www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Help with Concurrency

reply bertg <btgirton gmail.com> writes:
I am having trouble with a simple use of concurrency.

Running the following code I get 3 different tid's, multiple 
"sock in" messages printed, but no receives. I am supposed to get 
a "received!" for each "sock in", but I am getting hung up on 
"receiving...".

Am I misusing or misunderstanding the use of mailboxes?

===

class Connection {
     Reactor reactor;
     WebSocket webSocket;

     this(Reactor r, WebSocket ws)
     {
         reactor = r;
         webSocket = ws;

         messageLoop();
     }

     void messageLoop()
     {
         std.concurrency.Tid tid = std.concurrency.thisTid();
         writeln("tid 1 ~ " ~ 
to!string(std.concurrency.thisTid()));
         writeln("tid 2 ~ " ~ 
to!string(std.concurrency.thisTid()));
         writeln("tid 3 ~ " ~ 
to!string(std.concurrency.thisTid()));

         // deal with websocket messages
         spawn(&handleConnectionWebSocket, tid, cast(shared) 
webSocket);
         // deal with pub/sub
         //spawn();

         while (true) {
             writeln("receiving...");
             std.concurrency.receive(
                 (string msg) {
                     writeln("conn: received ws message: " ~ msg);
                 }
             );
             writeln("received!");
         }
     }
}
void handleConnectionWebSocket(std.concurrency.Tid caller, shared 
WebSocket ws)
{
     auto sock = cast(WebSocket) ws;
     while (sock.connected) {
         writeln("sock in");
         auto msgIn = sock.receiveText();
         std.concurrency.send(caller, msgIn);
     }
}
Nov 03 2015
next sibling parent reply Nicholas Wilson <iamthewilsonator hotmail.com> writes:
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
 I am having trouble with a simple use of concurrency.

 Running the following code I get 3 different tid's, multiple 
 "sock in" messages printed, but no receives. I am supposed to 
 get a "received!" for each "sock in", but I am getting hung up 
 on "receiving...".

 Am I misusing or misunderstanding the use of mailboxes?

 ===

 class Connection {
     Reactor reactor;
     WebSocket webSocket;

     this(Reactor r, WebSocket ws)
     {
         reactor = r;
         webSocket = ws;

         messageLoop();
     }

     void messageLoop()
     {
         std.concurrency.Tid tid = std.concurrency.thisTid();
         writeln("tid 1 ~ " ~ 
 to!string(std.concurrency.thisTid()));
         writeln("tid 2 ~ " ~ 
 to!string(std.concurrency.thisTid()));
         writeln("tid 3 ~ " ~ 
 to!string(std.concurrency.thisTid()));

         // deal with websocket messages
         spawn(&handleConnectionWebSocket, tid, cast(shared) 
 webSocket);
         // deal with pub/sub
         //spawn();
Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)
         while (true) {
             writeln("receiving...");
             std.concurrency.receive(
                 (string msg) {
                     writeln("conn: received ws message: " ~ 
 msg);
                 }
             );
             writeln("received!");
         }
     }
 }
 void handleConnectionWebSocket(std.concurrency.Tid caller, 
 shared WebSocket ws)
 {
     auto sock = cast(WebSocket) ws;
     while (sock.connected) {
         writeln("sock in");
         auto msgIn = sock.receiveText();
         std.concurrency.send(caller, msgIn);
     }
 }
Nov 03 2015
parent bertg <btgirton gmail.com> writes:
On Wednesday, 4 November 2015 at 01:27:57 UTC, Nicholas Wilson 
wrote:
 On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
[...]
Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)
 [...]
That didn't solve the problem. How would that solve the problem? std.concurrency.receive does not have a timeout either.
Nov 03 2015
prev sibling next sibling parent Marc =?UTF-8?B?U2Now7x0eg==?= <schuetzm gmx.net> writes:
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
         while (true) {
             writeln("receiving...");
             std.concurrency.receive(
                 (string msg) {
                     writeln("conn: received ws message: " ~ 
 msg);
                 }
             );
             writeln("received!");
         }
     }
 }
 void handleConnectionWebSocket(std.concurrency.Tid caller, 
 shared WebSocket ws)
 {
     auto sock = cast(WebSocket) ws;
     while (sock.connected) {
         writeln("sock in");
         auto msgIn = sock.receiveText();
         std.concurrency.send(caller, msgIn);
     }
 }
What is the type of `msgIn`? Try inserting `pragma(msg, typeof(msgIn))` after the line where it's declared and look at the compiler's output. My suspicion is that it's something like `char[]` or `const(char)[]`, which doesn't match the `string` (aka `immutable(char)[]`) you're trying to receive.
Nov 04 2015
prev sibling next sibling parent reply JR <sunspyre gmail.com> writes:
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
 Running the following code I get 3 different tid's, multiple 
 "sock in" messages printed, but no receives. I am supposed to 
 get a "received!" for each "sock in", but I am getting hung up 
 on "receiving...".
[...]
         while (true) {
             writeln("receiving...");
             std.concurrency.receive(
                 (string msg) {
                     writeln("conn: received ws message: " ~ 
 msg);
                 }
             );
             writeln("received!");
         }
Unless I'm reading it wrong, you want std.concurrency.receiveTimeout. import core.time; import std.concurrency; bool received = receiveTimeout(1.seconds, // negative makes it not wait at all (string msg) { writeln("conn: received ws message: " ~ msg); }); if (received) { writeln("received!"); } else { writeln("timed out..."); // stuff? } }
 Tries to receive but will give up if no matches arrive within 
 duration. Won't wait at all if provided core.time.Duration is 
 negative.
Nov 04 2015
parent JR <sunspyre gmail.com> writes:
On Wednesday, 4 November 2015 at 16:49:59 UTC, JR wrote:
[...]
And my indentation and brace-balancing there is wrong. Shows how dependent I've become on syntax highlighting. import core.time; import std.concurrency; bool received = receiveTimeout(1.seconds, writeln("conn: received ws message: " ~ msg); } ); if (received) { writeln("received!"); } else { writeln("timed out..."); // stuff? }
Nov 04 2015
prev sibling parent Dmitri <deemok gmail.com> writes:
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
 I am having trouble with a simple use of concurrency.

 Running the following code I get 3 different tid's, multiple 
 "sock in" messages printed, but no receives. I am supposed to 
 get a "received!" for each "sock in", but I am getting hung up 
 on "receiving...".

 [...]
I had a similarily odd experience with std.concurrency - my receive would not work unless I also received on Variant, although the Variant receiver was a no-op: receive( (Event event) { // handle event }, (Variant v) {} );
Nov 05 2015