I'm encountering an issue while running the following line of code in JupyterLab, which is connected to a Spark cluster:
df.select('col_1').distinct().collect()
This exact code executes successfully in a Databricks notebook, but when run locally, it results in an error. My environment includes Java 8 and databricks-connect version 10.4.40.
the issue arises only with operations that attempt to return data to Python, example: toPandas/related.
/tmp/ipykernel_53/2883452107.py:1 in <module> │
│ │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_53/2883452107.py' │
│ │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/dataframe.py:715 in collect │
│ │
│ 712 │ │ │ │ os.remove(filename) │
│ 713 │ │ # Default path used in OSS Spark / for non-DF-ACL clusters: │
│ 714 │ │ with SCCallSiteSync(self._sc) as css: │
│ ❱ 715 │ │ │ sock_info = self._jdf.collectToPython() │
│ 716 │ │ return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) │
│ 717 │ │
│ 718 │ def toLocalIterator(self, prefetchPartitions=False): │
│ │
│ ╭───────────────── locals ──────────────────╮ │
│ │ css = None │ │
│ │ self = DataFrame[CD_COMP: decimal(38,18)] │ │
│ ╰───────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/py4j/java_gateway.py:1304 in __call__ │
│ │
│ 1301 │ │ │ proto.END_COMMAND_PART │
│ 1302 │ │ │
│ 1303 │ │ answer = self.gateway_client.send_command(command) │
│ ❱ 1304 │ │ return_value = get_return_value( │
│ 1305 │ │ │ answer, self.gateway_client, self.target_id, self.name) │
│ 1306 │ │ │
│ 1307 │ │ for temp_arg in temp_args: │
│ │
│ ╭──────────────────────────────── locals ────────────────────────────────╮ │
│ │ answer = 'xro101' │ │
│ │ args = () │ │
│ │ args_command = '' │ │
│ │ command = 'c\no98\ncollectToPython\ne\n' │ │
│ │ self = <py4j.java_gateway.JavaMember object at 0x7f3b137ecaf0> │ │
│ │ temp_args = [] │ │
│ ╰────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py:117 in deco │
│ │
│ 114 def capture_sql_exception(f): │
│ 115 │ def deco(*a, **kw): │
│ 116 │ │ try: │
│ ❱ 117 │ │ │ return f(*a, **kw) │
│ 118 │ │ except py4j.protocol.Py4JJavaError as e: │
│ 119 │ │ │ converted = convert_exception(e.java_exception) │
│ 120 │ │ │ if not isinstance(converted, UnknownException): │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ a = ( │ │
│ │ │ 'xro101', │ │
│ │ │ <py4j.clientserver.JavaClient object at 0x7f3afed79280>, │ │
│ │ │ 'o98', │ │
│ │ │ 'collectToPython' │ │
│ │ ) │ │
│ │ converted = UnknownException('java.lang.NumberFormatException: null', │ │
│ │ 'java.lang.NumberFormatException: null\n\tat │ │
│ │ java.lang.Long.parseLong(Long.java:552)\n\tat │ │
│ │ java.lang.Long.parseLong(Long.java:631)\n\tat │ │
│ │ scala.collection.immutable.StringLike.toLong(StringLike.scala:309)\n\tat │ │
│ │ scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)\n\tat │ │
│ │ scala.collection.immutable.StringOps.toLong(StringOps.scala:33)\n\tat │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(Databricks… │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(Da… │ │
│ │ scala.Option.map(Option.scala:230)\n\tat │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf… │ │
│ │ com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(Databrick… │ │
│ │ scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)\n\tat │ │
│ │ org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueStr… │ │
│ │ org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.sca… │ │
│ │ scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat │ │
│ │ scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat │ │
│ │ scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat │ │
│ │ scala.collection.IterableLike.foreach(IterableLike.scala:74)\n\tat │ │
│ │ scala.collection.IterableLike.foreach$(IterableLike.scala:73)\n\tat │ │
│ │ scala.collection.AbstractIterable.foreach(Iterable.scala:56)\n\tat │ │
│ │ org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)\n\… │ │
│ │ org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(… │ │
│ │ org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)\n\tat │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecutio… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.sc… │ │
│ │ org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)\n\tat │ │
│ │ org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)\n\tat │ │
│ │ sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat │ │
│ │ sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n… │ │
│ │ sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja… │ │
│ │ java.lang.reflect.Method.invoke(Method.java:498)\n\tat │ │
│ │ py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat │ │
│ │ py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n\tat │ │
│ │ py4j.Gateway.invoke(Gateway.java:295)\n\tat │ │
│ │ py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat │ │
│ │ py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat │ │
│ │ py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n\… │ │
│ │ py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n\tat │ │
│ │ java.lang.Thread.run(Thread.java:750)\n', None) │ │
│ │ f = <function get_return_value at 0x7f3b1b3b2550> │ │
│ │ kw = {} │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/py4j/protocol.py:326 in get_return_value │
│ │
│ 323 │ │ │ type = answer[1] │
│ 324 │ │ │ value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) │
│ 325 │ │ │ if answer[1] == REFERENCE_TYPE: │
│ ❱ 326 │ │ │ │ raise Py4JJavaError( │
│ 327 │ │ │ │ │ "An error occurred while calling {0}{1}{2}.\n". │
│ 328 │ │ │ │ │ format(target_id, ".", name), value) │
│ 329 │ │ │ else: │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ answer = 'xro101' │ │
│ │ gateway_client = <py4j.clientserver.JavaClient object at 0x7f3afed79280> │ │
│ │ name = 'collectToPython' │ │
│ │ target_id = 'o98' │ │
│ │ type = 'r' │ │
│ │ value = JavaObject id=o101 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
Py4JJavaError: An error occurred while calling o98.collectToPython.
: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
at com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(DatabricksConnectConf.scala:94)
at
com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(DatabricksConnectConf.scala:94)
at scala.Option.map(Option.scala:230)
at com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf.scala:94)
at com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(DatabricksSQLConf.scala:1879)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at
org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueString(ConfigEntry.scala:170)
at org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.scala:4956)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)
at org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:196)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:393)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:192)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:147)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:343)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)
I've tried with different java versions/different pandas versions but nothing solved the problem.
First, ensure that you are using the correct version of
databricks-connect
that matches your Databricks runtime.If your runtime is
13
, you need to installdatabricks-connect
version 13.For Databricks runtime 12.2 and below, follow this documentation; if it is 13.0 and above, use this documentation.
The error you are encountering is due to the presence of
null
orempty strings
that are being converted toLong
. If the type doesn't match, null is automatically added.In your case, the issue may be a version mismatch between
databricks-connect
and the Databricks runtime.If you still encounter the same error after checking the versions, remove records with
null
values or strip the string to eliminate spaces in your column.