digitalmars.D - Multi-processing : processes, mutexes and shared data
- Luhrel (222/222) Jul 15 2022 Hello there,
Hello there, I wanted to create a multi-process program, and share some data between the processes (for a heavy task, so a thread wouldn't be efficient). However, I didn't see any way to multi-process a function in the druntime/phobos, so I created a simple Process class : ``` module process; import core.sys.posix.unistd : fork, _exit; import core.sys.posix.sys.wait; import std.process : ProcessException; import std.functional : toDelegate; class Process { private: pid_t pid; public: this(T, Args...)(T function(Args) fn, Args args) { this(fn.toDelegate, args); } this(T, Args...)(T delegate(Args) dg, Args args) if (is(T == int) || is(T == void)) { pid_t pid = fork(); if (pid < 0) { throw ProcessException.newFromErrno("Failed to spawn new process"); } else if (pid == 0) { // child static if (is(T == int)) { _exit(dg(args)); } else { dg(args); _exit(0); } } else { // parent this.pid = pid; } } int wait() { int status = void; waitpid(pid, &status, 0); return WEXITSTATUS(status); } } ``` Nice. Now, let's use it : ``` import process; import std.stdio; import core.thread.osthread : Thread; import core.time; void job(int i) { Thread.sleep(dur!"seconds"(1)); writeln(i); } void main() { enum NB_PROCESS = 15; Process[NB_PROCESS] processes; foreach(i ; 0 .. NB_PROCESS) { processes[i] = new Process(&job, i); } scope(exit) { foreach(i ; 0 .. NB_PROCESS) { processes[i].wait(); } } } ``` [x] create multiple processes. Ok, now we need to share data between all the processes. Maybe the `synchronized` keyword ? [no](https://dlang.org/spec/statement.html#SynchronizedStatement), that's only for threads. Let's use a Mutex from [core.sync.mutex](https://dlang.org/phobos/core_sync_mutex.html) then. Wait, nope, it does not support multi-processes, `pthread_mutexattr_getpshared` is not set to `PTHREAD_PROCESS_SHARED` [[source code](https://github.com/dlang/dmd/blob/master/druntime/src/core/sync/mutex.d#L92=)][[man](https://man7.org/linux/man-pages/man3/pthread_mutexattr_getpshared.3.html)]. So, I need to modify it. Let's copy-pasta `core.sync.mutex` and add the following modifications to the [line 92](https://github.com/dlang/dmd/blob/69ab16a7e81c24bc893851d1fbf68a0ae8baeb53/druntime/src/cor /sync/mutex.d#L92=) : ``` !pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) || abort("Error: pthread_mutexattr_setpshared failed."); ``` Rename the module to avoid conflicts, and voilĂ ! [x] Mutex for multiple processes. Now let's give it a try. But how to check if it correctly works ? We need to create a shared memory space. Maybe a `shared` variable ? [No](http://ddili.org/ders/d.en/concurrency_shared.html), same problem as before ; it's only for multi-threading. Arg, got it. D isn't made for multi-processing, I'll code it. *3 hours later* : ``` module shared_memory; import core.sys.posix.sys.mman; import core.sys.posix.unistd; import core.sys.posix.fcntl; import std.conv : emplace; class SharedMemory(T, Args...) { private: static if (is(T == class)) T cl; T* ptr; string ident; public: nogc this(string identifier, Args args) { ident = identifier; static if (is(T == class)) size_t size = __traits(classInstanceSize, T); else size_t size = T.sizeof; int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR, 0x1b6); // 0x1b6 = 0666 (octal) ftruncate(shm_fd, size); void[] memory = mmap(null, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0)[0..size]; static if (is(T == class)) { cl = emplace!(T, Args)(memory, args); ptr = &cl; } else { ptr = emplace!(T, Args)(memory, args); } } // nogc void unlink() { static if (is(T == class)) destroy(cl); shm_unlink(ident.ptr); // shm_unlink also closes the file descriptor } T* data() { return ptr; } } ``` Seems to work, now let's create a test program : ``` import process; import mutex; import shared_memory; import std.stdio; import core.thread.osthread : Thread; import core.time; void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm) { auto m = *mutex.data; Thread.sleep(dur!"msecs"(1000)); // force the scheduler to change the execution context m.lock(); Thread.sleep(dur!"msecs"(100)); // ditto foreach (j; 1 .. 100) *sm.data += i * j - i + i*i*i*i; // "random" calculations m.unlock(); } void main() { enum NB_PROCESS = 100; auto sum = new SharedMemory!ulong("sum"); auto mutex = new SharedMemory!Mutex("mutex"); *sum.data = 0; Process[NB_PROCESS] processes; foreach(i ; 0 .. NB_PROCESS) { processes[i] = new Process(&job, i, mutex, sum); } foreach(i ; 0 .. NB_PROCESS) { processes[i].wait(); } if (*sum.data != 193107012120) { writeln("failed ! sum: ", *sum.data); } import core.memory : GC; GC.collect(); // force collection sum.unlink(); mutex.unlink(); } ``` Let's run our program a hundred times : `for i in 0..100; do ./mulproc; done`. No output, so everything worked properly ! Yaay ! [x] Share data between processes *Achievement get: multi-processing in D.* --- Fine. This was fun to do. However, I have two questions : - Why wasn't this implemented in druntime/phobos ? - Can we implement this (also for Non-POSIX systems) in druntime/phobos ? If yes, how to do it properly (i.e. by not using my code) Bests, Luhrel
Jul 15 2022