I'm currently trying to upgrade my application to Spark 3.0.1. For table creation, I drop and create a table using cassandra-driver, the Python-Cassandra connector. Then I write a dataframe into the table using the spark-cassandra connector. There isn't really a good alternative to create and drop the table using only the spark-cassandra connector.
With Spark 2.4, there were no issues with the drop-create-write flow. But with Spark 3.0, the application seems to do these things in no particular order, often trying to write before dropping and creating. I have no clue how to ensure dropping and creating the table happens first. I know the drop and create does happen even while the application errors out on write, because when I query Cassandra via cqlsh I can see the table being dropped and re-created. Any ideas about this behavior in Spark 3.0?
Note: because the schema changes, this particular table needs to be dropped and recreated instead of a straight overwrite.
A code snippet as requested:
session = self._get_python_cassandra_session(self.env_conf, self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
session.execute(drop_table_query)
df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()
# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df, 'cassandra', keyspace=self.database,
table=tablename, mode=mode, copts=copts).save()
I ended up building a time.sleep(5) delay with 100 second timeout to periodically ping Cassandra for the table, and then writing if the table was found.