Add a tag to the list in the DataFrame based on the data from the second DataFrame

60 views Asked by At

I have two DataFrames - the first one with the columns model, cnd, age, tags (this is a repeatable field - String list/array), min, max and the second one with the main_model column.
I would like to add the MAIN tag to the first DataFrame to the tags list if the model field value from this DataFrame coincides with any model from DataFrame with main models. The code is written in Scala. Example below:

INPUT:
DataFrame no1
+------+-----+----+-------+------+-----+
| model| cnd | age| tags  |  min | max |
+------+-----+----+-------+------+-----+
|  foo1|   xx|  10|  []   |   1  |  2  |
|  foo2|   yy|  20|  []   |   2  |  3  | 
|  foo3|   zz|  30|  []   |   3  |  4  | 
+------+-----+----+-------+------+-----+

DataFrame no 2 - list for verifying models from the first DataFrame

+-----------+
| main_model|
+-----------+
|  foo1     |
|  foo3     | 
|  foo5     | 
+-----------+


 OUTPUT:

+------+-----+----+-------+------+-----+
| model| cnd | age| tags  |  min | max |
+------+-----+----+-------+------+-----+
|  foo1|   xx|  10|[MAIN] |   1  |  2  |
|  foo2|   yy|  20|  []   |   2  |  3  | 
|  foo3|   zz|  30|[MAIN] |   3  |  4  | 
+------+-----+----+-------+------+-----+

I haven't been able to find a reasonable solution. So far Im trying with:

    dataFrame1.join(dataFrame2, dataFrame1("model") === dataFrame2("main_model"), "left_outer") .select(
dataFrame1("model"),
dataFrame1("cnd"),
dataFrame1("age"),
when(dataFrame2("main_model").isNotNull, concat(dataFrame1("tags"), lit(", MAIN"))).otherwise(dataFrame1("tags")).alias("tags"),
dataFrame1("min"),
dataFrame1("max")
)
1

There are 1 answers

0
M_S On BEST ANSWER

You were close, you can use array_union instead of concat

import org.apache.spark.sql.functions._

val data = Seq(("foo1","xx",10,Array[String](),1,2),
               ("foo2","yy",20,Array[String](),2,3),
               ("foo3","zzxx",30,Array[String](),3,4))
val df = data.toDF("model","cnd","age","tags","min","max")
val modelData = Seq(("foo1"),("foo3"),("foo5"))
val modelDf = modelData.toDF("main_model")

val joinedDf = df.join(modelDf, df("model") === modelDf("main_model"), "left")
joinedDf.select(
  df("model"),
  df("cnd"),
  df("age"),
  when(modelDf("main_model").isNotNull, array_union(df("tags"), lit(Array("MAIN")))).otherwise(df("tags")).alias("tags"),
  df("min"),
  df("max")
).show

And the output:

+-----+----+---+------+---+---+
|model| cnd|age|  tags|min|max|
+-----+----+---+------+---+---+
| foo1|  xx| 10|[MAIN]|  1|  2|
| foo2|  yy| 20|    []|  2|  3|
| foo3|zzxx| 30|[MAIN]|  3|  4|
+-----+----+---+------+---+---+