I'm encountering an issue while running the following line of code in JupyterLab, which is connected to a Spark cluster:
df.select('col_1').distinct().collect()
This exact code executes successfully in a Databricks notebook, but when run locally, it results in an error. My environment includes Java 8 and databricks-connect version 10.4.40.
the issue arises only with operations that attempt to return data to Python, example: toPandas/related.
/tmp/ipykernel_53/2883452107.py:1 in <module> │
│ │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_53/2883452107.py' │
│ │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/dataframe.py:715 in collect │
│ │
│ 712 │ │ │ │ os.remove(filename) │
│ 713 │ │ # Default path used in OSS Spark / for non-DF-ACL clusters: │
│ 714 │ │ with SCCallSiteSync(self._sc) as css: │
│ ❱ 715 │ │ │ sock_info = self._jdf.collectToPython() │
│ 716 │ │ return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) │
│ 717 │ │
│ 718 │ def toLocalIterator(self, prefetchPartitions=False): │
│ │
│ ╭───────────────── locals ──────────────────╮ │
│ │ css = None │ │
│ │ self = DataFrame[CD_COMP: decimal(38,18)] │ │
│ ╰───────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/py4j/java_gateway.py:1304 in __call__ │
│ │
│ 1301 │ │ │ proto.END_COMMAND_PART │
│ 1302 │ │ │
│ 1303 │ │ answer = self.gateway_client.send_command(command) │
│ ❱ 1304 │ │ return_value = get_return_value( │
│ 1305 │ │ │ answer, self.gateway_client, self.target_id, self.name) │
│ 1306 │ │ │
│ 1307 │ │ for temp_arg in temp_args: │
│ │
│ ╭──────────────────────────────── locals ────────────────────────────────╮ │
│ │ answer = 'xro101' │ │
│ │ args = () │ │
│ │ args_command = '' │ │
│ │ command = 'c\no98\ncollectToPython\ne\n' │ │
│ │ self = <py4j.java_gateway.JavaMember object at 0x7f3b137ecaf0> │ │
│ │ temp_args = [] │ │
│ ╰────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py:117 in deco │
│ │
│ 114 def capture_sql_exception(f): │
│ 115 │ def deco(*a, **kw): │
│ 116 │ │ try: │
│ ❱ 117 │ │ │ return f(*a, **kw) │
│ 118 │ │ except py4j.protocol.Py4JJavaError as e: │
│ 119 │ │ │ converted = convert_exception(e.java_exception) │
│ 120 │ │ │ if not isinstance(converted, UnknownException): │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ a = ( │ │
│ │ │ 'xro101', │ │
│ │ │ <py4j.clientserver.JavaClient object at 0x7f3afed79280>, │ │
│ │ │ 'o98', │ │
│ │ │ 'collectToPython' │ │
│ │ ) │ │
│ │ converted = UnknownException('java.lang.NumberFormatException: null', │ │
│ │ 'java.lang.NumberFormatException: null\n\tat │ │
│ │ java.lang.Long.parseLong(Long.java:552)\n\tat │ │
│ │ java.lang.Long.parseLong(Long.java:631)\n\tat │ │
│ │ scala.collection.immutable.StringLike.toLong(StringLike.scala:309)\n\tat │ │
│ │ scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)\n\tat │ │
│ │ scala.collection.immutable.StringOps.toLong(StringOps.scala:33)\n\tat │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(Databricks… │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(Da… │ │
│ │ scala.Option.map(Option.scala:230)\n\tat │ │
│ │ com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf… │ │
│ │ com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(Databrick… │ │
│ │ scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)\n\tat │ │
│ │ org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueStr… │ │
│ │ org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.sca… │ │
│ │ scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat │ │
│ │ scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat │ │
│ │ scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat │ │
│ │ scala.collection.IterableLike.foreach(IterableLike.scala:74)\n\tat │ │
│ │ scala.collection.IterableLike.foreach$(IterableLike.scala:73)\n\tat │ │
│ │ scala.collection.AbstractIterable.foreach(Iterable.scala:56)\n\tat │ │
│ │ org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)\n\… │ │
│ │ org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(… │ │
│ │ org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)\n\tat │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecutio… │ │
│ │ org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.sc… │ │
│ │ org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)\n\tat │ │
│ │ org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)\n\tat │ │
│ │ sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat │ │
│ │ sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n… │ │
│ │ sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja… │ │
│ │ java.lang.reflect.Method.invoke(Method.java:498)\n\tat │ │
│ │ py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat │ │
│ │ py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n\tat │ │
│ │ py4j.Gateway.invoke(Gateway.java:295)\n\tat │ │
│ │ py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat │ │
│ │ py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat │ │
│ │ py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n\… │ │
│ │ py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n\tat │ │
│ │ java.lang.Thread.run(Thread.java:750)\n', None) │ │
│ │ f = <function get_return_value at 0x7f3b1b3b2550> │ │
│ │ kw = {} │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/py4j/protocol.py:326 in get_return_value │
│ │
│ 323 │ │ │ type = answer[1] │
│ 324 │ │ │ value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) │
│ 325 │ │ │ if answer[1] == REFERENCE_TYPE: │
│ ❱ 326 │ │ │ │ raise Py4JJavaError( │
│ 327 │ │ │ │ │ "An error occurred while calling {0}{1}{2}.\n". │
│ 328 │ │ │ │ │ format(target_id, ".", name), value) │
│ 329 │ │ │ else: │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ answer = 'xro101' │ │
│ │ gateway_client = <py4j.clientserver.JavaClient object at 0x7f3afed79280> │ │
│ │ name = 'collectToPython' │ │
│ │ target_id = 'o98' │ │
│ │ type = 'r' │ │
│ │ value = JavaObject id=o101 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
Py4JJavaError: An error occurred while calling o98.collectToPython.
: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
at com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2(DatabricksConnectConf.scala:94)
at
com.databricks.spark.util.DatabricksConnectConf$.$anonfun$getOrgId$2$adapted(DatabricksConnectConf.scala:94)
at scala.Option.map(Option.scala:230)
at com.databricks.spark.util.DatabricksConnectConf$.getOrgId(DatabricksConnectConf.scala:94)
at com.databricks.sql.DatabricksSQLConf$.$anonfun$SPARK_SERVICE_ORG_ID$1(DatabricksSQLConf.scala:1879)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at
org.apache.spark.internal.config.ConfigEntryWithDefaultFunction.defaultValueString(ConfigEntry.scala:170)
at org.apache.spark.sql.internal.SQLConf.$anonfun$getAllDefaultConfs$1(SQLConf.scala:4956)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.internal.SQLConf.getAllDefaultConfs(SQLConf.scala:4954)
at org.apache.spark.sql.internal.SQLConf.recordNonDefaultConfs(SQLConf.scala:4968)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:196)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:393)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:192)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:147)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:343)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3737)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)
I've tried with different java versions/different pandas versions but nothing solved the problem.
First, ensure that you are using the correct version of
databricks-connectthat matches your Databricks runtime.If your runtime is
13, you need to installdatabricks-connectversion 13.For Databricks runtime 12.2 and below, follow this documentation; if it is 13.0 and above, use this documentation.
The error you are encountering is due to the presence of
nullorempty stringsthat are being converted toLong. If the type doesn't match, null is automatically added.In your case, the issue may be a version mismatch between
databricks-connectand the Databricks runtime.If you still encounter the same error after checking the versions, remove records with
nullvalues or strip the string to eliminate spaces in your column.