Read from Pulsar and load to Postgres with asyncio

298 views Asked by At

I am trying to read data from pulsar and write to postgres table with asyncio and asyncpg in Python.

import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
        
class Connection:

    loop = asyncio.get_event_loop()

    @classmethod
    async def connect_persist_postgresql(cls, data):
        connection = await asyncpg.connect(
            user='postgres,
            password='password_postgres',
            database='postgres',
            host='10.xxx.xxx.xx',
            port=5432
        )

        query = '''INSERT INTO {}.{} (
                                        uid, 
                                        col1, 
                                        col2, 
                                        col3, 
                                        col4
                                        ) 
                   VALUES (
                           $1, 
                           $2, 
                           $3, 
                           $4, 
                           $5, 
                           )'''
        if isinstance(data, list):
            for event in data:
                await connection.execute(
                    query,
                    'schema',
                    'table_test',
                    event['uid'],
                    event['col1'],
                    event['col2'],
                    event['col3'],
                    event['col4'],
                )
        else:
            await connection.execute(
                query,
                'schema',
                'table_test',
                data[0]['uid'],
                data[0]['col1'],
                data[0]['col2'],
                data[0]['col3'],
                data[0]['col4']
            )

        await connection.close()

And then I am connected to pulsar as a consumer, getting data and trying to write into postgresql table with next line: def run_consumer():

client = pulsar.Client(
    'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql',
                            'test-subs')
while True:
    msg = consumer.receive()
    unpacked_list = []
    loop = asyncio.get_event_loop()
    try:
        my_new_string_value = msg.data()
        unpacked_list.append(my_new_string_value)
        loop.run_until_complete(connect_persist_postgresql(unpacked_list))
    except Exception as a:
        print(a)

But I am getting an error: This event loop is already running

Any hint is welcomed, thanks in advance.

0

There are 0 answers