I am trying to read data from pulsar and write to postgres table with asyncio and asyncpg in Python.
import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
class Connection:
loop = asyncio.get_event_loop()
@classmethod
async def connect_persist_postgresql(cls, data):
connection = await asyncpg.connect(
user='postgres,
password='password_postgres',
database='postgres',
host='10.xxx.xxx.xx',
port=5432
)
query = '''INSERT INTO {}.{} (
uid,
col1,
col2,
col3,
col4
)
VALUES (
$1,
$2,
$3,
$4,
$5,
)'''
if isinstance(data, list):
for event in data:
await connection.execute(
query,
'schema',
'table_test',
event['uid'],
event['col1'],
event['col2'],
event['col3'],
event['col4'],
)
else:
await connection.execute(
query,
'schema',
'table_test',
data[0]['uid'],
data[0]['col1'],
data[0]['col2'],
data[0]['col3'],
data[0]['col4']
)
await connection.close()
And then I am connected to pulsar as a consumer, getting data and trying to write into postgresql table with next line: def run_consumer():
client = pulsar.Client(
'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql',
'test-subs')
while True:
msg = consumer.receive()
unpacked_list = []
loop = asyncio.get_event_loop()
try:
my_new_string_value = msg.data()
unpacked_list.append(my_new_string_value)
loop.run_until_complete(connect_persist_postgresql(unpacked_list))
except Exception as a:
print(a)
But I am getting an error: This event loop is already running
Any hint is welcomed, thanks in advance.