Here are some bullet points in terms of how I have things setup:
- I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
- I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table.
By re-running a job, I am getting duplicate rows in redshift (as expected). However, is there way to replace or delete rows before inserting the new data, using a key or the partitions setup in glue?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
columnMapping = [
("id", "int", "id", "int"),
("name", "string", "name", "string"),
]
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
job.commit()
This was the solution I got from AWS Glue Support:
As you may know, although you can create primary keys, Redshift doesn't enforce uniqueness. Therefore, if you are rerunning Glue jobs then duplicate rows can get inserted. Some of the ways to maintain uniqueness are:
Use a staging table to insert all rows and then perform a upsert/merge [1] into the main table, this has to be done outside of glue.
Add another column in your redshift table [1], like an insert timestamp, to allow duplicate but to know which one came first or last and then delete the duplicate afterwards if you need to.
Load the previously inserted data into dataframe and then compare the data to be insert to avoid inserting duplicates[3]
[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html and http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/
[2] - https://github.com/databricks/spark-redshift/issues/238
[3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html