digitalmars.D.learn - How to create Multi Producer-Single Consumer concurrency
- adnan338 (19/19) Jun 12 2020 I have a list of files to download from the internet. Each of
- =?UTF-8?Q?Ali_=c3=87ehreli?= (59/62) Jun 16 2020 That's almost exactly what I do in some of my programs. I use
- Bagomot (2/69) Jul 13 2022 How to do the same with `taskPool` instead of `spawnLinked`?
- =?UTF-8?Q?Ali_=c3=87ehreli?= (13/14) Jul 13 2022 You are hitting the nail on the head. :) std.parallelism, which taskPool...
- Bagomot (3/21) Jul 13 2022 Thank you! I understood the difference between `std.parallelism`
I have a list of files to download from the internet. Each of those downloads are a time consuming task. What I want to do is download them concurrently and when each of those downloads finish, I want to activate something in my main thread, which will update a progressbar in the GUI thread. So there are multiple "download finished" message producers, and one consumer of those messages. Furthermore, that producer has a callback that triggers an UI object. For example, auto list = [...]; // list of URLs auto downloadableItems = list.length; __gshared int downloaded = 0; auto progressBar = // a gtk progressbar; How should I proceed? I can do a parallel foreach and call download() on each of the URL and update `downloaded`, but I don't know how to listen to `downloaded` when it gets updated so I can make changes in `progressBar` Not that my UI library is not thread safe and I cannot access UI object from different threads.
Jun 12 2020
On 6/12/20 3:02 PM, adnan338 wrote:So there are multiple "download finished" message producers, and one consumer of those messages. Furthermore, that producer has a callback that triggers an UI object.That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do. I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%. import std.stdio; import std.concurrency; import std.algorithm; import std.range; import std.exception; import std.format; import core.thread; struct Progress { Tid tid; // The id of the reporting thread size_t amount; // The amount of progress so far size_t total; // Total progress (can be file size) } void display(Progress[Tid] progresses) { const amount = progresses.byValue.map!(p => p.amount).sum; const total = progresses.byValue.map!(p => p.total).sum; writefln!"%6.2f%%"(100.0 * amount / total); } // The worker thread function void download(string url) { writefln!"Worker %s downloading %s."(thisTid, url); enum total = 20; foreach (i; 0 .. total) { // Imitate some progress Thread.sleep(100.msecs); // Report progress to owner ownerTid.send(Progress(thisTid, i + 1, total)); } } void main() { auto list = [ "dlang.org", "ddili.org" ]; auto downloaders = list.length .iota .map!(i => spawnLinked(&download, list[i])) .array; Progress[Tid] progresses; size_t finished = 0; while (finished != list.length) { receive( (LinkTerminated arg) { ++finished; // Check whether this thread is exiting prematurely enforce((arg.tid in progresses) && (progresses[arg.tid].amount == progresses[arg.tid].total), format!"Thread %s exited unexpectedly"(arg.tid)); }, (Progress progress) { progresses[progress.tid] = progress; progresses.display(); } ); } writeln("Processing the downloaded files."); } Ali
Jun 16 2020
On Tuesday, 16 June 2020 at 09:10:09 UTC, Ali Çehreli wrote:On 6/12/20 3:02 PM, adnan338 wrote:How to do the same with `taskPool` instead of `spawnLinked`?So there are multiple "download finished" message producers,and oneconsumer of those messages. Furthermore, that producer has acallbackthat triggers an UI object.That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do. I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%. import std.stdio; import std.concurrency; import std.algorithm; import std.range; import std.exception; import std.format; import core.thread; struct Progress { Tid tid; // The id of the reporting thread size_t amount; // The amount of progress so far size_t total; // Total progress (can be file size) } void display(Progress[Tid] progresses) { const amount = progresses.byValue.map!(p => p.amount).sum; const total = progresses.byValue.map!(p => p.total).sum; writefln!"%6.2f%%"(100.0 * amount / total); } // The worker thread function void download(string url) { writefln!"Worker %s downloading %s."(thisTid, url); enum total = 20; foreach (i; 0 .. total) { // Imitate some progress Thread.sleep(100.msecs); // Report progress to owner ownerTid.send(Progress(thisTid, i + 1, total)); } } void main() { auto list = [ "dlang.org", "ddili.org" ]; auto downloaders = list.length .iota .map!(i => spawnLinked(&download, list[i])) .array; Progress[Tid] progresses; size_t finished = 0; while (finished != list.length) { receive( (LinkTerminated arg) { ++finished; // Check whether this thread is exiting prematurely enforce((arg.tid in progresses) && (progresses[arg.tid].amount == progresses[arg.tid].total), format!"Thread %s exited unexpectedly"(arg.tid)); }, (Progress progress) { progresses[progress.tid] = progress; progresses.display(); } ); } writeln("Processing the downloaded files."); } Ali
Jul 13 2022
On 7/13/22 02:25, Bagomot wrote:How to do the same with `taskPool` instead of `spawnLinked`?You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent. However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes. You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides. The following are my understandings of these topics: http://ddili.org/ders/d.en/parallelism.html http://ddili.org/ders/d.en/concurrency.html http://ddili.org/ders/d.en/concurrency_shared.html The introduction section of the Concurrency chapter lists some differences. Ali
Jul 13 2022
On Wednesday, 13 July 2022 at 19:06:48 UTC, Ali Çehreli wrote:On 7/13/22 02:25, Bagomot wrote:Thank you! I understood the difference between `std.parallelism` and `std.concurrency`. My question no longer relevant :)How to do the same with `taskPool` instead of `spawnLinked`?You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent. However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes. You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides. The following are my understandings of these topics: http://ddili.org/ders/d.en/parallelism.html http://ddili.org/ders/d.en/concurrency.html http://ddili.org/ders/d.en/concurrency_shared.html The introduction section of the Concurrency chapter lists some differences. Ali
Jul 13 2022