Spark 3.0 and Cassandra Spark / Python Conenctors: Table is not being created prior to write

365 views Asked by At

I'm currently trying to upgrade my application to Spark 3.0.1. For table creation, I drop and create a table using cassandra-driver, the Python-Cassandra connector. Then I write a dataframe into the table using the spark-cassandra connector. There isn't really a good alternative to create and drop the table using only the spark-cassandra connector.

With Spark 2.4, there were no issues with the drop-create-write flow. But with Spark 3.0, the application seems to do these things in no particular order, often trying to write before dropping and creating. I have no clue how to ensure dropping and creating the table happens first. I know the drop and create does happen even while the application errors out on write, because when I query Cassandra via cqlsh I can see the table being dropped and re-created. Any ideas about this behavior in Spark 3.0?

Note: because the schema changes, this particular table needs to be dropped and recreated instead of a straight overwrite.

A code snippet as requested:

        session = self._get_python_cassandra_session(self.env_conf, self.database)
        # build drop table query
        drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
        session.execute(drop_table_query)

        df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
        # build create query
        create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
        # execute table creation
        session.execute(create_table_query)
        session.shutdown()


        # spark-cassandra connection options
        copts = _cassandra_cluster_spark_options(self.env_conf)
        # set write mode
        copts['confirm.truncate'] = overwrite
        mode = 'overwrite' if overwrite else 'append'
        # write dataframe to cassandra
        get_dataframe_writer(df, 'cassandra', keyspace=self.database, 
        table=tablename, mode=mode, copts=copts).save()
2

There are 2 answers

0
L. Chu On

I ended up building a time.sleep(5) delay with 100 second timeout to periodically ping Cassandra for the table, and then writing if the table was found.

0
Alex Ott On

In the Spark Cassandra Connector 3.0+ you can use new functionality - manipulating the keyspaces & tables via Catalogs API. You can create/alter/drop keyspaces & tables using the Spark SQL. For example, you can create a table in Cassandra with following command:

CREATE TABLE casscatalog.ksname.table_name (
  key_1 Int, 
  key_2 Int, 
  key_3 Int, 
  cc1 STRING, 
  cc2 String, 
  cc3 String, 
  value String) 
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc',
    compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)

As you can see here, you can specify quite complex primary keys, and also specify table options. The casscatalog piece is a prefix that links to the specific Cassandra cluster (you can use multiple at the same time) - it's specified when you're starting Spark job, like:

spark-shell --packages com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 \
  --conf spark.sql.catalog.casscatalog=com.datastax.spark.connector.datasource.CassandraCatalog

More examples could be found in the documentation: