digitalmars.D - [vibe.d] WebSocket mess with "cast(shared) ..."
- madkote (211/212) Aug 09 2023 Hi,
Hi,
the idea is to perform some heavy calculation in thread and send
results directly through WS connection back to client.
Note, connection close is handled - so ws connection is open
during worker thread is executing.
Again, maybe I do something wrong here? I guess, there is some
mess with references to websocket object.
On the first run (client connects to server), there are only one
or two client, which do not receive all results. On second
attempt to connect to server, clients __NEVER__ receive any
results from worker threads (which are created as new on each
connection).
```d
/*
* Case 01
* -------
* Worker task is executed always in same worker thread.
*
* How-To:
* - open 4 parallel WS connection to service.
* - in each connection send channel id, eg. "ch01", "ch02", ...
* - observe logs
* - worker thread name is always same
* - worker cannot write to WS?
*/
import vibe.d;
import vibe.vibe;
import vibe.core.core;
import vibe.http.server;
import vibe.http.websockets;
import vibe.http.router;
import vibe.inet.url;
import core.time;
import core.thread : Thread;
import std.conv;
static void workerFuncPingPongWS(Task caller, string channel_id,
shared WebSocket s) nothrow {
WebSocket ws = cast(WebSocket) s;
int counter = 5;
try {
logInfo("WORKER :: thread-id=%s caller=%s channel-id=%s
THREAD=%s", thisTid, caller, channel_id, Thread.getThis().name);
while (receiveOnly!string() == "ping" && --counter) {
logInfo("%s :: %s :: pong=%s", Thread.getThis().name,
channel_id, counter);
try {
ws.send("pong-" ~ channel_id ~ "-" ~ Thread.getThis().name);
} catch (Exception o) {
logError(">>> exception=%s", o);
}
caller.send("pong");
sleep(2.seconds);
}
caller.send("goodbye");
} catch (Exception e) assert(false, e.msg);
}
class WebsocketService {
path("/ws") void getWebsocket1(scope WebSocket ws){
logInfo("X> connected=%s, ws=%s code=%s THREAD=%s",
ws.connected, &ws, ws.closeCode, Thread.getThis().name);
auto channel_id = ws.receiveText;
logInfo("Receive channel '%s'.", channel_id);
auto callee = runWorkerTaskH(&workerFuncPingPongWS,
Task.getThis, channel_id, cast(shared) ws);
do {
logInfo("ping");
callee.send("ping");
} while (receiveOnly!string() == "pong");
while (true) {
auto txt = ws.receiveText;
logInfo("Receive '%s'. thisTid=%s", txt, thisTid);
if (txt == "stop") {
break;
}
ws.send(txt ~ " pong");
}
logInfo("Client disconnected - worker is done. THREAD=%s",
Thread.getThis().name);
}
}
void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{
res.writeBody("Hello");
}
void main()
{
logInfo("APP::CASE::01");
auto router = new URLRouter;
router.registerWebInterface(new WebsocketService());
router.get("/hello", &helloWorld);
auto settings = new HTTPServerSettings;
settings.port = 8080;
settings.bindAddresses = ["::1", "127.0.0.1"];
auto listener = listenHTTP(settings, router);
scope (exit)
{
listener.stopListening();
}
runApplication();
}
```
server console output
```bash
[main(----) INF] APP::CASE::01
[main(----) INF] Listening for requests on http://[::1]:8080/
[main(----) INF] Listening for requests on http://127.0.0.1:8080/
[main(E7UW) INF] X> connected=true, ws=7F34D6BA1DD8 code=0
THREAD=main
[main(0Bjv) INF] X> connected=true, ws=7F34D8BA3DD8 code=0
THREAD=main
[main(0MxP) INF] X> connected=true, ws=7F34D7BA2DD8 code=0
THREAD=main
[main(bsKy) INF] X> connected=true, ws=7F34D5BA0DD8 code=0
THREAD=main
[main(E7UW) INF] Receive channel '2-fibonacci'.
[main(0Bjv) INF] Receive channel '0-fibonacci'.
[main(0MxP) INF] Receive channel '3-fibonacci'.
[main(bsKy) INF] Receive channel '1-fibonacci'.
[vibe-15(biLB) INF] WORKER :: thread-id=Tid(7f34db0876e0)
caller=7F34DB074C00:1 channel-id=2-fibonacci THREAD=vibe-15
[vibe-4(tmZQ) INF] WORKER :: thread-id=Tid(7f34db087840)
caller=7F34DB074A00:1 channel-id=3-fibonacci THREAD=vibe-4
[vibe-15(AXdw) INF] WORKER :: thread-id=Tid(7f34db087790)
caller=7F34DB074800:3 channel-id=0-fibonacci THREAD=vibe-15
[vibe-15(DCEi) INF] WORKER :: thread-id=Tid(7f34db0878f0)
caller=7F34DB074E00:1 channel-id=1-fibonacci THREAD=vibe-15
exception=object.Exception ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/v
be/core/net.d(819): Error writing data to socket.
----------------
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/net.d:819 safe
ulong vibe.core.net.TCPConnection.write(scope const(ubyte)[],
eventcore.driver.IOMode) [0x55db0d143fca]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfacep
oxy.d-mixin-302:302 safe ulong
vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.Stream).InterfaceProxy.ProxyImpl!(vibe.core.net.TCPConnection).ProxyImpl.__mixin8.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3.
_mixin2.write(scope void[], scope const(ubyte)[], eventcore.driver.IOMode)
[0x55db0cf30395]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfacep
oxy.d-mixin-196:196 safe ulong
vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.OutputStream).InterfaceProxy.__mixin22.__mixin3.
_mixin2.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0cfcf442]
../../.dub/packages/vibe-d/0.9.6/vibe-d/stream/vibe/stream/wrapper.d:199 safe
ulong vibe.stream.wrapper.ConnectionProxyStream.write(scope const(ubyte)[],
eventcore.driver.IOMode) [0x55db0d12ce3b]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/stream.d:305
safe void vibe.core.stream.OutputStream.write(scope const(ubyte)[])
[0x55db0d14d69e]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:959 safe
void vibe.http.websockets.OutgoingWebSocketMessage.sendFrame(bool)
[0x55db0d03a211]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:942 safe
void vibe.http.websockets.OutgoingWebSocketMessage.finalize() [0x55db0d03a044]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:688 safe
void vibe.http.websockets.WebSocket.send(scope void delegate(scope
vibe.http.websockets.OutgoingWebSocketMessage) safe,
vibe.http.websockets.FrameOpcode).__lambda3() [0x55db0d03b44c]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/sync.d:189 safe
void vibe.core.sync.performLocked!(vibe.http.websockets.WebSocket.send(scope
void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) safe,
vibe.http.websockets.FrameOpcode).__lambda3(),
vibe.core.sync.InterruptibleTaskMutex).performLocked(vibe.core.sync.Int
rruptibleTaskMutex) [0x55db0d03b361]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:685 safe
void vibe.http.websockets.WebSocket.send(scope void delegate(scope
vibe.http.websockets.OutgoingWebSocketMessage) safe,
vibe.http.websockets.FrameOpcode) [0x55db0d038fc3]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:657 safe
void vibe.http.websockets.WebSocket.send(scope const(char)[]) [0x55db0d038ef3]
source/app_forum_01_task_thread.d:33 nothrow void
app_forum_01_task_thread.workerFuncPingPongWS(vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)) [0x55db0cf1b209]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/taskpool.d:211
nothrow void vibe.core.taskpool.TaskPool.doRunTaskH!(void
function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)).doRunTaskH(vibe.cor
.task.TaskSettings, void function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref
immutable(char)[], ref
shared(vibe.http.websockets.WebSocket)).taskFun(vibe.core.channel.Channel!(
ibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*,
vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket))
[0x55db0cee96d3]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:737
nothrow void vibe.core.task.TaskFuncInfo.set!(void
function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void
function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*,
vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void
function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)).set(ref void
function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void
function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task,
immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref
vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, ref void
function(vibe.core.task.Task, immutable(char)[],
shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref
immutable(char)[], ref shared(vibe.http.websockets.WebSocket)).callDelegate(ref
vibe.core.task.TaskFuncInfo) [0x55db0ceec2f6]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:758 void
vibe.core.task.TaskFuncInfo.call() [0x55db0d15edc5]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:457
nothrow void vibe.core.task.TaskFiber.run() [0x55db0d15dfbe]
??:? void core.thread.context.Callable.opCall() [0x55db0d1e3de8]
??:? fiber_entryPoint [0x55db0d23fe1b]
...
```
client console output. As you can see channel `0-fibonacci` did
not received results from worker thread!
```bash
...
3-fibonacci results:
['pong-3-fibonacci-vibe-4',
'pong-3-fibonacci-vibe-4',
'pong-3-fibonacci-vibe-4',
'pong-3-fibonacci-vibe-4',
'fibonacci pong',
'0 pong',
'1 pong',
'2 pong',
'3 pong',
'4 pong']
2-fibonacci results:
['pong-2-fibonacci-vibe-15',
'pong-2-fibonacci-vibe-15',
'pong-2-fibonacci-vibe-15',
'pong-2-fibonacci-vibe-15',
'fibonacci pong',
'0 pong',
'1 pong',
'2 pong',
'3 pong',
'4 pong']
0-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4
pong']
1-fibonacci results:
['pong-1-fibonacci-vibe-15',
'pong-1-fibonacci-vibe-15',
'pong-1-fibonacci-vibe-15',
'pong-1-fibonacci-vibe-15',
'fibonacci pong',
'0 pong',
'1 pong',
'2 pong',
'3 pong',
'4 pong']
```
client console output on 2nd attempt (server worker thread has
only errors on sending data from the thread). Here none of
clients received any results from worker threads.
```bash
1-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4
pong']
3-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4
pong']
2-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4
pong']
0-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4
pong']
```
BTW, see also my other question related to `runWorkerTaskH`. Many
Thanks!
Aug 09 2023








madkote <schroeder.compling googlemail.com>