I'm trying to implement a datalake via python with Trino, Dagster as well as openpyxl and I am trying to initialize a table inside a catalog and a schema, (both defined in my config). My first query: " Create schema if not exists example" works whereas the second one does not. Could this be a problem with the transaction ending, so then my 2nd query is not taken into account?
I've tried not closing the cursor before executing the query with no avail, also tried inputting incorrect information inside of the query so that it would force an exception, however it did not trigger it.
Here is a code snippet where lista contains a list with tuples of type (name, type):
@op(required_resource_keys={'trino'})
def init(context,lista):
trino = context.resources.trino
with trino.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""CREATE SCHEMA if not exists my_catalog.example""")
try:
columns_definition = ', '.join([f'{col[0]} {col[1]}' for col in lista])
query = f'''CREATE TABLE if not exists ex1 ({columns_definition})'''
cursor.execute(query)
context.log.info(f'Table created with columns: {columns_definition}')
conn.commit()
except Exception as e:
conn.rollback()
context.log.error(f'Error creating table: {e}')
return []
@repository
def workspace():
config = {
"resources": {
"trino": {
"config": {
"host": "trino",
"port": "8060",
"user": "trino",
"password": "",
"catalog": "my_catalog",
"schema": "example"
}
}
}
}
resource_config = config.get("resources", {}).get("trino", {}).get("config", {})
with build_op_context(resources={'trino':trino_resource.configured(resource_config)}) as con:
return [init(con,read_files_op(con))]