digitalmars.D - Parallel ByLine, ByChunk?
- dsimcha (21/21) Aug 09 2011 It's sometimes useful to read a file by line or by chunk such that the n...
- Jonathan M Davis (4/27) Aug 09 2011 I suspect that this is what the OP of the "Modify thread-local storage f...
- Graham Fawcett (13/20) Aug 09 2011 Interesting. A possible use-case would be reading from a set of files,
- dsimcha (10/22) Aug 09 2011 I made a pull request
- dsimcha (69/69) Aug 09 2011 Ok, I've actually managed to come up with a good way to do this without ...
- Graham Fawcett (6/10) Aug 10 2011 Doesn't that essentially implement the InputRange protocol, but with
- dsimcha (9/19) Aug 10 2011 The fundamental difference between what I've defined and an input range
It's sometimes useful to read a file by line or by chunk such that the next element is fetched in a background thread while the current one is being used. std.parallelism.asyncBuf almost fits the bill here, except that, since the buffers in std.stdio.File.ByLine and ByChunk are recycled, they need to be duplicated by asyncBuf. This is inefficient. Is there any interest in a ParallelByChunk and ParallelByLine in std.stdio.File? These would have a set of some user specified size nBuffers of buffers, which would be recycled. A background thread would read ahead until all buffers are full, then wait on a condition variable. When popFront() has been called several times and a large number of buffers are available, the background thread would be woken to fill the newly available buffers. This would all happen in parallel with the client using each chunk/line obtained by front() and would be fully encapsulated and safe, even though there would probably be some low-level concurrency (i.e. using core.thread and core.sync instead of std.concurrency or std.parallelism) involved in the implementation. If there is interest, does anyone have any suggestions for making this buffer recycling range solution more general instead of such an ad-hoc solution that requires re-writing byChunk and byLine and using low-level concurrency? I'd like to encapsulate this pattern and put it in std.parallelism somehow, but I can't think of an easy way.
Aug 09 2011
On Tuesday, August 09, 2011 18:58:22 dsimcha wrote:It's sometimes useful to read a file by line or by chunk such that the next element is fetched in a background thread while the current one is being used. std.parallelism.asyncBuf almost fits the bill here, except that, since the buffers in std.stdio.File.ByLine and ByChunk are recycled, they need to be duplicated by asyncBuf. This is inefficient. Is there any interest in a ParallelByChunk and ParallelByLine in std.stdio.File? These would have a set of some user specified size nBuffers of buffers, which would be recycled. A background thread would read ahead until all buffers are full, then wait on a condition variable. When popFront() has been called several times and a large number of buffers are available, the background thread would be woken to fill the newly available buffers. This would all happen in parallel with the client using each chunk/line obtained by front() and would be fully encapsulated and safe, even though there would probably be some low-level concurrency (i.e. using core.thread and core.sync instead of std.concurrency or std.parallelism) involved in the implementation. If there is interest, does anyone have any suggestions for making this buffer recycling range solution more general instead of such an ad-hoc solution that requires re-writing byChunk and byLine and using low-level concurrency? I'd like to encapsulate this pattern and put it in std.parallelism somehow, but I can't think of an easy way.I suspect that this is what the OP of the "Modify thread-local storage from parent thread" thread in D.Learn is looking for. - Jonathan M Davis
Aug 09 2011
On Tue, 09 Aug 2011 18:58:22 +0000, dsimcha wrote:Is there any interest in a ParallelByChunk and ParallelByLine in std.stdio.File? These would have a set of some user specified size nBuffers of buffers, which would be recycled. A background thread would read ahead until all buffers are full, then wait on a condition variable. When popFront() has been called several times and a large number of buffers are available, the background thread would be woken to fill the newly available buffers.Interesting. A possible use-case would be reading from a set of files, say, a directory full of log files which need to be analyzed line-by-line. So a reader that accepted a range of files might be handy. (I guess that the file concatenation could be handled at the OS level, with "cat" and a pipe, but a range-of-files would be more convenient.) On a tangent, the first thing that came to mind when you mentioned this was Tim Bray's WideFinder benchmark: http://wikis.sun.com/display/WideFinder/Wide+Finder+Home ParallelByLine would make for a great WideFinder implementation. Best, Graham
Aug 09 2011
On 8/9/2011 3:14 PM, Graham Fawcett wrote:Interesting. A possible use-case would be reading from a set of files, say, a directory full of log files which need to be analyzed line-by-line. So a reader that accepted a range of files might be handy. (I guess that the file concatenation could be handled at the OS level, with "cat" and a pipe, but a range-of-files would be more convenient.) On a tangent, the first thing that came to mind when you mentioned this was Tim Bray's WideFinder benchmark: http://wikis.sun.com/display/WideFinder/Wide+Finder+Home ParallelByLine would make for a great WideFinder implementation. Best, GrahamI made a pull request (https://github.com/D-Programming-Language/phobos/pull/179) for another overload of asyncBuf in std.parallelism that will make parallel byLine or byChunk trivial to implement (reading lines in parallel is actually the documentation example). I'm not sure whether it's worth adding an actual ParallelByLine to std.stdio, given how trivial the new asyncBuf overload makes it. At any rate, I'd rather get the lower level infrastructure finalized and into Phobos before I build the higher level stuff on top of it.
Aug 09 2011
Ok, I've actually managed to come up with a good way to do this without writing any new low-level concurrency code. The idea is to create two delegates: nextDel reads the next element into a user-provided buffer. emptyDel tests whether there's anything left to read. Then, RoundRobinBuffer turns this into a range, cycling through a fixed set of buffers. Then, std.parallelism.asyncBuf handles filling the first half of the buffers while reading the second half and vice-versa. This pattern can be encapsulated as an overload of TaskPool.asyncBuf with some signature like: auto asyncBuf(T) (void delegate(ref T[]) nextDel, bool delegate() emptyDel, size_t nBuffers); Prototype/demo code: import std.stdio; struct RoundRobinBuffer(T) { private: T[][] bufs; size_t index; void delegate(ref T[]) nextDel; bool delegate() emptyDel; bool _empty; bool primed; void prime() { scope(success) primed = true; nextDel(bufs[index]); } public: this( void delegate(ref T[]) nextDel, bool delegate() emptyDel, size_t nBuffers ) { this.nextDel = nextDel; this.emptyDel = emptyDel; bufs.length = nBuffers; } T[] front() property { if(!primed) prime(); return bufs[index]; } void popFront() { if(empty || emptyDel()) { _empty = true; return; } index = (index + 1) % bufs.length; primed = false; } bool empty() property const pure nothrow safe { return _empty; } } void main() { import std.parallelism; auto f = File("test2.d"); void readNext(ref char[] buf) { f.readln(buf); import std.algorithm; if(std.algorithm.endsWith(buf, '\n')) { buf.length -= 1; } } auto b = RoundRobinBuffer!char( &readNext, &f.eof, 20 ); foreach(line; taskPool.asyncBuf(b, 10)) { writeln(line); } }
Aug 09 2011
On Tue, 09 Aug 2011 20:22:40 +0000, dsimcha wrote:Ok, I've actually managed to come up with a good way to do this without writing any new low-level concurrency code. The idea is to create two delegates: nextDel reads the next element into a user-provided buffer. emptyDel tests whether there's anything left to read.Doesn't that essentially implement the InputRange protocol, but with delegates instead of an object? Not saying it's bad, just wondering what the benefits/tradeoffs of your delegate-based approach are. Graham
Aug 10 2011
On 8/10/2011 7:58 PM, Graham Fawcett wrote:On Tue, 09 Aug 2011 20:22:40 +0000, dsimcha wrote:The fundamental difference between what I've defined and an input range is that the next() delegate lets you provide your own buffer. Of course, you can implement input ranges on top of this by defining an object that manages the buffering, which is exactly what I did. I could have used objects instead (e.g. a class/struct with a next() and empty() member function), but as a matter of personal preference I err on the side of avoiding objects because, when used excessively, they lead to boilerplate code.Ok, I've actually managed to come up with a good way to do this without writing any new low-level concurrency code. The idea is to create two delegates: nextDel reads the next element into a user-provided buffer. emptyDel tests whether there's anything left to read.Doesn't that essentially implement the InputRange protocol, but with delegates instead of an object? Not saying it's bad, just wondering what the benefits/tradeoffs of your delegate-based approach are. Graham
Aug 10 2011