Taco Bell Parallel Programming

While working on migrating support.mozilla.org away from Kitsune (which is a great community support platform that needs love, remember that internet) I needed to convert about 4M database rows of a custom, Markdown inspired, format to HTML.

The challenge of the task is that it needs to happen as fast as possible so we can dump the database, convert the data and load the database onto the new platform with the minimum possible time between the first and the last step.

I started a fresh MySQL container and started hacking:

Load the database dump

Kitsune's database weights about 35GiB so creating and loading the dump is a lengthy procedure. I used some tricks taken from different places with most notable ones:

  • Set innodb_flush_log_at_trx_commit = 2 for more speed. This should not be used in production as it may break ACID compliance but for my use case it's fine.

  • Set innodb_write_io_threads = 16

  • Set innodb_buffer_pool_size=16G and innodb_log_file_size=4G. I read that the innodb_log_file_size is recommended to be 1/4th of innodb_buffer_pool_size and I set the later based on my available memory.

Loading the database dump takes about 60 minutes. I'm pretty sure there's room for improvement there.

Extra tip: When dumping such huge databases from production websites make sure to use a replica host and mysqldump's --single-transaction flag to avoid locking the database.

Create a place to store the processed data

Kitsune being a Django project I created extra fields named content_html in the Models with markdown content, generated the migrations and run them against the db.

Process the data

An AWS m4.2xl gives 8 cores at my disposal and 32GiB of memory, of which 16 I allocated to MySQL earlier.

I started with a basic single core solution::

for question in Question.objects.all():
    question.content_html = parser.wiki_2_html(question.content)

which obviously does the job but it's super slow.

Transactions take a fair amount of time, what if we could bundle multiple saves into one transaction?

def chunks(count, nn=500):
    """Yield successive n-sized chunks from l."""
    offset = 0

    while True:
        yield (offset, min(offset+nn, count))
        offset += nn
        if offset > count:

for low, high in chunks(Question.objects.count()):
    with transaction.atomic():
      for question in Question.objects.all().limit[low:high]:
          question.content_html = parser.wiki_2_html(question.content)

This is getting better. Increasing the chunk size to 20000 items in the cost of more RAM used produces faster results. Anything above this value seems to require about the same time to complete.

Tried pypy and I didn't get better results so I defaulted to CPython.

Let's add some more cores into the mix using Python's multiprocessing library.

I created a Pool with 7 processes (always leave one core outside the Pool so the system remains responsive) and used apply_async to generate the commands to run by the Pool.

results = []
it = Question.objects.all()
number_of_rows = it.count()
pool = mp.Pool(processes=7)
[pool.apply_async(process_chunk), (chunk,), callback=results.append) for chunk in chunks(it)]

sum_results = 0
while sum_results < number_of_rows:
    print 'Progress: {}/{}'.format(sum_results, number_of_rows)
    sum_results = sum(results)

Function process_chunk will process, save and return the number of rows processed. Then apply_async will append this number to results which is used in the while loop to give me an overview of what's happening while I'm waiting.

So far so good, this is significantly faster. It took some tries before getting this right. Two things to remember when dealing with multiprocess and Django are:

  • ./manage.py shell won't work. I don't know why but I went ahead and wrote a standalone python script, imported django and run django.setup().

  • When a process forks, Django's database connection which was already created by that time, needs to be cleared out and get re-created for every process. First thing process_chunk does is db.connections.close_all(). Django will take care re-creating when needed.

OK I'm good to hit the road -I thought- and I launched the process with all the rows that needed parsing. As the time goes by I see the memory usage to increase dramatically and eventually the kernel would kill my process to free up memory.

It seems that the queries would take too much memory. I set the Pool to shutdown and start a new process on every new chunk with maxtasksperchild=1 which helped a bit but again, the farther in the process the more the memory usage. I tried to debug the issue with different Django queries and profiling (good luck with that on a multiprocess program) and I failed. Eventually I needed to figure out a solution before it's too late, so back to the drawing board.

Process the data, take two

I read this interesting blog post the other day named Taco Bell Programming where Ted is claiming that many times you can achieve the desired functionality just by rearranging the Unix tool set, much like Taco Bell is producing its menu by rearranging basic ingredients.

What you win with Taco Bell Programming is battle-tested tools and throughout documentation which should save you time from actual coding and time debugging problems already solved.

I took a step back and re-thought by problem. The single core solution was working just fine and had no memory issues. What if I could find a program to paralellize multiple runs? And that tool (obviously) exists, it's GNU Parallel.

In the true spirit of other GNU tools, Parallel has a gazillion command line arguments and can do a ton of things related to parallelizing the run of a list of commands.

I mention just the most important to me at the moment:

  1. Read from command line a list of commands
  2. Show progress and provide ETA
  3. Limit the run to a number of cores
  4. Retry failed jobs, resume runs and book overall keeping.
  5. Send jobs to other machines (I wish I had the time to test that, amazing)

Prepare the input to Parallel

I reverted to the original one core python script and refactored it a bit so I can call it using python -c. I also removed the chunk generation code since I'll do that elsewhere

def parse_to_html(it, from_field, to_field):
    with transaction.atomic():
        for p in it:
           setattr(p, to_field, parser.wiki_to_html(getattr(p, from_field)))

Now to process all questions I can call this thing using

$ echo "import wikitohtml; it = Question.objects.all(); wikitohtml.parse_to_html(it, 'content', 'content_html')"  | python -

Then I wrote a small python script to generate the chunks and print out commands to be later used by Parallel

CMD = '''echo "import wikitohtml; it = wikitohtml.Question.objects.filter(id__gte={from_value}, id__lt={to_value}); wikitohtml.parse_to_html(it, 'content', 'content_html')" | python - > /dev/null'''
for i in range(0, 1200000, 10000):
    print CMD.format(from_value=i, to_value=i+step)

I wouldn't be surprised if Parallel can do the chunking itself but in this case it's easier for me to fine tune it using Python.

Now I can process all questions in parallel using

$ python generate-cmds.py | parallel -j 7 --eta --joblog joblog

So everything is working now in parallel and the memory leak is gone!

But I'm not done yet.


I left the script running for half an hour and then I started seeing MySQL aborted transactions that failed to grab a lock. OK that's should be an easy fix by increasing the wait lock time with SET innodb_lock_wait_timeout = 5000; (up from 50). Later I added --retries 3 in Parallel to make sure that if anything failed it would get retried.

That actually made things worse as it introduced everyone's favorite issue in parallel programming, deadlocks. I reverted the MySQL change and looked deeper. Being unfamiliar with Kitsune's code I was not aware that the model.save() methods are doing a number of different things, including saving other objects as well, e.g. Answer.save() also calls Question.save().

Since I'm only processing one field and save the result into another field which is unrelated to everything else all the magic that happens in save() can be skipped. Besides dealing with the deadlock this can actually get us a speed increase for free.

I refactored the python code to use Django's update() which directly hits the database and does not go through save().

def parse_to_html(it, from_field, to_field, id_field='id'):
    with transaction.atomic():
        for p in it:
           it.filter(**{id_field: getattr(p, id_field)}).update(**{to_field: parser.wiki_to_html(getattr(p, from_field))})

Everything works and indeed update() did increase things a lot and solved the deadlock issue. The cores are 100% utilized which means that throwing more CPU power into the problem would buy more speed. The processing of all 4 million rows takes now about 30 minutes, down from many many hours.


Go Top