Wednesday, January 13, 2010

worker pool optimization - batching apis, for some tasks.

What are worker pools anyway?

Worker pools are an optimization to the problem of processing many things in parallel. Rather than have a worker for every item you want to process, you spread the load between the available workers. As an example, creating 1,000,000 processes to add 1,000,000 numbers together is a bit heavy weight. You probably want to divide it up between 8 processes for your 8 core machine. This is the optimization worker pools do.

With a usual worker pool there is a queue of jobs/data for the workers to work on. The workers are usually threads, processes or separate machines which do the work passed to them.

So for 1000 items on that queue, there are around 1000 accesses to that queue. Usually the queue has to be thread safe or process safe, so that pieces of data are not sent to many workers at once.

This can be an efficient method to use for some types of data. For example, if each job can take different amounts of time, like IO tasks over the internet... this is not optimal, but pretty good.

Problem work loads for typical worker pools.

Let's assume that the tasks are fairly easy to measure the average(or median if you like) time of each task. So either not IO tasks, or fairly equal length tasks. Then the central queue idea starts to fall down for the following reasons.

What if the cost of starting a new job is quite high? Like if starting each job happened over a machine with a 200ms network latency (say using a HTTP call to the other side of the planet). Or if a new process needs to be spawned for each task ( say with exec or fork ).

Or if the cost of accessing the queue is quite high? Like if you have a lock on the queue (eg a GIL) and lots of workers. Then the contention on that lock will be quite high.

What if there are a lot more items than 1000? Like if there are 10,000,000 items? With so many items, it is worth trying to reduce or avoid that cost of accessing the queue all together.

How to optimize the worker pool for these problems?

The obvious solution is to divide the items up into chunks first, and then feed those big chunks of work to each worker. Luckily the obvious solution works quite well! It's trivial to divide a whole list of things into roughly equal size chunks quite quickly ( a python one liner *1).

An example of how to improve your worker pool.

Here is some example code to transform a pygame.threads.tmap command that uses a worker pool to do its work off a central worker queue, into one that first divides the work into roughly equal parts. Mentally replace pygame.threads.tmap with your own worker pool map function to get the same effect.

import operator

# Here's our one liner divider, as two lines.
def divide_it(l, num_parts):
return [ l[i:i+num_parts] for i in xrange(0, len(l), num_parts)]

# Here is our_map which transforms a map into
# one which takes bigger pieces.
def our_map(old_map, f, work_to_do, num_workers):
bigger_pieces = divide_it(work_to_do, len(work_to_do)//num_workers+1)
parts = old_map(lambda parts: map(f, parts), bigger_pieces)
return reduce(operator.add, parts)

# now an example of how it can speed things up.
if __name__ == "__main__":
import pygame, pygame.threads, time

# use 8 worker threads for our worker queue.
num_workers = 8
# Use the pygame threaded map function as our
# normal worker queue.
old_map = pygame.threads.tmap

# make up a big list of work to do.
work_to_do = list(range(100000))

# a minimal function to run on all of the items of data.
f = lambda x:x+1

# We time our normal worker queue method.
t3 = time.time()
r = pygame.threads.tmap(f, work_to_do)
t4 = time.time()

# We use our new map function to divide the data up first.
t1 = time.time()
r = our_map(old_map, f, work_to_do, num_workers)
t2 = time.time()
del r

print "dividing the work up time:%s:" % (t2-t1)
print "normal threaded worker queue map time:%s:" % (t4-t3)

$ python
dividing the work up time:0.0565769672394:
normal threaded worker queue map time:6.26608109474:

For our contrived example we have 100,000 pieces of data to work through. If you created a thread for each piece of data it would surely take for ever. Which is why people often use a worker queue. However a normal worker queue can still be improved apon.

Results for this contrived example made to make this technique look good?

We get a 100x speedup by dividing the work up in this way. This won't work for all types of data and functions... but for certain cases as mentioned above, it is a great improvement. Not bad for something that could be written in one line of python!*1

It's an interesting case of how massaging your data to use Batching API design techniques gives good results. It also shows how writing parallel code can be sped up with knowledge of the data you are processing.

*1 - Well it could be done in one line if we were functional ninjas... but for sane reading it is split up into 12 lines.

No comments: