2025-09-02 06:39:13
#mysql #mariadb #test
$ cat test.py
import mysql.connector
from mysql.connector import Error
import threading
import time
import logging
import datetime
import random
# Конфигурация логирования
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
handlers=[
logging.FileHandler('test_db_load.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация одной ноды
node = {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"}
db_name = 'test_load_db'
table_name = 'test_load_table'
def check_connection(node):
"""Проверка соединения с нодой"""
try:
conn = mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password']
)
logger.info(f"Соединение с {node['host']}:{node['port']} успешно")
conn.close()
return True
except Error as e:
logger.error(f"Ошибка соединения с {node['host']}:{node['port']}: {e}")
return False
def create_db_and_table(conn):
"""Создание базы данных и таблицы"""
try:
cursor = conn.cursor()
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
cursor.execute(f"USE {db_name};")
cursor.execute(f"""CREATE TABLE IF NOT EXISTS {table_name} (
id INT PRIMARY KEY AUTO_INCREMENT,
value1 INT,
value2 VARCHAR(255),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY unique_value2 (value2)
) ENGINE=InnoDB;""")
logger.debug(f"База данных {db_name} и таблица {table_name} созданы или проверены")
cursor.close()
except Error as e:
logger.error(f"Ошибка создания базы данных/таблицы: {e}")
raise
def insert_records(conn, num_records):
"""Вставка записей для тестирования ошибок вставки (например, дубликат ключа)"""
cursor = conn.cursor()
cursor.execute(f"USE {db_name};") # Выбор базы данных
logger.debug(f"Начало вставки {num_records} записей")
for i in range(num_records):
try:
value1 = random.randint(1, 1000)
value2 = f"value_{value1}" if i % 10 != 0 else f"duplicate_value" # Создание дубликата каждые 10 записей
cursor.execute(f"INSERT INTO {table_name} (value1, value2) VALUES (%s, %s);", (value1, value2))
logger.debug(f"Вставлена запись {i + 1}: value1={value1}, value2={value2}")
except Error as e:
logger.error(f"Ошибка вставки записи {i + 1}: {e}")
conn.commit()
cursor.close()
logger.debug("Вставка завершена")
def update_records(conn, num_updates):
"""Обновление записей для тестирования deadlock или таймаутов"""
cursor = conn.cursor()
cursor.execute(f"USE {db_name};") # Выбор базы данных
logger.debug(f"Начало {num_updates} обновлений")
for i in range(num_updates):
try:
cursor.execute("START TRANSACTION;")
cursor.execute(f"UPDATE {table_name} SET value1 = value1 + %s WHERE id = %s;", (random.randint(1, 10), 1))
time.sleep(0.05) # Задержка для увеличения шанса дедлоков
cursor.execute("COMMIT;")
logger.debug(f"Обновление {i + 1} завершено")
except Error as e:
logger.error(f"Ошибка обновления {i + 1}: {e}")
cursor.execute("ROLLBACK;")
logger.debug(f"Обновление {i + 1}: Транзакция откатана")
conn.commit()
cursor.close()
logger.debug("Обновления завершены")
def read_records(conn):
"""Чтение записей для тестирования ошибок чтения или производительности"""
try:
cursor = conn.cursor()
cursor.execute(f"USE {db_name};") # Выбор базы данных
cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
count = cursor.fetchone()[0]
logger.debug(f"Всего записей в таблице: {count}")
cursor.execute(f"SELECT * FROM {table_name} ORDER BY updated_at DESC LIMIT 10;")
rows = cursor.fetchall()
logger.debug(f"Последние 10 записей: {rows}")
cursor.close()
except Error as e:
logger.error(f"Ошибка чтения: {e}")
def load_test_thread(node):
"""Функция потока для нагрузочного тестирования"""
try:
conn = mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password'], autocommit=False
)
logger.debug("Соединение с базой данных в потоке установлено")
insert_records(conn, 100) # Тест вставки с возможными дубликатами
update_records(conn, 50) # Тест обновлений с возможными дедлоками
read_records(conn) # Тест чтения
conn.close()
except Error as e:
logger.error(f"Ошибка в потоке: {e}")
if __name__ == "__main__":
try:
logger.info(f"Начало скрипта нагрузочного тестирования в {datetime.datetime.now()}")
if not check_connection(node):
logger.error("Начальное соединение не удалось. Выход.")
exit(1)
# Инициализация базы данных
with mysql.connector.connect(
host=node['host'],
port=node['port'],
user=node['user'],
password=node['password'],
autocommit=True
) as conn:
create_db_and_table(conn)
# Запуск нескольких потоков для нагрузки
threads = []
for i in range(5): # 5 потоков для имитации параллелизма
thread = threading.Thread(target=load_test_thread, args=(node,), name=f"Load-Thread-{i+1}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# Финальное чтение состояния таблицы
with mysql.connector.connect(
host=node['host'],
port=node['port'],
user=node['user'],
password=node['password'],
autocommit=True
) as conn:
read_records(conn)
logger.info("Нагрузочное тестирование завершено успешно.")
logger.info(f"Скрипт завершен в {datetime.datetime.now()}")
except Exception as e:
logger.error(f"Ошибка в main: {e}")
Back to list