Tool-Building in Bioinformatics

TBiB Q4/2006

BiRC / Courses / TBiB / Lecture Notes / Threads and Worker Pools

Threads and Worker Pools

In this lecture we cover multiple execution threads — that enable us to perform multiple tasks in parallel on the same machine — and worker pools — a general architecture for distributing the workload of jobs.

Motivation

In the two previous lectures we have built a framework for making objects located on different hosts communicate through remote method invocation. This lets us distributed a program over a number of machines, but does not in itself give us any parallel execution of our programs.

In this lecture we learn how to write scripts with multiple execution threads, which enables us to write programs where tasks, in principle, can be run in parallel — in principle, since the number of tasks that are in fact running in parallel is limited by the number of CPUs on the host executing the script.

Building on this, we then implement a framework for running multiple threads distributed over a number of hosts, thus enabling truly concurrent execution of programs.

Threads

Using multiple threads is both similar to and different from using multiple processes, as we did in the very first lecture. The similarity is in the, conceptual at least, parallel execution of code: when we run multiple threads, just as multiple processes, we are conceptually running them at the same time.

The difference is in the environment of the threads: when we run multiple processes we are really ruining multiple programs that we let communicate through file like objects (pipes or sockets, for example); when we run multiple threads they are running in the same program and can access the same objects and update the same variables. This enables the threads to work closer together, but enables threads to interfere with each other, with undesired consequences, unless measures are taken to avoid it.

Parallel Execution

To run multiple threads, you use the Thread class from the threading module. One way to use the Thread class is to initialize it with a function to be run in a separate thread:

from threading import Thread

def as():
    for i in xrange(1000):
        print 'a',
        # wait a bit
        for j in xrange(1000): pass

def bs():
    for i in xrange(1000):
        print 'b',
        # wait a bit
        for j in xrange(1000): pass


ta = Thread(target=as)
tb = Thread(target=bs)

ta.start()
tb.start() 

The as() and bs() methods are made the targets of separate threads, and when these threads are started — by calling the start() method on the threads — the two functions are run in a separate thread.

If you run this script you will notice that the ta and tb threads are run concurrently, since the output is mixing as and bs. Actually there are three threads running in this program, the ta and tb thread, and the main thread that started the other two. This thread, though, has terminated just after starting the other two threads.

It is possible to provide parameters to the target functions, either by a sequence passed to the args parameter of the Thread constructor, or by passing a dictionary to the kwargs parameter:

from threading import Thread

def cs(c):
    for i in xrange(1000):
        print c,
        # wait a bit
        for j in xrange(1000): pass

ta = Thread(target=cs,args=('a'))
tb = Thread(target=cs,kwargs={'c':'b'})

ta.start()
tb.start()

Another way of specifying the target of a thread is to subclass Thread and override the run() method; when start() is called on a Thread object, and no target was given in the constructor, the run() method is called instead.

from threading import Thread

class CS(Thread):
    def __init__(self,c):
        Thread.__init__(self) # don't forget this!
        self.c = c

    def run(self):
        for i in xrange(1000):
            print self.c,
            # wait a bit
            for j in xrange(1000): pass

ta = CS('a')
tb = CS('b')

ta.start()
tb.start()

It is important to remember to call the Thread constructor in the subclass constructor, otherwise this method will not work.

Race Conditions

Two threads running at the same time can, as mentioned earlier, access the same objects, in contrast to two processes running in parallel. This makes it easier for two threads to communicate than for two processes to communicate, but can lead to other problems if one is not careful. Consider:

from threading import Thread

class Counter(object):
    def __init__(self):
        self.count = 0

    def inc(self):
        self.count += 1

    def dec(self):
        self.count -= 1

def callALot(method):
    for i in xrange(100000):
        method()

counter = Counter()
incer = Thread(target=callALot,args=[counter.inc])
decer = Thread(target=callALot,args=[counter.dec])

incer.start()
decer.start()

# wait for the two threads to finish
incer.join()
decer.join()

print counter.count

The main thread starts two threads — one incrementing a counter, the other decrementing the same counter — and then waits for them to complete — by calling the join() method on the two threads. The two threads call their respective methods an equal amount of times, so one would expect the counter to be incremented and decremented the same amount of times, and the final result — printed in the final statement — to be zero. Since the execution of the two statements is concurrent, we cannot expect the ordering of the method calls to be deterministic, so we cannot expect to know the values the counter will take during the execution, but we do expect it to end up at zero.

However, if we run the script a few times we notice that the result is not always zero, and that the final result is in fact non-deterministic. What is going on?

Consider the statement self.count += 1. This statement seems atomic, but it consists of first reading the current value of self.count, then adding one to it, and finally writing the result back to self.count. If the counter is modified between it being read and writing the result back, the modification will be forgotten, since it is the result of incrementing the old value by one that is written no matter which modification was made.

The same problem, of course, is found in the decrement statement self.count -= 1.

The reason that the script behaves non-deterministically is that the two threads can interfere with each other between reading and writing the value of the counter. The exact timing of the statements can vary and this will lead to different results.

Since it is the timing of events that determines the result, this problem is known as a race condition and is one of the most common problems with using multiple threads.

Locks

A statement, or in general a sequence of statements, where a race condition can occur — such as the increment and decrement statements above — is called a critical region.

In the simplest cases, a race condition can occur in a single critical region, and we can avoid the race condition by ensuring that only a single thread can be in the critical region at any one time.

The situation above is slightly more complex, since we have two critical regions and must ensure that only a single thread is inside either at any one time. The solution, however, is the same both for the simple and the slightly more complex case: we protect the critical regions by a lock.

To be permitted inside a critical region, the thread must acquire() the lock, and when it leaves the critical region it must release() it, to allow other threads to enter the critical region.

lock.acquire()

# ... critical region ...

lock.release() 

When a thread tries to acquire a lock, it will block if the lock is already acquired and not unblock until it is released again. When the lock is released, the blocked thread (or if several threads are blocked, one of these) will be unblocked and given the lock, after which it can enter the critical region.

Be careful with exceptions here, though! If an exception is thrown in the critical region above, the lock will not be released and the critical region will be inaccessible to all threads. It is therefore safer to use this idiom for critical regions:

try:
   lock.acquire()
   
   # ... critical region ...

finally:   
   lock.release() 

The finally block is guaranteed to be executed both when an exception is raised and when the critical region is left in the usual way.

The threading module provides two lock classes: Lock and RLock. Both lock classes provide the acquire() and release() methods, but Lock is faster than RLock while RLock is reentrant, meaning that the same thread can acquire an RLock multiple times without blocking (and not releasing it to other threads until it has been released the same number of times as it has been acquired), while a Lock will block any thread that tries to acquire it while it is already in use.

In the simple counter example, we can use a Lock object:

from threading import Thread, Lock

class Counter(object):
    def __init__(self):
        self.count = 0
        self.lock = Lock()

    def inc(self):
        try:
            self.lock.acquire()
            self.count += 1
        finally:
            self.lock.release()

    def dec(self):
        try:
            self.lock.acquire()
            self.count -= 1
        finally:
            self.lock.release()


def callALot(method):
    for i in xrange(100000):
        method()

counter = Counter()
incer = Thread(target=callALot,args=[counter.inc])
decer = Thread(target=callALot,args=[counter.dec])

incer.start()
decer.start()

# wait for the two threads to finish
incer.join()
decer.join()

print counter.count

but with this version:

from threading import Thread, Lock

class Counter(object):
    def __init__(self,times = 10):
        self.count = 0
        self.times = times
        self.lock = Lock()

    def setTimes(self,times):
        try:
            self.lock.acquire()
            self.times = times
        finally:
            self.lock.release()
        
    def incTimes(self):
        try:
            self.lock.acquire()
            for i in xrange(self.times):
                self.inc()
        finally:
            self.lock.release()

    def decTimes(self):
        try:
            self.lock.acquire()
            for i in xrange(self.times):
                self.dec()
        finally:
            self.lock.release()

    def inc(self):
        try:
            self.lock.acquire()
            self.count += 1
        finally:
            self.lock.release()

    def dec(self):
        try:
            self.lock.acquire()
            self.count -= 1
        finally:
            self.lock.release()


def callALot(method):
    for i in xrange(10000):
        method()

counter = Counter()
incer = Thread(target=callALot,args=[counter.incTimes])
decer = Thread(target=callALot,args=[counter.decTimes])

incer.start()
decer.start()

# wait for the two threads to finish
incer.join()
decer.join()

print counter.count

the incTimes() and decTimes() need exclusive access to the times attributes while calling inc() and dec() respectively, but as implemented this means that the lock is acquired in both incTimes() and inc() (or decTimes() and dec()), which means that the thread will block waiting for itself to release the lock.

In situations such as this, we need a reentrant lock.

EXERCISE TWP.1: Run the script above. Then try it again with an RLock instead of the Lock.

EXERCISE TWP.2*: Can you use __getattribute__() to wrap an object and protect all method calls by an RLock?

EXERCISE TWP.3*: Can you use a meta-class to protect all methods of an object?

Conditions

Now consider a slightly more complicated setup than the counter. Say we have a stack with a push() and a pop() method, where we want the pop() method to block if the stack is empty and return the top element otherwise. If it is blocked we want it to be unblocked when an element is pushed onto the stack, so it can return this element.

Except for the blocking stuff, this isn't that different from the counter, but the blocking stuff complicates things enough!

A first attempt could look like this:

class Stack(object):
    def __init__(self):
        self.lock  = Lock()
        self.stack = []

    def isEmpty(self):
        # not really any need for locking...
        return len(self.stack) == 0

    def push(self,obj):
        try:
            self.lock.acquire()
            self.stack.append(obj)
        finally:
            self.lock.release()

    def pop(self):
        try:
            self.lock.acquire()
            while self.isEmpty():
                pass
            return self.stack.pop()
        finally:
            self.lock.release() 

It is inefficient, since the loop in the pop() method will run until the stack becomes empty, without actually doing anything, but worse still, the stack will never become empty while we are waiting for it — we have acquired the lock, and as long as we have it, no other thread can push a value unto the stack.

We can fix the last problem by moving the lock acquisition inside the busy loop:

    def pop(self):
        while True:
            try:
                self.lock.acquire()
                if not self.isEmpty():
                    return self.stack.pop()
            finally:
                self.lock.release() 

but we are still left with an inefficient solution — and the control flow is less obvious since the "blocking while empty" has been translated into a "escape loop if non-empty".

A better solution uses a Condition object. A Condition object provides a way of waiting for a condition to be satisfied, momentarily releasing a lock to allow other threads access to it — in this way allowing them to change the state so the condition becomes satisfied — but reacquiring the lock once the condition becomes satisfied.

For our stack, a condition can be used as follows:

from threading import Thread, Lock, Condition

class Stack(object):
    def __init__(self):
        self.lock = Lock()
        self.nonEmpty = Condition(self.lock)
        self.stack = []

    def isEmpty(self):
        # not really any need for locking...
        return len(self.stack) == 0

    def push(self,obj):
        try:
            self.lock.acquire()
            self.stack.append(obj)
            self.nonEmpty.notify()
        finally:
            self.lock.release()

    def pop(self):
        try:
            self.lock.acquire()
            while self.isEmpty():
                self.nonEmpty.wait()
            return self.stack.pop()
        finally:
            self.lock.release() 

The condition object, self.nonEmpty is associated with the stack lock in the stack constructor. The pop() method wait()s for it — similar to the busy wait in our first try at this — and the push() method notify() the condition when it has added an element to the stack.

Before explaining what is really going on here, I must admit that the explanation I gave before the code above is slightly inaccurate. Despite its name, a Condition is not a predicate you can wait for, and you are not really waiting until it becomes satisfied. The part about wait() releasing the lock associated with the condition, and reacquiring it before returning again, was correct though, and this "release, block, reacquire" process in wait() is the essence of the Condition functionality.

We are not waiting for the condition to be satisfied, we are just waiting to be notified. When the notify() method is called on the Condition object, the waiting thread will be woken up as soon as the lock can be reacquired. When it is woken up, it will have the lock and can continue in its critical region.

When we are woken up from our wait, there is no guarantee that the condition we are waiting for is actually satisfied — thus the looping check instead of just an if statement — we only know that someone has notified the condition.

If more than one thread is waiting on the condition, only one will be woken up by the notify. In this example, this is not a problem since each push results in a notify. You can test the stack using this script:

stack = Stack()

def pusher(lst):
    for e in lst:
        stack.push(e)

def poper():
    print stack.pop(),

tpush = Thread(target=pusher,args=[range(2)])
tpop1 = Thread(target=poper)
tpop2 = Thread(target=poper)

tpop1.start()
tpop2.start()

# wait a bit to make sure the popers are waiting
import time
time.sleep(1)

tpush.start()

To force the poping threads to wait, we use time.sleep(), but otherwise we simply start one thread that pushes two values to the stack and two threads that each pops one value.

Sometimes, though, it makes a difference whether only one waiting thread, or all waiting threads, are woken up. Consider this minor change to the stack and test program:

from threading import Thread, Lock, Condition

class Stack(object):
    def __init__(self):
        self.lock = Lock()
        self.nonEmpty = Condition(self.lock)
        self.stack = []

    def isEmpty(self):
        # not really any need for locking...
        return len(self.stack) == 0

    def push(self,elements):
        try:
            self.lock.acquire()
            self.stack.extend(elements)
            self.nonEmpty.notify()
        finally:
            self.lock.release()

    def pop(self):
        try:
            self.lock.acquire()
            while self.isEmpty():
                self.nonEmpty.wait()
            return self.stack.pop()
        finally:
            self.lock.release()


stack = Stack()

def pusher(lst):
    stack.push(lst)

def poper():
    print stack.pop()

tpush = Thread(target=pusher,args=[range(2)])
tpop1 = Thread(target=poper)
tpop2 = Thread(target=poper)

tpop1.start()
tpop2.start()

# wait a bit to make sure the popers are waiting
import time
time.sleep(1)

tpush.start()

We still have two waiting threads when we add two elements to the stack, but since the notify() wakes up one of the waiting threads, we only pop one of the elements, and the other waiting thread will remain blocked — despite that the stack is non-empty.

To wake up all waiting threads, we can instead use the notifyAll() method on the condition.

EXERCISE TWP.4: Change the stack to use notifyAll() and test that it now works with multiple sleeping pop'ers.

EXERCISE TWP.5: Return to the append() version of the stack — as opposed to the extend() version. Extend this version with a bound on the number of elements allowed on the stack, such that a push() will block when the stack is full.

Queues

The Queue module implements a class very similar to the stack of exercise TWP.5 — except, of course, that it is a queue, not a stack.

EXERCISE TWP.6: Use a Queue to implement a script that reads the lines of a file and queues them in one thread, and then writes the lines to another file in another thread.

Events

Events, as implemented by the Event class, is a simpler form of conditions. A thread can wait on an event, using the wait() method, just as for conditions, but the waiting is not for a notification from another thread, but rather for an event to "occur", in the form of a flag in the Event object being set.

When the flag is set — by calling the set() method — all waiting threads will be woken up, similar to a notifyAll() on a condition. The waiting threads are not given access to any lock, however, so they can all run in parallel after being woken up.

Once the flag is set, a new thread calling wait() will not be blocked — after all, it shouldn't wait for an event that has already happened — but it can move on immediately.

The use of events is illustrated by the small program below:

from threading import Thread, Event
import time

startGame = Event()

class Game(Thread):
    def __init__(self,name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        self.pinged = Event()
        self.ponged = Event()
        tping = Thread(target=self.pinger)
        tpong = Thread(target=self.ponger)
        tping.setDaemon(True)
        tpong.setDaemon(True)

        startGame.wait()

        tping.start()
        tpong.start()

    def pinger(self):
        while True:
            time.sleep(1)
            print self.name, 'ping'
            self.pinged.set()
            self.ponged.wait()
            self.ponged.clear()
            
    def ponger(self):
        while True:
            self.pinged.wait()
            time.sleep(1)
            print self.name, 'pong'
            self.ponged.set()
            self.pinged.clear()

print 'creating game'
print
Game('game 1').start()

time.sleep(1)
print 'starting game'
print
startGame.set()


time.sleep(5)
print 'creating a new game'
print
Game('game 2').start() 

The program uses events in two different ways: one for starting games, and one for signaling "pings" and "pongs" inside the games.

The start event is occurring 1 sec. after the first game is created, and when running the program you will notice that the pings and pongs do not start until after this second. The second game is started five seconds after the game is started, but this game starts immediately after it is created, since the start event has already happened at this point.

The pinged and ponged events are used to synchronize the pingers and pongers, and are reset, using the clear() method, between each use.

Daemons

The game program above never terminates. The pinger and ponger threads both run never ending loops, and the program will not terminate until all its threads have terminated.

Sometimes, this behavior is exactly what we want, but in other cases we want some threads to be alive only as long as the program runs, rather than have the program run as long as any thread is alive.

Consider exercise RMI.7 from the last lecture. In this exercise you extend the RMI system to allow any client to export objects. But the way the RMI system is designed, this basically means that each program that exports objects must run an RMI server; any program that exports an object must have a thread listening for remote calls.

If we set up a server thread for the RMI server, we want this thread to run as long as the script runs, but not to prevent the program from finishing once the "real" program threads are done.

Threads behaving this way are called daemon threads, and a normal thread is made into a daemon thread using the setDaemon() method. This method is called with a boolean value, if True the thread is made into a daemon, if False it is made into a normal thread.

For the RMI server, it could look like this:

import socket, cPickle

def dispatch(conn):
    '''Handle dispatches of calls to exported objects at this server.'''
    # ... dispatch code ...

def startServer(port,isDaemon=False):
    '''Run an RMI server on this host -- this function should be called
     before any object is exported from this host.'''

    global _SERVER_ADDRESS
    global _exportedObjects

    _SERVER_ADDRESS = socket.gethostname(), port
    _exportedObjects = dict()

    from threading import Thread
    class Server(Thread):
        def run(self):
            s = socket.socket()
            s.bind(_SERVER_ADDRESS)
            s.listen(1)

            while True:
                conn, _ = s.accept()
                dispatch(conn)

    server = Server()
    server.setDaemon(isDaemon)
    server.start()

The startServer() method sets up the server at the specified port, and starts a thread for handling remote method requests. If the isDaemon parameter is False (the default), it runs as a normal thread, if it is True it runs as a daemon.

To start a server with a service that should stay permanently available to the system, we would start the server with isDaemon set to false, and to start a "client" server — a server used to export objects temporarily for some reason — we would start the server with isDaemon set to true.

Worker Pools

Now that we know how to make multi-threaded programs, we can attack distributed multi-threaded programs. There are many different architectures for this, but we will concentrate on a simple one: the worker pool.

Warning!
The RMI module you get here is not particularly robust and should not be used for production code. Use it as an example of the concept, not as a "real" RMI implementation.

For the description of worker pools — and the exercises on these — I will assume that you have a working RMI system as described in the previous lecture, including the solutions to RMI.7 and RMI.10. Since these exercises were not mandatory, you might not all of you have done them, so if you haven't you can grab this solution.

EXERCISE TWP.7: Read this RMI module and make sure you understand how it works.

A worker pool is a simple architecture where, as the name suggests, we have a pool of workers for doing the work. A client can request a worker from the pool and then have that worker perform the job after which the worker is returned to the pool, ready for performing new jobs.

The interface of a worker could look like this:

class Worker(rmi.RemoteObject):
    def __init__(self,poolHost):
        '''Create a worker, associated with the worker pool on a given host.'''
        # ... #

    def register(self):
        '''Provide this worker to the pool.'''
        # ... register this worker at the pool to make it
        # available to clients

    def done(self):
        '''Signal from the client that the worker is done and can be
        returned to the pool.'''
        # ... clean up after a completed job and re-register the
        # worker at the pool ...  

To implement a concrete worker, we can subclass this worker and implement the functionality the real worker should provide. To make the worker available to clients, we register it at the pool where it can be requested at clients. When a client is done with the worker, it calls the done() method to signal this. The worker can then preform any necessary clean up and re-register itself at the pool.

The worker pool could then have an interface like this:

class WorkerPool(object):

    def provideWorker(self,worker):
        '''Register a worker with the pool.'''
        # ... add a worker to the pool ... 

    def requestWorker(self):
        '''Provide a worker to a client.'''
        # ... return a worker to the caller, block if none are avaliable.
        # A returned worker should be removed from the pool ... 

EXERCISE TWP.8: Implement a worker and worker pool classes based on these interfaces.

Put these two classes in a module, worker_pool, we can then add these lines at the bottom of the module:

_WP_PORT = 50000
def getWorkerPool(serverAddress):
    '''Get the worker at a given host.'''
    return rmi._ClientProxy((serverAddress,_WP_PORT),0) # worker pool at id 0

if __name__ == '__main__':
    rmi.startServer(_WP_PORT)
    rmi.export(WorkerPool(),0) # export the pool at id 0

Running the module starts a worker pool server, listening on the local host at port _WP_PORT (50000), at object id 0.

Clients of the worker pool — both worker servers and worker clients, both are clients of the pool — can use the function getWorkerPool() to get a reference to the worker pool.

For example, if we run the pool on host horse08, a worker server can look like this:

import rmi
import worker_pool

class MyWorker(worker_pool.Worker):
    def __init__(self,poolHost):
        worker_pool.Worker.__init__(self,poolHost)

    def done(self):
        # no clean up ... could actually just inherit ...
        worker_pool.Worker.done(self)

    def hello(self):
        print 'hello'

if __name__ == '__main__':
    rmi.startServer(43210)
    worker = MyWorker('horse08')
    worker.register() 

This starts its own RMI server on port 43210, to be able to export objects, then creates a worker and register it at the worker pool.

A worker client could then look like this:

import worker_pool

workerPool = worker_pool.getWorkerPool('horse08')

worker = workerPool.requestWorker()
worker.hello()
worker.done()

It gets a worker pool, requests a worker, use it and signals that it is done. Since this client does not export its own objects it does not need to start an RMI server.

Deadlocks?

Did you implement the worker pool in the obvious way, using a Queue for the workers?

If so, what happens if you request a worker from an empty pool? The RMI thread will block when getting from the queue, which means that the server will no longer accept remote calls, which in turns means that it is not possible for a worker to register — the pool is deadlocked!

We need a design of the worker pool where the RMI thread is never blocked. But we still want the call to the pool to block if the pool is empty. Is this possible?

The problem we are facing here is, of course, a general problem with the RMI system — a blocked method will block not only the caller by the entire system — but since this is the system we have available, we need to deal with it somehow.

One possibility is to spawn off a new thread for each method call; when we dispatch to requestWorker() we do it in a new thread that is blocked on the queue, while the RMI server thread returns to the dispatching loop to handle new requests.

This can be a bit wasteful, though, since we will have a blocked thread per waiting request, and threads are a limited resource.

Another solution is to queue the request — similar to how we queue workers — and inform the client when a worker becomes available. But with this solution we are not blocking on a request, and the request handling is quite a bit more complicated.

We can make a protocol for the request handling and hide the complexity in a function, however, so let us go with this design.

When we request a worker, we send a request to be queued. This request can look like this:

class _WorkerRequest(rmi.RemoteObject):
    def __init__(self,pool):
        '''Create a client, associated with the worker pool on a given host.'''
        self.pool = pool
        self.event = threading.Event()

    def request(self):
        '''Request a worker from the pool.'''
        self.pool.requestWorker(self)
        self.event.wait() # block until we get a worker
        return self.worker

    def getWorker(self,worker):
        self.worker = worker
        self.event.set() # unblock request  

The request() just calls the pool service with self as the request. It then blocks until the request is handled, after which it returns the worker it gets assigned. When the pool has a worker available, it calls the getWorker() method on the request, which assigns the worker and unblocks the request. The request() thread is blocked, but the getWorker() is called from the RMI server thread as a remote call from the pool server.

We can wrap this request in a client worker pool class:

class ClientWorkerPool(object):
    def __init__(self,serverHost):
        self.pool = rmi._ClientProxy((serverHost,_WP_PORT),0)

    def request(self):
        return _WorkerRequest(self.pool).request(service) 

and use it like this:

import rmi
import worker_pool
import sys

port = int(sys.argv[1])
rmi.startServer(port,True)

pool = worker_pool.getWorkerPool('horse08')
worker = pool.request()
worker.hello()
worker.done() 

Notice that we now need to have an RMI server running at the client, since the request exports a remote object. We run it as a daemon, though, since the exported object is not used after the request, so there is no need for keeping the RMI server alive after the client code is completed.

The worker pool server needs to be updated to this protocol as well, of course:

class WorkerPool(object):
    def __init__(self):
        self.workerQueue = Queue()
        self.requestQueue = Queue()

    def provideWorker(self,service,worker):
        '''Register a worker with the pool.'''
        self.workerQueue.put(worker)
        self._checkQueues()

    def requestWorker(self,service,client):
        '''Provide a worker to a client.'''
        self.requestQueue.put(client)
        self._checkQueues()

    def _checkQueues(self):
        if not workerQueue.empty() and not requestQueue.empty():
            worker = workerQueue.get()
            request = requestQueue.get()
            request.getWorker(worker) 

but the worker server does not need to be modified; it uses the same interface to the worker pool as before.

EXERCISE TWP.9: The worker pool above checks the queues after each update, using the RMI server thread. Can you instead extend the server with a thread that loops and extracts jobs and workers and handle the requests, outside the RMI server thread. The RMI server thread still adds workers and requests, but the queue check is handled in the new thread.

Generalizing the Worker Pool

The design above is appropriate if there is only a single type of workers, but it is insufficient if there are different types; if there are different types of tasks the workers can do, there is no way for the client to specify which type of worker it needs when it requests a worker.

A simple solution is to have different worker pools for each kind of worker, but this means that we need a worker pool server for each type of job running. A better solution is to add a service type to the interface of the pool:

class WorkerPool(object):
    def provideWorker(self,service,worker):
        '''Register a worker with the pool.'''
        # ... register worker for the service ...

    def requestWorker(self,service):
        '''Provide a worker to a client.'''
        # ... return worker implementing the service ...

Now we can register workers of different kinds — as specified by the service they provide — and request workers by the type of job we want to perform — again specified by the service.

The pool contains a pool for each service, and otherwise it works as before.

EXERCISE TWP.10: Extend the worker pool with a pool per service.

Summary

We have learned how to use threads to add parallelism to our programs.

This parallelism is only virtual, though, on machines with a single CPU, but combined with RMI we can distributed computations across several hosts and in this way achieve true parallelism.

We have then used this to build a simple worker pool architecture where different hosts can provide workers to the pool, and where clients can request workers to perform jobs.