digitalmars.D.learn - Implementing a Monitor
- Minas Mina (91/91) Jul 27 2012 I'm using condition variables to implement a monitor that
- Sean Kelly (15/56) Jul 30 2012 You need to lock the mutex to synchronize access to the shared data.
- Minas (109/109) Jul 31 2012 Thank you for your reply.
I'm using condition variables to implement a monitor that simulates the producer-consumer(unbounded buffer) scenario. Note: monitor is __gshared and is initialized in main(). Then the other threads are created and run. There is one producer thread, and 5 consumer threads. void producer() { while( 1 ) { monitor.produce(); } } void consumer() { while( 1 ) { monitor.consume(); } } class Monitor { Cond cond; char[] buffer; ulong sz; this() { // necessary: The condition variable constructor requires a mutex passed to it cond = new Cond(new Mutex()); } void produce() { // produce a letter a-z char c = 'a' + rand() % 26; writeln("* Producer: Produced a '", c, "' buffer.length = ", buffer.length, "\n"); // put it into the buffer ++buffer.length; buffer[buffer.length - 1] = c; //if( buffer.length > 1 ) notify(cond); // calls condition.notify() //Thread.sleep(dur!"msecs"(1000)); } void consume() { if( buffer.length == 0 ) { writeln("put to sleep"); cwait(cond); // calls Condition.wait() } // take char c = buffer[buffer.length-1]; --buffer.length; "] (", buffer.length, " elements)\n"); } } The output is something like: put to sleep put to sleep put to sleep put to sleep * Producer: Produced a 'n' buffer.length = 0 put to sleep * Producer: Produced a 'w' buffer.length = 1 * Producer: Produced a 'l' buffer.length = 2 * Producer: Produced a 'r' buffer.length = 3 * Producer: Produced a 'b' buffer.length = 4 * Producer: Produced a 'b' buffer.length = 5 * Producer: Produced a 'm' buffer.length = 6 * Producer: Produced a 'q' buffer.length = 7 ... Even though the producer calls notify() when he finishes, none of the consumer threads is ever resumed... (I ran the program for a that the consumer prints but nothing). Why is that happening? Is there something wrong with Condition.notify()? Also, is there a function that I can call to immediatly suspend the running thread? Thanks
Jul 27 2012
You need to lock the mutex to synchronize access to the shared data. On Jul 27, 2012, at 2:31 PM, Minas Mina <minas_mina1990 hotmail.co.uk> = wrote:=20 class Monitor { Cond cond;Mutex mutex;=09 char[] buffer; ulong sz; =09 this() { // necessary: The condition variable constructor =requires a mutex passed to it mutex =3D new Mutex; cond =3D new Cond(mutex);} =09 void produce() { // produce a letter a-z char c =3D 'a' + rand() % 26; writeln("* Producer: Produced a '", c, "' buffer.length ==3D ", buffer.length, "\n"); synchronized(mutex) {=09 // put it into the buffer ++buffer.length; buffer[buffer.length - 1] =3D c; =09 //if( buffer.length > 1 ) notify(cond); // calls condition.notify()}=09 //Thread.sleep(dur!"msecs"(1000)); } =09 void consume() {synchronized(mutex) { // note changed if to whilewhile( buffer.length =3D=3D 0 ) { writeln("put to sleep"); cwait(cond); // calls Condition.wait() } =09 // take char c =3D buffer[buffer.length-1]; --buffer.length;}Also, is there a function that I can call to immediatly suspend the =running thread? Thread.yield(). Though Cond.wait() will suspend the thread as well.=
Jul 30 2012
Thank you for your reply. I have some more questions: I simplified my example. Now my monitor is used for mutual exclusion, to understand how monitors work first. // 3 threads are running P() - the main thread is not one of them void P() { while( run ) { monitor.EnterCritical2(); // critical section writeln(count); monitor.ExitCritical2(); } } class Monitor { Mutex mutex; Cond cond; bool in_use = false; this() { mutex = new Mutex(); cond = new Cond(mutex); } void EnterCritical() { // The synchronized statement is required to ensure only one thread will be able // to access the block synchronized(mutex) { while( in_use ) cwait(cond); in_use = true; ++count; } } void ExitCritical() { synchronized(mutex) { --count; in_use = false; cnotify(cond); } } } 1) In class (in college), I have learned that monitors "by nature" grant access only to one thread in their functions. That's what synchronized(mutex) is used for right? 2) Why synchronized(mutex) and not synchronized(this) (I know this is not good practise, but is there something more?) or not synchronized(some else object)? 3) You correctly changed my "EnterCritical()" code to "While" instead of "if", because, and please correct me if I'm wrong, we are using notify() and not signal(). 4) I wrote my own version of signal. It's: void csignal(Condition cond) { cond.notify(); Thread.yield(); } The logic is that it notifies another Thread and then forces a context switch, so it's like calling signal(). Is it correct? I changed my EnterCritical() to use "if" instead of "while" because I use my own version of signal() in ExitCritical(). void EnterCritical2() { synchronized(mutex) { if( in_use ) cwait(cond); in_use = true; ++count; } } void ExitCritical2() { synchronized(mutex) { --count; in_use = false; csignal(cond); } } However, when "count" is printed in P(), its value is 3 (it was 1 before which it ensured that mutual exclusion was correct). If I change the "if" to "while" it works, but that's not the purpose of signal(). I'm pretty sure that when using "signal" it's "if", not "while". What am I doing wrong? 5) Are the threads running concurrently on one core or in parallel (provided that the PC has more than one cores). Mine has 2 cores x 2 threads each. Thank you. PS: I'm writing some small programs that demonstrate concurrency in D. The reason is I suggested it to my university professor as a way to learn about this stuff in a practical manner. So I guess if he likes them, D will be tought in a university :)
Jul 31 2012