digitalmars.D - Peeking concurrency messages
- Anonymouse (38/38) Jun 23 2019 Concurrency messages are sent via std.concurrency's send and
- Francesco Mecca (4/13) Jun 23 2019 Can you explain your use case? Maybe there is an elegant solution
- Anonymouse (66/80) Jun 23 2019 TL;DR: I need to check for messages very often, and they each
- Francesco Mecca (24/107) Jun 25 2019 First of all, kudos for Kameloso, it is a very nice project.
- ikod (8/11) Jun 25 2019 Another possible design (sorry if you already considered it and
- ikod (8/26) Jun 25 2019 Another possible design (sorry if you already considered it and
- Francesco Mecca (3/3) Jun 23 2019 if you are ok with an external dependency, vibe has a channel
Concurrency messages are sent via std.concurrency's send and receive functions. Receiving a message consumes it, in the sense that receiving again will catch the next message. The only way of telling whether there is one waiting is to consume the first and commit to handling it. In other words, there is currently no way of checking whether the message box is empty; all the (thread ID Tid) member variables via which you could tell are private to the std.concurrency module. Is there any chance we could get the ability to see if a Tid has messages waiting? --- diff --git a/std/concurrency.d b/std/concurrency.d index c89b6c3b4..65850fd38 100644 --- a/std/concurrency.d +++ b/std/concurrency.d -334,6 +334,13 public: formattedWrite(sink, "Tid(%x)", cast(void*) mbox); } + /** + * Returns whether or not this Tid has messages waiting. + */ + bool hasMessages() safe pure nothrow nogc + { + return (mbox.m_localMsgs + mbox.m_sharedBox.length) > 0; + } } system unittest --- void foo() { assert(!thisTid.hasMessages); thisTid.send(123); assert(thisTid.hasMessages); int i = receiveOnly!int(); assert(!thisTid.hasMessages); } This would be immensely useful to me and I would use it straight away.
Jun 23 2019
On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:Concurrency messages are sent via std.concurrency's send and receive functions. Receiving a message consumes it, in the sense that receiving again will catch the next message. The only way of telling whether there is one waiting is to consume the first and commit to handling it. In other words, there is currently no way of checking whether the message box is empty; all the (thread ID Tid) member variables via which you could tell are private to the std.concurrency module. [...]Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
Jun 23 2019
On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:TL;DR: I need to check for messages very often, and they each incur one allocation per check because of closures[1], even if there were no messages to receive. Being able to tell if there are messages waiting and thus skip the receive attempt if not would get rid of the vast majority of these allocations. Right now it's one allocation per second, with runtimes in the ranges of days and weeks. Without, it would be a some dozens of allocations per day from this part of the code (at the current level of use). I'm hesitant to add more dependencies, since I'm having problems with compilation memory use already. I can try a PR if it all doesn't strike anyone as an obviously bad idea. --- I work on an IRC bot[2] that splits its functionality into plugins. The main program loop only reads from the server, parses whatever it gets, and serially calls each plugin with whatever it translated it to. It's a simple design, but it requires a way for plugins to be able to send requests back to the main loop, for it to send to the server or to affect the program in other ways (think a request for the program to quit). Some but not all of these plugins are threaded, such as ones doing http requests and other things that don't lend themselves well to being done in the same thread as everything else, and these need to communicate back as well. An elegant solution is for all plugins, including the non-threaded ones, to send these requests as concurrency messages, for the main loop to catch and sequentially deal with. That way there'll be no data races, and threaded plugins work just as well as single-threaded ones do. I do blocking reads from the server, which time out after n seconds. Inbetween reads it checks for concurrency messages sent by plugins, to see if one wanted something, acts if so, and then resumes reading. The message checks are done by using receiveTimeout(0.seconds, ...)[3], which makes it instantly time out if there were no messages there to receive, so as to resume reading immediately. (A normal call of receive(...) would block.) Thinking in extremes, if the read timeout of n seconds is infinitely large and nothing is being read from the server, requests from plugins would completely stall. If one plugin wanted the program to quit, that request would stay there indefinitely with the main loop blocking on the socket read. On the other hand, if n is infinitely small plugin message accuracy will be great, but because of how closures work[1] each attempt at reading messages allocates, regardless of whether anything was read. So if n is infinitely small, it goes toward infinitely many allocations. I have the socket read timeout at one second, but that still means best-case one allocation per second, more if there is activity from the server. For something running constantly over days and weeks, this adds up with allocation counts in the millions that all did nothing. If I was able to tell whether the main loop's thread had messages waiting, I could avoid these allocations by only attempting to receive them when there are any there to actually receive. There would be hundreds of real messages, not millions of empty-yet-allocating receives. I can't change how closures work, I can only try to avoid allocating too many of them. [1]: https://forum.dlang.org/thread/igotwkxyjmezneicsqqg forum.dlang.org [2]: https://github.com/zorael/kameloso [3]: https://github.com/zorael/kameloso/blob/a340a76a/source/kameloso/kameloso.d#L501-L520Concurrency messages are sent via std.concurrency's send and receive functions. Receiving a message consumes it, in the sense that receiving again will catch the next message. The only way of telling whether there is one waiting is to consume the first and commit to handling it. In other words, there is currently no way of checking whether the message box is empty; all the (thread ID Tid) member variables via which you could tell are private to the std.concurrency module. [...]Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
Jun 23 2019
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:First of all, kudos for Kameloso, it is a very nice project. Having peeked at the code I think that you have the following possibilities (in random order): 1. attempt a PR. std.concurrency is very low level but I see no reason why it shouldn't be accepted. 2. try to allocate the closure on the stack with one of these methods: * https://github.com/sociomantic-tsunami/ocean/blob/e53ac93fbf3bfa9b2dceec1a2b6dc4a0ec7f78b2/src/ocean/core/TypeConvert.d#L249-L311 * use scope delegates * there are other methods, I can't remember now and my search skills are failing me 3. redefine your design: i see some possibilities for that. * you could use fibers and have one fiber waiting on std.concurrency.receive with no timeout and yield on the socket read. If the fiber is blocked on mailbox receive control goes back to the fiber reading on sockets. * have a consumer/producer pattern where the producer blocks on receive and the other thread reads a shared value in between socket reads * have the thread that reads on sockets as an actor producing a mailbox message when finished reading so that you either have a mailbox message from plugins or network activity.On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:TL;DR: I need to check for messages very often, and they each incur one allocation per check because of closures[1], even if there were no messages to receive. Being able to tell if there are messages waiting and thus skip the receive attempt if not would get rid of the vast majority of these allocations. Right now it's one allocation per second, with runtimes in the ranges of days and weeks. Without, it would be a some dozens of allocations per day from this part of the code (at the current level of use). I'm hesitant to add more dependencies, since I'm having problems with compilation memory use already. I can try a PR if it all doesn't strike anyone as an obviously bad idea. --- I work on an IRC bot[2] that splits its functionality into plugins. The main program loop only reads from the server, parses whatever it gets, and serially calls each plugin with whatever it translated it to. It's a simple design, but it requires a way for plugins to be able to send requests back to the main loop, for it to send to the server or to affect the program in other ways (think a request for the program to quit). Some but not all of these plugins are threaded, such as ones doing http requests and other things that don't lend themselves well to being done in the same thread as everything else, and these need to communicate back as well. An elegant solution is for all plugins, including the non-threaded ones, to send these requests as concurrency messages, for the main loop to catch and sequentially deal with. That way there'll be no data races, and threaded plugins work just as well as single-threaded ones do. I do blocking reads from the server, which time out after n seconds. Inbetween reads it checks for concurrency messages sent by plugins, to see if one wanted something, acts if so, and then resumes reading. The message checks are done by using receiveTimeout(0.seconds, ...)[3], which makes it instantly time out if there were no messages there to receive, so as to resume reading immediately. (A normal call of receive(...) would block.) Thinking in extremes, if the read timeout of n seconds is infinitely large and nothing is being read from the server, requests from plugins would completely stall. If one plugin wanted the program to quit, that request would stay there indefinitely with the main loop blocking on the socket read. On the other hand, if n is infinitely small plugin message accuracy will be great, but because of how closures work[1] each attempt at reading messages allocates, regardless of whether anything was read. So if n is infinitely small, it goes toward infinitely many allocations. I have the socket read timeout at one second, but that still means best-case one allocation per second, more if there is activity from the server. For something running constantly over days and weeks, this adds up with allocation counts in the millions that all did nothing. If I was able to tell whether the main loop's thread had messages waiting, I could avoid these allocations by only attempting to receive them when there are any there to actually receive. There would be hundreds of real messages, not millions of empty-yet-allocating receives. I can't change how closures work, I can only try to avoid allocating too many of them. [1]: https://forum.dlang.org/thread/igotwkxyjmezneicsqqg forum.dlang.org [2]: https://github.com/zorael/kameloso [3]: https://github.com/zorael/kameloso/blob/a340a76a/source/kameloso/kameloso.d#L501-L520Concurrency messages are sent via std.concurrency's send and receive functions. Receiving a message consumes it, in the sense that receiving again will catch the next message. The only way of telling whether there is one waiting is to consume the first and commit to handling it. In other words, there is currently no way of checking whether the message box is empty; all the (thread ID Tid) member variables via which you could tell are private to the std.concurrency module. [...]Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
Jun 25 2019
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:I do blocking reads from the server, which time out after n seconds. Inbetween reads it checks for concurrency messages sent by plugins, to see if one wanted something, acts if so,Another possible design (sorry if you already considered it and threw it away) would be call select/poll/kqueue on server socket and on the socketpair which 'connects' threaded plugin with the main loop. Each time plugin have message it sends it over std.concurrency and then wakes up main loop sending single byte over socketpair. This slightly complicate design and adds sockets housekeeping, but it works.
Jun 25 2019
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:Another possible design (sorry if you already considered it and threw it away) would be call select/poll/kqueue on server socket and on the socketpair which 'connects' threaded plugin with the main loop. Each time plugin have message it sends it over std.concurrency and then wakes up main loop sending single byte over socketpair. This slightly complicate design and adds sockets housekeeping, but it works.On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:TL;DR: I need to check for messages very often, and they eachConcurrency messages are sent via std.concurrency's send and receive functions. Receiving a message consumes it, in the sense that receiving again will catch the next message. The only way of telling whether there is one waiting is to consume the first and commit to handling it. In other words, there is currently no way of checking whether the message box is empty; all the (thread ID Tid) member variables via which you could tell are private to the std.concurrency module. [...]Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
Jun 25 2019
if you are ok with an external dependency, vibe has a channel implementation that returns the number of messages in the buffer http://vibed.org/api/vibe.core.channel/Channel
Jun 23 2019