2025-09-02 06:37:14
#mysql #mariadb provocator `buffer_pool_dirty_pages` и `wsrep_local_cert_failures`
$ cat provo.py
import mysql.connector
from mysql.connector import Error
import threading
import time
import os
import logging
import datetime
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
handlers=[
logging.FileHandler('provo.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Configuration for Galera nodes (adjust for single-node or multi-node)
nodes = [
{"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
{"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
{"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"},
]
db_name = 'provocation_db'
table_name = 'provocation_table'
barrier = threading.Barrier(len(nodes)) # Barrier for multi-node synchronization
def check_node_status(node):
"""Check MariaDB/Galera node status"""
try:
with mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password']
) as conn:
with conn.cursor() as cursor:
# Check general MariaDB status
cursor.execute("SELECT VERSION();")
version = cursor.fetchone()[0]
logger.debug(f"Node {node['host']}:{node['port']} MariaDB version: {version}")
# Check Galera-specific status
cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_status';")
cluster_status = cursor.fetchone()
cursor.execute("SHOW STATUS LIKE 'wsrep_local_state';")
local_state = cursor.fetchone()
cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_size';")
cluster_size = cursor.fetchone()
cursor.execute("SHOW VARIABLES LIKE 'wsrep_on';")
wsrep_on = cursor.fetchone()
logger.debug(f"Node {node['host']}:{node['port']} status: "
f"cluster_status={cluster_status[1] if cluster_status else 'N/A'}, "
f"local_state={local_state[1] if local_state else 'N/A'}, "
f"cluster_size={cluster_size[1] if cluster_size else 'N/A'}, "
f"wsrep_on={wsrep_on[1] if wsrep_on else 'N/A'}")
except Error as e:
logger.error(f"Failed to check status for {node['host']}:{node['port']}: {e}")
raise # Re-raise to prevent proceeding with failed nodes
def create_db_and_table(cursor):
logger.debug("Creating database and table if not exists")
try:
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
cursor.execute(f"USE {db_name};")
cursor.execute(f"""CREATE TABLE IF NOT EXISTS {table_name} (
id INT PRIMARY KEY AUTO_INCREMENT,
node1_val INT DEFAULT 0,
node2_val INT DEFAULT 0,
node3_val INT DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB;""")
logger.debug(f"Database {db_name} and table {table_name} created or verified")
except Error as e:
logger.error(f"Error creating database/table: {e}")
raise
def insert_initial_record(cursor):
logger.debug("Inserting initial record")
try:
cursor.execute(
f"""INSERT INTO {table_name} (node1_val, node2_val, node3_val)
VALUES (0, 0, 0)
ON DUPLICATE KEY UPDATE node1_val = node1_val;"""
)
logger.debug("Initial record inserted")
except Error as e:
logger.error(f"Error inserting initial record: {e}")
raise
def get_record_id(cursor):
logger.debug("Fetching record ID")
try:
cursor.execute(f"SELECT id FROM {table_name} LIMIT 1;")
result = cursor.fetchone()
if result:
logger.debug(f"Record ID fetched: {result[0]}")
return result[0]
else:
logger.error("No record found in table")
raise Exception("No record found in table")
except Error as e:
logger.error(f"Error fetching record ID: {e}")
raise
def log_table_state(conn):
"""Log current state of the table"""
try:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {db_name}.{table_name};")
rows = cursor.fetchall()
logger.debug(f"Current table state: {rows}")
except Error as e:
logger.error(f"Error logging table state: {e}")
def update_record(conn, record_id, node_index):
cursor = conn.cursor()
logger.debug(f"Starting updates for node {node_index}, record ID: {record_id}")
for i in range(100):
try:
val = node_index * 1000 + i
logger.debug(f"Node {node_index}, iteration {i}: Updating values to {val}")
cursor.execute("START TRANSACTION;")
cursor.execute(
f"""UPDATE {table_name} SET
node1_val = %s,
node2_val = %s,
node3_val = %s
WHERE id = %s;""",
(val, val, val, record_id)
)
cursor.execute("COMMIT;")
logger.debug(f"Node {node_index}, iteration {i}: Update committed")
log_table_state(conn)
time.sleep(0.001) # Reduced delay to increase conflict likelihood
except mysql.connector.Error as e:
logger.error(f"Node {node_index}, iteration {i}: Error during update: {e}")
cursor.execute("ROLLBACK;")
logger.debug(f"Node {node_index}, iteration {i}: Transaction rolled back")
cursor.close()
logger.debug(f"Finished updates for node {node_index}")
def get_metrics(node):
logger.debug(f"Collecting metrics for {node['host']}:{node['port']}")
try:
with mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password']
) as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW STATUS LIKE 'wsrep_local_cert_failures';")
result = cursor.fetchone()
cert_failures = result[1] if result else "N/A"
cursor.execute("SHOW STATUS LIKE 'Innodb_buffer_pool_pages_dirty';")
result = cursor.fetchone()
dirty_pages = result[1] if result else "N/A"
cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_status';")
cluster_status = cursor.fetchone()
cluster_status = cluster_status[1] if cluster_status else "N/A"
cursor.execute("SHOW STATUS LIKE 'wsrep_local_state';")
local_state = cursor.fetchone()
local_state = local_state[1] if local_state else "N/A"
cursor.execute("SHOW STATUS LIKE 'wsrep_cluster_size';")
cluster_size = cursor.fetchone()
cluster_size = cluster_size[1] if cluster_size else "N/A"
logger.info(f"Node {node['host']}:{node['port']}: "
f"wsrep_local_cert_failures={cert_failures}, "
f"buffer_pool_dirty_pages={dirty_pages}, "
f"cluster_status={cluster_status}, "
f"local_state={local_state}, "
f"cluster_size={cluster_size}")
except Error as e:
logger.error(f"Error fetching metrics from {node['host']}:{node['port']}: {e}")
def process_node(node, node_index):
try:
logger.debug(f"Connecting to {node['host']}:{node['port']} for node {node_index}")
check_node_status(node)
with mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password'], autocommit=True
) as conn:
with conn.cursor() as cursor:
create_db_and_table(cursor)
insert_initial_record(cursor)
record_id = get_record_id(cursor)
logger.info(f"Node {node['host']}:{node['port']} ready, waiting for others...")
barrier.wait()
logger.info(f"Starting updates on {node['host']}:{node['port']}...")
update_record(conn, record_id, node_index)
logger.info(f"Finished processing {node['host']}:{node['port']}")
except Error as e:
logger.error(f"Error on node {node['host']}:{node['port']}: {e}")
raise # Re-raise to ensure thread failure is caught
if __name__ == "__main__":
try:
logger.info(f"Starting script at {datetime.datetime.now()}")
logger.debug(f"Initializing database on {nodes[0]['host']}:{nodes[0]['port']}...")
check_node_status(nodes[0])
with mysql.connector.connect(
host=nodes[0]['host'],
port=nodes[0]['port'],
user=nodes[0]['user'],
password=nodes[0]['password'],
autocommit=True
) as conn:
with conn.cursor() as cursor:
create_db_and_table(cursor)
insert_initial_record(cursor)
logger.debug("Starting threads for parallel updates")
threads = []
for idx, node in enumerate(nodes):
thread = threading.Thread(target=process_node, args=(node, idx+1), name=f"Node-{idx+1}")
threads.append(thread)
thread.start()
logger.debug("Waiting for all threads to complete")
for thread in threads:
thread.join()
logger.info("\nCollecting metrics...")
for node in nodes:
get_metrics(node)
logger.info("All nodes updated in parallel.")
logger.info(f"Script completed at {datetime.datetime.now()}")
except Exception as e:
logger.error(f"Error in main: {e}")
Back to list