I have a piece of code that ideally, I only want to stream the bin log existed in the list. Else, exit the code.
This is part of the code:
def list_binlog_files(host_name, port, username, password, database, stream_log_file):
if stream_log_file is None:
logging.warning("stream_log_file is None. Exiting the function.")
return None, None # Return None for both binlog_files and latest_binlog
db_connection = pymysql.connect(
host=host_name, port=port, user=username, password=password, database=database
)
try:
cursor = db_connection.cursor()
cursor.execute("SHOW BINARY LOGS")
binlog_files = [row[0] for row in cursor.fetchall()]
# Filter the list to include only binlog files after stream.log_file
binlog_files = [file for file in binlog_files if file >= stream_log_file]
# Determine the latest binlog file
latest_binlog = binlog_files[-1] if binlog_files else None
return binlog_files, latest_binlog
finally:
cursor.close()
db_connection.close()
def main():
binlog_files, latest_binlog = list_binlog_files(
DATABASE_HOST,
DATABASE_PORT,
DATABASE_USER,
DATABASE_PASS,
DATABASE_NAME,
stream_log_file,
)
iterator = TimeoutIterator(stream.__iter__(), 1)
try:
if binlog_files is not None:
if stream.log_file in binlog_files:
logging.info(f"Fetching bin log: {stream.log_file}")
for binlog_event in iterator:
logging.info(f"Fetching bin log: {stream.log_file}")
if binlog_event == iterator.get_sentinel():
writer.try_flush()
continue
for row in binlog_event.rows:
data = json.dumps(
{
"schema": binlog_event.schema,
"region": region,
"country": country,
"table": binlog_event.table,
"type": type(binlog_event).__name__,
"row": row,
"file": stream.log_file,
"position": stream.log_pos,
},
default=str,
)
writer.write({"Data": data}, stream.log_file, stream.log_pos)
elif stream.log_file in binlog_files:
logging.info(f"{stream.log_file} is not in the list. Exiting.")
exit_with_cleanup(stream, writer)
else:
logging.info("f you!")
exit_with_cleanup(stream, writer)
else:
#irrelevant
except OperationalError as ex:
# check if error code is 1236 (ER_MASTER_FATAL_ERROR_READING_BINLOG)
if ex.args[0] == 1236:
logging.error("Replication error: %s", ex.args)
# reset the binglog position in db and exit so that the process can be restarted
logging.info("Resetting log position to the beginning of binlog")
position_logger.clear_position()
exit_with_cleanup(stream, writer)
else:
raise
except KeyboardInterrupt:
logging.info("Interrupt detected")
exit_with_cleanup(stream, writer)
I omitted part of the code to simplify but I can share more.
Curent problem is, let's say List of bin logs to fetch ['mysql-bin-changelog.11111', 'mysql-bin-changelog.11112']
With the current logic, it should exit if stream.log_file in binlog_files:
when it tries to stream mysql-bin-changelog.11113
because it is not in the list. But it still stream the rest of the bin logs. How can I improve this?