digitalmars.D - Structured Concurrency vs P2300R7
- Paolo Invernizzi (23/23) Apr 07 I've seen again Sebastiaan talk about structured concurrency, I
- Sebastiaan Koppe (73/83) Apr 07 Contrary to P2300, the `whenAll` implementation in the
- Paolo Invernizzi (8/13) Apr 08 Thank you Sebastiaan for the explanations, I missed that whenAll
I've seen again Sebastiaan talk about structured concurrency, I know it's not a verbatim port of P2300R7, so trying to grok it I started to code in D some P2300R7 examples. The second example is basically some parallel computation over an array on doubles, powered by the `execution::bulk` adaptor, basically one way to execute one callable multiple times in parallel over an input. What is confusing me is that the Stream concept is not present in P2300, so I'm not sure about one basic thing, how to span parallel computations and reduce the results. What's the correct way to instantiate 'n' senders to execute a callable via a threadPool scheduler, and pass them as 'whenAll' parameter? What the way to perform the classic D example: ``` auto logs = new double[1_000_000]; foreach (i, ref elem; parallel(logs)) { elem = log(i + 1.0); } ``` Are Streams involved? Thank you
Apr 07
On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:What the way to perform the classic D example: ``` auto logs = new double[1_000_000]; foreach (i, ref elem; parallel(logs)) { elem = log(i + 1.0); } ``` Are Streams involved? Thank youContrary to P2300, the `whenAll` implementation in the concurrency library accepts an array as well. ``` import concurrency.thread : stdTaskPool; import concurrency.operations : whenAll; import concurrency : syncWait; import std.algorithm : map; import std.array : array; auto pool = stdTaskPool(32); auto scheduler = pool.getScheduler(); auto logs = new double[1_000_000] logs .map!((i) => just(i) .then((double i) => log(i + 1.0)) .on(scheduler) ) .array .whenAll .syncWait .value; ``` `whenAll` should be able to work with ranges that provide a size, but it doesn't yet, therefor a call to `.array` is required. Note this is not quite equivalent to your code, since it doesn't mutate the array in-place. You can do that using: ``` .map!((ref i) => just(&i) .then((double* i) => *i = log(*i + 1.0)) .on(scheduler) ) ``` Unfortunately `then` isn't smart enough to allow `ref double i`, so you have to deal with ugly pointers. Something to improve. --- As for streams, they are actually something that I am going to deprecate for the newer Sequence concept. While that code hasn't been written, the api would allow you to do the following: ``` auto result = logs .sequence() .transform((double i) => log(i + 1.0)) .parallelize(pool.getScheduler) .toList .syncWait .value ``` For the in-place mutation I suppose we could add a `refSequence` (then we don't need `.toList` anymore). Yet another way would be to use an `asyncScope` and spawn individual tasks: ``` import concurrency.thread : stdTaskPool; import concurrency.operations : whenAll; import concurrency : syncWait; import std.algorithm : map; import std.array : array; import concurrency.asyncscope; auto pool = stdTaskPool(32); auto scheduler = pool.getScheduler(); auto scp = asyncScope(); foreach (i, ref elem; logs) { scp.spawn( just(&elem) .then((double* elem)) => *elem = log(i + 1.0)) .on(scheduler) ); } scp.cleanup.syncWait(); ```
Apr 07
On Sunday, 7 April 2024 at 17:48:43 UTC, Sebastiaan Koppe wrote:On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:Thank you Sebastiaan for the explanations, I missed that whenAll accepts array too. So I will avoid to dig into Streams, waiting for Sequence, and yes I think that 'parallelize' is the right sugar to add. In the meantime, the trio Sender / Receiver / Scheduler should be sufficient to play with a lot of things. /P[...]Contrary to P2300, the `whenAll` implementation in the concurrency library accepts an array as well. [...]
Apr 08