digitalmars.D.learn - Simplest multithreading example
- Brian (47/47) Aug 31 2017 Hello, I am trying to get the most trivial example of
- Michael Coulombe (8/55) Aug 31 2017 Just like a sequential loop, when you do "foreach (i; parallel(I)
- =?UTF-8?Q?Ali_=c3=87ehreli?= (24/37) Aug 31 2017 As Michael Coulombe said, parallel() does that implicitly.
- Brian (46/89) Aug 31 2017 Hello, thank you very much for your quick replies !
- =?UTF-8?Q?Ali_=c3=87ehreli?= (7/9) Sep 01 2017 I still think you can take advantage of std.parallelism:
- ag0aep6g (29/47) Sep 01 2017 Works pretty well for me:
Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out. I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicable here ? writeln(X); // some random subset of indices ------------------------------------------------ Trial 2 : (closer to Java) ------------------------------------------------ class DerivedThread : Thread { int [] X; int i; this(int [] X, int i){ this.X = X; this.i = i; super(&run); } private: void run(){ X ~= i; } } void main(){ auto I = std.range.iota(0,500); int [] X; // output Thread [] threads; foreach (i; I ) threads ~= new DerivedThread( X,i); foreach( thread; threads) thread.start(); foreach( thread; threads) thread.join(); // does not seem to do anything core.thread.thread_joinAll(); // also not doing anything writeln(X); // X contains nothing at all } How can I get the program to wait until all threads have finished before moving to the next line of code ? Thank you !
Aug 31 2017
On Friday, 1 September 2017 at 01:59:07 UTC, Brian wrote:Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out. I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicable here ? writeln(X); // some random subset of indices ------------------------------------------------ Trial 2 : (closer to Java) ------------------------------------------------ class DerivedThread : Thread { int [] X; int i; this(int [] X, int i){ this.X = X; this.i = i; super(&run); } private: void run(){ X ~= i; } } void main(){ auto I = std.range.iota(0,500); int [] X; // output Thread [] threads; foreach (i; I ) threads ~= new DerivedThread( X,i); foreach( thread; threads) thread.start(); foreach( thread; threads) thread.join(); // does not seem to do anything core.thread.thread_joinAll(); // also not doing anything writeln(X); // X contains nothing at all } How can I get the program to wait until all threads have finished before moving to the next line of code ? Thank you !Just like a sequential loop, when you do "foreach (i; parallel(I) ) { ... }", execution will not continue past the foreach loop until all the tasks associated with each element of I have finished. Your particular example of "X ~= i" in the body of the loop is not thread-safe, so if that is the code you really intend to run, you should protect X with a Mutex or something comparable.
Aug 31 2017
On 08/31/2017 06:59 PM, Brian wrote:Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out. I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicable here ?As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali
Aug 31 2017
On Friday, 1 September 2017 at 04:43:29 UTC, Ali Çehreli wrote:On 08/31/2017 06:59 PM, Brian wrote:Hello, thank you very much for your quick replies ! I was trying to make a trivial example, but the 'real' problem is trying to split a huge calculation to different threads. Schematically : double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily. Very interesting approach about letting only the thread touch a particular element, I will try that. FYI I did manage to make the following work, but not sure if this is really still multi-threaded ? int [] I; foreach (i; 0 .. 500) I ~= i; int [] X; // output class DerivedThread : Thread { private int [] i; this(int [] i){ this.i = i; super(&run); } private void run(){ synchronized{ // Need synchronization here ! foreach( i0; i) X ~= i0; } } } Thread [] threads; foreach (i; std.range.chunks( I, 50 ) ) threads ~= new DerivedThread( i); foreach( thread; threads) thread.start(); core.thread.thread_joinAll(); // Does in fact seem to 'join all' threads writeln(X);Hello, I am trying to get the most trivial example ofmultithreadingworking, but can't seem to figure it out. I want to split a task across threads, and wait for all thosetasks tofinish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicablehere ? As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali
Aug 31 2017
On 08/31/2017 10:27 PM, Brian wrote:the 'real' problem is trying to split a huge calculation to different threads.I still think you can take advantage of std.parallelism: https://dlang.org/phobos/std_parallelism.html Unfortunately, its features like asyncBuf, map, and amap do not stand out in the documentation. Here's my interpretation of them: http://ddili.org/ders/d.en/parallelism.html Ali
Sep 01 2017
On 09/01/2017 07:27 AM, Brian wrote:double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily.Works pretty well for me: ---- double [] hugeCalc(int i) { // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } void main() { static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 10); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); } ---- That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved! Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel: ---- auto _hugeCalcCache = new double[][](10); ----
Sep 01 2017
On Friday, 1 September 2017 at 20:02:23 UTC, ag0aep6g wrote:On 09/01/2017 07:27 AM, Brian wrote:Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : double [] hugeCalc(int i){ // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 100); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100) but this does seem to work using your other method of initialization : auto _hugeCalcCache = new double[][](100); foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); foreach( double[] x ; _hugeCalcCache) writeln( x ); // this now contains all values so I guess initializing the whole array at compile time makes it thread safe ? (The second case runs in 16 seconds on my computer.) Anyways it seems to work, thanks again !double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily.Works pretty well for me: ---- double [] hugeCalc(int i) { // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } void main() { static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 10); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); } ---- That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved! Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel: ---- auto _hugeCalcCache = new double[][](10); ----
Sep 04 2017
On 09/05/2017 03:15 AM, Brian wrote:Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : double [] hugeCalc(int i){ // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 100); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100)Yeah. As expected, associative array accesses are apparently not thread-safe. A simple writeln is a terrible way to figure that out, though. I'd suggest sorting the keys and comparing that to `I`: ---- import std.algorithm: equal, sort; auto sortedKeys = _hugeCalcCache.keys.sort; assert(sortedKeys.equal(I)); ----but this does seem to work using your other method of initialization : auto _hugeCalcCache = new double[][](100); foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); foreach( double[] x ; _hugeCalcCache) writeln( x ); // this now contains all values so I guess initializing the whole array at compile time makes it thread safe ?There's nothing compile-timey about the code. The initialization is done at run-time, but before the parallel stuff starts. Note that the type of `_hugeCalcCache` here is different from above. Here it's `double[][]`, i.e. a dynamic array. Above it's `double[][int]`, i.e. an associative array. Those types are quite different, despite their similar names. You can prepare an associative array in a similar way, before doing the parallel stuff. Then it might be thread-safe (not sure): ---- double[][int] _hugeCalcCache; /* associative array */ /* First initialize the elements serially: */ foreach(i; I) _hugeCalcCache[i] = []; /* Then do the huge calculations in parallel: */ foreach(i; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); ---- But if your keys are consecutive numbers, I see no point in using an associative array.
Sep 05 2017