www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Parallel ByLine, ByChunk?

reply dsimcha <dsimcha yahoo.com> writes:
It's sometimes useful to read a file by line or by chunk such that the next
element is fetched in a background thread while the current one is being used.
 std.parallelism.asyncBuf almost fits the bill here, except that, since the
buffers in std.stdio.File.ByLine and ByChunk are recycled, they need to be
duplicated by asyncBuf.  This is inefficient.

Is there any interest in a ParallelByChunk and ParallelByLine in
std.stdio.File?  These would have a set of some user specified size nBuffers
of buffers, which would be recycled.  A background thread would read ahead
until all buffers are full, then wait on a condition variable.  When
popFront() has been called several times and a large number of buffers are
available, the background thread would be woken to fill the newly available
buffers.  This would all happen in parallel with the client using each
chunk/line obtained by front() and would be fully encapsulated and safe, even
though there would probably be some low-level concurrency (i.e. using
core.thread and core.sync instead of std.concurrency or std.parallelism)
involved in the implementation.

If there is interest, does anyone have any suggestions for making this buffer
recycling range solution more general instead of such an ad-hoc solution that
requires re-writing byChunk and byLine and using low-level concurrency?  I'd
like to encapsulate this pattern and put it in std.parallelism somehow, but I
can't think of an easy way.
Aug 09 2011
next sibling parent Jonathan M Davis <jmdavisProg gmx.com> writes:
On Tuesday, August 09, 2011 18:58:22 dsimcha wrote:
 It's sometimes useful to read a file by line or by chunk such that the next
 element is fetched in a background thread while the current one is being
 used. std.parallelism.asyncBuf almost fits the bill here, except that,
 since the buffers in std.stdio.File.ByLine and ByChunk are recycled, they
 need to be duplicated by asyncBuf.  This is inefficient.
 
 Is there any interest in a ParallelByChunk and ParallelByLine in
 std.stdio.File?  These would have a set of some user specified size nBuffers
 of buffers, which would be recycled.  A background thread would read ahead
 until all buffers are full, then wait on a condition variable.  When
 popFront() has been called several times and a large number of buffers are
 available, the background thread would be woken to fill the newly available
 buffers.  This would all happen in parallel with the client using each
 chunk/line obtained by front() and would be fully encapsulated and safe,
 even though there would probably be some low-level concurrency (i.e. using
 core.thread and core.sync instead of std.concurrency or std.parallelism)
 involved in the implementation.
 
 If there is interest, does anyone have any suggestions for making this
 buffer recycling range solution more general instead of such an ad-hoc
 solution that requires re-writing byChunk and byLine and using low-level
 concurrency?  I'd like to encapsulate this pattern and put it in
 std.parallelism somehow, but I can't think of an easy way.
I suspect that this is what the OP of the "Modify thread-local storage from parent thread" thread in D.Learn is looking for. - Jonathan M Davis
Aug 09 2011
prev sibling next sibling parent reply Graham Fawcett <fawcett uwindsor.ca> writes:
On Tue, 09 Aug 2011 18:58:22 +0000, dsimcha wrote:
 Is there any interest in a ParallelByChunk and ParallelByLine in
 std.stdio.File? These would have a set of some user specified size
 nBuffers of buffers, which would be recycled. A background thread
 would read ahead until all buffers are full, then wait on a
 condition variable. When popFront() has been called several times
 and a large number of buffers are available, the background thread
 would be woken to fill the newly available buffers.
Interesting. A possible use-case would be reading from a set of files, say, a directory full of log files which need to be analyzed line-by-line. So a reader that accepted a range of files might be handy. (I guess that the file concatenation could be handled at the OS level, with "cat" and a pipe, but a range-of-files would be more convenient.) On a tangent, the first thing that came to mind when you mentioned this was Tim Bray's WideFinder benchmark: http://wikis.sun.com/display/WideFinder/Wide+Finder+Home ParallelByLine would make for a great WideFinder implementation. Best, Graham
Aug 09 2011
parent dsimcha <dsimcha yahoo.com> writes:
On 8/9/2011 3:14 PM, Graham Fawcett wrote:
 Interesting. A possible use-case would be reading from a set of files,
 say, a directory full of log files which need to be analyzed
 line-by-line. So a reader that accepted a range of files might be
 handy. (I guess that the file concatenation could be handled at the OS
 level, with "cat" and a pipe, but a range-of-files would be more
 convenient.)

 On a tangent, the first thing that came to mind when you mentioned
 this was Tim Bray's WideFinder benchmark:

    http://wikis.sun.com/display/WideFinder/Wide+Finder+Home

 ParallelByLine would make for a great WideFinder implementation.

 Best,
 Graham
I made a pull request (https://github.com/D-Programming-Language/phobos/pull/179) for another overload of asyncBuf in std.parallelism that will make parallel byLine or byChunk trivial to implement (reading lines in parallel is actually the documentation example). I'm not sure whether it's worth adding an actual ParallelByLine to std.stdio, given how trivial the new asyncBuf overload makes it. At any rate, I'd rather get the lower level infrastructure finalized and into Phobos before I build the higher level stuff on top of it.
Aug 09 2011
prev sibling parent reply dsimcha <dsimcha yahoo.com> writes:
Ok, I've actually managed to come up with a good way to do this without writing
any new low-level concurrency code.  The idea is to create two delegates: 
nextDel
reads the next element into a user-provided buffer.  emptyDel tests whether
there's anything left to read.  Then, RoundRobinBuffer turns this into a range,
cycling through a fixed set of buffers.  Then, std.parallelism.asyncBuf handles
filling the first half of the buffers while reading the second half and
vice-versa.  This pattern can be encapsulated as an overload of
TaskPool.asyncBuf
with some signature like:

auto asyncBuf(T)
(void delegate(ref T[]) nextDel, bool delegate() emptyDel, size_t nBuffers);

Prototype/demo code:

import std.stdio;

struct RoundRobinBuffer(T) {
private:
    T[][] bufs;
    size_t index;
    void delegate(ref T[]) nextDel;
    bool delegate() emptyDel;
    bool _empty;
    bool primed;

    void prime() {
        scope(success) primed = true;
        nextDel(bufs[index]);
    }

public:

    this(
        void delegate(ref T[]) nextDel,
        bool delegate() emptyDel,
        size_t nBuffers
    ) {
        this.nextDel = nextDel;
        this.emptyDel = emptyDel;
        bufs.length = nBuffers;
    }

    T[] front()  property {
        if(!primed) prime();
        return bufs[index];
    }

    void popFront() {
        if(empty || emptyDel()) {
            _empty = true;
            return;
        }

        index = (index + 1) % bufs.length;
        primed = false;
    }

    bool empty()  property const pure nothrow  safe {
        return _empty;
    }
}

void main() {
    import std.parallelism;

    auto f = File("test2.d");

    void readNext(ref char[] buf) {
        f.readln(buf);

        import std.algorithm;
        if(std.algorithm.endsWith(buf, '\n')) {
            buf.length -= 1;
        }
    }

    auto b = RoundRobinBuffer!char(
        &readNext,
        &f.eof,
        20
    );

    foreach(line; taskPool.asyncBuf(b, 10)) {
        writeln(line);
    }
}
Aug 09 2011
parent reply Graham Fawcett <fawcett uwindsor.ca> writes:
On Tue, 09 Aug 2011 20:22:40 +0000, dsimcha wrote:

 Ok, I've actually managed to come up with a good way to do this without
 writing any new low-level concurrency code.  The idea is to create two
 delegates:  nextDel reads the next element into a user-provided buffer. 
 emptyDel tests whether there's anything left to read.  
Doesn't that essentially implement the InputRange protocol, but with delegates instead of an object? Not saying it's bad, just wondering what the benefits/tradeoffs of your delegate-based approach are. Graham
Aug 10 2011
parent dsimcha <dsimcha yahoo.com> writes:
On 8/10/2011 7:58 PM, Graham Fawcett wrote:
 On Tue, 09 Aug 2011 20:22:40 +0000, dsimcha wrote:

 Ok, I've actually managed to come up with a good way to do this without
 writing any new low-level concurrency code.  The idea is to create two
 delegates:  nextDel reads the next element into a user-provided buffer.
 emptyDel tests whether there's anything left to read.
Doesn't that essentially implement the InputRange protocol, but with delegates instead of an object? Not saying it's bad, just wondering what the benefits/tradeoffs of your delegate-based approach are. Graham
The fundamental difference between what I've defined and an input range is that the next() delegate lets you provide your own buffer. Of course, you can implement input ranges on top of this by defining an object that manages the buffering, which is exactly what I did. I could have used objects instead (e.g. a class/struct with a next() and empty() member function), but as a matter of personal preference I err on the side of avoiding objects because, when used excessively, they lead to boilerplate code.
Aug 10 2011