I want Spark to ignore bad records while saving into database

834 views Asked by At

I am saving the Rows in database using spark JDBC. The saving of the data works fine.

Issue: Spark aborts saving if it encounters any bad records (e.g. a column with null values when table is expecting the non-null value)

What I want: I want Spark to ignore the bad rows and move on to save the next row. How could this be achieved? I don't see much in documentation. Using StructType is not an option.

Any pointer?

My code looks like this.

class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)

    dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
        .option("driver", dbProperties.driverName)
        .option("truncate", "true")
        .option("batchsize", configurationProp.WriterBatchSize())
        .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}
1

There are 1 answers

1
FrancescoM On

Add a list of not null columns in the method and use them to create a filter condition to filter out bad rows

  class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession, notNullColumns : List[String]): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)
    val filterCondition = notNullColumns.map(c -> s"$c IS NOT NULL").mkString(" AND ")
    dataFrameTobeWritten.filter(filterCondition).write.mode(SaveMode.Overwrite)
    .option("driver", dbProperties.driverName)
    .option("truncate", "true")
    .option("batchsize", configurationProp.WriterBatchSize())
    .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}