www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - parallelism with message passing

Hello,
Im starting one process in the main thread who in turn is going 
to kick off another process but has no way of passing along 
main's Tid, hence the shared(Tid) nonsense.
* This works in serial (i.e. change workUnitSize > x.length in 
mains foreach loop).
* It also works when passing the tid directly into f.run(mainTid) 
with workUnitSize == 1
But I get a crash when attempting to run in parallel with a 
"shared" tid (see below).
Any ideas how to make this work?
Thanks,
Josh

// garbage: newb playing with threads
module test;
import core.thread;
import std.concurrency, std.parallelism, std.stdio;

shared(Tid) mainTid;

struct Foo {
     void run() {
         Thread.sleep(dur!"seconds"(1));
         writeln(cast(Tid)mainTid);
         send(cast(Tid)mainTid, true);
     }
}

void someAction(Tid tid) {
     mainTid = cast(shared(Tid))tid;
}

void main() {
     Foo[] x = [Foo(), Foo(), Foo()];
     spawn( &someAction, thisTid );
     foreach(f; taskPool.parallel(x, 1)) {
         f.run();
         receiveTimeout(dur!"seconds"(3),
             (bool x) { writeln("received"); }
         );
     }
}
// exception output:
core.exception.AssertError /usr/share/dmd/src/phobos/std/concurrency.d(1007): 
null this
----------------
5   test                                0x000000010604a4ad 
_d_assert_msg + 69
6   test                                0x0000000106032523 bool 
std.concurrency.MessageBox.get!(core.time.Duration, void 
function(bool)*).get(scope core.time.Duration, scope void 
function(bool)*) + 83
7   test                                0x00000001060324bb bool 
std.concurrency.receiveTimeout!(void 
function(bool)*).receiveTimeout(core.time.Duration, void 
function(bool)*) + 59
8   test                                0x000000010602c5c7 void 
test.main().int __foreachbody1416(ref test.Foo) + 71
9   test                                0x00000001060320f0 int 
std.parallelism.ParallelForeach!(test.Foo[]).ParallelForeach.opApply(scope 
int delegate(ref test.Foo)).void doIt() + 256
10  test                                0x00000001060583f4 void 
std.parallelism.run!(void delegate()).run(void delegate()) + 20
11  test                                0x000000010605800c void 
std.parallelism.__T4TaskS213std11parallelism3runTDFZvZ.Task.impl(void*) 
+ 24
12  test                                0x0000000106056da3 void 
std.parallelism.AbstractTask.job() + 15
13  test                                0x0000000106056df9 void 
std.parallelism.TaskPool.doJob(std.parallelism.AbstractTask*) + 33
14  test                                0x0000000106056f20 void 
std.parallelism.TaskPool.workLoop() + 132
15  test                                0x000000010603f915 void 
core.thread.Thread.run() + 49
16  test                                0x000000010603ec5a 
thread_entryPoint + 334
17  libsystem_c.dylib                   0x00007fff9681f742 
_pthread_start + 327
18  libsystem_c.dylib                   0x00007fff9680c181 
thread_start + 13
19  ???                                 0x0000000000000000 0x0 + 0
----------------
Nov 20 2012