27 Sep — D concurrency

Reasons for message passing

D's concurrency library

Important functions, all found in std.concurrency:

Basic example: Prime counting

import std.concurrency;
import std.stdio;
import std.conv : to;

immutable int numThreads = 10;

void main(string[] args) {
    int n = to!int(args[1]);

    foreach (i0 .. numThreads) {
        int k0 = i == 0 ? 2 : i * (n + 1) / numThreads;
        int k1 = (i + 1) * (n + 1) / numThreads;
        spawn(&countPrimesthisTidk0k1);
    }

    int sum = 0;
    foreach (i0 .. numThreads) {
        sum += receiveOnly!int;
    }
    writefln("%d primes between 2 and %d"sumn);
}

void countPrimes(Tid callerTidint loint hi) {
    int count = 0;
    foreach (nlo .. hi) {
        if (isPrime(n)) {
            count++;
        }
    }
    send(callerTidcount);
}

bool isPrime(int n) {
    for (int j = 2j * j <= nj++) {
        if (n % j == 0return false;
    }
    return true;
}

More complex example: Web server with caching

import std.algorithm : find;
import std.array : split;
import std.file : dirEntryDirEntryFileException;
import std.concurrency;
import std.conv : to;
import std.range : retrowalkLength;
import std.stdio;
import std.socket : SocketTcpSocketInternetAddress;

immutable int httpPort = 8080;

shared Socket clientSocket// horrible workaround to give non-immutable
                            // socket to responder thread

void main() {
    Tid cacheTid = spawn(&cacheServer);

    Socket server = new TcpSocket;
    server.bind(new InternetAddress(httpPort));
    server.listen(10);

    while (true) {
        clientSocket = cast(shared Socketserver.accept();
        spawn(&respondthisTidcacheTid);
        receiveOnly!bool(); // don't continue until thread got clientSocket
    }
}

void cacheServer() {
    string[stringcache;

    while (true) {
        receive(
            (Tid tidstring filename) { // cache lookup
                string found = cache.get(filename"");
                if (found.length > 0send(tidfound); // request is in cache
                else                  send(tidfalse); // request not in cache
            },
            (string filenamestring contents) { // store file in cache
                cache[filename] = contents;
            }
        );
    }
}

void respond(Tid callerTidTid cacheTid) {
    Socket socket = cast(SocketclientSocket;
    send(callerTidtrue); // signal that I am done using clientSocket
    try {
        string[] args = parseHttpRequest(socket);
        if (args.length >= 2 && args[0] == "GET") {
            serveFile(socketargs[1][1..$], cacheTid);
        } else {
            sendUnknownMethod(socketargs.length < 1 ? "?" : args[0]);
        }
    } finally {
        socket.close();
    }
}

void serveFile(Socket socketstring filenameTid cacheTid) {
    string contents = "";
    send(cacheTidthisTidfilename);
    receive(
        (string fromCache) { contents = fromCache; },
        (bool notFound) { contents = ""; }
    );
    if (contents.length > 0) {
        sendCachedFile(socketfilenamecontents);
        return;
    }

    DirEntry entry;
    try {
        entry = dirEntry(filename);
    } catch (FileException e) {
        sendUnknownFile(socketfilename);
        return;
    }
    if (!entry.isFile()) {
        sendUnknownFile(socketfilename);
        return;
    }

    File fin;
    try {
        fin = File(filename"r");
    } catch (Exception e) {
        sendUnknownFile(socketfilename);
        return;
    }

    if (entry.size() < 4096) {
        contents = slurp(finentry.size());
        send(cacheTidfilenamecontents);
        sendCachedFile(socketfilenamecontents);
    } else {
        sendOpenedFile(socketfinfilenameentry.size());
    }
}

Helper functions unrelated to concurrency, so not listed here. [Full source]

string[] parseHttpRequest(Socket socket) { // ...
string slurp(File finulong fileLength) { // ...
void sendCachedFile(Socket socketstring filename,
        string contents) { // ...
void sendOpenedFile(Socket socketFile finstring filename,
        ulong fileLength) { // ...
void sendUnknownMethod(Socket socketstring method) { // ...
void sendUnknownFile(Socket socketstring filename) { // ...
string getMimeType(string filename) { // ...