www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Structured Concurrency vs P2300R7

reply Paolo Invernizzi <paolo.invernizzi gmail.com> writes:
I've seen again Sebastiaan talk about structured concurrency, I 
know it's not a verbatim port of P2300R7, so trying to grok it I 
started to code in D some P2300R7 examples.

The second example is basically some parallel computation over an 
array on doubles, powered by the `execution::bulk` adaptor, 
basically one way to execute one callable multiple times in 
parallel over an input.

What is confusing me is that the Stream concept is not present in 
P2300, so I'm not sure about one basic thing, how to span 
parallel computations and reduce the results.

What's the correct way to instantiate 'n' senders to execute a 
callable via a threadPool scheduler, and pass them as 'whenAll' 
parameter?

What the way to perform the classic D example:

```
auto logs = new double[1_000_000];

foreach (i, ref elem; parallel(logs))
{
     elem = log(i + 1.0);
}
```

Are Streams involved?

Thank you
Apr 07 2024
parent reply Sebastiaan Koppe <mail skoppe.eu> writes:
On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:
 What the way to perform the classic D example:

 ```
 auto logs = new double[1_000_000];

 foreach (i, ref elem; parallel(logs))
 {
     elem = log(i + 1.0);
 }
 ```

 Are Streams involved?

 Thank you
Contrary to P2300, the `whenAll` implementation in the concurrency library accepts an array as well. ``` import concurrency.thread : stdTaskPool; import concurrency.operations : whenAll; import concurrency : syncWait; import std.algorithm : map; import std.array : array; auto pool = stdTaskPool(32); auto scheduler = pool.getScheduler(); auto logs = new double[1_000_000] logs .map!((i) => just(i) .then((double i) => log(i + 1.0)) .on(scheduler) ) .array .whenAll .syncWait .value; ``` `whenAll` should be able to work with ranges that provide a size, but it doesn't yet, therefor a call to `.array` is required. Note this is not quite equivalent to your code, since it doesn't mutate the array in-place. You can do that using: ``` .map!((ref i) => just(&i) .then((double* i) => *i = log(*i + 1.0)) .on(scheduler) ) ``` Unfortunately `then` isn't smart enough to allow `ref double i`, so you have to deal with ugly pointers. Something to improve. --- As for streams, they are actually something that I am going to deprecate for the newer Sequence concept. While that code hasn't been written, the api would allow you to do the following: ``` auto result = logs .sequence() .transform((double i) => log(i + 1.0)) .parallelize(pool.getScheduler) .toList .syncWait .value ``` For the in-place mutation I suppose we could add a `refSequence` (then we don't need `.toList` anymore). Yet another way would be to use an `asyncScope` and spawn individual tasks: ``` import concurrency.thread : stdTaskPool; import concurrency.operations : whenAll; import concurrency : syncWait; import std.algorithm : map; import std.array : array; import concurrency.asyncscope; auto pool = stdTaskPool(32); auto scheduler = pool.getScheduler(); auto scp = asyncScope(); foreach (i, ref elem; logs) { scp.spawn( just(&elem) .then((double* elem)) => *elem = log(i + 1.0)) .on(scheduler) ); } scp.cleanup.syncWait(); ```
Apr 07 2024
parent Paolo Invernizzi <paolo.invernizzi gmail.com> writes:
On Sunday, 7 April 2024 at 17:48:43 UTC, Sebastiaan Koppe wrote:
 On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:
 [...]
Contrary to P2300, the `whenAll` implementation in the concurrency library accepts an array as well. [...]
Thank you Sebastiaan for the explanations, I missed that whenAll accepts array too. So I will avoid to dig into Streams, waiting for Sequence, and yes I think that 'parallelize' is the right sugar to add. In the meantime, the trio Sender / Receiver / Scheduler should be sufficient to play with a lot of things. /P
Apr 08 2024