How to create a map column to count occurrences without udaf

1.5k views Asked by At

I would like to create a Map column which counts the number of occurrences.

For instance:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

would result in

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

For the moment, in Spark 2.4.6, I was able to make it using udaf.

While bumping to Spark3 I was wondering if I could get rid of this udaf (I tried using the new method aggregate without success)

Is there an efficient way to do it? (For the efficiency part, I am able to test easily)

4

There are 4 answers

4
Raphael Roth On BEST ANSWER

Here a Spark 3 solution:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

Here the solution using Aggregator:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+
2
Raphael Roth On

You could always use collect_list with UDF, but only if you groupings are not too lage:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

This should be faster than UDAF: Spark custom aggregation : collect_list+UDF vs UDAF

4
werner On

Here a solution with a single groupBy and a slightly complex sql expression. This solution works for Spark 2.4+

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),
       expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",
       expr("map_from_arrays(set,transform(set, x -> size(filter(list, y -> y=x))))"))
  .show()

Output:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a, b]|[a, a, b]|[a -> 2, b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

The idea is to collect the data from column a twice: one time into a set and one time into a list. Then with the help of transform for each element of the set the number of occurences of the particular element in the list is counted. Finally, the set and the number of elements are combined with map_from_arrays.

However I cannot say if this approach is really faster than a UDAF.

0
Sanket9394 On

We can achieve this is spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))