digitalmars.D - Support for feeding data to engine threads?
- Jerry Quinn (2/2) Sep 12 2011 I'm looking at porting an app that maintains a work queue to be processe...
- Timon Gehr (3/5) Sep 12 2011 I don't know if I get you right, but std.parallelism uses a task pool.
- Jerry Quinn (6/13) Sep 12 2011 OK I guess what I'm looking for is WorkerLocalStorage. I can create an ...
- Timon Gehr (6/19) Sep 12 2011 Calling workerLocalStorage once suffices.
- Jerry Quinn (2/32) Sep 12 2011 datafile is just a set of parameters for configuring the engine, i.e loc...
- dsimcha (23/36) Sep 12 2011 one of N engines and written out in order. At first glance, std.paralle...
- Jerry Quinn (8/50) Sep 12 2011 An engine is simply a complex class that takes a lot of memory and setup...
- dsimcha (10/12) Sep 12 2011 of N engines and written out in order. At first glance, std.parallelism...
- Jerry Quinn (2/15) Sep 12 2011 The question I was asking was how to execute a huge amount of per-thread...
- dsimcha (6/21) Sep 12 2011 initialization once per thread in the TaskPool framework.
- Jerry Quinn (9/31) Sep 12 2011 I'd probably naturally want to do something like:
- dsimcha (10/22) Sep 12 2011 each thread as thread-local data. Each Engine object has a process() ca...
- David Nadlinger (4/5) Sep 13 2011 A concurrent queue (SharedQueue?) would be a nice addition to Phobos
- dsimcha (3/8) Sep 13 2011 A lock-free queue is one of the examples in TDPL if I recall correctly. ...
I'm looking at porting an app that maintains a work queue to be processed by one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread. Am I missing something or do I need to roll a shared queue object?
Sep 12 2011
On 09/12/2011 07:23 PM, Jerry Quinn wrote:I'm looking at porting an app that maintains a work queue to be processed by one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread. Am I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.
Sep 12 2011
Timon Gehr Wrote:On 09/12/2011 07:23 PM, Jerry Quinn wrote:OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i < nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.I'm looking at porting an app that maintains a work queue to be processed by one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread. Am I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.
Sep 12 2011
On 09/12/2011 08:01 PM, Jerry Quinn wrote:Timon Gehr Wrote:Calling workerLocalStorage once suffices. auto engine=taskPool.workerLocalStorage(new Engine(datafile)); This will create one engine per working thread and and the same datafile. You can access the engine from each thread with engine.get. What is the exact role of datafile? Does it have to be distinct for each engine?On 09/12/2011 07:23 PM, Jerry Quinn wrote:OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i< nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.I'm looking at porting an app that maintains a work queue to be processed by one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread. Am I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.
Sep 12 2011
Timon Gehr Wrote:On 09/12/2011 08:01 PM, Jerry Quinn wrote:datafile is just a set of parameters for configuring the engine, i.e location of data, parameter values, etc. In this setting it would be the same for each engine.Timon Gehr Wrote:Calling workerLocalStorage once suffices. auto engine=taskPool.workerLocalStorage(new Engine(datafile)); This will create one engine per working thread and and the same datafile. You can access the engine from each thread with engine.get. What is the exact role of datafile? Does it have to be distinct for each engine?On 09/12/2011 07:23 PM, Jerry Quinn wrote:OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i< nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.I'm looking at porting an app that maintains a work queue to be processed by one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread. Am I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.
Sep 12 2011
== Quote from Jerry Quinn (jlquinn optonline.net)'s articleTimon Gehr Wrote:one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.On 09/12/2011 07:23 PM, Jerry Quinn wrote:I'm looking at porting an app that maintains a work queue to be processed byper thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool.OK I guess what I'm looking for is WorkerLocalStorage. I can create an engineAm I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.The impression I get is that for (int i=0; i < nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.The parameter for taskPool.workerLocalStorage is lazy, so it's even easier: taskPool.workerLocalStorage(new engine(datafile)); will create one new engine **per thread** because the lazy parameter is passed to it and evaluated **once per thread**. I'm not sure what an engine is in this context, though. As far as initialization, you could do something like: auto isInitialized = taskPool.workerLocalStorage(false); void doStuff() { if(!isInitialized.get) { engines.get.initialize(); isInitialized.get = true; } // Do some real processing. }
Sep 12 2011
dsimcha Wrote:== Quote from Jerry Quinn (jlquinn optonline.net)'s articleAn engine is simply a complex class that takes a lot of memory and setup work to prepare. I'm working on language processing, so we use large multiGB models. Great, I'm getting closer. So the evaluation happens in the worker thread, not the main thread, right? The other thing I think I have to do is sequence the initializations. Otherwise the disk/network will thrash trying to load N copies simultaneously. I imagine this can be done with a shared variable inside the engine.Timon Gehr Wrote:one of N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.On 09/12/2011 07:23 PM, Jerry Quinn wrote:I'm looking at porting an app that maintains a work queue to be processed byper thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool.OK I guess what I'm looking for is WorkerLocalStorage. I can create an engineAm I missing something or do I need to roll a shared queue object?I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.The impression I get is that for (int i=0; i < nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.The parameter for taskPool.workerLocalStorage is lazy, so it's even easier: taskPool.workerLocalStorage(new engine(datafile)); will create one new engine **per thread** because the lazy parameter is passed to it and evaluated **once per thread**. I'm not sure what an engine is in this context, though.As far as initialization, you could do something like: auto isInitialized = taskPool.workerLocalStorage(false); void doStuff() { if(!isInitialized.get) { engines.get.initialize(); isInitialized.get = true; } // Do some real processing. }I prefer to do all the initializations before work starts so that timing can be more accurate. These things take a lot to get fully loaded and ready to run. Thanks for all the help! It might be useful to indicate in the workerLocalData call that the lazy processing happens in the worker's thread. Jerry
Sep 12 2011
On 9/12/2011 3:32 PM, Jerry Quinn wrote:Thanks for all the help! It might be useful to indicate in the workerLocalData call that the lazy processing happens in the worker's thread.Sorry for the misunderstanding. When workerLocalStorage() is called, the lazy variable is evaluated once per worker thread, but it's evaluated **in the thread that workerLocalStorage() is called from**, **before workerLocalStorage() returns**.
Sep 12 2011
On 9/12/2011 3:32 PM, Jerry Quinn wrote:The other thing I think I have to do is sequence the initializations. Otherwise the disk/network will thrash trying to load N copies simultaneously. I imagine this can be done with a shared variable inside the engine.This can easily be done by creating null engines in the WorkerLocalStorage and then initializing them inside a synchronized block: auto wlEngines = taskPool.workerLocalStorage(Engine.init); auto wlIsInitialized = taskPool.workerLocalStorage(false); // This gets executed on the pool in a Task: void taskFunction() { if(!wlIsInitialized.get) { // Do initialization of thread-local engine in worker thread. synchronized wlEngines.get = new Engine(ctorArgs); wlIsInitialized.get = true; } // Do main processing. }I prefer to do all the initializations before work starts so that timing can be more accurate. These things take a lot to get fully loaded and ready to run.Unfortunately I don't have a good answer for this constraint. If you can offer a concrete enhancement request for std.parallelism, I'll certainly consider it, but I can't think of a good design for this kind of per-thread initialization routine off the top of my head, either in terms of API or implementation.
Sep 12 2011
== Quote from Jerry Quinn (jlquinn optonline.net)'s articleI'm looking at porting an app that maintains a work queue to be processed by oneof N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.Am I missing something or do I need to roll a shared queue object?Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.
Sep 12 2011
dsimcha Wrote:== Quote from Jerry Quinn (jlquinn optonline.net)'s articleThe question I was asking was how to execute a huge amount of per-thread initialization once per thread in the TaskPool framework.I'm looking at porting an app that maintains a work queue to be processed by oneof N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.Am I missing something or do I need to roll a shared queue object?Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.
Sep 12 2011
== Quote from Jerry Quinn (jlquinn optonline.net)'s articledsimcha Wrote:initialization once per thread in the TaskPool framework. Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one should be added. I've kind of needed it, too, and I don't anticipate it being very hard to implement. If you can suggest a good API for this, I'll work on it for next release.== Quote from Jerry Quinn (jlquinn optonline.net)'s articleThe question I was asking was how to execute a huge amount of per-threadI'm looking at porting an app that maintains a work queue to be processed by oneof N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.Am I missing something or do I need to roll a shared queue object?Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.
Sep 12 2011
dsimcha Wrote:== Quote from Jerry Quinn (jlquinn optonline.net)'s articleI'd probably naturally want to do something like: auto engines = taskPool.init(new Engine(params)); auto lines = File("foo.txt").byLine(); auto results = taskPool.map!(engines.process)(lines) foreach (auto r; results) writeln(r); This would create a set of new Engine objects per object, but initialized within each thread as thread-local data. Each Engine object has a process() call taking a line as input and returning a string in this case. Any free engine can process more work as long as there is still work available. For the large-scale use case, it could take an extra argument and allow only N simultaneous init's under the covers.dsimcha Wrote:initialization once per thread in the TaskPool framework. Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one should be added. I've kind of needed it, too, and I don't anticipate it being very hard to implement. If you can suggest a good API for this, I'll work on it for next release.== Quote from Jerry Quinn (jlquinn optonline.net)'s articleThe question I was asking was how to execute a huge amount of per-threadI'm looking at porting an app that maintains a work queue to be processed by oneof N engines and written out in order. At first glance, std.parallelism already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.Am I missing something or do I need to roll a shared queue object?Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.
Sep 12 2011
== Quote from Jerry Quinn (jlquinn optonline.net)'s articleeach thread as thread-local data. Each Engine object has a process() call taking a line as input and returning a string in this case. Any free engine can process more work as long as there is still work available.Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one should be added. I've kind of needed it, too, and I don't anticipate it being very hard to implement. If you can suggest a good API for this, I'll work on it for next release.I'd probably naturally want to do something like: auto engines = taskPool.init(new Engine(params)); auto lines = File("foo.txt").byLine(); auto results = taskPool.map!(engines.process)(lines) foreach (auto r; results) writeln(r); This would create a set of new Engine objects per object, but initialized withinFor the large-scale use case, it could take an extra argument and allow only Nsimultaneous init's under the covers. From reading your later posts, it seems like what you really want is worker-local storage that's initialized from the thread that owns it rather than the thread that workerLocalStorage() was called from. This may be do-able but I'll have to think about the consequences and the implementation. In the mean time, see my other posts for a workaround.
Sep 12 2011
On 9/12/11 7:23 PM, Jerry Quinn wrote:Am I missing something or do I need to roll a shared queue object?A concurrent queue (SharedQueue?) would be a nice addition to Phobos regardless, imho. David
Sep 13 2011
== Quote from David Nadlinger (see klickverbot.at)'s articleOn 9/12/11 7:23 PM, Jerry Quinn wrote:A lock-free queue is one of the examples in TDPL if I recall correctly. Maybe we should just put a slightly embellished version of that in.Am I missing something or do I need to roll a shared queue object?A concurrent queue (SharedQueue?) would be a nice addition to Phobos regardless, imho. David
Sep 13 2011