www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Exiting blocked threads (socket.accept)

reply "Tim" <tim unknownMailAddress.com> writes:
Hi guys,

I'm doing the following:

class Connection : Thread {
	private Socket pSocket;
	void run() {
		ptrdiff_t received;
		ubyte[0x10] buffer;

		mainloop:
		while(true) {
			received = pSocket.receive(buffer);

			// do some more stuff here
			if (buffer[0 .. received] == "QUIT")
				break mainloop;

		}
	}
	this(Socket s) {
		super(&run);
		pSocket = s;
	}
}

extern(C) void terminateServer(int s) {
	stopServer = true;
}

private bool stopServer = false;

void main() {

	sigaction_t sig;
	sig.sa_handler = &terminateServer;
	sigemptyset(&sig.sa_mask);
	sig.sa_flags = 0x00;
	sigaction(SIGINT, &sig, null);

	s.bind(new InternetAddress(2100));
	s.blocking(false);
	s.listen(0);

	while(!stopServer) {
		try
			(new Connection(s.accept)).start();
		catch (Exception e)
			Thread.yield();
	}

	s.shutdown(SocketShutdown.BOTH);
	s.close();
	
}

That works as expected, except the fact that pressing CTRL+C 
which stops the while(!stopServer) doesn't terminate the mainloop 
in my Connection-class (run()-method). This thread is blocked 
because of the receive()-method... but how can I force this 
thread to exit? Is there any chance to do that? I already tried 
to set the accepted socket to blocking(false) without any 
success...

Thanks in advance for any reply!
Mar 27 2013
next sibling parent Sean Kelly <sean invisibleduck.org> writes:
Have each thread select() on the read end of a pipe that the main thread =
writes to when it wants to trigger a wakeup--write() is legal even in =
signal handlers.=
Mar 27 2013
prev sibling parent reply =?UTF-8?B?TWFydGluIERyYcWhYXI=?= <drasar ics.muni.cz> writes:
Dne 27.3.2013 18:51, Tim napsal(a):
 That works as expected, except the fact that pressing CTRL+C which stops
 the while(!stopServer) doesn't terminate the mainloop in my
 Connection-class (run()-method). This thread is blocked because of the
 receive()-method... but how can I force this thread to exit? Is there
 any chance to do that? I already tried to set the accepted socket to
 blocking(false) without any success...

 Thanks in advance for any reply!
Hi Tim, you have to pass the termination information to the thread. It does not know about it and waits for receive() to return. You will have to employ the select() call to some extent. 1) You can have some form of global variable that indicates termination or you can send the termination info using Tid.send(). The code can then look like this: threadSocket.blocking(false); auto ss = new SocketSet(); while (!shouldEnd) { ss.reset(); ss.add(threadSocket); auto rc = Socket.select(ss, null, null, dur!"msecs"(timeout)); if (rc == 1) { // process your data } } And it would take at most timeout miliseconds for thread to react to termination message. 2) Use what Sean Kelly wrote. Either using a pipe or socketpair. Martin
Mar 27 2013
parent reply "Tim" <tim unknownMailAddress.com> writes:
On Wednesday, 27 March 2013 at 20:16:39 UTC, Martin Drašar wrote:
 Dne 27.3.2013 18:51, Tim napsal(a):
 That works as expected, except the fact that pressing CTRL+C 
 which stops
 the while(!stopServer) doesn't terminate the mainloop in my
 Connection-class (run()-method). This thread is blocked 
 because of the
 receive()-method... but how can I force this thread to exit? 
 Is there
 any chance to do that? I already tried to set the accepted 
 socket to
 blocking(false) without any success...

 Thanks in advance for any reply!
Hi Tim, you have to pass the termination information to the thread. It does not know about it and waits for receive() to return. You will have to employ the select() call to some extent. 1) You can have some form of global variable that indicates termination or you can send the termination info using Tid.send(). The code can then look like this: threadSocket.blocking(false); auto ss = new SocketSet(); while (!shouldEnd) { ss.reset(); ss.add(threadSocket); auto rc = Socket.select(ss, null, null, dur!"msecs"(timeout)); if (rc == 1) { // process your data } } And it would take at most timeout miliseconds for thread to react to termination message. 2) Use what Sean Kelly wrote. Either using a pipe or socketpair. Martin
Thanks! I've never used message passing and I'm currently a bit confused how it works (I came from the Java-area where message passing isn't necessary for something like that)... are there any information/examples about message passing? I sill can't get it to work... I changed my code as follows: class Connection : Thread { private Socket pSocket; void run() { ptrdiff_t received; ubyte[0x10] buffer; SocketSet ss = new SocketSet(); mainloop: while(!stopServer) { ss.reset(); ss.add(pSocket); if (Socket.select(ss, null, null, dur!"msecs"(10)) > 0) { received = pSocket.receive(buffer); // do some more stuff here if (buffer[0 .. received] == "QUIT") break mainloop; } } } this(Socket s) { super(&run); pSocket = s; } } extern(C) void terminateServer(int s) { stopServer = true; } private bool stopServer = false; void main() { sigaction_t sig; sig.sa_handler = &terminateServer; sigemptyset(&sig.sa_mask); sig.sa_flags = 0; sigaction(SIGINT, &sig, null); TcpSocket s = new TcpSocket(); s.bind(new InternetAddress(2100)); //s.blocking(false); s.listen(0); SocketSet ss = new SocketSet(); while(!stopServer) { ss.reset(); ss.add(s); if (Socket.select(ss, null, null, dur!"msecs"(10)) > 0) (new Connection(s.accept)).start(); } writeln("Server stopped"); s.shutdown(SocketShutdown.BOTH); s.close(); } Alright... let's connect to the server... the server accepts, creates an instance of Connection and starts a new thread. Now... I set stopServer to true (CTRL+C). This should stop the server and I also get the message in my main()-method that the server stopped. But the other thread (Connection-thread) doesn't terminate... it runs as long as the connection to the client is alive... by killing the connection from the client-side, the connection thread also terminates (and throws an exception).
Mar 27 2013
next sibling parent =?UTF-8?B?QWxpIMOHZWhyZWxp?= <acehreli yahoo.com> writes:
On 03/27/2013 02:38 PM, Tim wrote:

 are there any information/examples about message passing?
The following chapter is about the std.concurrency module of Phobos: http://ddili.org/ders/d.en/concurrency.html Some of the examples in there send a special 'struct Exit' message to tell the workers to exit. Ali
Mar 27 2013
prev sibling parent reply =?UTF-8?B?TWFydGluIERyYcWhYXI=?= <drasar ics.muni.cz> writes:
Dne 27.3.2013 22:38, Tim napsal(a):
 Thanks! I've never used message passing and I'm currently a bit confused
 how it works (I came from the Java-area where message passing isn't
 necessary for something like that)... are there any information/examples
 about message passing? I sill can't get it to work... I changed my code
 as follows:
Hi, To use message passing, you have to use the std.concurrency module and then jump through some hoops to execute the code in your class in separate thread. Right now, you are using core.thread which is lower level than std.concurrency. Definitely check the page Ali sent you. I have altered your code a bit to send the interrupt to the thread using a socketpair. I had to change the signal code, because it was not working on my windows box (I wasn't sure what to import). This code still relies on a variable set inside the signal handler, because writing to a socket is not nothrow. You could overcome this by using the pipe as Sean suggested. Martin import std.socket; import core.thread; import core.stdc.signal; import std.stdio; __gshared Socket readSock; __gshared Socket writeSock; __gshared bool stopServer = false; class Connection : Thread { private Socket pSocket; void run() { ptrdiff_t received; ubyte[0x10] buffer; SocketSet ss = new SocketSet(); mainloop: while(1) { ss.reset(); ss.add(pSocket); ss.add(readSock); if (Socket.select(ss, null, null) > 0) { if (ss.isSet(pSocket)) { received = pSocket.receive(buffer); writeln("Received data"); // process data } else if (ss.isSet(readSock)) { writeln("Received interrupt"); break mainloop; } } } pSocket.close(); } this(Socket s) { super(&run); pSocket = s; } } extern (C) void terminateServer(int s) nothrow { stopServer = true; } void main() { signal(SIGINT, &terminateServer); TcpSocket s = new TcpSocket(); s.bind(new InternetAddress(2100)); s.listen(0); auto pair = socketPair(); readSock = pair[0]; writeSock = pair[1]; SocketSet ss = new SocketSet(); while (!stopServer) { ss.reset(); ss.add(s); if (Socket.select(ss, null, null, dur!"msecs"(20)) > 0) { writeln("Received new connection"); (new Connection(s.accept)).start(); } } writeSock.send([1]); s.shutdown(SocketShutdown.BOTH); s.close(); writeln("Finished"); }
Mar 28 2013
parent reply "Tim" <tim unknownMailAddress.com> writes:
On Thursday, 28 March 2013 at 07:57:07 UTC, Martin Drašar wrote:
 Dne 27.3.2013 22:38, Tim napsal(a):
 Thanks! I've never used message passing and I'm currently a 
 bit confused
 how it works (I came from the Java-area where message passing 
 isn't
 necessary for something like that)... are there any 
 information/examples
 about message passing? I sill can't get it to work... I 
 changed my code
 as follows:
Hi, To use message passing, you have to use the std.concurrency module and then jump through some hoops to execute the code in your class in separate thread. Right now, you are using core.thread which is lower level than std.concurrency. Definitely check the page Ali sent you. I have altered your code a bit to send the interrupt to the thread using a socketpair. I had to change the signal code, because it was not working on my windows box (I wasn't sure what to import). This code still relies on a variable set inside the signal handler, because writing to a socket is not nothrow. You could overcome this by using the pipe as Sean suggested. Martin import std.socket; import core.thread; import core.stdc.signal; import std.stdio; __gshared Socket readSock; __gshared Socket writeSock; __gshared bool stopServer = false; class Connection : Thread { private Socket pSocket; void run() { ptrdiff_t received; ubyte[0x10] buffer; SocketSet ss = new SocketSet(); mainloop: while(1) { ss.reset(); ss.add(pSocket); ss.add(readSock); if (Socket.select(ss, null, null) > 0) { if (ss.isSet(pSocket)) { received = pSocket.receive(buffer); writeln("Received data"); // process data } else if (ss.isSet(readSock)) { writeln("Received interrupt"); break mainloop; } } } pSocket.close(); } this(Socket s) { super(&run); pSocket = s; } } extern (C) void terminateServer(int s) nothrow { stopServer = true; } void main() { signal(SIGINT, &terminateServer); TcpSocket s = new TcpSocket(); s.bind(new InternetAddress(2100)); s.listen(0); auto pair = socketPair(); readSock = pair[0]; writeSock = pair[1]; SocketSet ss = new SocketSet(); while (!stopServer) { ss.reset(); ss.add(s); if (Socket.select(ss, null, null, dur!"msecs"(20)) > 0) { writeln("Received new connection"); (new Connection(s.accept)).start(); } } writeSock.send([1]); s.shutdown(SocketShutdown.BOTH); s.close(); writeln("Finished"); }
Thanks Martin and Ali. Your solution works as long as I use the receive()-method, but what about using SocketStreams? I replaced socket.receive() with socketStream.readLine() which isn't broken by the solution above...
Mar 28 2013
parent reply Martin Drasar <drasar ics.muni.cz> writes:
On 28.3.2013 11:23, Tim wrote:
 Thanks Martin and Ali. Your solution works as long as I use the
 receive()-method, but what about using SocketStreams? I replaced
 socket.receive() with socketStream.readLine() which isn't broken by the
 solution above...
If you check the documentation, you will see that the SocketStream is a stream for blocking socket. You can't easily make it work with nonblocking select() calls. However, if you want to use the stream interface, you can write the non-blocking stream yourself. If you check the SocketStream at github (https://github.com/D-Programming-Language/phobos/blob/master/std/socketstream.d), you will see that it is pretty small and easy. You can transfer the interrupt code inside the readBlock() method. However, it will probably be for the best to just write your own method to read for socket until the end of the line. Martin
Mar 28 2013
parent reply "Tim" <tim unknownMailAddress.com> writes:
On Thursday, 28 March 2013 at 12:28:05 UTC, Martin Drasar wrote:
 On 28.3.2013 11:23, Tim wrote:
 Thanks Martin and Ali. Your solution works as long as I use the
 receive()-method, but what about using SocketStreams? I 
 replaced
 socket.receive() with socketStream.readLine() which isn't 
 broken by the
 solution above...
If you check the documentation, you will see that the SocketStream is a stream for blocking socket. You can't easily make it work with nonblocking select() calls. However, if you want to use the stream interface, you can write the non-blocking stream yourself. If you check the SocketStream at github (https://github.com/D-Programming-Language/phobos/blob/master/std/socketstream.d), you will see that it is pretty small and easy. You can transfer the interrupt code inside the readBlock() method. However, it will probably be for the best to just write your own method to read for socket until the end of the line. Martin
Thanks for everything, but I'm still having some problems with this solution... I implemented a simple ftp server which looks as follows: import std.socket; import core.thread; import core.stdc.signal; import std.stdio; import std.string; import std.conv : to; __gshared Socket readSock; __gshared Socket writeSock; __gshared bool stopServer = false; class DataChannel { Socket datasocket; this() { datasocket = new TcpSocket(); datasocket.bind(new InternetAddress(0)); datasocket.listen(0); } } class Connection : Thread { private Socket pSocket; void run() { int i = 0x00; DataChannel datachannel; ptrdiff_t received; ubyte[0x20] buffer; SocketSet ss = new SocketSet(); pSocket.send("220 FTP ready.\r\n"); datachannel = new DataChannel(); mainloop: while(1) { i++; ss.reset(); ss.add(pSocket); ss.add(readSock); if (Socket.select(ss, null, null) > 0) { if (ss.isSet(pSocket)) { received = pSocket.receive(buffer); if (i == 1) pSocket.send("331 Password required for anyUser\r\n"); else if (i == 2) pSocket.send("230 User anyUser logged in\r\n"); else if (i == 3) pSocket.send("257 / is the current directory.\r\n"); else if (i == 4) pSocket.send("215 UNIX Type: L8\r\n"); else if (i == 5) pSocket.send("200 Type set to I\r\n"); else if (i == 6) { string[] lip = split(pSocket.localAddress.toAddrString, "."); auto port = (cast(InternetAddress) datachannel.datasocket.localAddress).port; pSocket.send("227 Entering Passive Mode(" ~ lip[0x00] ~ "," ~ lip[0x01] ~ "," ~ lip[0x02] ~ "," ~ lip[0x03] ~ "," ~ to!(string)(port / 256) ~ "," ~ to!(string)(port % 256) ~ ")\r\n"); } else if (i == 7) pSocket.send("150 Here comes the directory listing.\r\n226 Directory listing complete\r\n"); else if (i == 8) pSocket.send("250 CWD command successful.\r\n"); else if (i == 9) pSocket.send("257 / is the current directory.\r\n"); } // process data } else if (ss.isSet(readSock)) { writeln("Received interrupt"); break mainloop; } } pSocket.close(); } this(Socket s) { super(&run); pSocket = s; } } extern (C) void terminateServer(int s) nothrow { stopServer = true; } void main() { signal(SIGINT, &terminateServer); TcpSocket s = new TcpSocket(); s.bind(new InternetAddress(2100)); s.listen(0); auto pair = socketPair(); readSock = pair[0]; writeSock = pair[1]; SocketSet ss = new SocketSet(); while (!stopServer) { ss.reset(); ss.add(s); if (Socket.select(ss, null, null, dur!"msecs"(20)) > 0) { writeln("Received new connection"); (new Connection(s.accept)).start(); } } writeSock.send([1]); s.shutdown(SocketShutdown.BOTH); s.close(); writeln("Finished"); } I know... that's not only a dirty solution, but a terrible (and wrong) solution, but it's for demonstration purposes only. By connecting to this server using a ftp client (let's say FileZilla or gFTP), I can connect until I get an error (which comes because of the dirty solution... but that's not the point here). When I press CTRL+C to terminate the server, this doesn't work anymore. I don't know why (probably because of the datachannel? But there is no loop or something else in the datachannel-class - no accept() or similar)... I've a rfc959-based implemented version of the ftp-server, but the result is the same... pressing CTRL+C doesn't work in some cases.
Mar 28 2013
parent "Tim" <tim unknownMailAddress.com> writes:
On Thursday, 28 March 2013 at 17:57:47 UTC, Tim wrote:
 On Thursday, 28 March 2013 at 12:28:05 UTC, Martin Drasar wrote:
 On 28.3.2013 11:23, Tim wrote:
 Thanks Martin and Ali. Your solution works as long as I use 
 the
 receive()-method, but what about using SocketStreams? I 
 replaced
 socket.receive() with socketStream.readLine() which isn't 
 broken by the
 solution above...
If you check the documentation, you will see that the SocketStream is a stream for blocking socket. You can't easily make it work with nonblocking select() calls. However, if you want to use the stream interface, you can write the non-blocking stream yourself. If you check the SocketStream at github (https://github.com/D-Programming-Language/phobos/blob/master/std/socketstream.d), you will see that it is pretty small and easy. You can transfer the interrupt code inside the readBlock() method. However, it will probably be for the best to just write your own method to read for socket until the end of the line. Martin
Thanks for everything, but I'm still having some problems with this solution... I implemented a simple ftp server which looks as follows: import std.socket; import core.thread; import core.stdc.signal; import std.stdio; import std.string; import std.conv : to; __gshared Socket readSock; __gshared Socket writeSock; __gshared bool stopServer = false; class DataChannel { Socket datasocket; this() { datasocket = new TcpSocket(); datasocket.bind(new InternetAddress(0)); datasocket.listen(0); } } class Connection : Thread { private Socket pSocket; void run() { int i = 0x00; DataChannel datachannel; ptrdiff_t received; ubyte[0x20] buffer; SocketSet ss = new SocketSet(); pSocket.send("220 FTP ready.\r\n"); datachannel = new DataChannel(); mainloop: while(1) { i++; ss.reset(); ss.add(pSocket); ss.add(readSock); if (Socket.select(ss, null, null) > 0) { if (ss.isSet(pSocket)) { received = pSocket.receive(buffer); if (i == 1) pSocket.send("331 Password required for anyUser\r\n"); else if (i == 2) pSocket.send("230 User anyUser logged in\r\n"); else if (i == 3) pSocket.send("257 / is the current directory.\r\n"); else if (i == 4) pSocket.send("215 UNIX Type: L8\r\n"); else if (i == 5) pSocket.send("200 Type set to I\r\n"); else if (i == 6) { string[] lip = split(pSocket.localAddress.toAddrString, "."); auto port = (cast(InternetAddress) datachannel.datasocket.localAddress).port; pSocket.send("227 Entering Passive Mode(" ~ lip[0x00] ~ "," ~ lip[0x01] ~ "," ~ lip[0x02] ~ "," ~ lip[0x03] ~ "," ~ to!(string)(port / 256) ~ "," ~ to!(string)(port % 256) ~ ")\r\n"); } else if (i == 7) pSocket.send("150 Here comes the directory listing.\r\n226 Directory listing complete\r\n"); else if (i == 8) pSocket.send("250 CWD command successful.\r\n"); else if (i == 9) pSocket.send("257 / is the current directory.\r\n"); } // process data } else if (ss.isSet(readSock)) { writeln("Received interrupt"); break mainloop; } } pSocket.close(); } this(Socket s) { super(&run); pSocket = s; } } extern (C) void terminateServer(int s) nothrow { stopServer = true; } void main() { signal(SIGINT, &terminateServer); TcpSocket s = new TcpSocket(); s.bind(new InternetAddress(2100)); s.listen(0); auto pair = socketPair(); readSock = pair[0]; writeSock = pair[1]; SocketSet ss = new SocketSet(); while (!stopServer) { ss.reset(); ss.add(s); if (Socket.select(ss, null, null, dur!"msecs"(20)) > 0) { writeln("Received new connection"); (new Connection(s.accept)).start(); } } writeSock.send([1]); s.shutdown(SocketShutdown.BOTH); s.close(); writeln("Finished"); } I know... that's not only a dirty solution, but a terrible (and wrong) solution, but it's for demonstration purposes only. By connecting to this server using a ftp client (let's say FileZilla or gFTP), I can connect until I get an error (which comes because of the dirty solution... but that's not the point here). When I press CTRL+C to terminate the server, this doesn't work anymore. I don't know why (probably because of the datachannel? But there is no loop or something else in the datachannel-class - no accept() or similar)... I've a rfc959-based implemented version of the ftp-server, but the result is the same... pressing CTRL+C doesn't work in some cases.
I should better double check the documentation... the reason for the problem is the if-then-elseif... I've changed this to: if (ss.isSet(pSocket)) { // ... do stuff here } if (ss.isSet(readSock)) { break mainloop; } As I already mentioned... thanks to Sean, Ali and Martin!
Mar 28 2013