digitalmars.D.learn - Help with Concurrency
- bertg (51/51) Nov 03 2015 I am having trouble with a simple use of concurrency.
- Nicholas Wilson (3/55) Nov 03 2015 Try replacing the following loop to have a receive that times out
- bertg (4/9) Nov 03 2015 That didn't solve the problem. How would that solve the problem?
- Marc =?UTF-8?B?U2Now7x0eg==?= (6/28) Nov 04 2015 What is the type of `msgIn`? Try inserting `pragma(msg,
- JR (19/36) Nov 04 2015 Unless I'm reading it wrong, you want
- JR (16/17) Nov 04 2015 And my indentation and brace-balancing there is wrong. Shows how
- Dmitri (11/17) Nov 05 2015 I had a similarily odd experience with std.concurrency - my
I am having trouble with a simple use of concurrency. Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...". Am I misusing or misunderstanding the use of mailboxes? === class Connection { Reactor reactor; WebSocket webSocket; this(Reactor r, WebSocket ws) { reactor = r; webSocket = ws; messageLoop(); } void messageLoop() { std.concurrency.Tid tid = std.concurrency.thisTid(); writeln("tid 1 ~ " ~ to!string(std.concurrency.thisTid())); writeln("tid 2 ~ " ~ to!string(std.concurrency.thisTid())); writeln("tid 3 ~ " ~ to!string(std.concurrency.thisTid())); // deal with websocket messages spawn(&handleConnectionWebSocket, tid, cast(shared) webSocket); // deal with pub/sub //spawn(); while (true) { writeln("receiving..."); std.concurrency.receive( (string msg) { writeln("conn: received ws message: " ~ msg); } ); writeln("received!"); } } } void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws) { auto sock = cast(WebSocket) ws; while (sock.connected) { writeln("sock in"); auto msgIn = sock.receiveText(); std.concurrency.send(caller, msgIn); } }
Nov 03 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:I am having trouble with a simple use of concurrency. Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...". Am I misusing or misunderstanding the use of mailboxes? === class Connection { Reactor reactor; WebSocket webSocket; this(Reactor r, WebSocket ws) { reactor = r; webSocket = ws; messageLoop(); } void messageLoop() { std.concurrency.Tid tid = std.concurrency.thisTid(); writeln("tid 1 ~ " ~ to!string(std.concurrency.thisTid())); writeln("tid 2 ~ " ~ to!string(std.concurrency.thisTid())); writeln("tid 3 ~ " ~ to!string(std.concurrency.thisTid())); // deal with websocket messages spawn(&handleConnectionWebSocket, tid, cast(shared) webSocket); // deal with pub/sub //spawn();Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)while (true) { writeln("receiving..."); std.concurrency.receive( (string msg) { writeln("conn: received ws message: " ~ msg); } ); writeln("received!"); } } } void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws) { auto sock = cast(WebSocket) ws; while (sock.connected) { writeln("sock in"); auto msgIn = sock.receiveText(); std.concurrency.send(caller, msgIn); } }
Nov 03 2015
On Wednesday, 4 November 2015 at 01:27:57 UTC, Nicholas Wilson wrote:On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:That didn't solve the problem. How would that solve the problem? std.concurrency.receive does not have a timeout either.[...]Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)[...]
Nov 03 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:while (true) { writeln("receiving..."); std.concurrency.receive( (string msg) { writeln("conn: received ws message: " ~ msg); } ); writeln("received!"); } } } void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws) { auto sock = cast(WebSocket) ws; while (sock.connected) { writeln("sock in"); auto msgIn = sock.receiveText(); std.concurrency.send(caller, msgIn); } }What is the type of `msgIn`? Try inserting `pragma(msg, typeof(msgIn))` after the line where it's declared and look at the compiler's output. My suspicion is that it's something like `char[]` or `const(char)[]`, which doesn't match the `string` (aka `immutable(char)[]`) you're trying to receive.
Nov 04 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...".[...]while (true) { writeln("receiving..."); std.concurrency.receive( (string msg) { writeln("conn: received ws message: " ~ msg); } ); writeln("received!"); }Unless I'm reading it wrong, you want std.concurrency.receiveTimeout. import core.time; import std.concurrency; bool received = receiveTimeout(1.seconds, // negative makes it not wait at all (string msg) { writeln("conn: received ws message: " ~ msg); }); if (received) { writeln("received!"); } else { writeln("timed out..."); // stuff? } }Tries to receive but will give up if no matches arrive within duration. Won't wait at all if provided core.time.Duration is negative.
Nov 04 2015
On Wednesday, 4 November 2015 at 16:49:59 UTC, JR wrote:[...]And my indentation and brace-balancing there is wrong. Shows how dependent I've become on syntax highlighting. import core.time; import std.concurrency; bool received = receiveTimeout(1.seconds, writeln("conn: received ws message: " ~ msg); } ); if (received) { writeln("received!"); } else { writeln("timed out..."); // stuff? }
Nov 04 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:I am having trouble with a simple use of concurrency. Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...". [...]I had a similarily odd experience with std.concurrency - my receive would not work unless I also received on Variant, although the Variant receiver was a no-op: receive( (Event event) { // handle event }, (Variant v) {} );
Nov 05 2015