Update a highly nested column from string to struct

312 views Asked by At
 |-- x: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- y: long (nullable = true)
 |    |    |-- z: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- log: string (nullable = true)

I have the above nested schema where I want to change column z's log from string to struct.

 |-- x: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- y: long (nullable = true)
 |    |    |-- z: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- log: struct (nullable = true)
 |    |    |    |    |    |    |-- b: string (nullable = true)
 |    |    |    |    |    |    |-- c: string (nullable = true)

I'm not using Spark 3 but Spark 2.4.x. Will prefer Scala way but python works too since this is a one time manual thing to backfill some past data.

Is there a way to do this with some udf or any other way?

I know it's easy to do this via from_json but the nested array of struct is causing issues.

2

There are 2 answers

0
wwnde On

Higher Order functions are your friend in this case. Coalesce basically. Code below

 df = df.withColumn('x', F.expr('transform(x, e -> struct(e.y as y, array(struct(coalesce(("1" as a,"2" as b)) as log))as z))')).printSchema()

root
 |-- x: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- y: long (nullable = true)
 |    |    |-- z: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- log: struct (nullable = false)
 |    |    |    |    |    |-- a: string (nullable = false)
 |    |    |    |    |    |-- b: string (nullable = false)
3
ZygD On

I think it depends on the values in your log column. I mean, the way you want to split the string into 2 separate fields.

The following PySpark code will just "move" your log values to b and c fields.

# Example data:

schema = (
    T.StructType([
        T.StructField('x', T.ArrayType(T.StructType([
            T.StructField('y', T.LongType()),
            T.StructField('z', T.ArrayType(T.StructType([
                T.StructField('log', T.StringType())
            ]))),
        ])))
    ])
)
df = spark.createDataFrame([
    [
        [[
            9,
            [[
                'text'
            ]]
        ]]
    ]
], schema)

df.printSchema()
# root
#  |-- x: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- y: long (nullable = true)
#  |    |    |-- z: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- log: string (nullable = true)
df = df.withColumn('x', F.expr('transform(x, e -> struct(e.y as y, array(struct(struct(e.z.log[0] as b, e.z.log[0] as c) as log)) as z))'))

df.printSchema()
# root
#  |-- x: array (nullable = true)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- y: long (nullable = true)
#  |    |    |-- z: array (nullable = false)
#  |    |    |    |-- element: struct (containsNull = false)
#  |    |    |    |    |-- log: struct (nullable = false)
#  |    |    |    |    |    |-- b: string (nullable = true)
#  |    |    |    |    |    |-- c: string (nullable = true)

If string transformations are needed on log column, e.z.log[0] parts need to be changed to include string transformations.