Spark-Cassandra read cassandra table returning NoSuchElementException

53 views Asked by At

I have been trying to copy a table from one keyspace to another using Apache spark. I have been getting the java.util.NoSuchElementException on null partition when iterating via the .foreachPartition() function on a dataframe.

Spark 3.4.1 Spark-cassandra-connector_2.12 3.4.1 AWS Keyspaces 3.11

I ran the data via:

data = session.read()
                .format("org.apache.spark.sql.cassandra")
                .option("keyspace", keyspace)
                .option("table", table)
                .load();

data.foreachPartition(t -> {
...
// save to cassandra
...
);

Error Message:

01:10:50.600 [Executor task launch worker for task 12.0 in stage 0.0 (TID 12)] ERROR org.apache.spark.executor.Executor - Exception in task 12.0 in stage 0.0 (TID 12)
java.util.NoSuchElementException: null
    at com.datastax.oss.driver.internal.core.util.CountingIterator.next(CountingIterator.java:102) ~[cassandra-1.0.0.jar:?]
    at com.datastax.spark.connector.rdd.reader.PrefetchingResultSetIterator.next(PrefetchingResultSetIterator.scala:51) ~[spark-cassandra-connector_2.12-3.4.1.jar:3.4.1]
    at com.datastax.spark.connector.rdd.reader.PrefetchingResultSetIterator.next(PrefetchingResultSetIterator.scala:22) ~[spark-cassandra-connector_2.12-3.4.1.jar:3.4.1]
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) ~[scala-library-2.12.18-M1.jar:?]
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:496) ~[scala-library-2.12.18-M1.jar:?]
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66) ~[spark-cassandra-connector_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.18-M1.jar:?]
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.18-M1.jar:?]
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:87) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.next(InMemoryRelation.scala:79) ~[spark-sql_2.12-3.4.1.jar:3.4.1]
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) ~[scala-library-2.12.18-M1.jar:?]
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.scheduler.Task.run(Task.scala:139) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) ~[spark-core_2.12-3.4.1.jar:3.4.1]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) [spark-core_2.12-3.4.1.jar:3.4.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
01:10:50.640 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager - Task 12 in stage 0.0 failed 1 times; aborting job

see the code block above. I was not expecting the above errors.

0

There are 0 answers