How to copy data from SQLite to postgreSQL?

69 views Asked by At

I have a task and I need to copy data from one SQLite table to almost identical postgreSQL table. I wrote a code that doesn't work and I can't figure out what's the problem. It prints that connection was established successfull, shows no errors, but when I go to the terminal to check content.film_work table with command SELECT * FROM content.film_work; I don't see the data. It shows:

movies_database=# SELECT * FROM content.film_work;



created | modified | id | title | description | creation_date | rating | type | file_path 
---------+----------+----+-------+-------------+---------------+--------+------+-----------
(0 rows)

Here are some steps:

  1. Connect to the SQLite database and copy data from film_work table to dataclass.
  2. Pass list of dataclasses instances with table rows to another function whee I connect to the postgreSQL.
  3. Connect to the postgreSQL and get postgreSQL table (content.film_work) column names to pass it to the INSERT SQL query.
  4. Before all this I change the order of columns to correctly pass data from SQLite to postgreSQL.

SQLite table (film_work):

0|id|TEXT|0||1
1|title|TEXT|1||0
2|description|TEXT|0||0
3|creation_date|DATE|0||0
4|file_path|TEXT|0||0
5|rating|FLOAT|0||0
6|type|TEXT|1||0
7|created_at|timestamp with time zone|0||0
8|updated_at|timestamp with time zone|0||0

postgreSQL table (content.film_work):

 created | modified | id | title | description | creation_date | rating | type | file_path

Code snippet:

psycopg2.extras.register_uuid()
db_path = 'db.sqlite'

@contextmanager
def conn_context(db_path: str):
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    try:
        yield conn
    finally:
        conn.close() 

@dataclass
class FilmWork:
    created_at: date = None
    updated_at: date = None
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    title: str = ''
    description: str = ''
    creation_date: date = None
    rating: float = 0.0
    type: str = ''
    file_path: str = ''


def __post_init__(self):
    if self.creation_date is None:
        self.creation_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.created_at is None:
        self.created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.updated_at is None:
        self.updated_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.description is None:
        self.description = 'Нет описания'
    if self.rating is None:
        self.rating = 0.0


def copy_from_sqlite():
    with conn_context(db_path) as connection:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM film_work;")

        result = cursor.fetchall()

        films = [FilmWork(**dict(film)) for film in result]

        save_film_work_to_postgres(films)

def save_film_work_to_postgres(films: list):
    psycopg2.extras.register_uuid()
    dsn = {
        'dbname': 'movies_database',
        'user': 'app',
        'password': '123qwe',
        'host': 'localhost',
        'port': 5432,
        'options': '-c search_path=content',
    }

    try:
        conn = psycopg2.connect(**dsn)
        print("Successfull connection!")

        with conn.cursor() as cursor:
            cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
            column_names_list = [row[0] for row in cursor.fetchall()]            
            column_names_str = ','.join(column_names_list)

            col_count = ', '.join(['%s'] * len(column_names_list)) 
            bind_values = ','.join(cursor.mogrify(f"({col_count})", astuple(film)).decode('utf-8') for film in films)

            cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
    except psycopg2.Error as _e:
        print("Ошибка:", _e)
    finally:
        if conn is not None:
            conn.close()
    

copy_from_sqlite()
1

There are 1 answers

2
Meher Khurana On

check if this code execute correctly made some changes: data formatting, field initialization and fixed the column name mismatch

import sqlite3
import psycopg2
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from psycopg2 import extras
from uuid import UUID

psycopg2.extras.register_uuid()
db_path = 'db.sqlite'

@contextmanager
def conn_context(db_path: str):
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    try:
        yield conn
    finally:
        conn.close() 

@dataclass
class FilmWork:
    created_at: datetime = None
    updated_at: datetime = None
    id: UUID = field(default_factory=uuid.uuid4)
    title: str = ''
    description: str = ''
    creation_date: datetime = None
    rating: float = 0.0
    type: str = ''
    file_path: str = ''

    def __post_init__(self):
        if self.creation_date is None:
            self.creation_date = datetime.now()
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.updated_at is None:
            self.updated_at = datetime.now()
        if self.description is None:
            self.description = 'Нет описания'
        if self.rating is None:
            self.rating = 0.0

def copy_from_sqlite():
    with conn_context(db_path) as connection:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM film_work;")

        result = cursor.fetchall()

        films = [FilmWork(**dict(film)) for film in result]

        save_film_work_to_postgres(films)

def save_film_work_to_postgres(films: list):
    dsn = {
        'dbname': 'movies_database',
        'user': 'app',
        'password': '123qwe',
        'host': 'localhost',
        'port': 5432,
        'options': '-c search_path=content',
    }

    try:
        conn = psycopg2.connect(**dsn)
        print("Successful connection!")

        with conn.cursor() as cursor:
            cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
            column_names_list = [row[0] for row in cursor.fetchall()]            
            column_names_str = ','.join(column_names_list)

            col_count = ', '.join(['%s'] * len(column_names_list)) 
            bind_values = ','.join(cursor.mogrify(f"({col_count})", film).decode('utf-8') for film in films)

            cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
            conn.commit()  # Don't forget to commit changes
    except psycopg2.Error as _e:
        print("Ошибка:", _e)
    finally:
        if conn is not None:
            conn.close()

copy_from_sqlite()