www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - A scheduled control signal with fibers?

reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
Hi,

I have a problem to solve that may be solved using fibers. I have 
no previous experience with fibers. We are working on a 
variable-rate weeder. A camera is installed in front of a 
tractor. A flame weeder is connected to the behind of a tractor. 
Doing image processing (on RP3), we determine a weed density rate 
and send a PWM signal to the LPG valve to adjust the intensity of 
the flame. It is working well under lab conditions so far. 
However, my control signal has to be continuous with a delayed 
time shift. Because the tractor is moving with a ground-speed, 
and the flame applicator will reach the scene seen by the camera 
after about 1.5 seconds (I cannot change the location of the 
camera for some bad and mandatory design decisions). My 
pseudo-code is like:

int main(){
     ...

     while(true){

         int pwmval = getFlameIntensityViaImageProcessing();

         sendPWMSignalToValfe(pwmval); // I need this streamed 
ctrl signal to the valfe with a delayed time shift

         // a possible solution:
         // enum afterNmilliseconds = 1500;

         // schedulePWMSignalToValve(pwmval, afterNmilliseconds );

         ...
     }
     ...
}

How can I implement schedulePWMSignalToValve(pwmval, 
afterNmilliseconds ) using fibers?

Thanks in advance.
Sep 25 2020
next sibling parent reply Imperatorn <johan_forsberg_86 hotmail.com> writes:
On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş 
wrote:
 Hi,

 I have a problem to solve that may be solved using fibers. I 
 have no previous experience with fibers. We are working on a 
 variable-rate weeder. A camera is installed in front of a 
 tractor. A flame weeder is connected to the behind of a 
 tractor. Doing image processing (on RP3), we determine a weed 
 density rate and send a PWM signal to the LPG valve to adjust 
 the intensity of the flame. It is working well under lab 
 conditions so far. However, my control signal has to be 
 continuous with a delayed time shift. Because the tractor is 
 moving with a ground-speed, and the flame applicator will reach 
 the scene seen by the camera after about 1.5 seconds (I cannot 
 change the location of the camera for some bad and mandatory 
 design decisions). My pseudo-code is like:

 [...]
A naive implementation would be to store 1500 ms worth of data with the pwm values, like a buffer. I guess memory is not a problem if you're using a RP3? Then just loop through and yield depending on your sample rate (you didn't say what it was)
Sep 25 2020
parent Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Friday, 25 September 2020 at 12:43:41 UTC, Imperatorn wrote:
 On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş 
 wrote:
 [...]
A naive implementation would be to store 1500 ms worth of data with the pwm values, like a buffer. I guess memory is not a problem if you're using a RP3? Then just loop through and yield depending on your sample rate (you didn't say what it was)
I didn't measure the entire sample rate yet, but I can say image processing (the most costly process) can be done with ~15 FPS.
Sep 25 2020
prev sibling parent reply Sebastiaan Koppe <mail skoppe.eu> writes:
On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş 
wrote:
 int main(){
     ...

     while(true){

         int pwmval = getFlameIntensityViaImageProcessing();

         sendPWMSignalToValfe(pwmval); // I need this streamed 
 ctrl signal to the valfe with a delayed time shift

         // a possible solution:
         // enum afterNmilliseconds = 1500;

         // schedulePWMSignalToValve(pwmval, afterNmilliseconds 
 );

         ...
     }
     ...
 }

 How can I implement schedulePWMSignalToValve(pwmval, 
 afterNmilliseconds ) using fibers?

 Thanks in advance.
No need for fibers per se. You can run 2 threads. One that produces {time: now + 1500.msecs, value: getFlameIntensityViaImageProcessing} objects and one that consumes those and basically waits until each's msg.time < now and then sendPWMSignalToValfe(msg.value). You would basically rely on std.concurrency's MessageBox to do the queuing. Although you could do that manually as well. Could also run it on 1 thread if you don't mind there being a jitter of however long getFlameIntensityViaImageProcessing takes, but you will need a queue.
Sep 25 2020
next sibling parent reply Sebastiaan Koppe <mail skoppe.eu> writes:
On Friday, 25 September 2020 at 13:08:16 UTC, Sebastiaan Koppe 
wrote:
 On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş
 How can I implement schedulePWMSignalToValve(pwmval, 
 afterNmilliseconds ) using fibers?
No need for fibers per se.
Can also use https://code.dlang.org/packages/timingwheels
Sep 25 2020
parent Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Friday, 25 September 2020 at 13:13:50 UTC, Sebastiaan Koppe 
wrote:
 On Friday, 25 September 2020 at 13:08:16 UTC, Sebastiaan Koppe 
 wrote:
 On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş
 How can I implement schedulePWMSignalToValve(pwmval, 
 afterNmilliseconds ) using fibers?
No need for fibers per se.
Can also use https://code.dlang.org/packages/timingwheels
I will take a look at that also, thanks.
Sep 25 2020
prev sibling parent reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Friday, 25 September 2020 at 13:08:16 UTC, Sebastiaan Koppe 
wrote:
 On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş 
 wrote:
 [...]
No need for fibers per se. You can run 2 threads. One that produces {time: now + 1500.msecs, value: getFlameIntensityViaImageProcessing} objects and one that consumes those and basically waits until each's msg.time < now and then sendPWMSignalToValfe(msg.value). You would basically rely on std.concurrency's MessageBox to do the queuing. Although you could do that manually as well. Could also run it on 1 thread if you don't mind there being a jitter of however long getFlameIntensityViaImageProcessing takes, but you will need a queue.
That was the first thing I thought. A FIFO queue. I just wanted to not reinvent the wheel. So, you guys say go for regular threads not fibers. Thank you.
Sep 25 2020
parent reply Steven Schveighoffer <schveiguy gmail.com> writes:
On 9/25/20 9:16 AM, Ferhat Kurtulmuş wrote:
 On Friday, 25 September 2020 at 13:08:16 UTC, Sebastiaan Koppe wrote:
 On Friday, 25 September 2020 at 11:58:53 UTC, Ferhat Kurtulmuş wrote:
 [...]
No need for fibers per se. You can run 2 threads. One that produces {time: now + 1500.msecs, value: getFlameIntensityViaImageProcessing} objects and one that consumes those and basically waits until each's msg.time < now and then sendPWMSignalToValfe(msg.value). You would basically rely on std.concurrency's MessageBox to do the queuing. Although you could do that manually as well. Could also run it on 1 thread if you don't mind there being a jitter of however long getFlameIntensityViaImageProcessing takes, but you will need a queue.
That was the first thing I thought. A FIFO queue. I just wanted to not reinvent the wheel. So, you guys say go for regular threads not fibers. Thank you.
Whether a fiber is a valid solution or not depends on that getFlameItensityViaImageProcessing function. If that is actually code running in your process, a fiber is going to block any other fibers during that operation. So threads are the way to go here. fibers work great if you can yield the fiber when waiting on something *external* to complete (like for instance, i/o). But if the fiber *is* the thing you are waiting on, it's not going to be able to execute anything else until that is done. Given the rate and the number of concurrent tasks, I'd say threads. -Steve
Sep 25 2020
parent reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Friday, 25 September 2020 at 13:37:09 UTC, Steven 
Schveighoffer wrote:

 Given the rate and the number of concurrent tasks, I'd say 
 threads.

 -Steve
Here is my testable and minimal code using 1 extra thread. Thank you all! import core.thread; import std.stdio; import std.concurrency; import std.container.dlist; import std.datetime; import std.datetime.systime; __gshared DList!Entry queue; __gshared bool shouldRun = true; struct Entry { SysTime st; int val; } void main() { spawn(&worker); while (true) { int v; "enter your value: ".write; // getFlameIntensityViaImageProcessing() readf(" %d", &v); if(v==0){ shouldRun = false; break; } queue.insertFront(Entry(Clock.currTime + 1500.msecs, v)); } writeln("main is done."); } void worker() { while(shouldRun){ auto r = queue[]; if(!r.empty && queue.back.st < Clock.currTime){ writeln(queue.back); // consume the value sendPWMSignalToValfe(pwmval) queue.popLastOf(r); } } }
Sep 27 2020
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 9/27/20 3:06 AM, Ferhat Kurtulmu=C5=9F wrote:

 __gshared DList!Entry queue;
 __gshared bool shouldRun =3D true;
Have you considered passing messages with std.concurrency.send() and=20 std.concurrency.receive() and friends? You wouldn't need 'queue' because = all of your threads already have mail boxes to send messages to each othe= r.
 void worker() {
      while(shouldRun){
          auto r =3D queue[];
          if(!r.empty && queue.back.st < Clock.currTime){
              writeln(queue.back); // consume the value
 sendPWMSignalToValfe(pwmval)
              queue.popLastOf(r);
          }
      }
 }
It's not clear whether it's only in your test code but busy-waiting like = that will make your CPU very warm. :) Since requests cannot pass each=20 other, your worker thread should have something like the following in=20 that loop: import core.thread; Thread.sleep(duration); Depending on how accurate the operating system honors your sleep=20 requests (e.g. is it real-time?), you may want to sleep less than=20 'duration' and then busy-wait the rest of the duration. Similar to the=20 difference between spinForce() and yieldForce() of std.parallelism (I=20 understand that your solution should not involve std.parallelism): https://dlang.org/phobos/std_parallelism.html#.Task.spinForce As an improvement when defining durations, you don't need to "hide"=20 units in comments: // enum afterNmilliseconds =3D 1500; // Instead: enum after =3D 1500.msecs; msecs and friends are defined in core.time: https://dlang.org/phobos/core_time.html#.dur Ali
Sep 27 2020
next sibling parent reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Sunday, 27 September 2020 at 10:40:25 UTC, Ali Çehreli wrote:
 On 9/27/20 3:06 AM, Ferhat Kurtulmuş wrote:
 Have you considered passing messages with 
 std.concurrency.send() and std.concurrency.receive() and 
 friends? You wouldn't need 'queue' because all of your threads 
 already have mail boxes to send messages to each other.
I remember that your book covers passing messages with send(). Probably I will rewrite it using that mechanism, you are right, I noticed that when I run the code I can hear the boosted noise of my desktop fan.
 As an improvement when defining durations, you don't need to 
 "hide" units in comments:

         // enum afterNmilliseconds = 1500;

         // Instead:
         enum after = 1500.msecs;

 msecs and friends are defined in core.time:

   https://dlang.org/phobos/core_time.html#.dur
Thank you for the tip. That was just a preudo-code to explain my situation. Thanks a lot. Ferhat
Sep 27 2020
parent Imperatorn <johan_forsberg_86 hotmail.com> writes:
On Sunday, 27 September 2020 at 10:52:58 UTC, Ferhat Kurtulmuş 
wrote:
 On Sunday, 27 September 2020 at 10:40:25 UTC, Ali Çehreli wrote:
 [...]
 [...]
I remember that your book covers passing messages with send(). Probably I will rewrite it using that mechanism, you are right, I noticed that when I run the code I can hear the boosted noise of my desktop fan.
   [...]
Thank you for the tip. That was just a preudo-code to explain my situation. Thanks a lot. Ferhat
The actor model is under-rated imho
Sep 27 2020
prev sibling parent reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Sunday, 27 September 2020 at 10:40:25 UTC, Ali Çehreli wrote:
 On 9/27/20 3:06 AM, Ferhat Kurtulmuş wrote:

 __gshared DList!Entry queue;
 __gshared bool shouldRun = true;
Have you considered passing messages with std.concurrency.send() and std.concurrency.receive() and friends? You wouldn't need 'queue' because all of your threads already have mail boxes to send messages to each other.
 void worker() {
      while(shouldRun){
          auto r = queue[];
          if(!r.empty && queue.back.st < Clock.currTime){
              writeln(queue.back); // consume the value
 sendPWMSignalToValfe(pwmval)
              queue.popLastOf(r);
          }
      }
 }
It's not clear whether it's only in your test code but busy-waiting like that will make your CPU very warm. :) Since requests cannot pass each other, your worker thread should have something like the following in that loop: import core.thread; Thread.sleep(duration); Depending on how accurate the operating system honors your sleep requests (e.g. is it real-time?), you may want to sleep less than 'duration' and then busy-wait the rest of the duration. Similar to the difference between spinForce() and yieldForce() of std.parallelism (I understand that your solution should not involve std.parallelism): https://dlang.org/phobos/std_parallelism.html#.Task.spinForce As an improvement when defining durations, you don't need to "hide" units in comments: // enum afterNmilliseconds = 1500; // Instead: enum after = 1500.msecs; msecs and friends are defined in core.time: https://dlang.org/phobos/core_time.html#.dur Ali
Yes, this solution requires less code and obviously less system resources. void main() { while (true) { int v; "type your value: ".write; readf(" %d", &v); if(v==0){ break; } auto childTid = spawn(&spawnedFunc, thisTid); send(childTid, v); } writeln("main is done."); } static void spawnedFunc(Tid ownerTid) { receive((int v){ Thread.sleep(1500.msecs); writeln(v); }); } However, there is a big problem now. If I change my main like below, numbers are not written at the correct order after 1.5 seconds? void main() { foreach (v; 0..10){ auto childTid = spawn(&spawnedFunc, thisTid); send(childTid, v); } writeln("main is done."); }
Sep 27 2020
parent reply Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Sunday, 27 September 2020 at 12:05:13 UTC, Ferhat Kurtulmuş 
wrote:
 On Sunday, 27 September 2020 at 10:40:25 UTC, Ali Çehreli wrote:
 On 9/27/20 3:06 AM, Ferhat Kurtulmuş wrote:
Oh, It will work fine if I imitate my time-consuming image processing like this. I think it is Ok now. import std.stdio; import std.concurrency; import core.thread; void main() { foreach (v; 0..10){ auto childTid = spawn(&spawnedFunc, thisTid); Thread.sleep(10.msecs); // imitate image processing send(childTid, v); } writeln("main is done."); } static void spawnedFunc(Tid ownerTid) { receive((int v){ Thread.sleep(1500.msecs); writeln(v); }); }
Sep 27 2020
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 9/27/20 6:33 AM, Ferhat Kurtulmu=C5=9F wrote:

 On Sunday, 27 September 2020 at 12:05:13 UTC, Ferhat Kurtulmu=C5=9F wr=
ote:
 On Sunday, 27 September 2020 at 10:40:25 UTC, Ali =C3=87ehreli wrote:=
 On 9/27/20 3:06 AM, Ferhat Kurtulmu=C5=9F wrote:
Oh, It will work fine if I imitate my time-consuming image processing like this. I think it is Ok now. import std.stdio; import std.concurrency; import core.thread; void main() { foreach (v; 0..10){ auto childTid =3D spawn(&spawnedFunc, thisTid);
How many flame threads do you need? I thought one image processor and=20 one flame thrower, no? Even if you have a dozen of each, main can start=20 only the image processing threads and then each image processor can=20 start its own flame thrower. Then, each pair will have an owner and a=20 worker. You don't need to send thisTid because every thread already has an=20 ownerTid defined: auto childTid =3D spawn(&spawnedFunc);
          Thread.sleep(10.msecs); // imitate image processing
          send(childTid, v);
UFCS makes it nicer: childTid.send(v);
      }
      writeln("main is done.");
 }

 static void spawnedFunc(Tid ownerTid)
To repeat, you already have a valid ownerTid in this thread. Just remove = the parameter.
 {
      receive((int v){
          Thread.sleep(1500.msecs);
I think you should sleep less than that to work at the exact expected=20 time. Otherwise, an unknown amount of time has already passed when this=20 thread is woken up again. Instead of sleeping 1500, something like this may be needed: - This thread looks at the time to figure out how long to sleep e.g.=20 sometimes 1400 msecs - Sleeps that amount - Fires when it wakes up However, you can not expect to be waken up exactly at 1400 msecs later.=20 If timing precision is really important, I recommend running some=20 statistics to see how much off your thread is when it wakes up.=20 Depending on the statistics, I would sleep less than the expected amount = and then burn the CPU until it's the exact time. But maybe precision is=20 not that important; so, forget that idea. :)
          writeln(v);
      });
 }
One more thing: It is common for the workers to die with an exception=20 (sometimes with Error). You must catch it (including Error) by the=20 worker thread and report it somehow e.g. with a special exception.=20 Otherwise, nobody will know what happened. This reminds me: If you start the worker with spawnLinked() instead of=20 spawn(), the owner will get a LinkTerminated message if a thread dies.=20 That's another way of detecting that failure. Ali
Sep 27 2020
parent Ferhat =?UTF-8?B?S3VydHVsbXXFnw==?= <aferust gmail.com> writes:
On Sunday, 27 September 2020 at 16:39:45 UTC, Ali Çehreli wrote:
 On 9/27/20 6:33 AM, Ferhat Kurtulmuş wrote:

 [...]
Kurtulmuş wrote:
 [...]
wrote:
 [...]
processing
          [...]
How many flame threads do you need? I thought one image processor and one flame thrower, no? Even if you have a dozen of each, main can start only the image processing threads and then each image processor can start its own flame thrower. Then, each pair will have an owner and a worker. [...]
Thank you Ali (Bey). I will take those into account when finalizing my code. Your comments helped a lot!
Sep 27 2020