LogNotes

2025-09-02 06:39:13
#mysql #mariadb #test

$ cat test.py 
import mysql.connector
from mysql.connector import Error
import threading
import time
import logging
import datetime
import random

# Конфигурация логирования
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
    handlers=[
        logging.FileHandler('test_db_load.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Конфигурация одной ноды
node = {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"}

db_name = 'test_load_db'
table_name = 'test_load_table'

def check_connection(node):
    """Проверка соединения с нодой"""
    try:
        conn = mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password']
        )
        logger.info(f"Соединение с {node['host']}:{node['port']} успешно")
        conn.close()
        return True
    except Error as e:
        logger.error(f"Ошибка соединения с {node['host']}:{node['port']}: {e}")
        return False

def create_db_and_table(conn):
    """Создание базы данных и таблицы"""
    try:
        cursor = conn.cursor()
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
        cursor.execute(f"USE {db_name};")
        cursor.execute(f"""CREATE TABLE IF NOT EXISTS {table_name} (
            id INT PRIMARY KEY AUTO_INCREMENT,
            value1 INT,
            value2 VARCHAR(255),
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            UNIQUE KEY unique_value2 (value2)
        ) ENGINE=InnoDB;""")
        logger.debug(f"База данных {db_name} и таблица {table_name} созданы или проверены")
        cursor.close()
    except Error as e:
        logger.error(f"Ошибка создания базы данных/таблицы: {e}")
        raise

def insert_records(conn, num_records):
    """Вставка записей для тестирования ошибок вставки (например, дубликат ключа)"""
    cursor = conn.cursor()
    cursor.execute(f"USE {db_name};")  # Выбор базы данных
    logger.debug(f"Начало вставки {num_records} записей")
    for i in range(num_records):
        try:
            value1 = random.randint(1, 1000)
            value2 = f"value_{value1}" if i % 10 != 0 else f"duplicate_value"  # Создание дубликата каждые 10 записей
            cursor.execute(f"INSERT INTO {table_name} (value1, value2) VALUES (%s, %s);", (value1, value2))
            logger.debug(f"Вставлена запись {i + 1}: value1={value1}, value2={value2}")
        except Error as e:
            logger.error(f"Ошибка вставки записи {i + 1}: {e}")
    conn.commit()
    cursor.close()
    logger.debug("Вставка завершена")

def update_records(conn, num_updates):
    """Обновление записей для тестирования deadlock или таймаутов"""
    cursor = conn.cursor()
    cursor.execute(f"USE {db_name};")  # Выбор базы данных
    logger.debug(f"Начало {num_updates} обновлений")
    for i in range(num_updates):
        try:
            cursor.execute("START TRANSACTION;")
            cursor.execute(f"UPDATE {table_name} SET value1 = value1 + %s WHERE id = %s;", (random.randint(1, 10), 1))
            time.sleep(0.05)  # Задержка для увеличения шанса дедлоков
            cursor.execute("COMMIT;")
            logger.debug(f"Обновление {i + 1} завершено")
        except Error as e:
            logger.error(f"Ошибка обновления {i + 1}: {e}")
            cursor.execute("ROLLBACK;")
            logger.debug(f"Обновление {i + 1}: Транзакция откатана")
    conn.commit()
    cursor.close()
    logger.debug("Обновления завершены")

def read_records(conn):
    """Чтение записей для тестирования ошибок чтения или производительности"""
    try:
        cursor = conn.cursor()
        cursor.execute(f"USE {db_name};")  # Выбор базы данных
        cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
        count = cursor.fetchone()[0]
        logger.debug(f"Всего записей в таблице: {count}")
        cursor.execute(f"SELECT * FROM {table_name} ORDER BY updated_at DESC LIMIT 10;")
        rows = cursor.fetchall()
        logger.debug(f"Последние 10 записей: {rows}")
        cursor.close()
    except Error as e:
        logger.error(f"Ошибка чтения: {e}")

def load_test_thread(node):
    """Функция потока для нагрузочного тестирования"""
    try:
        conn = mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password'], autocommit=False
        )
        logger.debug("Соединение с базой данных в потоке установлено")
        insert_records(conn, 100)  # Тест вставки с возможными дубликатами
        update_records(conn, 50)  # Тест обновлений с возможными дедлоками
        read_records(conn)  # Тест чтения
        conn.close()
    except Error as e:
        logger.error(f"Ошибка в потоке: {e}")

if __name__ == "__main__":
    try:
        logger.info(f"Начало скрипта нагрузочного тестирования в {datetime.datetime.now()}")
        if not check_connection(node):
            logger.error("Начальное соединение не удалось. Выход.")
            exit(1)

        # Инициализация базы данных
        with mysql.connector.connect(
            host=node['host'],
            port=node['port'],
            user=node['user'],
            password=node['password'],
            autocommit=True
        ) as conn:
            create_db_and_table(conn)

        # Запуск нескольких потоков для нагрузки
        threads = []
        for i in range(5):  # 5 потоков для имитации параллелизма
            thread = threading.Thread(target=load_test_thread, args=(node,), name=f"Load-Thread-{i+1}")
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

        # Финальное чтение состояния таблицы
        with mysql.connector.connect(
            host=node['host'],
            port=node['port'],
            user=node['user'],
            password=node['password'],
            autocommit=True
        ) as conn:
            read_records(conn)

        logger.info("Нагрузочное тестирование завершено успешно.")
        logger.info(f"Скрипт завершен в {datetime.datetime.now()}")

    except Exception as e:
        logger.error(f"Ошибка в main: {e}")
← Previous Next →
Back to list