Assignment 4: D thread pool

Due: 5:00pm, Friday, October 25. Value: 50 pts.

In concurrency, it's often helpful to have a thread pool — a fixed set of several “worker threads” that are available for processing jobs. In this assignment, we'll implement a thread pool in D using its message-passing concurrency. (You may not use shared.)

Starter code may be found in factors.d. As distributed, it creates a single computation thread (dispatcher), which receives factoring jobs and processes them in order. Your job is to modify dispatcher so that it creates a pool of 10 worker threads, which will process factoring jobs as they become available. (The dispatcher thread will no longer call first_factor.) This will allow factoring jobs to occur simultaneously on different cores.

There will be several messages sent between threads.

  • When the dispatcher receives a job request, it checks its queue of ready workers. If that list isn't empty, it removes one worker thread from the list and forwards it to that worker thread. If the list is empty, it adds the request to its list of pending job requests.
  • When a worker thread receives a job request, it executes the job, it sends the result to the requesting thread, and it sends a message to the dispatcher notifying it that it is ready for another job.
  • When the dispatcher receives notification that a worker thread has completed a job, it checks its list of pending job requests. If that list isn't empty, it removes one of the pending requests and sends it to the worker. If the list is empty, it adds the worker thread to its list of ready workers.

You will want some way of maintaining a queue of information. The following programs illustrates the essential methods for manipulating a queue.

import std.container : DList;
import std.stdio;

void main() {
    DList!int queue = DList!int(cast(int[]) []);
        // Bizarrely, the constructor requires a list of initial elements,
        // which must be of the correct type - thus the cast.
    queue.insertFront(2);              // Use insertFront to enqueue
    int removed = queue.removeAny();   // Use removeAny to dequeue
    writefln("remove %d"removed);
    queue.insertFront(3);
    if (queue.empty()) {               // Use empty to test emptiness
        writefln("is empty");
    } else {
        writefln("nonempty");
    }
    queue.insertFront(4);
    removed = queue.removeAny();
    writefln("remove %d"removed);
    removed = queue.removeAny();
    writefln("remove %d"removed);
    if (queue.empty()) {
        writefln("is empty");
    } else {
        writefln("nonempty");
    }
}