LogNotes

2025-09-02 06:37:14
#mysql #mariadb provocator `buffer_pool_dirty_pages` и `wsrep_local_cert_failures`

$ cat provo.py 

import mysql.connector
from mysql.connector import Error
import threading
import time
import os
import logging
import datetime

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
    handlers=[
        logging.FileHandler('provo.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Configuration for Galera nodes (adjust for single-node or multi-node)
nodes = [
    {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
    {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
    {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
]

db_name = 'provocation_db'
table_name = 'provocation_table'
barrier = threading.Barrier(len(nodes))  # Barrier for multi-node synchronization

def check_node_status(node):
    """Check MariaDB/Galera node status"""
    try:
        with mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password']
        ) as conn:
            with conn.cursor() as cursor:
                # Check general MariaDB status
                cursor.execute("SELECT VERSION();")
                version = cursor.fetchone()[0]
                logger.debug(f"Node {node['host']}:{node['port']} MariaDB version: {version}")
                
                # Check Galera-specific status
                cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_status';")
                cluster_status = cursor.fetchone()
                cursor.execute("SHOW STATUS LIKE 'wsrep_local_state';")
                local_state = cursor.fetchone()
                cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_size';")
                cluster_size = cursor.fetchone()
                cursor.execute("SHOW VARIABLES LIKE 'wsrep_on';")
                wsrep_on = cursor.fetchone()
                logger.debug(f"Node {node['host']}:{node['port']} status: "
                           f"cluster_status={cluster_status[1] if cluster_status else 'N/A'}, "
                           f"local_state={local_state[1] if local_state else 'N/A'}, "
                           f"cluster_size={cluster_size[1] if cluster_size else 'N/A'}, "
                           f"wsrep_on={wsrep_on[1] if wsrep_on else 'N/A'}")
    except Error as e:
        logger.error(f"Failed to check status for {node['host']}:{node['port']}: {e}")
        raise  # Re-raise to prevent proceeding with failed nodes

def create_db_and_table(cursor):
    logger.debug("Creating database and table if not exists")
    try:
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
        cursor.execute(f"USE {db_name};")
        cursor.execute(f"""CREATE TABLE IF NOT EXISTS {table_name} (
            id INT PRIMARY KEY AUTO_INCREMENT,
            node1_val INT DEFAULT 0,
            node2_val INT DEFAULT 0,
            node3_val INT DEFAULT 0,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        ) ENGINE=InnoDB;""")
        logger.debug(f"Database {db_name} and table {table_name} created or verified")
    except Error as e:
        logger.error(f"Error creating database/table: {e}")
        raise

def insert_initial_record(cursor):
    logger.debug("Inserting initial record")
    try:
        cursor.execute(
            f"""INSERT INTO {table_name} (node1_val, node2_val, node3_val)
            VALUES (0, 0, 0)
            ON DUPLICATE KEY UPDATE node1_val = node1_val;"""
        )
        logger.debug("Initial record inserted")
    except Error as e:
        logger.error(f"Error inserting initial record: {e}")
        raise

def get_record_id(cursor):
    logger.debug("Fetching record ID")
    try:
        cursor.execute(f"SELECT id FROM {table_name} LIMIT 1;")
        result = cursor.fetchone()
        if result:
            logger.debug(f"Record ID fetched: {result[0]}")
            return result[0]
        else:
            logger.error("No record found in table")
            raise Exception("No record found in table")
    except Error as e:
        logger.error(f"Error fetching record ID: {e}")
        raise

def log_table_state(conn):
    """Log current state of the table"""
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {db_name}.{table_name};")
            rows = cursor.fetchall()
            logger.debug(f"Current table state: {rows}")
    except Error as e:
        logger.error(f"Error logging table state: {e}")

def update_record(conn, record_id, node_index):
    cursor = conn.cursor()
    logger.debug(f"Starting updates for node {node_index}, record ID: {record_id}")
    for i in range(100):
        try:
            val = node_index * 1000 + i
            logger.debug(f"Node {node_index}, iteration {i}: Updating values to {val}")
            cursor.execute("START TRANSACTION;")
            cursor.execute(
                f"""UPDATE {table_name} SET
                    node1_val = %s,
                    node2_val = %s,
                    node3_val = %s
                WHERE id = %s;""",
                (val, val, val, record_id)
            )
            cursor.execute("COMMIT;")
            logger.debug(f"Node {node_index}, iteration {i}: Update committed")
            log_table_state(conn)
            time.sleep(0.001)  # Reduced delay to increase conflict likelihood
        except mysql.connector.Error as e:
            logger.error(f"Node {node_index}, iteration {i}: Error during update: {e}")
            cursor.execute("ROLLBACK;")
            logger.debug(f"Node {node_index}, iteration {i}: Transaction rolled back")
    cursor.close()
    logger.debug(f"Finished updates for node {node_index}")

def get_metrics(node):
    logger.debug(f"Collecting metrics for {node['host']}:{node['port']}")
    try:
        with mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password']
        ) as conn:
            with conn.cursor() as cursor:
                cursor.execute("SHOW STATUS LIKE 'wsrep_local_cert_failures';")
                result = cursor.fetchone()
                cert_failures = result[1] if result else "N/A"
                
                cursor.execute("SHOW STATUS LIKE 'Innodb_buffer_pool_pages_dirty';")
                result = cursor.fetchone()
                dirty_pages = result[1] if result else "N/A"
                
                cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_status';")
                cluster_status = cursor.fetchone()
                cluster_status = cluster_status[1] if cluster_status else "N/A"
                
                cursor.execute("SHOW STATUS LIKE 'wsrep_local_state';")
                local_state = cursor.fetchone()
                local_state = local_state[1] if local_state else "N/A"
                
                cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_size';")
                cluster_size = cursor.fetchone()
                cluster_size = cluster_size[1] if cluster_size else "N/A"
                
                logger.info(f"Node {node['host']}:{node['port']}: "
                          f"wsrep_local_cert_failures={cert_failures}, "
                          f"buffer_pool_dirty_pages={dirty_pages}, "
                          f"cluster_status={cluster_status}, "
                          f"local_state={local_state}, "
                          f"cluster_size={cluster_size}")
    except Error as e:
        logger.error(f"Error fetching metrics from {node['host']}:{node['port']}: {e}")

def process_node(node, node_index):
    try:
        logger.debug(f"Connecting to {node['host']}:{node['port']} for node {node_index}")
        check_node_status(node)
        with mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password'], autocommit=True
        ) as conn:
            with conn.cursor() as cursor:
                create_db_and_table(cursor)
                insert_initial_record(cursor)
                record_id = get_record_id(cursor)
                logger.info(f"Node {node['host']}:{node['port']} ready, waiting for others...")
                barrier.wait()
                logger.info(f"Starting updates on {node['host']}:{node['port']}...")
                update_record(conn, record_id, node_index)
            logger.info(f"Finished processing {node['host']}:{node['port']}")
    except Error as e:
        logger.error(f"Error on node {node['host']}:{node['port']}: {e}")
        raise  # Re-raise to ensure thread failure is caught

if __name__ == "__main__":
    try:
        logger.info(f"Starting script at {datetime.datetime.now()}")
        logger.debug(f"Initializing database on {nodes[0]['host']}:{nodes[0]['port']}...")
        check_node_status(nodes[0])
        with mysql.connector.connect(
            host=nodes[0]['host'],
            port=nodes[0]['port'],
            user=nodes[0]['user'],
            password=nodes[0]['password'],
            autocommit=True
        ) as conn:
            with conn.cursor() as cursor:
                create_db_and_table(cursor)
                insert_initial_record(cursor)

        logger.debug("Starting threads for parallel updates")
        threads = []
        for idx, node in enumerate(nodes):
            thread = threading.Thread(target=process_node, args=(node, idx+1), name=f"Node-{idx+1}")
            threads.append(thread)
            thread.start()

        logger.debug("Waiting for all threads to complete")
        for thread in threads:
            thread.join()

        logger.info("\nCollecting metrics...")
        for node in nodes:
            get_metrics(node)

        logger.info("All nodes updated in parallel.")
        logger.info(f"Script completed at {datetime.datetime.now()}")

    except Exception as e:
        logger.error(f"Error in main: {e}")
← Previous Next →
Back to list