I'm trying to read parquet files with geometry in Well Known Blob into a PostgreSQL / PostGIS database using DUCK DB and psycopg2
This Works for a single row but is really slow
SQL = """INSERT INTO my_table(a,b,c,geometry,x,y) VALUES (%s,%s,%s,ST_GeomFromWKB(%s, 4326),%s,%s);"""
params = [a,b,c,geometry,x,y]
cursor = pg_conn.cursor()
cursor.execute(SQL, params)
The code bellow is based on this page https://hakibenita.com/fast-load-data-python-postgresql#execute-values-from-iterator-with-page-size This enables you to use a template in theory similar to the single row method above
Unfortuatly it's Erroring at insert_values_iterator <class 'dict_keyiterator'> 'dict_keyiterator' object is not subscriptable
I realise my_dict is not correctly formatted for this, I don’t know what is supposed to be
Could someone explain what I'm doing wrong NOTE: This is the gist of what I'm doing and may contain typos
# -*- coding: utf-8 -*-
import duckdb
import os
from typing import Iterator, Dict, Any
import psycopg2
import psycopg2.extras
pg_conn = psycopg2.connect(
database="my_db",
user='user1',
password='XXX',
host='localhost', port='5432'
)
# Set Up DuckDB
db = duckdb.connect()
db.execute("INSTALL spatial;")
db.execute("LOAD spatial;")
db.execute("INSTALL parquet;")
db.execute("LOAD parquet;")
def insert_values_iterator(
pg_conn,
my_list: Iterator[Dict[str, Any]],
template=None,
page_size: int = 100,
) -> None:
pg_conn.autocommit = True
params = '(%s,%s,%s,ST_GeomFromWKB(%s, 4326),%s,%s)'
print(type(Trans)) ## <class 'list_iterator'>
## my_list contains 10000 rows
try:
with pg_conn.cursor() as cursor:
psycopg2.extras.execute_values(cursor, f"""
INSERT INTO my_table(a,b,c,geometry,x,y) VALUES %s;""",
((
row['a'], # "Protected Attributes"
row['b'], # Not getting here to see it's value
row['c'],
row['geometry'],
row['x'],
row['y']
) for row in my_list) , page_size=page_size, template= params
) # End psycopg2.extras.execute_values
except Exception as e:
print(str(e))
with open("ErrLog.txt", "a") as log:
log.write(str(e) )
def Process_Parquet(pq_file, db, pg_conn):
# Empty list
my_list = []
RowCount = 0
try:
# Use DuckDB To read files
duckSQL = f"SELECT a,b,JSON(c) as c_json,geometry,x,y FROM '{pq_file}';"
db.execute(duckSQL)
rows = db.fetchall()
for row in rows:
a = row[0]
b = row[1]
c =row[2]
geometry = row[3]
x = row[4]
y=row[5]
# Add row to my_list
my_list.append(
{'a': a, 'b': c,'geometry': geometry,'x': x, 'y': y})
# pass my_dict Dictionary to insert function
if RowCount >= 10000:
insert_values_iterator (pg_conn, iter(my_list) )
# Reset
RowCount =0
my_list=[]
# Deal with leftovers
if len(my_list) >0:
insert_values_iterator (pg_conn, iter(my_list) )
except Exception as e:
print(str(e))
with open("ErrLog.txt", "a") as log:
log.write(str(e) + ', ' + pq_file)
pq_file = 'Path\To\my.parqet'
if os.path.exists(pq_file):
Process_Parquet(pq_file, db, pg_conn)
UPDATE
I now realise I should be passing a list of dictionaries to insert_values_iterator
Inside this method row['a'] is still "Protected Attributes" I’m not able step through the code to see what it’s value is Each row should look something like
{'a': a, 'b': c,'geometry': geometry,'x': x, 'y': y}
This is now being passed to the database
however in the databse column a contains the entire row
a,b,c{json string}, geom as WKB,x,y
and the other columns are populated correctly even the geometry column
Hopping someone can see why