digitalmars.D.learn - std.parallelism: How to wait all tasks finished?
- Cooler (17/17) Feb 02 2014 I have several tasks. Each task may or may not create another
- Dan Killebrew (6/11) Feb 02 2014 Are you sure TaskPool.finish isn't what you're looking for?
- Cooler (3/14) Feb 02 2014 No. After taskPool.finish() no way to add new tasks to the queue.
- Dan Killebrew (10/15) Feb 03 2014 Then perhaps you need to create a new TaskPool (and make sure
- Cooler (4/20) Feb 05 2014 Will not help. I don't know beforehand what tasks will be
- Chris Williams (9/12) Feb 05 2014 You seem to be saying that you want to be able to wait for all
- Andrea Fontana (19/43) Feb 06 2014 Something like this? (not tested...)
- Cooler (24/69) Feb 06 2014 It is closer, but after taskPool.finish() all tries to
- Cooler (5/80) Feb 06 2014 Forgot to say - I know how to solve the topic problem. My
- Andrea Fontana (4/91) Feb 06 2014 What about sync ++taskCount when you put() something and
- Andrea Fontana (27/119) Feb 06 2014 Something like this:
- Cooler (10/130) Feb 06 2014 Now I do almost the same in my program, but instead of while(...)
- Russel Winder (29/31) Feb 06 2014 What you are describing here is a classic fork/join architecture. The
- Cooler (3/43) Feb 06 2014 Thank you for your verbose answer. I think the solution could be
I have several tasks. Each task may or may not create another task. What is the best way to wait until all tasks finished? The code: void procData(){ if(...) taskPool.put(task(&procData)); } void main(){ taskPool.put(task(&procData)); taskPool.put(task(&procData)); ... taskPool.put(task(&procData)); // Next line will block execution until all tasks already in queue finished. // Almost all what I need, but new tasks will not be started. taskPool.finish(true); }
Feb 02 2014
// Next line will block execution until all tasks already in queue finished. // Almost all what I need, but new tasks will not be started. taskPool.finish(true); }Are you sure TaskPool.finish isn't what you're looking for? "Signals worker threads to terminate when the queue becomes empty." It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
Feb 02 2014
On Monday, 3 February 2014 at 06:56:35 UTC, Dan Killebrew wrote:No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.// Next line will block execution until all tasks already in queue finished. // Almost all what I need, but new tasks will not be started. taskPool.finish(true); }Are you sure TaskPool.finish isn't what you're looking for? "Signals worker threads to terminate when the queue becomes empty." It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
Feb 02 2014
Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 03 2014
On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 05 2014
On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.You seem to be saying that you want to be able to wait for all tasks to complete an indefinite number of times, adding more tasks after each one. Why would you want to do that? The queue for the pool is infinitely long, so just keep adding tasks till you have no more tasks to add. Or if you have a progression of types, like all tasks of type A have to be complete before you can start running the tasks of type B, then you should be able to have a separate thread pool for each type.
Feb 05 2014
On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example. import std.stdio, std.parallelism, core.thread; shared int i; void procData(){ synchronized ++i; if(i >= 100) return; foreach(i; 0 .. 100) taskPool.put(task(&procData)); // New tasks will be rejected after // taskPool.finish() } void main(){ taskPool.put(task(&procData)); Thread.sleep(1.msecs); // The final output of "i" depends on duration here taskPool.finish(true); writefln("i = %s", i); } In the example above the total number of tasks executed depends on sleep duration.On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:Forgot to say - I know how to solve the topic problem. My question is "What is the BEST way?". One of my idea - may be introduce new function, named for example "wait", that will block until there are working tasks?On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example. import std.stdio, std.parallelism, core.thread; shared int i; void procData(){ synchronized ++i; if(i >= 100) return; foreach(i; 0 .. 100) taskPool.put(task(&procData)); // New tasks will be rejected after // taskPool.finish() } void main(){ taskPool.put(task(&procData)); Thread.sleep(1.msecs); // The final output of "i" depends on duration here taskPool.finish(true); writefln("i = %s", i); } In the example above the total number of tasks executed depends on sleep duration.On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:What about sync ++taskCount when you put() something and --taskCount when task is done? And on main while(i > 0) Thread.yield(); ?On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:Forgot to say - I know how to solve the topic problem. My question is "What is the BEST way?". One of my idea - may be introduce new function, named for example "wait", that will block until there are working tasks?On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example. import std.stdio, std.parallelism, core.thread; shared int i; void procData(){ synchronized ++i; if(i >= 100) return; foreach(i; 0 .. 100) taskPool.put(task(&procData)); // New tasks will be rejected after // taskPool.finish() } void main(){ taskPool.put(task(&procData)); Thread.sleep(1.msecs); // The final output of "i" depends on duration here taskPool.finish(true); writefln("i = %s", i); } In the example above the total number of tasks executed depends on sleep duration.On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Thursday, 6 February 2014 at 16:07:51 UTC, Andrea Fontana wrote:On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:Something like this: import std.stdio, std.parallelism, core.thread; import std.random; shared size_t taskCount; shared size_t i; void procData() in { synchronized ++i; } out { synchronized --taskCount; } body { if (i > 100) return; foreach(i; 0 .. 100) { taskPool.put(task(&procData)); synchronized ++taskCount; } } void main(){ taskCount = 2; taskPool.put(task(&procData)); taskPool.put(task(&procData)); while(taskCount > 0) Thread.yield(); }On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:What about sync ++taskCount when you put() something and --taskCount when task is done? And on main while(i > 0) Thread.yield(); ?On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:Forgot to say - I know how to solve the topic problem. My question is "What is the BEST way?". One of my idea - may be introduce new function, named for example "wait", that will block until there are working tasks?On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example. import std.stdio, std.parallelism, core.thread; shared int i; void procData(){ synchronized ++i; if(i >= 100) return; foreach(i; 0 .. 100) taskPool.put(task(&procData)); // New tasks will be rejected after // taskPool.finish() } void main(){ taskPool.put(task(&procData)); Thread.sleep(1.msecs); // The final output of "i" depends on duration here taskPool.finish(true); writefln("i = %s", i); } In the example above the total number of tasks executed depends on sleep duration.On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Thursday, 6 February 2014 at 16:19:38 UTC, Andrea Fontana wrote:On Thursday, 6 February 2014 at 16:07:51 UTC, Andrea Fontana wrote:Now I do almost the same in my program, but instead of while(...) Thread.yield() I wait on semaphore to be notified. And in threads when --taskCount reaches 0, i do Semaphore.notify(). But all of this don't look beautiful enough. As I mention above - may be introduce new function, named for example "wait", that will block TaskPool until there are working tasks? If such situation encounters frequently, may be it is worth to add Phobos some functionality?On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:Something like this: import std.stdio, std.parallelism, core.thread; import std.random; shared size_t taskCount; shared size_t i; void procData() in { synchronized ++i; } out { synchronized --taskCount; } body { if (i > 100) return; foreach(i; 0 .. 100) { taskPool.put(task(&procData)); synchronized ++taskCount; } } void main(){ taskCount = 2; taskPool.put(task(&procData)); taskPool.put(task(&procData)); while(taskCount > 0) Thread.yield(); }On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:What about sync ++taskCount when you put() something and --taskCount when task is done? And on main while(i > 0) Thread.yield(); ?On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:Forgot to say - I know how to solve the topic problem. My question is "What is the BEST way?". One of my idea - may be introduce new function, named for example "wait", that will block until there are working tasks?On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example. import std.stdio, std.parallelism, core.thread; shared int i; void procData(){ synchronized ++i; if(i >= 100) return; foreach(i; 0 .. 100) taskPool.put(task(&procData)); // New tasks will be rejected after // taskPool.finish() } void main(){ taskPool.put(task(&procData)); Thread.sleep(1.msecs); // The final output of "i" depends on duration here taskPool.finish(true); writefln("i = %s", i); } In the example above the total number of tasks executed depends on sleep duration.On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:Something like this? (not tested...) shared bool more = true; ... ... ... void procData(){ if(...) { taskPool.put(task(&procData)); more = true; } } while(true) { taskPool.finish(true); if (!more) break; else more = false; }Will not help. I don't know beforehand what tasks will be created. procData is recursive and it decides create new task or not.Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc. auto phase1 = new TaskPool(); //make sure all new tasks are added to phase1 phase1.finish(true); auto phase2 = new TaskPool(); //make sure all new tasks are added to phase2 phase2.finish(true);It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
Feb 06 2014
On Mon, 2014-02-03 at 00:00 +0000, Cooler wrote:I have several tasks. Each task may or may not create another task. What is the best way to wait until all tasks finished?What you are describing here is a classic fork/join architecture. The tasks are structured as a tree with synchronization handled by the sub-nodes. As far as I am aware std.parallelism focuses on data parallelism which is a scatter/gather (aka map/reduce) model of just a single layer. All the code fragments in the thread have, I believe, been predicated on working with a thread pool as an explicit global entity. I think the problems have stemmed from taking this viewpoint. I would suggest following the way the Java fork/join framework (based on Doug Lea's original) works. There is an underlying global thread pool, but the user code uses the fork/join abstraction layer in order to create the tree of synchronization dependencies. In this case instead of working with tasks directly there needs to be a type whose job it is to be a non-leaf node in the tree that handles synchronization whilst nonetheless creating tasks and submitting them to the pool. This is clearly something that could turn into an addition to std.parallelism or be std.forkjoin. Sorry I have no actual code to offer, but the overall design of what is needed is well understood, at least in the Java context. C++ has a long way to go to catch up, as does D. The other thing that then sits on this is lazy stream parallelism, which is what Java 8 is adding to the mix. -- Russel. ============================================================================= Dr Russel Winder t: +44 20 7585 2200 voip: sip:russel.winder ekiga.net 41 Buckmaster Road m: +44 7770 465 077 xmpp: russel winder.org.uk London SW11 1EN, UK w: www.russel.org.uk skype: russel_winder
Feb 06 2014
On Thursday, 6 February 2014 at 17:58:47 UTC, Russel Winder wrote:On Mon, 2014-02-03 at 00:00 +0000, Cooler wrote:Thank you for your verbose answer. I think the solution could be shorter and simpler :)I have several tasks. Each task may or may not create another task. What is the best way to wait until all tasks finished?What you are describing here is a classic fork/join architecture. The tasks are structured as a tree with synchronization handled by the sub-nodes. As far as I am aware std.parallelism focuses on data parallelism which is a scatter/gather (aka map/reduce) model of just a single layer. All the code fragments in the thread have, I believe, been predicated on working with a thread pool as an explicit global entity. I think the problems have stemmed from taking this viewpoint. I would suggest following the way the Java fork/join framework (based on Doug Lea's original) works. There is an underlying global thread pool, but the user code uses the fork/join abstraction layer in order to create the tree of synchronization dependencies. In this case instead of working with tasks directly there needs to be a type whose job it is to be a non-leaf node in the tree that handles synchronization whilst nonetheless creating tasks and submitting them to the pool. This is clearly something that could turn into an addition to std.parallelism or be std.forkjoin. Sorry I have no actual code to offer, but the overall design of what is needed is well understood, at least in the Java context. C++ has a long way to go to catch up, as does D. The other thing that then sits on this is lazy stream parallelism, which is what Java 8 is adding to the mix.
Feb 06 2014