I'm trying to create an ETL job in AWS Glue. The use-case is as follows: When a column gets added in one of the source table after running ETL job, and when we try to re run the etl job, the etl job fails saying column not found (in target table)
How can I enable ETL to create that column in target table. Because ETL already has right to create table when it doesn't exist.
Example:
Source Table:
Table X: column_1, column_2
Table Y: column_1, column_3, column_4
ETL Job Configured to join both of them resulting into
Table_XY: column_1, column_2, column_3, column_4
Until this it works perfectly.
Now if Table Y gets modifies as below
Table Y: column_1, column_3, column_4, **column_5**
And I rerun crawlers (which detects column on source)
Then I rerun the ETL job, it fails with below error message
Column "column_5" not found in schema
How can I solve this?
Updated with Glue Script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "db_source", table_name = "sourc_table_x", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_source", table_name = "sourc_table_x", transformation_ctx = "datasource0")
## @type: DataSource
## @args: [database = "db_source", table_name = "sourc_table_y", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource1"]
## @return: datasource1
## @inputs: []
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_source", table_name = "sourc_table_y", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource1")
## @type: Join
## @args: [keys1 = ['column_1'], keys2 = ['column_1']]
## @return: join2
## @inputs: [frame1 = datasource0, frame2 = datasource1]
join2 = Join.apply(frame1 = datasource0, frame2 = datasource1, keys1 = ['column_1'], keys2 = ['column_1'], transformation_ctx = "join2")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = join2]
resolvechoice2 = ResolveChoice.apply(frame = join2, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "my-db-connection", connection_options = {"dbtable": "target_table_xy", "database": "db_target"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "my-db-connection", connection_options = {"dbtable": "target_table_xy", "database": "db_target"}, transformation_ctx = "datasink4")
job.commit()