Python: How use psycopg2.extras.execute_values for bulk insert

154 views Asked by At

I'm trying to read parquet files with geometry in Well Known Blob into a PostgreSQL / PostGIS database using DUCK DB and psycopg2

This Works for a single row but is really slow

SQL = """INSERT INTO my_table(a,b,c,geometry,x,y)  VALUES (%s,%s,%s,ST_GeomFromWKB(%s, 4326),%s,%s);"""
params = [a,b,c,geometry,x,y]
cursor = pg_conn.cursor()
cursor.execute(SQL, params)

The code bellow is based on this page This enables you to use a template in theory similar to the single row method above

Unfortuatly it's Erroring at insert_values_iterator <class 'dict_keyiterator'> 'dict_keyiterator' object is not subscriptable

I realise my_dict is not correctly formatted for this, I don’t know what is supposed to be

Could someone explain what I'm doing wrong NOTE: This is the gist of what I'm doing and may contain typos

# -*- coding: utf-8 -*-
import duckdb
import os
from typing import Iterator, Dict, Any

import psycopg2
import psycopg2.extras
pg_conn = psycopg2.connect(
    host='localhost', port='5432'


# Set Up DuckDB
db = duckdb.connect()

db.execute("INSTALL spatial;")
db.execute("LOAD spatial;")
db.execute("INSTALL parquet;")
db.execute("LOAD parquet;")

def insert_values_iterator(
    my_list: Iterator[Dict[str, Any]],
    page_size: int = 100,
) -> None:
    pg_conn.autocommit = True
    params = '(%s,%s,%s,ST_GeomFromWKB(%s, 4326),%s,%s)'
    print(type(Trans)) ## <class 'list_iterator'>

    ## my_list contains 10000 rows 
        with pg_conn.cursor() as cursor:
            psycopg2.extras.execute_values(cursor, f"""
            INSERT INTO my_table(a,b,c,geometry,x,y)  VALUES %s;""",
                    row['a'], # "Protected Attributes"
                    row['b'],  #  Not getting here to see it's value
                    ) for row in my_list) , page_size=page_size, template= params

            ) # End psycopg2.extras.execute_values

    except Exception as e:
        with open("ErrLog.txt", "a") as log:
            log.write(str(e) )

def Process_Parquet(pq_file, db, pg_conn):
    # Empty list
    my_list = []
    RowCount = 0

        # Use DuckDB To read files
        duckSQL = f"SELECT a,b,JSON(c) as c_json,geometry,x,y FROM '{pq_file}';"
        rows = db.fetchall()
        for row in rows:
            a = row[0]
            b = row[1]
            c =row[2]
            geometry = row[3]
            x = row[4]

            # Add row to my_list
                {'a': a, 'b': c,'geometry': geometry,'x': x, 'y': y})
            # pass my_dict Dictionary to insert function
            if RowCount >= 10000:
                insert_values_iterator (pg_conn, iter(my_list) )
                # Reset
                RowCount =0

        # Deal with leftovers
        if len(my_list) >0:
           insert_values_iterator (pg_conn, iter(my_list) )

    except Exception as e:
        with open("ErrLog.txt", "a") as log:
            log.write(str(e) + ', ' + pq_file)

pq_file = 'Path\To\my.parqet'
    if os.path.exists(pq_file):
        Process_Parquet(pq_file, db, pg_conn)


I now realise I should be passing a list of dictionaries to insert_values_iterator

Inside this method row['a'] is still "Protected Attributes" I’m not able step through the code to see what it’s value is Each row should look something like

 {'a': a, 'b': c,'geometry': geometry,'x': x, 'y': y}

This is now being passed to the database

however in the databse column a contains the entire row

a,b,c{json string}, geom as WKB,x,y

and the other columns are populated correctly even the geometry column

Hopping someone can see why


There are 0 answers