Perform Action after prefetch processing is complete

28 views Asked by At

I have a script consuming a queue on RabbitMQ and writing it to Postgresql

I am prefetching a 1000 records and writing them, but the only way to commit the data is to run a commit in the callback function. This slows down the database writing process. Is there a way, perhaps not using start_consuming(), to prefetch/process and acknowledge individual messages, and then run my db commit before fetching the next 1000 messages from RabbitMQ?

import pika
import json
import psycopg2


creds = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq', 5672, '/', creds))
consumer_channel = connection.channel()
consumer_channel.basic_qos(prefetch_count=1000)

consumer_channel.queue_declare(queue='blend_out', durable=True)

result = consumer_channel.queue_declare(queue='blend_out', durable=True)
queue_name = result.method.queue

consumer_channel.queue_bind(exchange='blend_out', queue='blend_out', routing_key='blend_out')

conn = psycopg2.connect(host="postgresql-ha-pgpool", port=5432, dbname="postgres", user="postgres", password="password")

print(' [*] Waiting for data to write. To exit press CTRL+C')


def callback(ch, method, properties, body):
    data = json.loads(body)
    with conn.cursor() as curs:
        sql = """INSERT INTO public.blend_demo (attr_1, attr_1_blend_order, attr_1_source_data, attr_2, 
        attr_2_blend_order, attr_2_source_data, attr_3, attr_3_blend_order, attr_3_source_data, attr_4, 
        attr_4_blend_order, attr_4_source_data, attr_5, attr_5_blend_order, attr_5_source_data, attr_6, 
        attr_6_blend_order, attr_6_source_data, attr_7, attr_7_blend_order, attr_7_source_data, attr_8, 
        attr_8_blend_order, attr_8_source_data, attr_9, attr_9_blend_order, attr_9_source_data, attr_10, 
        attr_10_blend_order, attr_10_source_data, attr_11, attr_11_blend_order, attr_11_source_data, attr_12, 
        attr_12_blend_order, attr_12_source_data, attr_13, attr_13_blend_order, attr_13_source_data, attr_14, 
        attr_14_blend_order, attr_14_source_data, attr_15, attr_15_blend_order, attr_15_source_data) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
        try:
            curs.execute(sql, (data['attr_1'], data['attr_1_blend_order'], data['attr_1_source_data'],
                               data['attr_2'], data['attr_2_blend_order'], data['attr_2_source_data'],
                               data['attr_3'], data['attr_3_blend_order'], data['attr_3_source_data'],
                               data['attr_4'], data['attr_4_blend_order'], data['attr_4_source_data'],
                               data['attr_5'], data['attr_5_blend_order'], data['attr_5_source_data'],
                               data['attr_6'], data['attr_6_blend_order'], data['attr_6_source_data'],
                               data['attr_7'], data['attr_7_blend_order'], data['attr_7_source_data'],
                               data['attr_8'], data['attr_8_blend_order'], data['attr_8_source_data'],
                               data['attr_9'], data['attr_9_blend_order'], data['attr_9_source_data'],
                               data['attr_10'], data['attr_10_blend_order'], data['attr_10_source_data'],
                               data['attr_11'], data['attr_11_blend_order'], data['attr_11_source_data'],
                               data['attr_12'], data['attr_12_blend_order'], data['attr_12_source_data'],
                               data['attr_13'], data['attr_13_blend_order'], data['attr_13_source_data'],
                               data['attr_14'], data['attr_14_blend_order'], data['attr_14_source_data'],
                               data['attr_15'], data['attr_15_blend_order'], data['attr_15_source_data']))
        except:
            print('wtf')
    conn.commit()
    ch.basic_ack(delivery_tag=method.delivery_tag)

consumer_channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=False)

consumer_channel.start_consuming()

Edit: I was able to accomplish this using channel.consume and setting some flags for when a transaction was open. Now it will commit every 1k records, and if there are less than 1k messages to consume, it will timeout after 5 seconds and commit the transaction if the open_transaction flag is set to True.

print(' [*] Waiting for coords to write. To exit press CTRL+C')
open_transaction = False

for method_frame, properties, body in consumer_channel.consume(queue=queue_name, auto_ack=False, inactivity_timeout=5):
    if not method_frame and open_transaction:
        conn.commit()
        open_transaction = False
        commit_record_count = 0
        print('commited ' + str(commit_record_count) + ' records')
    if method_frame:
        if method_frame.delivery_tag % 1000 == 0:
            conn.commit()
            print('commited not empty 1000 records')
        data = json.loads(body)
        with conn.cursor() as curs:
            sql = 'INSERT INTO public.coords (source_epsg, target_epsg, source_lat, source_long, target_lat, target_long) VALUES (%s, %s, %s, %s, %s, %s)'
            try:
                curs.execute(sql, (
                data['source_epsg'], data['target_epsg'], data['source_lat'], data['source_long'], data['target_lat'],
                data['target_long']))
                open_transaction = True
                commit_record_count += 1
            except:
                print('wtf')
        consumer_channel.basic_ack(delivery_tag=method_frame.delivery_tag)

consumer_channel.close()
connection.close()
0

There are 0 answers