I've got a Python 2.7 script I'm working on that retrieves rows from a MySQL table, loops through the data to process it, then is supposed to do the following things in this order:
UPDATE the table rows we just got previously to set a locked value in each row to TRUE
After the UPDATE query executes and commits via MySQLdb, a ThreadPool of processes should run on the data from the original loop.
What's actually happening is that the UPDATE query seems to be committing somehow after the ThreadPool is done. I tried refactoring it into a try/finally statement to make sure, but now it either still does it afterwards, or just doesn't commit the UPDATE and runs the ThreadPool anyways.
It's a head scratcher, to be sure. I assume I'm just doing something really wrong and obvious but not catching it after looking at this for so long. Any input is greatly appreciated!
Here's the gist:
from multiprocessing.pool import ThreadPool, IMapIterator
import MySQLdb as mdb
import os, sys, time
import re
from boto.s3.connection import S3Connection
from boto.s3.bucket import Bucket
...
con = mdb.connect('localhost', 'user', 'pass', 'db')
with con:
cur = con.cursor()
cur.execute("SELECT preview_queue.filename, preview_queue.product_id, preview_queue.track, products.name, preview_queue.id FROM preview_queue join `catalog_module-products` AS products on products.id = preview_queue.product_id where locked != 1")
rows = cur.fetchall()
mp3s_to_download = []
lock_ids = []
last_directory = ""
if len(rows) > 0:
for row in rows:
base_dir = str(get_base_dir(row[1], row[3]))
mp3s_to_download.append([base_dir, str(row[0])])
if last_directory != "preview_temp/"+base_dir:
if not os.path.exists("preview_temp/"+base_dir):
try:
os.makedirs("preview_temp/"+base_dir)
except OSError, e:
pass
last_directory = "preview_temp/"+base_dir
lock_ids.append(str(row[4]))
if len(lock_ids) > 0:
action_ids = ','.join(lock_ids)
try:
cur.execute("UPDATE preview_queue SET locked = 1 WHERE id IN ({})".format(action_ids))
con.commit()
finally:
pool = ThreadPool(processes=20)
pool.map(download_file, mp3s_to_download)
cur.close()
A
finally
clause is guaranteed to execute, even if thetry
clause raises an exception. What is probably happening here is that an exception is being raised, preventing the update from being committed, but the threads are triggered anyway.This doesn't really seen to be an appropriate use of try/finally. Rather, use a normal try/except to catch and log any exceptions, and then perhaps use an
else
clause to start the threads only if no exception was raised.