Glue pyspark script to delete records from hudi table without loading into a dataframe

221 views Asked by At

I have a hudi table in S3, which is registered in the Glue Catalog. I wish to write a Glue pyspark job to delete all records that have a certain value in one of the fields.

The code samples I managed to find all start by loading the table into a DataFrame, then apllying filtration and eventually writing back to the table.

Here's a sample code to describe the desired outcome:

import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext

# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Specify the Hudi table path on Amazon S3
hudi_table_path = "s3://your-s3-bucket/hudi-table-path"

# Value to filter records on
value_to_delete = "your_value"

# Create a GlueContext and get the dynamic frame for the Hudi table
hudi_dyf = glueContext.create_dynamic_frame.from_catalog(database="your-database-name", table_name="your-table-name")

# Filter the records based on the field with the specified value
filtered_dyf = Filter.apply(frame=hudi_dyf, f=lambda x: x["your_field_name"] != value_to_delete)

# Delete the filtered records from the Hudi table
glueContext.write_dynamic_frame.from_catalog(frame=filtered_dyf, database="your-database-name", table_name="your-table-name", transformation_ctx="datasink")

# Commit the deletion operation
glueContext.commit()

# Cleanup
glueContext.cleanup()

I'm wondering if there's a way to do it without loading the entire table into memory - something like a delete command in SQL? My concern is about performance and cost with tables that have a huge amount of records.

1

There are 1 answers

1
Shubham Joshi On

Below is the another way to perform a Hard Delete, with the help of Spark SQL, where you do not need to Create a DataFrame on Full data and then perform a Filter Transformation. This is the sample code I have used in my job to perform hard delete on test table where emp_id =1

hard_delete_df = spark.sql("SELECT * FROM mydb.emptable where emp_id='1' ")
print(hard_delete_df.show())
print("\n")
hudi_options['hoodie.datasource.write.operation'] = 'delete'
hard_delete_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)

Also, you can directly use the SparkSQL to perform delete, please refer to the official document

DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';

Hope this helps!