AWS Glue: Column "column_name" not found in schema

3.1k views Asked by At

I'm trying to create an ETL job in AWS Glue. The use-case is as follows: When a column gets added in one of the source table after running ETL job, and when we try to re run the etl job, the etl job fails saying column not found (in target table)

How can I enable ETL to create that column in target table. Because ETL already has right to create table when it doesn't exist.

Example:

Source Table:

Table X: column_1, column_2

Table Y: column_1, column_3, column_4

ETL Job Configured to join both of them resulting into

Table_XY: column_1, column_2, column_3, column_4

Until this it works perfectly.

Now if Table Y gets modifies as below

Table Y: column_1, column_3, column_4, **column_5**

And I rerun crawlers (which detects column on source)

Then I rerun the ETL job, it fails with below error message

Column "column_5" not found in schema

How can I solve this?

Updated with Glue Script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "db_source", table_name = "sourc_table_x", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_source", table_name = "sourc_table_x", transformation_ctx = "datasource0")

## @type: DataSource
## @args: [database = "db_source", table_name = "sourc_table_y", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource1"]
## @return: datasource1
## @inputs: []
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_source", table_name = "sourc_table_y", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource1")

## @type: Join
## @args: [keys1 = ['column_1'], keys2 = ['column_1']]
## @return: join2
## @inputs: [frame1 = datasource0, frame2 = datasource1]
join2 = Join.apply(frame1 = datasource0, frame2 = datasource1, keys1 = ['column_1'], keys2 = ['column_1'], transformation_ctx = "join2")

## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = join2]
resolvechoice2 = ResolveChoice.apply(frame = join2, choice = "make_cols", transformation_ctx = "resolvechoice2")

## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink
## @args: [catalog_connection = "my-db-connection", connection_options = {"dbtable": "target_table_xy", "database": "db_target"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "my-db-connection", connection_options = {"dbtable": "target_table_xy", "database": "db_target"}, transformation_ctx = "datasink4")
job.commit()
0

There are 0 answers