I am a beginner at using PySpark and since I do not have access to the databricks platform, I am currently exploring learning through JupyterNotebook. I have reviewed other forums and tried different methods, however, the problem still persists when I run the .show() method. Below is my code:
import findspark
findspark.init()
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AnotherTestDemo").getOrCreate()
import json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType, IntegerType, DateType
myschema = StructType([
StructField("EmpID", IntegerType(), True),
StructField("Details", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True),
StructField("ADEmail", StringType(), True),
StructField("DOB", StringType(), True),
StructField("State", StringType(), True),
StructField("GenderCode", StringType(), True),
StructField("LocationCode", IntegerType(), True),
StructField("RaceDesc", StringType(), True),
StructField("MaritalDesc", StringType(), True),
]), True),
StructField("JobProfile", StructType([
StructField("Title", StringType(), True),
StructField("BusinessUnit", StringType(), True),
StructField("Supervisor", StringType(), True),
StructField("EmployeeType", StringType(), True),
StructField("DepartmentType", StringType(), True),
StructField("Division", StringType(), True),
StructField("JobFunctionDescription", StringType(), True),
]), True),
StructField("Payroll", StructType([
StructField("EmployeeClassificationType", StringType(), True),
StructField("PayZone", StringType(), True),
]), True),
StructField("Performance", StructType([
StructField("Performance Score", StringType(), True),
StructField("Current Employee Rating", IntegerType(), True),
]), True),
StructField("History", StructType([
StructField("StartDate", DateType(), True),
StructField("EmployeeStatus", StringType(), True),
StructField("ExitDetails", ArrayType(StructType([
StructField("ExitDate", DateType(), True),
StructField("TerminationType", StringType(), True),
StructField("TerminationDescription", StringType(), True),
])), True),
]), True)
])
schemadefined_df = spark.read.json("employee_data.json", schema=myschema, multiLine=True)
from pysparketl.dataframes import flattenDF
with_defined_schema_df = flattenDF(schemadefined_df)
with_defined_schema_df.show()
The specific error message is as follows:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-105-b38a28ad0f7b> in <module>
1 #with_defined_schema_df.show()
C:\Spark\spark-3.4.1-bin-hadoop3\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
897
898 if isinstance(truncate, bool) and truncate:
--> 899 print(self._jdf.showString(n, 20, vertical))
900 else:
901 try:
C:\Spark\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1320
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1324
C:\Spark\spark-3.4.1-bin-hadoop3\python\pyspark\errors\exceptions\captured.py in deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
C:\Spark\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1573.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 87.0 failed 1 times, most recent failure: Lost task 0.0 in stage 87.0 (TID 60) (LAPTOP-KQHH2FNK executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to java.base/java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
at jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to java.base/java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$1(FileFormat.scala:156)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 more
Further details: PySpark version: 3.4.1 Python version: 3.8.8 I have also restarted the Jupyter kernel and still the same error when running the .show() function