Combining Spark schema without duplicates?

4.3k views Asked by At

To process the data I have, I am extracting the schema before, so that when I read the dataset, I provide the schema instead of going through the expensive step of inferring schema.

In order to construct the schema, I need to merge in several different schema into the final schema, so I have been using union (++) and distinct methods, but I keep getting org.apache.spark.sql.AnalysisException: Duplicate column(s) exception.

For example, say we have two schema in the following structure:

val schema1 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema2 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema3 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) ::
    StructField("ii", StringType, true) :: Nil
    ), true) :: Nil)

val final_schema = (schema1 ++ schema2 ++ schema3).distinct

println(final_schema)

which outputs:

StructType(
    StructField(A,StructType(
         StructField(i,StringType,true)),true), 
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

I understand that only schema structure that exactly match another schema will get filtered out by distinct. However I want the result to look like this:

StructType(
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

in which all the gets "combined" into one schema. I have sifted through all the methods in scala documentation but I cannot seem to find the right method to solve this. Any ideas?

EDIT:

The end goal will be to feed in final_schema to sqlContext.read.schema and read RDD of JSON strings using read method.

2

There are 2 answers

1
Chobeat On BEST ANSWER

Try something like this:

(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)

where getKey is a function that goes from a schema to the properties you want to consider for merging (for example the column name or the name of the sub fields). In the map function you can take the head or use some more elaborate function to keep a specific schema.

0
Rituparno Behera On

Spark with Scala:

val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)

Spark with Java:

StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());