Does Spark 3.0.1 support custom Aggregators on window functions?

544 views Asked by At

I wrote a custom Aggregator (an extension of org.apache.spark.sql.expressions.Aggregator) and Spark invokes it correctly as an aggregating function under group by statement:

sparkSession
    .createDataFrame(...)
    .groupBy(col("id"))
    .agg(
        new MyCustomAggregator().toColumn().name("aggregation_result"))
    .show();

I would like to use it within window function though, because ordering matters to me. I've tried invoking it like that:

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

That's the error I get:

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.

Is it at all possible to use custom Aggregators as window functions in Spark 3.0.1? If so, what am I missing here?

1

There are 1 answers

0
igor On BEST ANSWER

Yes, Spark 3 does indeed support custom aggregators as window functions.

Here is the Java code:

UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

AggregationInput here is a simple DTO with the row elements needed for your aggregation function.

So no matter whether you aggregate under group by or as a window function you still want to use org.apache.spark.sql.expressions.Aggregator.