www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Peeking concurrency messages

reply Anonymouse <zorael gmail.com> writes:
Concurrency messages are sent via std.concurrency's send and 
receive functions. Receiving a message consumes it, in the sense 
that receiving again will catch the next message. The only way of 
telling whether there is one waiting is to consume the first and 
commit to handling it. In other words, there is currently no way 
of checking whether the message box is empty; all the (thread ID 
Tid) member variables via which you could tell are private to the 
std.concurrency module.

Is there any chance we could get the ability to see if a Tid has 
messages waiting?

---

diff --git a/std/concurrency.d b/std/concurrency.d
index c89b6c3b4..65850fd38 100644
--- a/std/concurrency.d
+++ b/std/concurrency.d
   -334,6 +334,13    public:
          formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
      }

+    /**
+     * Returns whether or not this Tid has messages waiting.
+     */
+    bool hasMessages()  safe pure nothrow  nogc
+    {
+        return (mbox.m_localMsgs + mbox.m_sharedBox.length) > 0;
+    }
  }

   system unittest

---

void foo()
{
     assert(!thisTid.hasMessages);
     thisTid.send(123);
     assert(thisTid.hasMessages);
     int i = receiveOnly!int();
     assert(!thisTid.hasMessages);
}

This would be immensely useful to me and I would use it straight 
away.
Jun 23
next sibling parent reply Francesco Mecca <me francescomecca.eu> writes:
On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:
 Concurrency messages are sent via std.concurrency's send and 
 receive functions. Receiving a message consumes it, in the 
 sense that receiving again will catch the next message. The 
 only way of telling whether there is one waiting is to consume 
 the first and commit to handling it. In other words, there is 
 currently no way of checking whether the message box is empty; 
 all the (thread ID Tid) member variables via which you could 
 tell are private to the std.concurrency module.

 [...]
Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
Jun 23
parent reply Anonymouse <zorael gmail.com> writes:
On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:
 On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:
 Concurrency messages are sent via std.concurrency's send and 
 receive functions. Receiving a message consumes it, in the 
 sense that receiving again will catch the next message. The 
 only way of telling whether there is one waiting is to consume 
 the first and commit to handling it. In other words, there is 
 currently no way of checking whether the message box is empty; 
 all the (thread ID Tid) member variables via which you could 
 tell are private to the std.concurrency module.

 [...]
Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
TL;DR: I need to check for messages very often, and they each incur one allocation per check because of closures[1], even if there were no messages to receive. Being able to tell if there are messages waiting and thus skip the receive attempt if not would get rid of the vast majority of these allocations. Right now it's one allocation per second, with runtimes in the ranges of days and weeks. Without, it would be a some dozens of allocations per day from this part of the code (at the current level of use). I'm hesitant to add more dependencies, since I'm having problems with compilation memory use already. I can try a PR if it all doesn't strike anyone as an obviously bad idea. --- I work on an IRC bot[2] that splits its functionality into plugins. The main program loop only reads from the server, parses whatever it gets, and serially calls each plugin with whatever it translated it to. It's a simple design, but it requires a way for plugins to be able to send requests back to the main loop, for it to send to the server or to affect the program in other ways (think a request for the program to quit). Some but not all of these plugins are threaded, such as ones doing http requests and other things that don't lend themselves well to being done in the same thread as everything else, and these need to communicate back as well. An elegant solution is for all plugins, including the non-threaded ones, to send these requests as concurrency messages, for the main loop to catch and sequentially deal with. That way there'll be no data races, and threaded plugins work just as well as single-threaded ones do. I do blocking reads from the server, which time out after n seconds. Inbetween reads it checks for concurrency messages sent by plugins, to see if one wanted something, acts if so, and then resumes reading. The message checks are done by using receiveTimeout(0.seconds, ...)[3], which makes it instantly time out if there were no messages there to receive, so as to resume reading immediately. (A normal call of receive(...) would block.) Thinking in extremes, if the read timeout of n seconds is infinitely large and nothing is being read from the server, requests from plugins would completely stall. If one plugin wanted the program to quit, that request would stay there indefinitely with the main loop blocking on the socket read. On the other hand, if n is infinitely small plugin message accuracy will be great, but because of how closures work[1] each attempt at reading messages allocates, regardless of whether anything was read. So if n is infinitely small, it goes toward infinitely many allocations. I have the socket read timeout at one second, but that still means best-case one allocation per second, more if there is activity from the server. For something running constantly over days and weeks, this adds up with allocation counts in the millions that all did nothing. If I was able to tell whether the main loop's thread had messages waiting, I could avoid these allocations by only attempting to receive them when there are any there to actually receive. There would be hundreds of real messages, not millions of empty-yet-allocating receives. I can't change how closures work, I can only try to avoid allocating too many of them. [1]: https://forum.dlang.org/thread/igotwkxyjmezneicsqqg forum.dlang.org [2]: https://github.com/zorael/kameloso [3]: https://github.com/zorael/kameloso/blob/a340a76a/source/kameloso/kameloso.d#L501-L520
Jun 23
next sibling parent Francesco Mecca <me francescomecca.eu> writes:
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:
 On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:
 On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:
 Concurrency messages are sent via std.concurrency's send and 
 receive functions. Receiving a message consumes it, in the 
 sense that receiving again will catch the next message. The 
 only way of telling whether there is one waiting is to 
 consume the first and commit to handling it. In other words, 
 there is currently no way of checking whether the message box 
 is empty; all the (thread ID Tid) member variables via which 
 you could tell are private to the std.concurrency module.

 [...]
Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
TL;DR: I need to check for messages very often, and they each incur one allocation per check because of closures[1], even if there were no messages to receive. Being able to tell if there are messages waiting and thus skip the receive attempt if not would get rid of the vast majority of these allocations. Right now it's one allocation per second, with runtimes in the ranges of days and weeks. Without, it would be a some dozens of allocations per day from this part of the code (at the current level of use). I'm hesitant to add more dependencies, since I'm having problems with compilation memory use already. I can try a PR if it all doesn't strike anyone as an obviously bad idea. --- I work on an IRC bot[2] that splits its functionality into plugins. The main program loop only reads from the server, parses whatever it gets, and serially calls each plugin with whatever it translated it to. It's a simple design, but it requires a way for plugins to be able to send requests back to the main loop, for it to send to the server or to affect the program in other ways (think a request for the program to quit). Some but not all of these plugins are threaded, such as ones doing http requests and other things that don't lend themselves well to being done in the same thread as everything else, and these need to communicate back as well. An elegant solution is for all plugins, including the non-threaded ones, to send these requests as concurrency messages, for the main loop to catch and sequentially deal with. That way there'll be no data races, and threaded plugins work just as well as single-threaded ones do. I do blocking reads from the server, which time out after n seconds. Inbetween reads it checks for concurrency messages sent by plugins, to see if one wanted something, acts if so, and then resumes reading. The message checks are done by using receiveTimeout(0.seconds, ...)[3], which makes it instantly time out if there were no messages there to receive, so as to resume reading immediately. (A normal call of receive(...) would block.) Thinking in extremes, if the read timeout of n seconds is infinitely large and nothing is being read from the server, requests from plugins would completely stall. If one plugin wanted the program to quit, that request would stay there indefinitely with the main loop blocking on the socket read. On the other hand, if n is infinitely small plugin message accuracy will be great, but because of how closures work[1] each attempt at reading messages allocates, regardless of whether anything was read. So if n is infinitely small, it goes toward infinitely many allocations. I have the socket read timeout at one second, but that still means best-case one allocation per second, more if there is activity from the server. For something running constantly over days and weeks, this adds up with allocation counts in the millions that all did nothing. If I was able to tell whether the main loop's thread had messages waiting, I could avoid these allocations by only attempting to receive them when there are any there to actually receive. There would be hundreds of real messages, not millions of empty-yet-allocating receives. I can't change how closures work, I can only try to avoid allocating too many of them. [1]: https://forum.dlang.org/thread/igotwkxyjmezneicsqqg forum.dlang.org [2]: https://github.com/zorael/kameloso [3]: https://github.com/zorael/kameloso/blob/a340a76a/source/kameloso/kameloso.d#L501-L520
First of all, kudos for Kameloso, it is a very nice project. Having peeked at the code I think that you have the following possibilities (in random order): 1. attempt a PR. std.concurrency is very low level but I see no reason why it shouldn't be accepted. 2. try to allocate the closure on the stack with one of these methods: * https://github.com/sociomantic-tsunami/ocean/blob/e53ac93fbf3bfa9b2dceec1a2b6dc4a0ec7f78b2/src/ocean/core/TypeConvert.d#L249-L311 * use scope delegates * there are other methods, I can't remember now and my search skills are failing me 3. redefine your design: i see some possibilities for that. * you could use fibers and have one fiber waiting on std.concurrency.receive with no timeout and yield on the socket read. If the fiber is blocked on mailbox receive control goes back to the fiber reading on sockets. * have a consumer/producer pattern where the producer blocks on receive and the other thread reads a shared value in between socket reads * have the thread that reads on sockets as an actor producing a mailbox message when finished reading so that you either have a mailbox message from plugins or network activity.
Jun 25
prev sibling next sibling parent ikod <geller.garry gmail.com> writes:
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:

 I do blocking reads from the server, which time out after n 
 seconds. Inbetween reads it checks for concurrency messages 
 sent by plugins, to see if one wanted something, acts if so,
Another possible design (sorry if you already considered it and threw it away) would be call select/poll/kqueue on server socket and on the socketpair which 'connects' threaded plugin with the main loop. Each time plugin have message it sends it over std.concurrency and then wakes up main loop sending single byte over socketpair. This slightly complicate design and adds sockets housekeeping, but it works.
Jun 25
prev sibling parent ikod <geller.garry gmail.com> writes:
On Sunday, 23 June 2019 at 16:25:58 UTC, Anonymouse wrote:
 On Sunday, 23 June 2019 at 13:59:38 UTC, Francesco Mecca wrote:
 On Sunday, 23 June 2019 at 09:33:27 UTC, Anonymouse wrote:
 Concurrency messages are sent via std.concurrency's send and 
 receive functions. Receiving a message consumes it, in the 
 sense that receiving again will catch the next message. The 
 only way of telling whether there is one waiting is to 
 consume the first and commit to handling it. In other words, 
 there is currently no way of checking whether the message box 
 is empty; all the (thread ID Tid) member variables via which 
 you could tell are private to the std.concurrency module.

 [...]
Can you explain your use case? Maybe there is an elegant solution without resorting to check if the mailbox is full. Otherwise you could try a PR
TL;DR: I need to check for messages very often, and they each
Another possible design (sorry if you already considered it and threw it away) would be call select/poll/kqueue on server socket and on the socketpair which 'connects' threaded plugin with the main loop. Each time plugin have message it sends it over std.concurrency and then wakes up main loop sending single byte over socketpair. This slightly complicate design and adds sockets housekeeping, but it works.
Jun 25
prev sibling parent Francesco Mecca <me francescomecca.eu> writes:
if you are ok with an external dependency, vibe has a channel 
implementation that returns the  number of messages in the buffer

http://vibed.org/api/vibe.core.channel/Channel
Jun 23