I have a dataframe, read from Avro file in Hadoop, with three columns (a,b,c), Where one is a key column and among two other columns one is of integer type and the other is of date type.
I am ordering the frame by the integer column and date column and then calling drop_duplicates by key column (a) on the resulting frame.
frame = frame.orderBy(["b","c"],ascending=False)
frame = frame.drop_duplicate('a')
Based on Spark Scala code I can see that orderBy
calls sort method internally, which does a global sorting.
/**
* Returns a new Dataset sorted by the given expressions. For example:
* {{{
* ds.sort($"col1", $"col2".desc)
* }}}
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): Dataset[T] = {
sortInternal(global = true, sortExprs)
}
And Also drop_duplicates(cols) method is translated to Aggregate(first(cols)) as per bellow spark code.
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
case d @ Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
val newAgg = Aggregate(nonemptyKeys, aggCols, child)
val attrMapping = d.output.zip(newAgg.output)
newAgg -> attrMapping
}
}
So I am expecting that drop duplicate will retrain the first rows after sorting and drop others. but I am observing in my spark jobs that this is not true.
Any Thoughts why?
No.
Sorting by b & c and then dropping by a, would work as you would like, if and only if there was only 1 partition to process. With Big Data that is generally not the case.
Has nothing to do with avro or pyspark. Also, order by b, c may also be non-deterministic.