Tensor flow affecting multiprocessing/threading

630 views Asked by At

I have an image classifier that is attempting to classify images into three categories. I trained my model using the open-source TensorFlow for poets: https://codelabs.developers.google.com/codelabs/tensorflow-for-poets/#4

I have a list of about 350,000 images that I want to classify and I'm using multiprocess with threading to download a batch of images locally, classify them, and then purge the image directory.

Multi-processing/threading:

def download_image(photo_url):
    ext = photo_url.split('.')[-1]
    if ext in extensions:
        image_data = requests.get(photo_url, timeout=5).content
        photo_name = '{}.{}'.format(os.urandom(12).encode('hex'), ext)
        photo_dest = os.path.join(tmp_images_dir, photo_name)
        with open(photo_dest, 'wb') as f:
            f.write(image_data)

def thread_handler(image_list):
    jobs = []
    for image in image_list.split(','):
        thread = threading.Thread(target=download_image, args=(image,))
        jobs.append(thread)
        thread.start()
    for j in jobs:
        j.join()


def multi_download_images(image_list):
    pool = Pool(processes=cpu_count() - 1)
    print 'downloading images...'
    pool.map(thread_handler,image_list)
    pool.close()
    pool.join()

Image Classification:

def classify_image_batch():
    print 'labeling images...'
    image_list = [f for f in os.listdir(tmp_images_dir)]
    with tf.gfile.FastGFile(graph_dir, 'rb') as f:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(f.read())
        _ = tf.import_graph_def(graph_def, name='')
        results = []
    with tf.Session() as sess:
        softmax_tensor = sess.graph.get_tensor_by_name('final_result:0')
        bar = ProgressBar()
        for image in bar(image_list):
            try:
                image_path = os.path.join(tmp_images_dir, image)
                image_data = tf.gfile.FastGFile(image_path, 'rb').read()
                predictions = sess.run(softmax_tensor,
                                       {'DecodeJpeg/contents:0': image_data})
                top_k = predictions[0].argsort()[-len(predictions[0]):][::-1]
                category = label_lines[top_k[0]]
                results.append((image, category))
            except:
                results.append((image, 'failed'))
    update_mongo_with_classification(results)
    sess.close()

Loop

def loop_image_classification():
    if tf.gfile.Exists(tmp_images_dir):
        tf.gfile.DeleteRecursively(tmp_images_dir)
    tf.gfile.MakeDirs(tmp_images_dir)

    multi_download_images()
    classify_image_batch()
    gc.collect()

I loop this download, classify, purge with a gc.collect() statement at the end to clean anything up.

This works smoothly for the first few iterations but then I notice that my machine stops using all of it's cores and relies on a single core to download and classify-- resulting in 1/16 of the normal performance. My gut instinct is that there is leakage from the TensorFlow as I am closing and joining my threads and pools. TensorFlow employs all of my cores during the first few speedy classification loops.

I have read about the graph_def growing too large/ adding nodes to my graph for each jpeg read in. But truth be told, the happenings under-the-hood of TensorFlow are still a mystery to me. I'd appreciate any direction that would help me with this problem/ make me better at TensorFlow.

Thanks!

0

There are 0 answers