Why is the Cassandra Python driver not returning COUNT() of all rows?

291 views Asked by At

I am inserting a df with around 14k rows in cassandra database of Data Stax. I am using the free version of Data Stax where you get 25 MB of storage limit. My dataset is around 1.5 MB of size. My code shows no error after insertion and fetching. But after I count the rows after fetching, I get only around 1.5k rows. I can't seem to figure out where the problem lies. Is it in insertion code or fetching code? I am not able to figure out after racking my brains and searching Google multiple times. Following is my code-:

cassandraDBLoad.py

def progressbar(it, prefix="", size=60, out=sys.stdout): # Python3.3+
    count = len(it)
    def show(j):
        x = int(size*j/count)
        print("{}[{}{}] {}/{}".format(prefix, u"█"*x, "."*(size-x), j, count), 
                end='\r', file=out, flush=True)
    show(0)
    for i, item in enumerate(it):
        yield item
        show(i+1)
    print("\n", flush=True, file=out)

def cassandraDBLoad(config_path):
    try:
        config = read_params(config_path)

        execution_profile = ExecutionProfile(request_timeout=10)
        cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
        auth_provider = PlainTextAuthProvider(
                config["connect_cassandra"]["client_id"],
                config["connect_cassandra"]["client_secret"]
                )
        cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
        session = cluster.connect()
        session.default_timeout = None
        connect_db = session.execute("select release_version from system.local")
        set_keyspace = session.set_keyspace(config["cassandra_db"]["keyspace"])
        
        table_ = config["cassandra_db"]["data_table"]
        define_columns = config["cassandra_db"]["define_columns"]
        
        create_table = f"CREATE TABLE IF NOT EXISTS {table_}({define_columns});"
        start_create = time.process_time()
        table_result = session.execute(create_table)
        
        train = pd.read_csv(config["data_source"]["train_source"])
        test = pd.read_csv(config["data_source"]["test_source"])
    
        #Combine test and train into one file
        train['source']='train'
        test['source']='test'
        df = pd.concat([train, test],ignore_index=True)
        df = df.fillna('NA')
        columns = list(df)
        for col in columns:
            df[col] = df[col].map(str)
        
        columns = config["cassandra_db"]["columns"]
        insert_qry = f"INSERT INTO {table_}({columns}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS"
        statement = session.prepare(insert_qry)
        
        start_insert = time.process_time()
        batch = BatchStatement()
        for i in progressbar(range(len(df)), "Inserting: ", 40):
            time.sleep(0.1)            
            session.execute_async(
                statement,
                    [
                        df.iat[i,0], df.iat[i,1], df.iat[i,2], df.iat[i,3], df.iat[i,4], df.iat[i,5], 
                        df.iat[i,6], df.iat[i,7], df.iat[i,8], df.iat[i,9], df.iat[i,10], df.iat[i,11], 
                        df.iat[i,12]
                    ]
                )
            print("Time taken to insert df - " + str((time.process_time() - start_insert)/60) + " minutes")

    except Exception as e:
        raise Exception("(cassandraDBLoad): Something went wrong in the CassandraDB Load operations\n" + str(e))

The above code is taking around 30 min to insert rows. I am having 12 GB RAM with 2 CPU cores.

preprocess_data.py

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

def preprocess_data(config_path):
    try:
        config = read_params(config_path)
        cassandra_config = {'secure_connect_bundle': config["connect_cassandra"]["cloud_config"]}
        auth_provider = PlainTextAuthProvider(
                config["connect_cassandra"]["client_id"],
                config["connect_cassandra"]["client_secret"]
                )
        cluster = Cluster(cloud=cassandra_config, auth_provider=auth_provider)
        session = cluster.connect()
        session.set_keyspace(config["cassandra_db"]["keyspace"])
        session.row_factory = pandas_factory
        #session.default_fetch_size = None

        count_query = f"SELECT COUNT(*) from {config['cassandra_db']['data_table']} LIMIT 20000"
        count_rslt = session.execute(count_query, timeout=None)
        print(count_rslt._current_rows)
        query = f"SELECT * from {config['cassandra_db']['data_table']}"
        simple_statement = SimpleStatement(query, consistency_level=ConsistencyLevel.ONE, fetch_size=None)
        execute_result = session.execute(simple_statement, timeout=None)
        data = execute_result._current_rows
  
        print(data.shape)
        
    except Exception as e:
        raise Exception("(preprocessData): " + str(e))

CSV files link - https://drive.google.com/drive/folders/1O03lNTMfSwhUKG61zOs7fNxXIRe44GRp?usp=sharing Kindly help to either insert full dataframe or fetch all the rows depending upon where the issue lies.

1

There are 1 answers

3
Erick Ramirez On

My best guess is that there is a mismatch between what you think you are counting, and the actual rows in the table.

I say it's a "guess" because it is impossible to know what you are doing without knowing the table's CQL schema. Let me illustrate with some examples.

For a table with a simple primary key, each partition can have exactly one row. For example:

CREATE TABLE users (
    username text,
    realname text,
    email text,
    PRIMARY KEY (username)
)

If there were 500 users in the table, this query will also return 500 rows:

SELECT COUNT(username) FROM users

For a table with a compound primary key (composed of partition key + at least one clustering key), each partition can have ONE OR MORE rows. For example:

CREATE TABLE user_emails (
    username text,
    email_type text,
    email_address text
    ...
    PRIMARY KEY (username, email_type)
)

A user (partition) can have one or more (rows of) emails -- personal, work, etc. If there were 500 users in the user_emails table, this query will simply return the number of partitions (users) despite each partition having any number of rows:

SELECT COUNT(username) FROM user_emails

As a side note, counting rows in Cassandra isn't the same thing as counting records in traditional relational databases. I've explained it in detail in this DBA Stack Exchange post -- Why is COUNT() bad in Cassandra?

As I mentioned in your other post, if you have to count records then use DataStax Bulk Loader (DSBulk) which is developed for this purpose. DSBulk has a nice feature for counting data in large tables in a distributed manner. It is open-source so Apache Cassandra users can use it for free. Cheers!