digitalmars.D.learn - Synchronize Class fields between different threads
- DrCataclysm (57/57) Nov 10 2017 I am trying to understand concurrent/parallel programming with D
- rikki cattermole (5/5) Nov 10 2017 Remember this bit: Everything on the heap, is not thread-local, it is
- DrCataclysm (19/25) Nov 10 2017 this is my implementation of Accept
- rikki cattermole (2/29) Nov 10 2017 Assuming _client is in a class, heap.
- bauss (9/37) Nov 10 2017 _client is allocated in the heap.
- DrCataclysm (7/48) Nov 10 2017 thank you, i thought i was going mad.
I am trying to understand concurrent/parallel programming with D but i just don't get how i should usesome of the concepts. This is the code i am using to tying out stuff. public class TCPListener { ubyte[] _messageBuffer; Socket _server; Socket _client; // define server in constructor this(string address, ushort port) { _server = new TcpSocket(); _server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); } // starts accept() in a new thread void startlistening(){...} // accepts connection and assigns client // after that start receiving in new thread and keep accepting void accept() {...} // receives data and adds to buffer // emits message if lineend in buffer void receiving() {...} // gets the client and sends the data (prefferably in its own task/thread) // emits signal when completed void send(string data) {...} } And I would like to use it in the following matter: int main() { auto o1 = new SpecialisedTCPListener(); //do other stuff } class SpecialisedTCPListener{ TCPListener _listener; this(){ _listener= new TCPListener("127.0.0.1", 10000); _listener.connect(&MessageReceived); _listener.connect(&SendCallback); _listener.startListening(); } public void MessageReceived(string message){ auto answer = doSomeThings(message); // send the answer _listener.Send(answer); } // did it send correctly? public void SendCallback(CallBackData e){...} } When i tried this approach it did not work. Accepting and receiving worked normally but sending was impossible because _client was thread local und would return a nullpointer. How do i pass fields to the different threads? I tried using spawn to start the threads but that only works with functions and not with class methods. What would be better ways to do something like this?
Nov 10 2017
Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
Nov 10 2017
On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10 2017
On 10/11/2017 2:13 PM, DrCataclysm wrote:On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:Assuming _client is in a class, heap.Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10 2017
On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10 2017
On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10 2017
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks. ``` static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts timeLimit--; } if (task.done) { auto value = task.yieldForce(); } ``` Could make it a function though: ``` bool yieldTimeLimit(Task)(Task task) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (yieldTimeLimit(task)) { auto value = task.yieldForce(); } ```On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class. When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10 2017
On Friday, 10 November 2017 at 15:01:30 UTC, bauss wrote:On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:Pardon my brain fart. The last bit should be: ``` bool yieldTimeLimit(Task)(Task task, size_t timeLimit) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (task.yieldTimeLimit(1000)) // Waits 1000 milliseconds { auto value = task.yieldForce(); } ```On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks. ``` static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts timeLimit--; } if (task.done) { auto value = task.yieldForce(); } ``` Could make it a function though: ``` bool yieldTimeLimit(Task)(Task task) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (yieldTimeLimit(task)) { auto value = task.yieldForce(); } ```On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?[...]_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }
Nov 10 2017
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working.Be careful with such statements. Typically, this situation means that there are Heisenbugs in the code that appear in certain conditions.
Nov 10 2017