```
ItemStruct = StructType([StructField("BomId", StringType()), StructField("price", StringType())])
BomStruct = StructType([StructField("OrderId",StringType()), StructField("items", ArrayType(ItemStruct))])
sampledata_sof = [Row("123-A", [Row("Bom-11", 120), Row("Bom-12", 140)]), Row("100-A", [Row("Bom-23", 170), Row("Bom-24", 190)])]
dfSampleBom = spark.createDataFrame(spark.sparkContext.parallelize(sampledata_sof), BomStruct)
dfSampleBom.printSchema()
dfSampleBom.show()```
Question: Given the above structure, how to achieve the following? if Bom-11 is in items, add item Bom-99 (price $99). Expected Output : Row with OrderId = 123-A should include {Bom-99, 99} in the list of items. In other words would like to generate and conditionally add one or a few elements into the items ArrayType column.
Tried using
df.rdd.map(lambda x: generateItems(x))
but got error pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
The number of items in the df is in order of 1000s and hence would like to have a solution that can spark can natively distribute and process efficiently. (Read that UDFs may not be able to get distributed across worker nodes hence not sure if that is even an option)
You can use
filter
first to identify ifitems
hasBom-11
or not, then usearray_insert
orconcat
to insert a struct into an existing array.Pyspark 3.4+
Pyspark 3.1+