Cassandra Python driver not fetching the next page of results

630 views Asked by At

DataStax driver for Cassandra Version 3.25.0, Python version 3.9

Session.execute() fetches the first 100 records. As per the documentation, the driver is supposed to tranparently fetch next pages as we reach the end of first page. However, it fetches the same page again and again and hence the first 100 records is all that is ever accessible.

The for loop that prints records goes infinite.

ssl_context.verify_mode = CERT_NONE

cluster = Cluster(contact_points=[db_host], port=db_port,
                            auth_provider = PlainTextAuthProvider(db_user, db_pwd),
                            ssl_context=ssl_context
                            )
session = cluster.connect()

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)
results = session.execute(statement)

for row in results:
    print(f"{row}")

I could see other similar threads, but they are not answered too. Has anyone encountered this issue before? Any help is appreciated.

3

There are 3 answers

1
Erick Ramirez On

The logic in your code is only calling execute() once so the contents of results will only ever be the same list of 100 rows.

You need to call execute() in your loop to get the next page of results like this:

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)

for row in session.execute(statement):
    process_row(row)

For more info, see Paging with the Python driver. Cheers!

1
absurdfarce On

I'm a bit confused by the initial statement of the problem. You mentioned that the initial page of results is fetched repeatedly and that these are the only results available to your program. You also indicated that the for loop responsible for printing results turns into an infinite loop when you run the program. These statements seem contradictory to me; how can you know what the driver has fetched if you never get any output? I'm assuming that's what you meant by "goes infinite"... if I'm wrong please correct me.

The following code seems to run as expected against Cassandra 4.0.0 using cassandra-driver 3.25.0 on Python 3.9.0:

import argparse
import logging
import time

from cassandra.cluster import Cluster, SimpleStatement

def setupLogging():
    log = logging.getLogger()
    log.setLevel('DEBUG')

    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
    log.addHandler(handler)

def setupSchema(session):
    session.execute("""create keyspace if not exists "baz" with replication = {'class':'SimpleStrategy', 'replication_factor':1};""")
    session.execute("""create table if not exists baz.qux (run_ts bigint, idx int, uuid timeuuid, primary key (run_ts,idx))""")
    session.execute("""truncate baz.qux""")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-d','--debug', action='store_true')
    args = parser.parse_args()

    cluster = Cluster()
    session = cluster.connect()

    if args.debug:
        setupLogging()
    setupSchema(session)

    run_ts = int(time.time())
    insert_stmt = session.prepare("""insert into baz.qux (run_ts,idx,uuid) values (?,?,now())""")
    for idx in range(10000):
        session.execute(insert_stmt, [run_ts, idx])

    query = "select * from baz.qux"
    stmt = SimpleStatement(query, fetch_size=100)
    results = session.execute(stmt)

    for row in results:
        print(f"{row}")

    cluster.shutdown()
$ time (python foo.py | wc -l)
10000

real    0m12.452s
user    0m3.786s
sys     0m2.197s

You might try running your sample app with debug logging enabled (see sample code above for how to enable this). It sounds like something might be off in your Cassandra configuration (or perhaps your client setup); the additional logging might help you identify what (if anything) is getting in the way.

0
Justin On

Below is the code snippet that finally worked for me, after restricting the driver version to 3.20:

statement = session.prepare(query)
            
# Execute the query once and retrieve the first page of results
results = self.session.execute(statement, params)
for row in results.current_rows:
    process_row(row)
            
# Fetch more pages until they exhaust
while results.has_more_pages:
    page_state = results.paging_state
    results = session.execute(statement, parameters=params, paging_state=page_state)
    for row in results.current_rows:
        process_row(row)