pyspark dataframe parent child hierarchy issue

2.4k views Asked by At

In continuation with the issue: pyspark dataframe withColumn command not working

I have a input dataframe: df_input (updated df_input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|12     |f      |&a     |
|12     |a      |f9     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

If you see the inp_col and inp_val is having a hierarchy and it can be n number with the root value. Here the parent value are "b" and "a".

Now, as per my requirement I have to replace the child values starting with "&" to its corresponding values. I have tried in iterating over the list of values starting with '&' values in inp_val column and replacing with list of values over every iteration. But, it didn't get worked. I'm facing issue how to get the list with parent and child list values.

tried code:

list_1 = [row['inp_val'] for row in tst.select(tst.inp_val).where(tst.inp_val.substr(0, 1) == '&').collect()]
# removing the '&' at every starting of the list values
list_2 = [list_val[1:] for list_val in list_1]
tst_1 = tst.withColumn("val_extract", when(tst.inp_val.substr(0, 1) == '&', regexp(tst.inp_val, "&", "")).otherwise(tst.inp_val))
for val in list_2:
   df_leaf = tst_1.select(tst_1.val_extract).where(tst_1.inp_col == val)
   list_3 = [row['val_extract'] for row in df_leaf.collect()]

   tst_1 = tst_1.withColumn('bool', when(tst_1.val_extract == val, 'True').otherwise('False'))
   tst_1 = tst_1.withColumn('val_extract', when(tst_1.bool == 'True', str(list_3)).otherwise(tst_1.val_extract)).drop('bool')

Updated Expected Output:

|comment|inp_col|inp_val|inp_extract                  |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|12     |f      |&a     |['a1, 'a2']                  |
|12     |f      |f9     |['f9']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |

After that I can try and do explode to get multiple row. But, the aove output is what we require and not able to get certain percent result.

2

There are 2 answers

0
srikanth holur On

You can join the data frame to itself to get this.

input : 
df.show()

+-------+-------+---------+
|comment|inp_col|input_val|
+-------+-------+---------+
|     11|      a|       a1|
|     12|      a|       a2|
|     13|      f|       &a|
|     14|      b|       b3|
|     15|      b|       b4|
|     16|      d|       &b|
+-------+-------+---------+

import pyspark.sql.functions as F


df.createOrReplaceTempView("df1")
df.withColumn("input_val", F.regexp_replace(F.col("input_val"), "&", "")).createOrReplaceTempView("df2")

spark.sql("""select * from (select coalesce(df2.comment,df1.comment) as comment , 
coalesce(df2.inp_col,df1.inp_col) as inp_col,
 coalesce(df2.input_val,df2.input_val) as input_val ,
 case when df1.input_val is not null then df1.input_val else df2.input_val end  as output
 from df1  full outer join df2 on df2.input_val = df1.inp_col) where input_val is not null order by comment  """).show()
Output
+-------+-------+---------+------+
|comment|inp_col|input_val|output|
+-------+-------+---------+------+
|     11|      a|       a1|    a1|
|     12|      a|       a2|    a2|
|     13|      f|        a|    a1|
|     13|      f|        a|    a2|
|     14|      b|       b3|    b3|
|     15|      b|       b4|    b4|
|     16|      d|        b|    b3|
|     16|      d|        b|    b4|
+-------+-------+---------+------+
4
murtihash On

If you really want to avoid using graphs and your case is not more complex than shown above, try this.

from pyspark.sql import functions as F

df.show() #sampledataframe

#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#|     11|        a|       a1|
#|     12|        a|       a2|
#|     12|        f|       &a|
#|     12|        f|       f9|
#|     15|        b|       b3|
#|     16|        b|       b4|
#|     17|        c|       &b|
#|     17|        c|       c5|
#|     17|        d|       &c|
#|     17|        d|       d6|
#|     17|        e|       &d|
#|     17|        e|       e7|
#+-------+---------+---------+

df1=df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.expr("""substring(input_val,0,1)!""")!=F.lit('&'), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1")

df2=df1.join(df1.selectExpr("input_val as input_val1","new_col as new_col1"), F.expr("""array_contains(new_col,input_val1) and\
           substring(input_val1,0,1)=='&'"""),'left')


df2.join(df2.selectExpr("input_val1 as val2","new_col1 as col2")\
         .dropna(),F.expr("""array_contains(new_col1,val2)"""),'left')\
  .withColumn("inp_extract", F.when(F.expr("""substring(input_val,0,1)!='&'"""), F.col("new_col"))\
                        .otherwise(F.expr("""filter(concat(\
                        coalesce(new_col,array()),\
                        coalesce(new_col1,array()),\
                        coalesce(col2, array()))\
                        ,x-> x is not null and substring(x,0,1)!='&')""")))\

  .select("comment","input_col","input_val",F.array_sort("inp_extract").alias("inp_extract")).show()

#+-------+---------+---------+----------------+
#|comment|input_col|input_val|     inp_extract|
#+-------+---------+---------+----------------+
#|     11|        a|       a1|            [a1]|
#|     12|        a|       a2|            [a2]|
#|     12|        f|       &a|        [a1, a2]|
#|     12|        f|       f9|            [f9]|
#|     15|        b|       b3|            [b3]|
#|     16|        b|       b4|            [b4]|
#|     17|        c|       &b|        [b3, b4]|
#|     17|        c|       c5|            [c5]|
#|     17|        d|       &c|    [b3, b4, c5]|
#|     17|        d|       d6|            [d6]|
#|     17|        e|       &d|[b3, b4, c5, d6]|
#|     17|        e|       e7|            [e7]|
#+-------+---------+---------+----------------+