Stream MYSQL bin log only from the list

61 views Asked by At

I have a piece of code that ideally, I only want to stream the bin log existed in the list. Else, exit the code.

This is part of the code:

def list_binlog_files(host_name, port, username, password, database, stream_log_file):
    if stream_log_file is None:
        logging.warning("stream_log_file is None. Exiting the function.")
        return None, None  # Return None for both binlog_files and latest_binlog

    db_connection = pymysql.connect(
        host=host_name, port=port, user=username, password=password, database=database
    )
    try:
        cursor = db_connection.cursor()
        cursor.execute("SHOW BINARY LOGS")
        binlog_files = [row[0] for row in cursor.fetchall()]

        # Filter the list to include only binlog files after stream.log_file
        binlog_files = [file for file in binlog_files if file >= stream_log_file]

        # Determine the latest binlog file
        latest_binlog = binlog_files[-1] if binlog_files else None

        return binlog_files, latest_binlog

    finally:
        cursor.close()
        db_connection.close()

def main():
    binlog_files, latest_binlog = list_binlog_files(
        DATABASE_HOST,
        DATABASE_PORT,
        DATABASE_USER,
        DATABASE_PASS,
        DATABASE_NAME,
        stream_log_file,
    )
    iterator = TimeoutIterator(stream.__iter__(), 1)
    try:
        if binlog_files is not None:
            if stream.log_file in binlog_files:
                logging.info(f"Fetching bin log: {stream.log_file}")
                for binlog_event in iterator:
                    logging.info(f"Fetching bin log: {stream.log_file}")
                    if binlog_event == iterator.get_sentinel():
                        writer.try_flush()
                        continue

                    for row in binlog_event.rows:
                        data = json.dumps(
                            {
                                "schema": binlog_event.schema,
                                "region": region,
                                "country": country,
                                "table": binlog_event.table,
                                "type": type(binlog_event).__name__,
                                "row": row,
                                "file": stream.log_file,
                                "position": stream.log_pos,
                            },
                            default=str,
                        )
                        writer.write({"Data": data}, stream.log_file, stream.log_pos)
            elif stream.log_file in binlog_files:
                logging.info(f"{stream.log_file} is not in the list. Exiting.")
                exit_with_cleanup(stream, writer)
            else:
                logging.info("f you!")
                exit_with_cleanup(stream, writer)
        else:
             #irrelevant


    except OperationalError as ex:
        # check if error code is 1236 (ER_MASTER_FATAL_ERROR_READING_BINLOG)
        if ex.args[0] == 1236:
            logging.error("Replication error: %s", ex.args)
            # reset the binglog position in db and exit so that the process can be restarted
            logging.info("Resetting log position to the beginning of binlog")
            position_logger.clear_position()
            exit_with_cleanup(stream, writer)
        else:
            raise

    except KeyboardInterrupt:
        logging.info("Interrupt detected")
        exit_with_cleanup(stream, writer)

I omitted part of the code to simplify but I can share more. Curent problem is, let's say List of bin logs to fetch ['mysql-bin-changelog.11111', 'mysql-bin-changelog.11112']

With the current logic, it should exit if stream.log_file in binlog_files: when it tries to stream mysql-bin-changelog.11113 because it is not in the list. But it still stream the rest of the bin logs. How can I improve this?

0

There are 0 answers