Consider the code:
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object JsonAwsSchemaExample extends App{
val awsCredentials = new DefaultAWSCredentialsProviderChain().getCredentials
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("spark-example")
.getOrCreate()
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", awsCredentials.getAWSAccessKeyId)
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", awsCredentials.getAWSSecretKey)
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
val schema = StructType(Array(
StructField("_corrupt_record", StringType, true),
StructField("myErrorField", StringType, false),
StructField("dataType", StringType, true)
))
spark.read
.schema(schema)
.option("enforceSchema", true)
.option("mode", "PERMISSIVE")
.json("s3a://my-bucket/my-table").show(20, false)
}
This give me en output like
+---------------+------------+------------+
|_corrupt_record|myErrorField|dataType |
+---------------+------------+------------+
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
|null |null |notification|
+---------------+------------+------------+
The myErrorField
created with nullable
false parameter but spark do not place any records to _corrupt_record
column. Is there a way to force this?