I have a script consuming a queue on RabbitMQ and writing it to Postgresql
I am prefetching a 1000 records and writing them, but the only way to commit the data is to run a commit in the callback function. This slows down the database writing process. Is there a way, perhaps not using start_consuming(), to prefetch/process and acknowledge individual messages, and then run my db commit before fetching the next 1000 messages from RabbitMQ?
import pika
import json
import psycopg2
creds = pika.PlainCredentials('user', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq', 5672, '/', creds))
consumer_channel = connection.channel()
consumer_channel.basic_qos(prefetch_count=1000)
consumer_channel.queue_declare(queue='blend_out', durable=True)
result = consumer_channel.queue_declare(queue='blend_out', durable=True)
queue_name = result.method.queue
consumer_channel.queue_bind(exchange='blend_out', queue='blend_out', routing_key='blend_out')
conn = psycopg2.connect(host="postgresql-ha-pgpool", port=5432, dbname="postgres", user="postgres", password="password")
print(' [*] Waiting for data to write. To exit press CTRL+C')
def callback(ch, method, properties, body):
data = json.loads(body)
with conn.cursor() as curs:
sql = """INSERT INTO public.blend_demo (attr_1, attr_1_blend_order, attr_1_source_data, attr_2,
attr_2_blend_order, attr_2_source_data, attr_3, attr_3_blend_order, attr_3_source_data, attr_4,
attr_4_blend_order, attr_4_source_data, attr_5, attr_5_blend_order, attr_5_source_data, attr_6,
attr_6_blend_order, attr_6_source_data, attr_7, attr_7_blend_order, attr_7_source_data, attr_8,
attr_8_blend_order, attr_8_source_data, attr_9, attr_9_blend_order, attr_9_source_data, attr_10,
attr_10_blend_order, attr_10_source_data, attr_11, attr_11_blend_order, attr_11_source_data, attr_12,
attr_12_blend_order, attr_12_source_data, attr_13, attr_13_blend_order, attr_13_source_data, attr_14,
attr_14_blend_order, attr_14_source_data, attr_15, attr_15_blend_order, attr_15_source_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
try:
curs.execute(sql, (data['attr_1'], data['attr_1_blend_order'], data['attr_1_source_data'],
data['attr_2'], data['attr_2_blend_order'], data['attr_2_source_data'],
data['attr_3'], data['attr_3_blend_order'], data['attr_3_source_data'],
data['attr_4'], data['attr_4_blend_order'], data['attr_4_source_data'],
data['attr_5'], data['attr_5_blend_order'], data['attr_5_source_data'],
data['attr_6'], data['attr_6_blend_order'], data['attr_6_source_data'],
data['attr_7'], data['attr_7_blend_order'], data['attr_7_source_data'],
data['attr_8'], data['attr_8_blend_order'], data['attr_8_source_data'],
data['attr_9'], data['attr_9_blend_order'], data['attr_9_source_data'],
data['attr_10'], data['attr_10_blend_order'], data['attr_10_source_data'],
data['attr_11'], data['attr_11_blend_order'], data['attr_11_source_data'],
data['attr_12'], data['attr_12_blend_order'], data['attr_12_source_data'],
data['attr_13'], data['attr_13_blend_order'], data['attr_13_source_data'],
data['attr_14'], data['attr_14_blend_order'], data['attr_14_source_data'],
data['attr_15'], data['attr_15_blend_order'], data['attr_15_source_data']))
except:
print('wtf')
conn.commit()
ch.basic_ack(delivery_tag=method.delivery_tag)
consumer_channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=False)
consumer_channel.start_consuming()
Edit: I was able to accomplish this using channel.consume and setting some flags for when a transaction was open. Now it will commit every 1k records, and if there are less than 1k messages to consume, it will timeout after 5 seconds and commit the transaction if the open_transaction flag is set to True.
print(' [*] Waiting for coords to write. To exit press CTRL+C')
open_transaction = False
for method_frame, properties, body in consumer_channel.consume(queue=queue_name, auto_ack=False, inactivity_timeout=5):
if not method_frame and open_transaction:
conn.commit()
open_transaction = False
commit_record_count = 0
print('commited ' + str(commit_record_count) + ' records')
if method_frame:
if method_frame.delivery_tag % 1000 == 0:
conn.commit()
print('commited not empty 1000 records')
data = json.loads(body)
with conn.cursor() as curs:
sql = 'INSERT INTO public.coords (source_epsg, target_epsg, source_lat, source_long, target_lat, target_long) VALUES (%s, %s, %s, %s, %s, %s)'
try:
curs.execute(sql, (
data['source_epsg'], data['target_epsg'], data['source_lat'], data['source_long'], data['target_lat'],
data['target_long']))
open_transaction = True
commit_record_count += 1
except:
print('wtf')
consumer_channel.basic_ack(delivery_tag=method_frame.delivery_tag)
consumer_channel.close()
connection.close()