Spark Cassandra Connector : ERROR AppendDataExec: Data source write support CassandraBulkWrite

75 views Asked by At

I have a simple Cassandra table like :

CREATE TABLE my_keyspace.my_table (
    my_composite_pk_a bigint,
    my_composite_pk_b ascii,
    value blob,
    PRIMARY KEY ((my_composite_pk_a, my_composite_pk_b))
) WITH bloom_filter_fp_chance = 0.1
   AND gc_grace_seconds = 86400
   AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
   AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'enabled': 'true'}
   AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};

Note that value refers to a BLOB, with a length of 1MB each.

I have a Spark app as simple as :

spark
  // read data from parquet
  .read.parquet("...")
  // skip large partitions, to avoid overwhelming Cassandra
  .withColumn("bytes_count",length(col("value")))
  .filter("bytes_count < 1000000") // < 1MB
  // project
  .select("my_composite_pk_a", "my_composite_pk_b", "value")
  // commit to Cassandra
  .writeTo("cassandra.my_keyspace.my_table")
  .append()

The following properties are used to configure the Spark Cassandra connector :

spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes=6 
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1 
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key=none 
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec=6 
spark.sql.catalog.cassandra.spark.cassandra.connection.host=node1,node2 
spark.sql.catalog.cassandra.spark.cassandra.connection.port=9042 
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level=LOCAL_QUORUM 
spark.sql.catalog.cassandra.spark.cassandra.output.metrics=false 
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS=90000 
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count=100 
spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalog 
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions 
spark.sql.catalog.cassandra.spark.cassandra.auth.username=USERNAME 
spark.sql.catalog.cassandra.spark.cassandra.auth.password=PASSWORD

The relevant Spark properties are :

--total-executor-cores 6 
--executor-cores 6 
--executor-memory 15G 
--driver-memory 6G 
--driver-cores 4 

So we have a simple executor alone, with 6 cores.

The following error appears on a task when running, so the whole Spark app crash :

...
24/03/06 10:07:24 WARN TaskSetManager: Lost task 886.0 in stage 3.0 (TID 2959, node1, executor 0): java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
...
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from node4:40201 in 10 seconds
...
24/03/06 10:07:24 ERROR AppendDataExec: Data source write support CassandraBulkWrite(
    org.apache.spark.sql.SparkSession@2e8eafb2,
    com.datastax.spark.connector.cql.CassandraConnector@1a865ebf,
    TableDef(
        my_keyspace,
        my_table,
        ArrayBuffer(
            ColumnDef(
                my_composite_pk_a,
                PartitionKeyColumn,
                BigIntType
            ), 
            ColumnDef(
                my_composite_pk_b,
                PartitionKeyColumn,
                AsciiType
            )
        ),
        ArrayBuffer(),
        Stream(
            ColumnDef(
                my_value,
                RegularColumn,
                BlobType
            )
        ),
        Stream(),
        false,
        false,
        Map()
    ),
    WriteConf(
        RowsInBatch(1),
        1000,
        None,
        LOCAL_QUORUM,
        false,
        false,
        6,
        Some(6.0),
        TTLOption(DefaultValue),
        TimestampOption(DefaultValue),false,None),
        StructType(StructField(snapshot,LongType,true),
        StructField(data_key,StringType,true),
        StructField(value,BinaryType,true)
    ),
    org.apache.spark.SparkConf@3903cfc9
)

It seems their is a write error with the Cassandra connector..

Do you have an idea why ?

1

There are 1 answers

2
Jaxon M On

1MB BLOBs with a batch size of only 1 can put stress on Cassandra. Consider increasing your spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1 for more efficient writes.

Please note that extremly large batch sizes could lead to memory issues so start with a moderate increase and experiment.