27 Sep — D concurrency
Reasons for message passing
Shared-memory solutions lead to hard-to-find bugs due to race conditions and deadlocks. Message-passing programs are less prone to such bugs, including no possibility for race conditions.
For architectures with many processors/cores, shared-memory becomes unrealistic. Already, traditional supercomputers have thousands of processors spread across a fast network, and shared memory is unthinkable. But even on personal devices, shared memory starts exhausting the RAM communication bandwidth once we reach eight cores — and CPUs with more cores than that will become widely available in the near future.
D's concurrency library
Important functions, all found in std.concurrency
:
thisTid
to get the “thread ID” or “tid” which is the mailing address for other threads to send messages to us. Many messages to other threads will include the tid so that the other thread can respond.spawn
to create a new thread. Parameters give the function that the other thread should initially call, as well as the parameters to be given that function. It returns the tid of the newly created thread.send
to send a message to another thread. Note that onlyimmutable
-type values may be sent.receive
andreceiveOnly
to block waiting for a message from another thread.
Basic example: Prime counting
import std.concurrency;
import std.stdio;
import std.conv : to;
immutable int numThreads = 10;
void main(string[] args) {
int n = to!int(args[1]);
foreach (i; 0 .. numThreads) {
int k0 = i == 0 ? 2 : i * (n + 1) / numThreads;
int k1 = (i + 1) * (n + 1) / numThreads;
spawn(&countPrimes, thisTid, k0, k1);
}
int sum = 0;
foreach (i; 0 .. numThreads) {
sum += receiveOnly!int;
}
writefln("%d primes between 2 and %d", sum, n);
}
void countPrimes(Tid callerTid, int lo, int hi) {
int count = 0;
foreach (n; lo .. hi) {
if (isPrime(n)) {
count++;
}
}
send(callerTid, count);
}
bool isPrime(int n) {
for (int j = 2; j * j <= n; j++) {
if (n % j == 0) return false;
}
return true;
}
More complex example: Web server with caching
import std.algorithm : find;
import std.array : split;
import std.file : dirEntry, DirEntry, FileException;
import std.concurrency;
import std.conv : to;
import std.range : retro, walkLength;
import std.stdio;
import std.socket : Socket, TcpSocket, InternetAddress;
immutable int httpPort = 8080;
shared Socket clientSocket; // horrible workaround to give non-immutable
// socket to responder thread
void main() {
Tid cacheTid = spawn(&cacheServer);
Socket server = new TcpSocket;
server.bind(new InternetAddress(httpPort));
server.listen(10);
while (true) {
clientSocket = cast(shared Socket) server.accept();
spawn(&respond, thisTid, cacheTid);
receiveOnly!bool(); // don't continue until thread got clientSocket
}
}
void cacheServer() {
string[string] cache;
while (true) {
receive(
(Tid tid, string filename) { // cache lookup
string found = cache.get(filename, "");
if (found.length > 0) send(tid, found); // request is in cache
else send(tid, false); // request not in cache
},
(string filename, string contents) { // store file in cache
cache[filename] = contents;
}
);
}
}
void respond(Tid callerTid, Tid cacheTid) {
Socket socket = cast(Socket) clientSocket;
send(callerTid, true); // signal that I am done using clientSocket
try {
string[] args = parseHttpRequest(socket);
if (args.length >= 2 && args[0] == "GET") {
serveFile(socket, args[1][1..$], cacheTid);
} else {
sendUnknownMethod(socket, args.length < 1 ? "?" : args[0]);
}
} finally {
socket.close();
}
}
void serveFile(Socket socket, string filename, Tid cacheTid) {
string contents = "";
send(cacheTid, thisTid, filename);
receive(
(string fromCache) { contents = fromCache; },
(bool notFound) { contents = ""; }
);
if (contents.length > 0) {
sendCachedFile(socket, filename, contents);
return;
}
DirEntry entry;
try {
entry = dirEntry(filename);
} catch (FileException e) {
sendUnknownFile(socket, filename);
return;
}
if (!entry.isFile()) {
sendUnknownFile(socket, filename);
return;
}
File fin;
try {
fin = File(filename, "r");
} catch (Exception e) {
sendUnknownFile(socket, filename);
return;
}
if (entry.size() < 4096) {
contents = slurp(fin, entry.size());
send(cacheTid, filename, contents);
sendCachedFile(socket, filename, contents);
} else {
sendOpenedFile(socket, fin, filename, entry.size());
}
}
Helper functions unrelated to concurrency, so not listed here. [Full source]
string[] parseHttpRequest(Socket socket) { // ...
string slurp(File fin, ulong fileLength) { // ...
void sendCachedFile(Socket socket, string filename,
string contents) { // ...
void sendOpenedFile(Socket socket, File fin, string filename,
ulong fileLength) { // ...
void sendUnknownMethod(Socket socket, string method) { // ...
void sendUnknownFile(Socket socket, string filename) { // ...
string getMimeType(string filename) { // ...