I have the following data in a pyspark dataframe where both the columns contain string data.
data = [(123, '[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL":"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}]')]
ar = spark.createDataFrame(data, ['id', 'val'])
| id | val |
|---|---|
| 123 | [{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL":"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}] |
Now, my aim is to transform the string data from val column to dictionary data. For example:
{
"A": 0.1,
"B": 0.2,
"C": 0.3,
"D": 0.4
}
So, the data looks like the following:
| id | val |
|---|---|
| 123 | {"A": 0.1, "B": 0.2, "C": 0.3,"D": 0.4} |
Note: I also have to convert the data from FLD_VAL to be decimal.
I have tried the following code:
def func(rows) :
lp= { row['FLD_NAME'] : row['FLD_VAL'] for row in rows }
return lp
arr = ar\
.rdd\
.map(lambda row: (row[0], func(row[1])))\
.groupByKey()\
.toDF(["id","val"])
This code is throwing the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 20.0 failed 4 times, most recent failure: Lost task 15.3 in stage 20.0 (TID 186) (10.5.152.101 executor 0): org.apache.spark.api.python.PythonException: 'TypeError: string indices must be integers', from <
Here's one way to do it. Basically read the string as json, explore the array, extract out the individual element, create map out of the individual lists.
I am sure this can be done better using transform. But this is more self-explanatory.
Output :