I want to know at which circumstances Spark will perform merge as part of the UDAF function.
Motivation: I am using a lot of UDAF functions OVER a Window in my Spark project. Often I want to answer a question like:
How many times a credit card transaction was made in the same country as the current transaction in the window of 30 days?
The window would start on the current transaction but it will not include it in the count. It needs the value from current transaction to know which country to count in the past 30 days.
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
I wrote my customUDAF to do the counting. I always use .orderBy(orderByColumn.desc)
and thanks to .desc
the current transaction appears as first in the window during calculation.
UDAF functions require implementation of merge
function which merges two intermediate aggregation buffers in parallel computations. If any mergers occur, my current transaction
may not be the same for different buffers and the results of UDAF will be incorrect.
I wrote an UDAF function that counts number of mergers on my dataset and keeps only first transaction in window to be compared with current transaction.
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
When I run it on with spark 2.0.1 on a local master with 16 cpu, there are never any mergers and first transaction in the window is always current transaction. This is what I want. In the near future I will run my code on a x100 bigger dataset and on real distributed Spark cluster and want to know if mergers can happen there.
Questions:
- At which circumstances/conditons mergers take place in UDAF?
- Do Windows with orderBy ever have mergers?
- Is it possible to tell Spark not to do mergers?
merge
is called when partial applications of the aggregate function ("map side aggregation") are merged after the shuffle ("reduce side aggregation").In the current implementation never. As for now window functions are just fancy
groupByKey
, and there is no partial aggregation. This is of course implementation detail and might changed without further notice in the future.It is not. However if data is already partitioned by the aggregation key there is no need for
merge
and onlycombine
is used.Finally:
does not call for
UDAFs
or window functions. I would probably create tumbling windows witho.a.s.sql.functions.window
, aggregate by user, country and window and join back with the input.