Best way to process Redshift data on Spark (EMR) via Airflow MWAA?

310 views Asked by At

We have an Airflow MWAA cluster and huge volume of Data in our Redshift data warehouse. We currently process the data directly in Redshift (w/ SQL) but given the amount of data, this puts a lot of pressure in the data warehouse and it is less and less resilient. A potential solution we found would be to decouple the data storage (Redshift) from the data processing (Spark), first of all, what do you think about this solution? To do this, we would like to use Airflow MWAA and SparkSQL to:

  • Transfer data from Redshift to Spark
  • Process the SQL scripts that were previously done in Redshift
  • Transfer the newly created table from Spark to Redshift

Is it a use case that someone here has already put in production?

What would in your opinion be the best way to interact with the Spark Cluster ? EmrAddStepsOperator vs PythonOperator + PySpark?

1

There are 1 answers

0
Hussein Awala On

You can use one of the two drivers:

  • spark-redshift connector: open source connector developed and maintained by databricks
  • EMR spark-redshift connector: it is developed by AWS and based on the first one, but with some improvements (github).

To load data from Redshift to spark, you can read the data table and process them in spark:

df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
    .option("dbtable", "my_table") \
    .option("tempdir", "s3a://path/for/temp/data") \
    .load()

Or take advantage of Redshift in a part of your processing by reading from a query result (you can filter, join or aggregate your data in Redshift before load them in spark)

df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
    .option("query", "select x, count(*) my_table group by x") \
    .option("tempdir", "s3a://path/for/temp/data") \
    .load()

You can do what you want with the loaded dataframe, and you can store the result to another data store if needed. You can use the same connector to load the result (or any other dataframe) in Redshift:

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table_copy") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .mode("error") \
  .save()

P.S: the connector is fully supported by spark SQL, so you can add the dependencies to your EMR cluster, then use the operator SparkSqlOperator to extract, transform then re-load your Redshift tables (SQL syntax example), or the operator SparkSubmitOperator if you prefer Python/Scala/JAVA jobs.