Assignment 4: D thread pool
Due: 5:00pm, Friday, October 25. Value: 50 pts.
In concurrency, it's often helpful to have a thread
pool — a fixed set of several “worker threads”
that are available for processing jobs. In this assignment,
we'll implement a thread pool in D using its message-passing
concurrency. (You may not use shared
.)
Starter code may be found in
factors.d.
As distributed, it creates a single computation thread
(dispatcher
), which receives factoring jobs
and processes them in order. Your job is to modify dispatcher
so that
it creates a pool of 10 worker threads, which will process factoring jobs as
they become available. (The dispatcher
thread will no
longer call first_factor
.) This will allow factoring jobs to occur
simultaneously on different cores.
There will be several messages sent between threads.
- When the dispatcher receives a job request, it checks its queue of ready workers. If that list isn't empty, it removes one worker thread from the list and forwards it to that worker thread. If the list is empty, it adds the request to its list of pending job requests.
- When a worker thread receives a job request, it executes the job, it sends the result to the requesting thread, and it sends a message to the dispatcher notifying it that it is ready for another job.
- When the dispatcher receives notification that a worker thread has completed a job, it checks its list of pending job requests. If that list isn't empty, it removes one of the pending requests and sends it to the worker. If the list is empty, it adds the worker thread to its list of ready workers.
You will want some way of maintaining a queue of information. The following programs illustrates the essential methods for manipulating a queue.
import std.container : DList;
import std.stdio;
void main() {
DList!int queue = DList!int(cast(int[]) []);
// Bizarrely, the constructor requires a list of initial elements,
// which must be of the correct type - thus the cast.
queue.insertFront(2); // Use insertFront to enqueue
int removed = queue.removeAny(); // Use removeAny to dequeue
writefln("remove %d", removed);
queue.insertFront(3);
if (queue.empty()) { // Use empty to test emptiness
writefln("is empty");
} else {
writefln("nonempty");
}
queue.insertFront(4);
removed = queue.removeAny();
writefln("remove %d", removed);
removed = queue.removeAny();
writefln("remove %d", removed);
if (queue.empty()) {
writefln("is empty");
} else {
writefln("nonempty");
}
}