I've got this dataframe

df1 = spark.createDataFrame([
    ('c', 'd', 3.0, 4),
    ('c', 'd', 7.3, 8),
    ('c', 'd', 7.3, 2),
    ('c', 'd', 7.3, 8),
    ('e', 'f', 6.0, 3),
    ('e', 'f', 6.0, 8),
    ('e', 'f', 6.0, 3),
    ('c', 'j', 4.2, 3),
    ('c', 'j', 4.3, 9),
], ['a', 'b', 'c', 'd'])
df1.show()
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|3.0|  4|
|  c|  d|7.3|  8|
|  c|  d|7.3|  2|
|  c|  d|7.3|  8|
|  e|  f|6.0|  3|
|  e|  f|6.0|  8|
|  e|  f|6.0|  3|
|  c|  j|4.2|  3|
|  c|  j|4.3|  9|
+---+---+---+---+

i did this to get the max of c of the couple a and b

df2 = df1.groupBy('a', 'b').agg(F.max('c').alias('c_max')).select(
        F.col('a'),
        F.col('b'),
        F.col('c_max').alias('c')
    )
df2.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  e|  f|6.0|
|  c|  d|7.3|
|  c|  j|4.3|
+---+---+---+

but now i need to get the values of d that should be

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|7.3|  8|
|  e|  f|6.0|  3|
|  c|  j|4.3|  9|
+---+---+---+---+

i tried to do an inner join between df1 and df2 but that didn't work:

condition = [df1.a ==  df2.a, df1.b ==  df2.b, df1.c ==  df2.c]
df3 = df1.join(df2,condition,"inner")
df3.show()
+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  a|  b|  c|
+---+---+---+---+---+---+---+
|  c|  d|7.3|  8|  c|  d|7.3|
|  c|  d|7.3|  8|  c|  d|7.3|
|  c|  d|7.3|  2|  c|  d|7.3|
|  e|  f|6.0|  3|  e|  f|6.0|
|  e|  f|6.0|  8|  e|  f|6.0|
|  e|  f|6.0|  3|  e|  f|6.0|
|  c|  j|4.3|  9|  c|  j|4.3|
+---+---+---+---+---+---+---+

i'm a beginner in pyspark, so please i need a little help to figure this out

2

There are 2 answers

9
pltc On BEST ANSWER

You can "zip" d and count of d and aggregate as usual to keep the frequency

df3 = (df1
    .groupBy('a', 'b', 'd')
    .agg(F.count('*').alias('d_count'))
    .groupBy('a', 'b')
    .agg(F.max(F.array('d_count', 'd')).alias('d_freq'))
    .select('a', 'b', F.col('d_freq')[1].alias('d'))
)

+---+---+---+
|  a|  b|  d|
+---+---+---+
|  c|  d|  8|
|  c|  j|  9|
|  e|  f|  3|
+---+---+---+

Now join both your df2 and this new df3 will give your desired output.

df2.join(df3, on=['a', 'b']).show()
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|7.3|  8|
|  c|  j|4.3|  9|
|  e|  f|6.0|  3|
+---+---+---+---+
1
Emma On

You can first count the frequency and assign the order value by sorting them in descending order. Then, get the first value where the order is 1.

This does not deal with tie breaking, if there are tie in the top frequency, this will pick whatever (non-deterministic).

from pyspark.sql import functions as F

df1 = (df1.withColumn('d_count', F.count('*').over(Window.partitionBy(['a', 'b', 'd'])))
 .withColumn('d_order', F.row_number().over(Window.partitionBy(['a', 'b']).orderBy(F.desc('d_count'))))
 .groupby(['a', 'b'])
 .agg(
   F.max('c').alias('c'),
   F.first(F.when(F.col('d_order') == 1, F.col('d'))).alias('d'))
)
# df1.show()

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  e|  f|6.0|  3|
|  c|  d|7.3|  8|
|  c|  j|4.3|  9|
+---+---+---+---+