Using Pydequu on Jupyter Notebook and having this "An error occurred while calling o70.run.'

326 views Asked by At

I'm trying to use Pydequu on Jupyter Notebook when i try to use ConstraintSuggestionRunner and show this error:

Py4JJavaError: An error occurred while calling o70.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'

I'm using this setup for the test:

  • SDKMAN
  • sdk install java 8.0.292.hs-adpt
  • SPARK 3.0.0

I got this configs from awslabs/python-dequu on README.md file.

import os
from pyspark.sql import SparkSession, Row
import pydeequ

os.environ["SPARK_VERSION"] = "3.0.0"

The error it's from below code:

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("org.apache.spark.sql.catalyst", "spark-catalyst_2.12-3.1.2-amzn-0.jar")
    .getOrCreate())

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

Complete error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [3], in <cell line: 3>()
      1 from pydeequ.suggestions import *
      3 suggestionResult = ConstraintSuggestionRunner(spark) \
      4              .onData(df) \
      5              .addConstraintRule(DEFAULT()) \
----> 6              .run()
      8 # Constraint Suggestions in JSON format
      9 print(suggestionResult)

File /opt/conda/lib/python3.10/site-packages/pydeequ/suggestions.py:81, in ConstraintSuggestionRunBuilder.run(self)
     74 def run(self):
     75     """
     76     A method that runs the desired ConstraintSuggestionRunBuilder functions on the data to obtain a constraint
     77             suggestion result. The result is then translated to python.
     78 
     79     :return: A constraint suggestion result
     80     """
---> 81     result = self._ConstraintSuggestionRunBuilder.run()
     83     jvmSuggestionResult = self._jvm.com.amazon.deequ.suggestions.ConstraintSuggestionResult
     84     result_json = json.loads(jvmSuggestionResult.getConstraintSuggestionsAsJson(result))

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o70.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.profileAndSuggest(ConstraintSuggestionRunner.scala:203)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.run(ConstraintSuggestionRunner.scala:102)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunBuilder.run(ConstraintSuggestionRunBuilder.scala:226)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
1

There are 1 answers

0
florins On

I had the same issue by using almost the same environment as you (meaning Jupyter notebook and Spark 3.1.1). I solved the problem by following the steps:

  1. Download the deequ-2.0.0-spark-3.1.jar from maven repository https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar

  2. Upload deequ-2.0.0-spark-3.1.jar into the a Jupyter folder /home/jovyan/work/java-libs/

  3. In the notebook add the following line:

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:///home/jovyan/work/java-libs/deequ-2.0.0-spark-3.1.jar pyspark-shell'

  4. Use the initialization code

    from pyspark.sql import SparkSession, Row
    

    import pydeequ

    spark=(SparkSession .builder .getOrCreate())

By following the above steps I could get rid off the error that you've mentioned. However, I bumped into a new one:

  py4j.Py4JException: Constructor com.amazon.deequ.suggestions.rules.CategoricalRangeRule([]) does not exist

that I've solved with a code like this:

suggestionResult = (
    ConstraintSuggestionRunner(spark)
    .onData(df)
    .addConstraintRule(CompleteIfCompleteRule())
    .addConstraintRule(NonNegativeNumbersRule())
    .addConstraintRule(RetainCompletenessRule())
    .addConstraintRule(RetainTypeRule())
    .addConstraintRule(UniqueIfApproximatelyUniqueRule())
    .run()
)

print(json.dumps(suggestionResult))