digitalmars.D.announce - the semi-resident thread pool
- zsxxsz (252/252) May 29 2009 Hi, I written one thread pool in which each thread is semi-resident. The
- Robert Fraser (2/288) May 30 2009 Sweet! Does the code want a license?
- zsxxsz (3/4) May 30 2009 The thread-pool is just one little part of my plan migrating acl_project...
- "Jarl =?UTF-8?B?QW5kcsOpIg==?= <jarl.andre gmail.com> (6/12) May 25 2012 I have copied the source in this article into my own source files
- Denis Koroskin (3/11) May 30 2009 You may consider adding your code to a scrapple project at dsource.org (...
Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function. Below is the source code: module adl.thread_pool; import core.sys.posix.pthread; // just for pthread_self() import core.thread; import core.sync.mutex; import core.sync.condition; import std.c.time; private struct Job { Job *next; void function() fn; void delegate() dg; void *arg; int call; } /** * semi-daemon thread of thread pool */ class CThreadPool { public: /** * Constructs a CThreadPool * param nMaxThread {int} the max number threads in thread pool * param idleTimeout {int} when > 0, the idle thread will * exit after idleTimeout seconds, if == 0, the idle thread * will not exit * param sz {size_t} when > 0, the thread will be created which * stack size is sz. */ this(int nMaxThread, int idleTimeout, size_t sz = 0) { m_nMaxThread = nMaxThread; m_idleTimeout = idleTimeout; m_stackSize = sz; m_mutex = new Mutex; m_cond = new Condition(m_mutex); } /** * Append one task into the thread pool's task queue * param fn {void function()} */ void append(void function() fn) { Job *job; char buf[256]; if (fn == null) throw new Exception("fn null"); job = new Job; job.fn = fn; job.next = null; job.call = Call.FN; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * Append one task into the thread pool's task queue * param dg {void delegate()} */ void append(void delegate() dg) { Job *job; char buf[256]; if (dg == null) throw new Exception("dg null"); job = new Job; job.dg = dg; job.next = null; job.call = Call.DG; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * If dg not null, when one new thread is created, dg will be called. * param dg {void delegate()} */ void onThreadInit(void delegate() dg) { m_onThreadInit = dg; } /** * If dg not null, before one thread exits, db will be called. * param dg {void delegate()} */ void onThreadExit(void delegate() dg) { m_onThreadExit = dg; } private: enum Call { NO, FN, DG } Mutex m_mutex; Condition m_cond; size_t m_stackSize = 0; Job* m_jobHead = null, m_jobTail = null; int m_nJob = 0; bool m_isQuit = false; int m_nThread = 0; int m_nMaxThread; int m_nIdleThread = 0; int m_overloadTimeWait = 0; int m_idleTimeout; time_t m_lastWarn; void delegate() m_onThreadInit; void delegate() m_onThreadExit; void append(Job *job) { if (m_jobHead == null) m_jobHead = job; else m_jobTail.next = job; m_jobTail = job; m_nJob++; if (m_nIdleThread > 0) { m_cond.notify(); } else if (m_nThread < m_nMaxThread) { Thread thread = new Thread(&doJob); thread.isDaemon = true; thread.start(); m_nThread++; } else if (m_nJob > 10 * m_nMaxThread) { time_t now = time(null); if (now - m_lastWarn >= 2) { m_lastWarn = now; } if (m_overloadTimeWait > 0) { Thread.sleep(m_overloadTimeWait); } } } void doJob() { Job *job; int status; bool timedout; long period = m_idleTimeout * 10_000_000; if (m_onThreadInit != null) m_onThreadInit(); m_mutex.lock(); for (;;) { timedout = false; while (m_jobHead == null && !m_isQuit) { m_nIdleThread++; if (period > 0) { try { if (m_cond.wait(period) == false) { timedout = true; break; } } catch (SyncException e) { m_nIdleThread--; m_nThread--; m_mutex.unlock(); if (m_onThreadExit != null) m_onThreadExit(); throw e; } } else { m_cond.wait(); } m_nIdleThread--; } /* end while */ job = m_jobHead; if (job != null) { m_jobHead = job.next; m_nJob--; if (m_jobTail == job) m_jobTail = null; /* the lock shuld be unlocked before enter working processs */ m_mutex.unlock(); switch (job.call) { case Call.FN: job.fn(); break; case Call.DG: job.dg(); break; default: break; } /* lock again */ m_mutex.lock(); } if (m_jobHead == null && m_isQuit) { m_nThread--; if (m_nThread == 0) m_cond.notifyAll(); break; } if (m_jobHead == null && timedout) { m_nThread--; break; } } m_mutex.unlock(); writefln("Thread(%d) of ThreadPool exit now", pthread_self()); if (m_onThreadExit != null) m_onThreadExit(); } } import std.stdio; unittest { CThreadPool pool = new CThreadPool(10, 10); void testThreadInit(string s) { void onThreadInit() { writefln("thread(%d) was created now, s: %s", pthread_self(), s); } pool.onThreadInit(&onThreadInit); } void testThreadExit(string s) { void onThreadExit() { writefln("thread(%d) was to exit now, s: %s", pthread_self(), s); } pool.onThreadExit(&onThreadExit); } void testAddJobs(string s) { void threadFun() { writef("doJob thread id: %d, str: %s\n", pthread_self(), s); Thread.sleep(10_000_000); writef("doJob thread id: %d, wakeup now\n", pthread_self()); } pool.append(&threadFun); pool.append(&threadFun); pool.append(&threadFun); } string s = "hello world"; string s1 = "new thread was ok now"; string s2 = "thread exited now"; testThreadInit(s1); testThreadExit(s2); testAddJobs(s); Thread.sleep(100_000_000); }
May 29 2009
zsxxsz wrote:Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function. Below is the source code: module adl.thread_pool; import core.sys.posix.pthread; // just for pthread_self() import core.thread; import core.sync.mutex; import core.sync.condition; import std.c.time; private struct Job { Job *next; void function() fn; void delegate() dg; void *arg; int call; } /** * semi-daemon thread of thread pool */ class CThreadPool { public: /** * Constructs a CThreadPool * param nMaxThread {int} the max number threads in thread pool * param idleTimeout {int} when > 0, the idle thread will * exit after idleTimeout seconds, if == 0, the idle thread * will not exit * param sz {size_t} when > 0, the thread will be created which * stack size is sz. */ this(int nMaxThread, int idleTimeout, size_t sz = 0) { m_nMaxThread = nMaxThread; m_idleTimeout = idleTimeout; m_stackSize = sz; m_mutex = new Mutex; m_cond = new Condition(m_mutex); } /** * Append one task into the thread pool's task queue * param fn {void function()} */ void append(void function() fn) { Job *job; char buf[256]; if (fn == null) throw new Exception("fn null"); job = new Job; job.fn = fn; job.next = null; job.call = Call.FN; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * Append one task into the thread pool's task queue * param dg {void delegate()} */ void append(void delegate() dg) { Job *job; char buf[256]; if (dg == null) throw new Exception("dg null"); job = new Job; job.dg = dg; job.next = null; job.call = Call.DG; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * If dg not null, when one new thread is created, dg will be called. * param dg {void delegate()} */ void onThreadInit(void delegate() dg) { m_onThreadInit = dg; } /** * If dg not null, before one thread exits, db will be called. * param dg {void delegate()} */ void onThreadExit(void delegate() dg) { m_onThreadExit = dg; } private: enum Call { NO, FN, DG } Mutex m_mutex; Condition m_cond; size_t m_stackSize = 0; Job* m_jobHead = null, m_jobTail = null; int m_nJob = 0; bool m_isQuit = false; int m_nThread = 0; int m_nMaxThread; int m_nIdleThread = 0; int m_overloadTimeWait = 0; int m_idleTimeout; time_t m_lastWarn; void delegate() m_onThreadInit; void delegate() m_onThreadExit; void append(Job *job) { if (m_jobHead == null) m_jobHead = job; else m_jobTail.next = job; m_jobTail = job; m_nJob++; if (m_nIdleThread > 0) { m_cond.notify(); } else if (m_nThread < m_nMaxThread) { Thread thread = new Thread(&doJob); thread.isDaemon = true; thread.start(); m_nThread++; } else if (m_nJob > 10 * m_nMaxThread) { time_t now = time(null); if (now - m_lastWarn >= 2) { m_lastWarn = now; } if (m_overloadTimeWait > 0) { Thread.sleep(m_overloadTimeWait); } } } void doJob() { Job *job; int status; bool timedout; long period = m_idleTimeout * 10_000_000; if (m_onThreadInit != null) m_onThreadInit(); m_mutex.lock(); for (;;) { timedout = false; while (m_jobHead == null && !m_isQuit) { m_nIdleThread++; if (period > 0) { try { if (m_cond.wait(period) == false) { timedout = true; break; } } catch (SyncException e) { m_nIdleThread--; m_nThread--; m_mutex.unlock(); if (m_onThreadExit != null) m_onThreadExit(); throw e; } } else { m_cond.wait(); } m_nIdleThread--; } /* end while */ job = m_jobHead; if (job != null) { m_jobHead = job.next; m_nJob--; if (m_jobTail == job) m_jobTail = null; /* the lock shuld be unlocked before enter working processs */ m_mutex.unlock(); switch (job.call) { case Call.FN: job.fn(); break; case Call.DG: job.dg(); break; default: break; } /* lock again */ m_mutex.lock(); } if (m_jobHead == null && m_isQuit) { m_nThread--; if (m_nThread == 0) m_cond.notifyAll(); break; } if (m_jobHead == null && timedout) { m_nThread--; break; } } m_mutex.unlock(); writefln("Thread(%d) of ThreadPool exit now", pthread_self()); if (m_onThreadExit != null) m_onThreadExit(); } } import std.stdio; unittest { CThreadPool pool = new CThreadPool(10, 10); void testThreadInit(string s) { void onThreadInit() { writefln("thread(%d) was created now, s: %s", pthread_self(), s); } pool.onThreadInit(&onThreadInit); } void testThreadExit(string s) { void onThreadExit() { writefln("thread(%d) was to exit now, s: %s", pthread_self(), s); } pool.onThreadExit(&onThreadExit); } void testAddJobs(string s) { void threadFun() { writef("doJob thread id: %d, str: %s\n", pthread_self(), s); Thread.sleep(10_000_000); writef("doJob thread id: %d, wakeup now\n", pthread_self()); } pool.append(&threadFun); pool.append(&threadFun); pool.append(&threadFun); } string s = "hello world"; string s1 = "new thread was ok now"; string s2 = "thread exited now"; testThreadInit(s1); testThreadExit(s2); testAddJobs(s); Thread.sleep(100_000_000); }Sweet! Does the code want a license?
May 30 2009
Sweet! Does the code want a license?The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.
May 30 2009
On Saturday, 30 May 2009 at 13:36:41 UTC, zsxxsz wrote:I have copied the source in this article into my own source files and have just started to use the thread pool. Since you did not provide any license it will effectively be licensed under the GPL v2 license as stated for my own source. I'll add a comment stating who made the particular code above the code.Sweet! Does the code want a license?The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.
May 25 2012
On Sat, 30 May 2009 08:14:08 +0400, zsxxsz <zhengshuxin hexun.com> wrote:Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function.You may consider adding your code to a scrapple project at dsource.org (http://www.dsource.org/projects/scrapple) This way you will keep your code up-to-date and more people will be able to use it.
May 30 2009