Cannot write Lake Formation governed table data from Glue ETL Job

1.4k views Asked by At

I am building a POC with Lake Formation where I read a queue of train movement information and persist the individual events into a governed table using AWS data wrangler. This works fine.

Then I am trying to read this governed table with an AWS Glue ETL job, and write the resulting data into another governed table. This succeeds, and writes parquet files into the S3 bucket / folder underlying that table, but when I try and query the data it's not readable from Athena (an Athena query just returns no records)

I created the journey table using this Aws Wrangler statement:

aw.catalog.create_parquet_table(database = "train_silver", 
                            table = "journey", 
                            path = "s3://train-silver/journey/",
                            columns_types = {
                                'train_id': 'string',
                                'date': 'date',
                                'stanox': 'string',
                                'start_timestamp': 'timestamp',
                                'created': 'timestamp',
                                'canx_timestamp': 'bigint'
                            },
                            compression = "snappy",
                            partitions_types = {'segment_date': 'date'},
                            table_type = "GOVERNED")

Here's the code for the Glue job:

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger.info('About to start transaction')

tx_id = glueContext.start_transaction(False)

bronze_table = glueContext.create_dynamic_frame.from_catalog(database = "train_bronze", table_name = "train_movements_governed", 
    additional_options = { "transactionId": tx_id })
logger.info('About to save the bronze table to a view')
bronze_table.toDF().registerTempTable("train_movements")

max_journey_timestamp = 0

journey_df = spark.sql("""
    SELECT train_id, loc_stanox as stanox, CAST(canx_timestamp as bigint) AS canx_timestamp, segment_date
    FROM train_movements
    WHERE canx_type = 'AT ORIGIN'
    AND cast(canx_timestamp AS bigint) > {}""".format(max_journey_timestamp))

journey_df = journey_df.withColumn("created",current_timestamp())

def date_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0).date()
date_UDF = udf(lambda z: date_from_timestamp(z))

def date_time_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0)
date_time_UDF = udf(lambda z: date_from_timestamp(z))

journey_df = journey_df.withColumn("date", date_UDF(col("canx_timestamp")))
journey_df = journey_df.withColumn("start_timestamp", date_time_UDF(col("canx_timestamp")))
journey_df.printSchema()

try:
    save_journey_frame = DynamicFrame.fromDF(journey_df, glueContext, "journey_df")
    logger.info('Saving ' + str(save_journey_frame.count()) + 'new journeys')
    journeySink = glueContext.write_dynamic_frame.from_catalog(frame = save_journey_frame, database = "train_silver", table_name = "journey", 
        additional_options = { "callDeleteObjectsOnCancel": True, "transactionId": tx_id })
    logger.info('Committing transaction')
    glueContext.commit_transaction(tx_id)
    logger.info('Transaction committed')
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
logger.info('Committing the job')
job.commit()

When the Glue job is run, there are parquet files in the table folder, but they aren't organized in the partition folders defined by my table definition: Screenshot of Journey folder with parquet files but no partition folder

I also tried writing a glue job that reads the parquet files in that folder, they contain all the rows that they should.

Here's a screenshot of me trying to query the data in Athena: SELECT query in Athena scanning no data, returning no rows

What am I missing here, how do I get the data added to the governed table from a Spark glue job so I can query it from Athena?

1

There are 1 answers

1
Tasio On

I think the problem is that the objects on the table are not being updated.

You can check that using this AWS CLI command:

aws lakeformation get-table-objects --database-name train_silver --table-name journey

From the Format Options for ETL Inputs and Outputs in AWS Glue documentation

For writing Apache Parquet, AWS Glue ETL only supports writing to a governed table by specifying an option for a custom Parquet writer type optimized for Dynamic Frames. When writing to a governed table with the parquet format, you should add the key useGlueParquetWriter with a value of true in the table parameters.

You can alternatively set the classification parameter of your table to "glueparquet" when you create it (you can also update this):

aw.catalog.create_parquet_table(database = "train_silver", 
                        table = "journey", 
                        path = "s3://train-silver/journey/",
                        columns_types = {
                            'train_id': 'string',
                            'date': 'date',
                            'stanox': 'string',
                            'start_timestamp': 'timestamp',
                            'created': 'timestamp',
                            'canx_timestamp': 'bigint'
                        },
                        compression = "snappy",
                        parameters={
                            "classification": "glueparquet"
                        }
                        partitions_types = {'segment_date': 'date'},
                        table_type = "GOVERNED")