pyspark - how to add a new element to ArrayType column

145 views Asked by At
```
ItemStruct = StructType([StructField("BomId", StringType()), StructField("price", StringType())])
BomStruct = StructType([StructField("OrderId",StringType()), StructField("items", ArrayType(ItemStruct))])
sampledata_sof = [Row("123-A", [Row("Bom-11", 120), Row("Bom-12", 140)]), Row("100-A", [Row("Bom-23", 170), Row("Bom-24", 190)])]

dfSampleBom = spark.createDataFrame(spark.sparkContext.parallelize(sampledata_sof), BomStruct)
dfSampleBom.printSchema() 
dfSampleBom.show()```

Output from jupyter notebook

Question: Given the above structure, how to achieve the following? if Bom-11 is in items, add item Bom-99 (price $99). Expected Output : Row with OrderId = 123-A should include {Bom-99, 99} in the list of items. In other words would like to generate and conditionally add one or a few elements into the items ArrayType column.

Tried using

    df.rdd.map(lambda x: generateItems(x))

but got error pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

The number of items in the df is in order of 1000s and hence would like to have a solution that can spark can natively distribute and process efficiently. (Read that UDFs may not be able to get distributed across worker nodes hence not sure if that is even an option)

1

There are 1 answers

2
Emma On

You can use filter first to identify if items has Bom-11 or not, then use array_insert or concat to insert a struct into an existing array.

Pyspark 3.4+

item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))

df = (dfSampleBom.select(
          'OrderId',
          F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0, 
                 F.array_insert('items', -1, item_to_ingest))
          .otherwise(F.col('items')).alias('items')))

Pyspark 3.1+

item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))

df = (dfSampleBom.select(
          'OrderId',
          F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0, 
                 F.concat('items', F.array(item_to_ingest)))
          .otherwise(F.col('items')).alias('items')))