www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - How to implement Canceleable spawn() from parent

reply aberba <karabutaworld gmail.com> writes:
Trying to implement a setInterval() that I can cancel using:

Tid tid = setInterval(2000, (){ writeln("hello");})

And then I can do:

stopInterval(tid);


With something like this:

stopInterval(Tid tid) {
     send(tid, "cancel");
}

import std.stdio : writeln;
import std.concurrency : receive, spawn, thisTid, Tid;

auto setInterval(long milliseconds, void delegate() callback)
{
     auto worker(long mls, void delegate() cb)
     {
         import core.thread.osthread : Thread;
         import std.datetime : seconds, msecs;

         writeln("Starting ", thisTid, "...");

         bool done = false;

         receive((string text) {
            writeln("Received string: ", text);
            done = true;
         });

         do
         {
             // or receive() comes here?

             Thread.sleep(mls.msecs);
             cb();
         }
         while (!done);
     }

     // I guess issue is with the callback
     Tid id = spawn(&worker, milliseconds, &callback);

     return id;
}



Getting error:

Error: template std.concurrency.spawn cannot deduce function from 
argument types !()(void delegate(Tid id)  system, Tid), 
candidates are:
/usr/include/dmd/phobos/std/concurrency.d(460,5):        spawn(F, 
T...)(F fn, T args)
   with F = void delegate(Tid)  system,
        T = (Tid)
   must satisfy the following constraint:
        isSpawnable!(F, T)


Am I even using the right tool here?
Jun 28 2020
parent reply Stanislav Blinov <stanislav.blinov gmail.com> writes:
On Sunday, 28 June 2020 at 13:29:08 UTC, aberba wrote:

 Getting error:

 Error: template std.concurrency.spawn cannot deduce function 
 from argument types !()(void delegate(Tid id)  system, Tid), 
 candidates are:
 /usr/include/dmd/phobos/std/concurrency.d(460,5):        
 spawn(F, T...)(F fn, T args)
   with F = void delegate(Tid)  system,
        T = (Tid)
   must satisfy the following constraint:
        isSpawnable!(F, T)
The error you're getting is because you're passing a pointer to a delegate instead of a delegate.
     Tid id = spawn(&worker, milliseconds, &callback);
^ here But fixing that still won't compile, because when you want to pass a delegate to `spawn`, it needs to be a shared delegate. If I understood your intent correctly, here's how you can do it: import std.stdio : writeln; import std.concurrency; import core.thread.osthread : Thread; import std.datetime.stopwatch; auto setInterval(long milliseconds, void function() callback) { static void worker(Duration d, void function() cb) { writeln("Starting ", thisTid, "..."); bool done = false; StopWatch sw; sw.start; while (true) { // wait for messages for a timespan of at least `d` receiveTimeout( d, (string text) { writeln("Received string: ", text); if (text == "cancel") done = true; }); if (done) break; // a non-cancelling message might've been received before timeout, // so test if it's really time for the callback if (sw.peek >= d) { cb(); sw.reset; } } } Tid id = spawn(&worker, milliseconds.msecs, callback); return id; } void stopInterval(Tid tid) { send(tid, "cancel"); } void main() { auto tid = setInterval(1000, { writeln("tick"); }); Thread.sleep(2.seconds); send(tid, "not cancel"); Thread.sleep(5.seconds); stopInterval(tid); }
Jun 28 2020
next sibling parent reply aberba <karabutaworld gmail.com> writes:
On Sunday, 28 June 2020 at 14:23:01 UTC, Stanislav Blinov wrote:
 On Sunday, 28 June 2020 at 13:29:08 UTC, aberba wrote:
Thanks. I believe this: StopWatch sw; sw.start; works becuse D structs are initialized by default, right? I've never actually done it this way. Little details.
Jun 28 2020
next sibling parent reply aberba <karabutaworld gmail.com> writes:
On Sunday, 28 June 2020 at 23:02:26 UTC, aberba wrote:
 On Sunday, 28 June 2020 at 14:23:01 UTC, Stanislav Blinov wrote:
 On Sunday, 28 June 2020 at 13:29:08 UTC, aberba wrote:
Thanks. I believe this: StopWatch sw; sw.start; works becuse D structs are initialized by default, right? I've never actually done it this way. Little details.
So I checked receiveTimeout() when I was looking for what I could use. I wish there was an example in the docs. https://dlang.org/library/std/concurrency/receive_timeout.html
Jun 28 2020
parent =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 6/28/20 4:08 PM, aberba wrote:

 So I checked receiveTimeout() when I was looking for what I could use. I 
 wish there was an example in the docs.
 
 https://dlang.org/library/std/concurrency/receive_timeout.html
I have an example of it: http://ddili.org/ders/d.en/concurrency.html#ix_concurrency.receiveTimeout Ali
Jun 28 2020
prev sibling parent reply Stanislav Blinov <stanislav.blinov gmail.com> writes:
On Sunday, 28 June 2020 at 23:02:26 UTC, aberba wrote:

 I believe this:

 StopWatch sw;
 sw.start;

 works becuse D structs are initialized by default, right?
 I've never actually done it this way. Little details.
Yup. You can also do a auto sw = StopWatch(AutoStart.yes); and not have to call `start` explicitly.
Jun 28 2020
parent reply aberba <karabutaworld gmail.com> writes:
On Sunday, 28 June 2020 at 23:39:07 UTC, Stanislav Blinov wrote:
 On Sunday, 28 June 2020 at 23:02:26 UTC, aberba wrote:

 I believe this:

 StopWatch sw;
 sw.start;

 works becuse D structs are initialized by default, right?
 I've never actually done it this way. Little details.
Yup. You can also do a auto sw = StopWatch(AutoStart.yes); and not have to call `start` explicitly.
Interesting. I should look into Phobos more.
Jun 28 2020
parent Johann Lermer <johann.lermer elvin.eu> writes:
I'm doing this in an X11 application in order to send a timer 
event every 100 milliseconds to the main event queue.

class Application
{
   shared private bool s_tick;

   void clock_task (shared X11.Display* disp, X11.Atom atom, 
X11.Window win)
   {
     for (;;)
     {
       try
       {
         receiveTimeout (100.msecs);

         if (disp && atomicLoad(s_tick))
         {
           // disable ticking until it is allowed again at the end 
of the event loop
           atomicStore(s_tick, false);

           X11.XClientMessageEvent event;
           event.type         = X11.ClientMessage;
           event.window       = win;
           event.message_type = atom;
           event.format       = 32;
           event.data.l       = [0, 0, 0, 0, 0];

           X11.XSendEvent (cast (X11.Display*) disp, win, 0, 0,  
cast(X11.XEvent*)&event);
           X11.XFlush (cast (X11.Display*) disp);
         }
       }
       catch (Throwable)
       {
         return;
       }
     }
   }

   this ()
   {
...
     spawn (&clock_task, cast(shared)x11Display, x11SigClockAtom, 
_x11_proxyWindow);
   }

   run ()
   {
     while (true)
     {
...
       // event processing starts here: read in X11 event and 
convert it to a wit Event
       X11.XEvent x11_event;
       X11.XNextEvent (_x11.display, &x11_event);
...
       atomicStore(s_tick, true);
     }
   }
}
Jun 28 2020
prev sibling parent reply aberba <karabutaworld gmail.com> writes:
On Sunday, 28 June 2020 at 14:23:01 UTC, Stanislav Blinov wrote:
 On Sunday, 28 June 2020 at 13:29:08 UTC, aberba wrote:

        [...]
The error you're getting is because you're passing a pointer to a delegate instead of a delegate. [...]
So with this, without the Thread.sleep() to block main from exiting, the spawned thread will terminate immediately. How do I keep it from happening? Keep it running continuously? From the docs, it says OwnerTerminated exception gets thrown when the sending thread (e.i. main) is terminated.
 Thrown on calls to receive if the thread that spawned the 
 receiving thread has terminated and no more messages exist.
Jun 29 2020
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main. Another way would be to wait for a worker's exit by looking for LinkTerminated but you need to start the thread with spawnLinked: http://ddili.org/ders/d.en/concurrency.html#ix_concurrency.LinkTerminated Ali
Jun 29 2020
parent reply aberba <karabutaworld gmail.com> writes:
On Tuesday, 30 June 2020 at 00:33:41 UTC, Ali Çehreli wrote:
 On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from
exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main.
So I tried that initially but my (){ writeln(...) } wasn't printing anything in console. Could that be related to stdout buffering? The program kept running though.
 Another way would be to wait for a worker's exit by looking for 
 LinkTerminated but you need to start the thread with 
 spawnLinked:
Read that too, but doesn't seem like the desired behavior I want. So here's the thing, unlike JavaScript, the D behavior seems to be like a while(bool){} has to be placed directly within the scope of main. Was thinking as long my while loop in setInterval() was running, spawn() will be kept alive. It seem both vibe.d and arsd have a similar setInterval() implementation and they both require using their event loop to keep the program alive. In my case, wanted setInterval() to behave like it's own event loop without placing it in a while(bool){} loop in main()
Jun 30 2020
parent reply Simen =?UTF-8?B?S2rDpnLDpXM=?= <simen.kjaras gmail.com> writes:
On Tuesday, 30 June 2020 at 08:15:54 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 00:33:41 UTC, Ali Çehreli wrote:
 On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from
exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main.
So I tried that initially but my (){ writeln(...) } wasn't printing anything in console. Could that be related to stdout buffering? The program kept running though.
Seems weird. This works great on my machine: import core.time : Duration, msecs; import core.thread : Thread, thread_joinAll; import std.concurrency : spawn, Tid, send, receiveTimeout; import std.stdio : writeln; private struct IntervalStop {} Tid setInterval(Duration dur, void function() fn) { return spawn((Duration d, void function() f){ while (!receiveTimeout(d, (IntervalStop s){})) { f(); } }, dur, fn); } void stopInterval(Tid tid) { tid.send(IntervalStop()); } void main() { auto a = setInterval(1000.msecs, (){ writeln("Hello from spawned thread A"); }); // Stop it before it happens stopInterval(a); Thread.sleep(2000.msecs); auto b = setInterval(1000.msecs, (){ writeln("Hello from spawned thread B"); }); // Let this one run a little Thread.sleep(2500.msecs); stopInterval(b); auto c = setInterval(1000.msecs, (){ writeln("Hello from spawned thread C"); }); // Sending the wrong message doesn't make it happen or stop prematurely c.send("Stop this at once!"); Thread.sleep(2500.msecs); stopInterval(c); thread_joinAll(); } So I guess the error is elsewhere, but I'm not sure where and how. -- Simen
Jun 30 2020
parent reply aberba <karabutaworld gmail.com> writes:
On Tuesday, 30 June 2020 at 12:48:32 UTC, Simen Kjærås wrote:
 On Tuesday, 30 June 2020 at 08:15:54 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 00:33:41 UTC, Ali Çehreli wrote:
 On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from
exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main.
So I tried that initially but my (){ writeln(...) } wasn't printing anything in console. Could that be related to stdout buffering? The program kept running though.
 So I guess the error is elsewhere, but I'm not sure where and 
 how.
Yeah, you're right. I changed receiveTimeout() to receive() to try something and forgot to change it back. Jeez, I hate myself. Thanks. So how can I now hide the core.thread.thread_joinAll so the library user doesn't have to type it themselves in main() ? I don't see how that can be done.
Jun 30 2020
next sibling parent Steven Schveighoffer <schveiguy gmail.com> writes:
On 6/30/20 9:44 AM, aberba wrote:
 On Tuesday, 30 June 2020 at 12:48:32 UTC, Simen Kjærås wrote:
 So I guess the error is elsewhere, but I'm not sure where and how.
Yeah, you're right. I changed receiveTimeout() to receive() to try something and forgot to change it back. Jeez, I hate myself. Thanks. So how can I now hide the core.thread.thread_joinAll so the library user doesn't have to type it themselves in main() ? I don't see how that can be done.
I assume you need something more than thread_joinAll, because you need to stop all the threads from executing also. So wrapping this up into a single call would be what you use (it's OK to ask the user to clean up a library manually). -Steve
Jun 30 2020
prev sibling parent reply Simen =?UTF-8?B?S2rDpnLDpXM=?= <simen.kjaras gmail.com> writes:
On Tuesday, 30 June 2020 at 13:44:38 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 12:48:32 UTC, Simen Kjærås wrote:
 On Tuesday, 30 June 2020 at 08:15:54 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 00:33:41 UTC, Ali Çehreli wrote:
 On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from
exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main.
So I tried that initially but my (){ writeln(...) } wasn't printing anything in console. Could that be related to stdout buffering? The program kept running though.
 So I guess the error is elsewhere, but I'm not sure where and 
 how.
Yeah, you're right. I changed receiveTimeout() to receive() to try something and forgot to change it back. Jeez, I hate myself. Thanks. So how can I now hide the core.thread.thread_joinAll so the library user doesn't have to type it themselves in main() ? I don't see how that can be done.
__gshared Tid mainTid; static this() { if (mainTid.tupleof[0] is null) { mainTid = thisTid; } } static ~this() { if (thisTid == mainTid) { thread_joinAll(); } } The above code does the trick. So, what does it do? __gshared means 'this variable is accessible to all threads'. static this() runs upon creation of any thread including the main thread. Since the main thread will run first*, it gets to store its Tid in mainTid, and every other thread will see a populated mainTid and leave it alone. In the module destructor, which runs after main(), we call thread_joinAll() iff we're the main thread. Now, why should you not do this? Well first, instead of getting a tidy crash you get a process that doesn't end. Second, there's the race conditions described below. Third, there's the principle of least astonishment. D programmers expect that when main() returns, the program will exit shortly(ish), while this zombie could continue running indefinitely. -- Simen *I'm pretty sure this is possibly wrong, if a module constructor spawns a new thread. There's also a possible race condition where newly spawned modules may conceivably not see a properly initialized mainTid.
Jun 30 2020
parent reply Steven Schveighoffer <schveiguy gmail.com> writes:
On 6/30/20 10:15 AM, Simen Kjærås wrote:
 On Tuesday, 30 June 2020 at 13:44:38 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 12:48:32 UTC, Simen Kjærås wrote:
 On Tuesday, 30 June 2020 at 08:15:54 UTC, aberba wrote:
 On Tuesday, 30 June 2020 at 00:33:41 UTC, Ali Çehreli wrote:
 On 6/29/20 4:34 PM, aberba wrote:

 So with this, without the Thread.sleep() to block main from
exiting, the
 spawned thread  will terminate immediately.
You can call core.thread.thread_joinAll at the end of main.
So I tried that initially but my (){ writeln(...) } wasn't printing anything in console. Could that be related to stdout buffering? The program kept running though.
 So I guess the error is elsewhere, but I'm not sure where and how.
Yeah, you're right. I changed receiveTimeout() to receive() to try something and forgot to change it back. Jeez, I hate myself. Thanks. So how can I now hide the core.thread.thread_joinAll so the library user doesn't have to type it themselves in main() ? I don't see how that can be done.
__gshared Tid mainTid; static this() {     if (mainTid.tupleof[0] is null) {         mainTid = thisTid;     } } static ~this() {     if (thisTid == mainTid) {         thread_joinAll();     } }
First, you can just use shared static dtor, as this runs once at the end of the program. At the very least, you can run the setting of mainTid in a shared constructor to avoid the race conditions (also no need to check if its set already). Second, I realized, thread_joinAll is already being done by the runtime: https://github.com/dlang/druntime/blob/67618bd2dc8905ad5dee95f3329109aebd839b74/src/rt/dmain2.d#L226 So the question really becomes -- why is it necessary to call thread_joinAll in main? It's because the main thread's TLS static destructor is closing the owner mailbox, which is sending a message to all the threads that the owner is terminated, causing your threads to exit immediately. See here: https://github.com/dlang/phobos/blob/268b56be494cc4f76da54a66a6960fa7e527c4ed/std/concurrency.d#L223 Honestly though, I think this is the correct behavior -- if you exit main, you are expecting the program to not hang indefinitely. -Steve
Jun 30 2020
parent reply aberba <karabutaworld gmail.com> writes:
On Tuesday, 30 June 2020 at 14:43:40 UTC, Steven Schveighoffer 
wrote:
 On 6/30/20 10:15 AM, Simen Kjærås wrote:
 [...]
My thinking is I don't want regular consumers using the package to think about the technicality of thread_joinAll() at all. Thinking about putting it in a mixin like: mixin KeepRunning; Or something
Jul 01 2020
parent =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 7/1/20 2:41 AM, aberba wrote:
 On Tuesday, 30 June 2020 at 14:43:40 UTC, Steven Schveighoffer wrote:
 On 6/30/20 10:15 AM, Simen Kj=C3=A6r=C3=A5s wrote:
 [...]
=20 My thinking is I don't want regular consumers using the package to thin=
k=20
 about the technicality of thread_joinAll() at all.
=20
 Thinking about putting it in a mixin like:
=20
 mixin KeepRunning;
=20
 Or something
How about main() starts a thread that starts all the other threads?=20 Then, thread_joinAll() would go inside the non-main :) thread. However, Steve is right: When main() exits, all threads will and should=20 exit. Ali
Jul 01 2020