databricks-connect, Py4JJavaError while trying to use collect()

101 views Asked by At

I'm encountering an issue while running the following line of code in JupyterLab, which is connected to a Spark cluster:

df.select('col_1').distinct().collect()

This exact code executes successfully in a Databricks notebook, but when run locally, it results in an error. My environment includes Java 8 and databricks-connect version 10.4.40.

the issue arises only with operations that attempt to return data to Python, example: toPandas/related.

/tmp/ipykernel_53/2883452107.py:1 in <module>                                                    │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_53/2883452107.py'                           │
│                                                                                                  │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/dataframe.py:715 in collect                   │
│                                                                                                  │
│    712 │   │   │   │   os.remove(filename)                                                       │
│    713 │   │   # Default path used in OSS Spark / for non-DF-ACL clusters:                       │
│    714 │   │   with SCCallSiteSync(self._sc) as css:                                             │
│ ❱  715 │   │   │   sock_info = self._jdf.collectToPython()                                       │
│    716 │   │   return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))  │
│    717 │                                                                                         │
│    718 │   def toLocalIterator(self, prefetchPartitions=False):                                  │
│                                                                                                  │
│ ╭───────────────── locals ──────────────────╮                                                    │
│ │  css = None                               │                                                    │
│ │ self = DataFrame[CD_COMP: decimal(38,18)] │                                                    │
│ ╰───────────────────────────────────────────╯                                                    │
│                                                                                                  │
│ /usr/local/lib/python3.9/site-packages/py4j/java_gateway.py:1304 in __call__                     │
│                                                                                                  │
│   1301 │   │   │   proto.END_COMMAND_PART                                                        │
│   1302 │   │                                                                                     │
│   1303 │   │   answer = self.gateway_client.send_command(command)                                │
│ ❱ 1304 │   │   return_value = get_return_value(                                                  │
│   1305 │   │   │   answer, self.gateway_client, self.target_id, self.name)                       │
│   1306 │   │                                                                                     │
│   1307 │   │   for temp_arg in temp_args:                                                        │
│                                                                                                  │
│ ╭──────────────────────────────── locals ────────────────────────────────╮                       │
│ │       answer = 'xro101'                                                │                       │
│ │         args = ()                                                      │                       │
│ │ args_command = ''                                                      │                       │
│ │      command = 'c\no98\ncollectToPython\ne\n'                          │                       │
│ │         self = <py4j.java_gateway.JavaMember object at 0x7f3b137ecaf0> │                       │
│ │    temp_args = []                                                      │                       │
│ ╰────────────────────────────────────────────────────────────────────────╯                       │
│                                                                                                  │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py:117 in deco                          │
│                                                                                                  │
│   114 def capture_sql_exception(f):                                                              │
│   115 │   def deco(*a, **kw):                                                                    │
│   116 │   │   try:                                                                               │
│ ❱ 117 │   │   │   return f(*a, **kw)                                                             │
│   118 │   │   except py4j.protocol.Py4JJavaError as e:                                           │
│   119 │   │   │   converted = convert_exception(e.java_exception)                                │
│   120 │   │   │   if not isinstance(converted, UnknownException):                                │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │         a = (                                                                                │ │
│ │             │   'xro101',                                                                    │ │
│ │             │   <py4j.clientserver.JavaClient object at 0x7f3afed79280>,                     │ │
│ │             │   'o98',                                                                       │ │
│ │             │   'collectToPython'                                                            │ │
│ │             )                                                                                │ │
│ │ converted = UnknownException('java.lang.NumberFormatException: null',                        │ │
│ │             'java.lang.NumberFormatException: null\n\tat                                     │ │
│ │             java.lang.Long.parseLong(Long.java:552)\n\tat                                    │ │
│ │             java.lang.Long.parseLong(Long.java:631)\n\tat                                    │ │
│ │             scala.collection.immutable.StringLike.toLong(StringLike.scala:309)\n\tat         │ │
│ │             scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)\n\tat        │ │
│ │             scala.collection.immutable.StringOps.toLong(StringOps.scala:33)\n\tat            │ │
│ │             com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(Databricks… │ │
│ │             com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(Da… │ │
│ │             scala.Option.map(Option.scala:230)\n\tat                                         │ │
│ │             com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf… │ │
│ │             com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(Databrick… │ │
│ │             scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)\n\tat     │ │
│ │             org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueStr… │ │
│ │             org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.sca… │ │
│ │             scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat                      │ │
│ │             scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat                     │ │
│ │             scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat             │ │
│ │             scala.collection.IterableLike.foreach(IterableLike.scala:74)\n\tat               │ │
│ │             scala.collection.IterableLike.foreach$(IterableLike.scala:73)\n\tat              │ │
│ │             scala.collection.AbstractIterable.foreach(Iterable.scala:56)\n\tat               │ │
│ │             org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)\n\… │ │
│ │             org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)… │ │
│ │             org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(… │ │
│ │             org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution… │ │
│ │             org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(… │ │
│ │             org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)\n\tat       │ │
│ │             org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecutio… │ │
│ │             org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.sc… │ │
│ │             org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)\n\tat                │ │
│ │             org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)\n\tat           │ │
│ │             sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat                │ │
│ │             sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n… │ │
│ │             sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja… │ │
│ │             java.lang.reflect.Method.invoke(Method.java:498)\n\tat                           │ │
│ │             py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat               │ │
│ │             py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n\tat         │ │
│ │             py4j.Gateway.invoke(Gateway.java:295)\n\tat                                      │ │
│ │             py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat       │ │
│ │             py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat                     │ │
│ │             py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n\… │ │
│ │             py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n\tat           │ │
│ │             java.lang.Thread.run(Thread.java:750)\n', None)                                  │ │
│ │         f = <function get_return_value at 0x7f3b1b3b2550>                                    │ │
│ │        kw = {}                                                                               │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                  │
│ /usr/local/lib/python3.9/site-packages/py4j/protocol.py:326 in get_return_value                  │
│                                                                                                  │
│   323 │   │   │   type = answer[1]                                                               │
│   324 │   │   │   value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)                     │
│   325 │   │   │   if answer[1] == REFERENCE_TYPE:                                                │
│ ❱ 326 │   │   │   │   raise Py4JJavaError(                                                       │
│   327 │   │   │   │   │   "An error occurred while calling {0}{1}{2}.\n".                        │
│   328 │   │   │   │   │   format(target_id, ".", name), value)                                   │
│   329 │   │   │   else:                                                                          │
│                                                                                                  │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮                     │
│ │         answer = 'xro101'                                                │                     │
│ │ gateway_client = <py4j.clientserver.JavaClient object at 0x7f3afed79280> │                     │
│ │           name = 'collectToPython'                                       │                     │
│ │      target_id = 'o98'                                                   │                     │
│ │           type = 'r'                                                     │                     │
│ │          value = JavaObject id=o101                                      │                     │
│ ╰──────────────────────────────────────────────────────────────────────────╯                     │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
Py4JJavaError: An error occurred while calling o98.collectToPython.
: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:552)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
        at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
        at com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(DatabricksConnectConf.scala:94)
        at 
com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(DatabricksConnectConf.scala:94)
        at scala.Option.map(Option.scala:230)
        at com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf.scala:94)
        at com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(DatabricksSQLConf.scala:1879)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at 
org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueString(ConfigEntry.scala:170)
        at org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.scala:4956)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)
        at org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:196)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:393)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:192)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:147)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:343)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
        at java.lang.Thread.run(Thread.java:750)

I've tried with different java versions/different pandas versions but nothing solved the problem.

1

There are 1 answers

4
JayashankarGS On

First, ensure that you are using the correct version of databricks-connect that matches your Databricks runtime.

If your runtime is 13, you need to install databricks-connect version 13.

For Databricks runtime 12.2 and below, follow this documentation; if it is 13.0 and above, use this documentation.

The error you are encountering is due to the presence of null or empty strings that are being converted to Long. If the type doesn't match, null is automatically added.

enter image description here

In your case, the issue may be a version mismatch between databricks-connect and the Databricks runtime.

If you still encounter the same error after checking the versions, remove records with null values or strip the string to eliminate spaces in your column.