Spark collect_list change data_type from array to string

1.8k views Asked by At

I am having a following aggregation

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))

Here I am doing some aggregation and collecting the result with collect_list. Earlier we were using spark 1 and it was giving me below data types.

 |-- final_data1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_data2: array (nullable = true)
 |    |-- element: string (containsNull = true)

Now we have to migrate to spark 2 but we are getting below schema.

|-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

On getting first() record below is the difference

spark 1.6

[2020-09-26, Ayush, 103.67] => datatype string

spark 2 

WrappedArray(2020-09-26, Ayush, 103.67)

How can I keep the same data type?

Edit - Tried Using Concat

One way I got exact schema like Spark 1.6 is by using concat like this

val df_date_agg = df
    .groupBy($"msisdn",$"event_date",$"network")
    .agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
    .groupBy($"msisdn")
    .agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))

Will it affect my code performance?? Is there a better way to do this?

2

There are 2 answers

0
Oli On BEST ANSWER

Since you want a string representation of an array, how about casting the array into a string like this?

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
         collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))

It might simply be what your old version of spark was doing.

The solution you propose would probably work as well by the way but wrapping your column references with lit is not necessary (lit($"event_date")). $"event_date" is enough.

1
Nihad TP On

Fllttening final1 and final2 columns would fix this problem.

val data = Seq((1,"A", "B"), (1, "C", "D"), (2,"E", "F"), (2,"G", "H"), (2,"I", "J"))

val df = spark.createDataFrame(
  data
).toDF("col1", "col2", "col3")

val old_df = df.groupBy(col("col1")).agg(
    collect_list(
        array(
            col("col2"), 
            col("col3")
            )
    ).as("final")
    )
val new_df = old_df.select(col("col1"), flatten(col("final")).as("final_new"))
println("Input Dataframe")

df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()

println("New schema format")
new_df.show(false)
new_df.printSchema()

Output:

Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |A   |B   |
|1   |C   |D   |
|2   |E   |F   |
|2   |G   |H   |
|2   |I   |J   |
+----+----+----+

Old schema format
+----+------------------------+
|col1|final                   |
+----+------------------------+
|1   |[[A, B], [C, D]]        |
|2   |[[E, F], [G, H], [I, J]]|
+----+------------------------+

root
 |-- col1: integer (nullable = false)
 |-- final: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

New schema format
+----+------------------+
|col1|final_new         |
+----+------------------+
|1   |[A, B, C, D]      |
|2   |[E, F, G, H, I, J]|
+----+------------------+

root
 |-- col1: integer (nullable = false)
 |-- final_new: array (nullable = true)
 |    |-- element: string (containsNull = true)

In you specefic case

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
         .select(flatten(col("final_data1").as("final_data1"), flatten(col("final_data2).as("final_data2))