digitalmars.D.learn - Question on shared memory concurrency
- Andy Valencia (32/32) Mar 03 I tried a shared memory parallel increment. Yes, it's basically
- Richard (Rikki) Andrew Cattermole (22/22) Mar 03 A way to do this without spawning threads manually:
- Andy Valencia (8/10) Mar 04 Thank you! Of course, a thread dispatch per atomic increment is
- evilrat (5/15) Mar 04 There is `__gshared` type qualifier, but unlike plain `shared` it
- Andy Valencia (32/37) Mar 04 For any other newbie dlang voyagers, here's a version which works
- Andy Valencia (36/40) Mar 05 Using a technique I found in a unit test in std/concurrency.d, I
I tried a shared memory parallel increment. Yes, it's basically a cache line thrasher, but I wanted to see what's involved in shared memory programming. Even though I tried to follow all the rules to make true shared memory (not thread local) it appears I failed, as the wait loop at the end only sees its own local 250 million increments? import core.atomic : atomicFetchAdd; import std.stdio : writeln; import std.concurrency : spawn; import core.time : msecs; import core.thread : Thread; const uint NSWEPT = 1_000_000_000; const uint NCPU = 4; void doadd(ref shared(uint) val) { for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(val, 1); } } void main() { shared(uint) val = 0; for (int x = 0; x < NCPU-1; ++x) { spawn(&doadd, val); } doadd(val); while (val != NSWEPT) { Thread.sleep(1.msecs); } }
Mar 03
A way to do this without spawning threads manually: ```d import std.parallelism : TaskPool, parallel, taskPool, defaultPoolThreads; import std.stdio : writeln; import std.range : iota; enum NSWEPT = 1_000_000; enum NCPU = 4; void main() { import core.atomic : atomicLoad, atomicOp; shared(uint) value; defaultPoolThreads(NCPU); TaskPool pool = taskPool(); foreach(_; pool.parallel(iota(NSWEPT))) { atomicOp!"+="(value, 1); } writeln(pool.size); writeln(atomicLoad(value)); } ``` Unfortunately I could only use the default task pool, creating a new one took too long on run.dlang.io. I also has to decrease NSWEPT because anything larger would take too long.
Mar 03
On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:A way to do this without spawning threads manually: ...Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations. Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
Mar 04
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:There is `__gshared` type qualifier, but unlike plain `shared` it is up to you to ensure valid concurrency access as stated in the docs. https://dlang.org/spec/const3.html#shared_globalA way to do this without spawning threads manually: ...Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations. Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
Mar 04
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote: ... I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator. On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds. import core.atomic : atomicFetchAdd; import std.concurrency : spawn; import core.time : msecs; import core.thread : Thread; import core.memory : GC; const uint NSWEPT = 100_000_000; const uint NCPU = 4; void doadd(shared uint *buf) { for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(buf[0], 1); } } void main() { shared uint *buf = cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN); for (uint x = 0; x < NCPU-1; ++x) { spawn(&doadd, buf); } doadd(buf); while (buf[0] != NSWEPT) { Thread.sleep(1.msecs); } }
Mar 04
On Monday, 4 March 2024 at 18:08:52 UTC, Andy Valencia wrote:For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator. On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds. ...Using a technique I found in a unit test in std/concurrency.d, I managed to share process memory without GC. It counted up to 1,000,000,000 on my low end i7 in: real 0m15.666s user 0m59.913s sys 0m0.004s import core.atomic : atomicFetchAdd; import std.concurrency : spawn, send, receiveOnly, ownerTid; import core.thread : Thread; const uint NSWEPT = 1_000_000_000; const uint NCPU = 4; void doadd() { auto val = receiveOnly!(shared(int)[]); for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(val[0], 1); } ownerTid.send(true); } void main() { static shared int[] val = new shared(int)[1]; // Parallel workers for (int x = 0; x < NCPU; ++x) { auto tid = spawn(&doadd); tid.send(val); } // Pick up all completed workers for (int x = 0; x < NCPU; ++x) { receiveOnly!(bool); } assert(val[0] == NSWEPT); }
Mar 05