Amazon deequ does not run in container but works locally

133 views Asked by At

I am unable to execute deequ functionalities when I try to run the job on k8s. However, it works correctly in local. I am using 2.0.0-spark-3.1 as dependency. As a trivial test, I tried to run the following

val df = Seq(1,2,3).toDF("id")

df.show()

val suggestionsResult = ConstraintSuggestionRunner()
  .onData(df)
  .addConstraintRules(Rules.DEFAULT)
  .run

suggestionsResult.constraintSuggestions.foreach {
  case (column, suggestions) => suggestions.foreach { suggestion =>
      println(
        s"Constraint suggestion for $column: \t${suggestion.description}\n" +
          s"the corresponding scala code is ${suggestion.codeForConstraint}"
      )
    }
}

On local, it prints

Constraint suggestion for id:   'id' is not null
the corresponding scala code is .isComplete("id")
Constraint suggestion for id:   'id' has no negative values
the corresponding scala code is .isNonNegative("id")

but throws the following error when i run it on k8s.

java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.profileAndSuggest(ConstraintSuggestionRunner.scala:203)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.run(ConstraintSuggestionRunner.scala:102)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunBuilder.run(ConstraintSuggestionRunBuilder.scala:226)

Both the image as well as my local uses Spark 3.1.1

Also, when I look at the deequ jar, I see the following

enter image description here

I am not sure why it contains org.apache.spark package and whether it is the missing org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression that is causing the error. However, why does it not then throw an error on local and how to resolve this issue?

0

There are 0 answers