292 lines
10 KiB
Python
292 lines
10 KiB
Python
"""
|
|
Load test runner for ATS application.
|
|
|
|
This script provides a command-line interface for running load tests
|
|
with different scenarios and configurations.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import subprocess
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
|
|
# Add the project root to Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from load_tests.config import LoadTestConfig, PERFORMANCE_THRESHOLDS
|
|
from load_tests.test_data_generator import TestDataGenerator
|
|
from load_tests.monitoring import check_performance_thresholds
|
|
|
|
class LoadTestRunner:
|
|
"""Main load test runner class."""
|
|
|
|
def __init__(self):
|
|
self.config = LoadTestConfig()
|
|
self.results_dir = "load_tests/results"
|
|
os.makedirs(self.results_dir, exist_ok=True)
|
|
|
|
def run_test(self, scenario_name: str, extra_args: List[str] = None) -> bool:
|
|
"""Run a specific load test scenario."""
|
|
scenario = self.config.get_scenario(scenario_name)
|
|
if not scenario:
|
|
print(f"Error: Scenario '{scenario_name}' not found.")
|
|
print(f"Available scenarios: {', '.join(self.config.list_scenarios())}")
|
|
return False
|
|
|
|
print(f"Running load test scenario: {scenario.name}")
|
|
print(f"Description: {scenario.description}")
|
|
print(f"Users: {scenario.users}, Spawn Rate: {scenario.spawn_rate}")
|
|
print(f"Duration: {scenario.run_time}")
|
|
print(f"Target: {scenario.host}")
|
|
print("-" * 50)
|
|
|
|
# Prepare Locust command
|
|
cmd = self._build_locust_command(scenario, extra_args)
|
|
|
|
# Set environment variables
|
|
env = os.environ.copy()
|
|
env['ATS_HOST'] = scenario.host
|
|
if scenario.login_credentials:
|
|
env['TEST_USERNAME'] = scenario.login_credentials.get('username', '')
|
|
env['TEST_PASSWORD'] = scenario.login_credentials.get('password', '')
|
|
|
|
try:
|
|
# Run the load test
|
|
print(f"Executing: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, env=env, check=True)
|
|
|
|
print(f"Load test '{scenario_name}' completed successfully!")
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Load test failed with exit code: {e.returncode}")
|
|
return False
|
|
except KeyboardInterrupt:
|
|
print("\nLoad test interrupted by user.")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Unexpected error running load test: {e}")
|
|
return False
|
|
|
|
def _build_locust_command(self, scenario, extra_args: List[str] = None) -> List[str]:
|
|
"""Build the Locust command line."""
|
|
cmd = [
|
|
"locust",
|
|
"-f", "load_tests/locustfile.py",
|
|
"--host", scenario.host,
|
|
"--users", str(scenario.users),
|
|
"--spawn-rate", str(scenario.spawn_rate),
|
|
"--run-time", scenario.run_time,
|
|
"--html", f"{self.results_dir}/report_{scenario.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html",
|
|
"--csv", f"{self.results_dir}/stats_{scenario.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
|
|
]
|
|
|
|
# Add user classes
|
|
if scenario.user_classes:
|
|
user_classes = ",".join(scenario.user_classes)
|
|
cmd.extend(["--user-class", user_classes])
|
|
|
|
# Add extra arguments
|
|
if extra_args:
|
|
cmd.extend(extra_args)
|
|
|
|
# Add test name for reporting
|
|
cmd.extend(["--test-name", scenario.name])
|
|
|
|
return cmd
|
|
|
|
def list_scenarios(self):
|
|
"""List all available test scenarios."""
|
|
print("Available Load Test Scenarios:")
|
|
print("=" * 50)
|
|
|
|
for name, scenario in self.config.scenarios.items():
|
|
print(f"\n{name}:")
|
|
print(f" Description: {scenario.description}")
|
|
print(f" Users: {scenario.users}, Spawn Rate: {scenario.spawn_rate}")
|
|
print(f" Duration: {scenario.run_time}")
|
|
print(f" User Classes: {', '.join(scenario.user_classes)}")
|
|
print(f" Tags: {', '.join(scenario.tags)}")
|
|
|
|
def generate_test_data(self, config: Dict[str, int] = None):
|
|
"""Generate test data for load testing."""
|
|
print("Generating test data...")
|
|
|
|
generator = TestDataGenerator()
|
|
|
|
if config is None:
|
|
config = {
|
|
"job_count": 100,
|
|
"user_count": 50,
|
|
"application_count": 500
|
|
}
|
|
|
|
test_data = generator.generate_bulk_data(config)
|
|
generator.save_test_data(test_data)
|
|
generator.create_test_files(100)
|
|
|
|
print("Test data generation completed!")
|
|
|
|
def run_headless_test(self, scenario_name: str, extra_args: List[str] = None) -> bool:
|
|
"""Run load test in headless mode (no web UI)."""
|
|
scenario = self.config.get_scenario(scenario_name)
|
|
if not scenario:
|
|
print(f"Error: Scenario '{scenario_name}' not found.")
|
|
return False
|
|
|
|
cmd = self._build_locust_command(scenario, extra_args)
|
|
cmd.extend(["--headless"])
|
|
|
|
# Set environment variables
|
|
env = os.environ.copy()
|
|
env['ATS_HOST'] = scenario.host
|
|
if scenario.login_credentials:
|
|
env['TEST_USERNAME'] = scenario.login_credentials.get('username', '')
|
|
env['TEST_PASSWORD'] = scenario.login_credentials.get('password', '')
|
|
|
|
try:
|
|
print(f"Running headless test: {scenario.name}")
|
|
result = subprocess.run(cmd, env=env, check=True)
|
|
print(f"Headless test completed successfully!")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Headless test failed with exit code: {e.returncode}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
return False
|
|
|
|
def run_master_worker_test(self, scenario_name: str, master: bool = False, workers: int = 1):
|
|
"""Run distributed load test with master-worker setup."""
|
|
scenario = self.config.get_scenario(scenario_name)
|
|
if not scenario:
|
|
print(f"Error: Scenario '{scenario_name}' not found.")
|
|
return False
|
|
|
|
if master:
|
|
# Run as master
|
|
cmd = self._build_locust_command(scenario)
|
|
cmd.extend(["--master"])
|
|
cmd.extend(["--expect-workers", str(workers)])
|
|
|
|
print(f"Starting master node for: {scenario.name}")
|
|
print(f"Expecting {workers} workers")
|
|
else:
|
|
# Run as worker
|
|
cmd = [
|
|
"locust",
|
|
"-f", "load_tests/locustfile.py",
|
|
"--worker",
|
|
"--master-host", "localhost"
|
|
]
|
|
|
|
print("Starting worker node")
|
|
|
|
try:
|
|
result = subprocess.run(cmd, check=True)
|
|
print("Distributed test completed successfully!")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Distributed test failed with exit code: {e.returncode}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Main entry point for the load test runner."""
|
|
parser = argparse.ArgumentParser(
|
|
description="ATS Load Test Runner",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Run a smoke test
|
|
python run_load_tests.py run smoke_test
|
|
|
|
# Run a heavy load test in headless mode
|
|
python run_load_tests.py headless heavy_load
|
|
|
|
# List all available scenarios
|
|
python run_load_tests.py list
|
|
|
|
# Generate test data
|
|
python run_load_tests.py generate-data
|
|
|
|
# Run distributed test (master)
|
|
python run_load_tests.py master moderate_load --workers 4
|
|
|
|
# Run distributed test (worker)
|
|
python run_load_tests.py worker
|
|
"""
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest='command', help='Available commands')
|
|
|
|
# Run command
|
|
run_parser = subparsers.add_parser('run', help='Run a load test scenario')
|
|
run_parser.add_argument('scenario', help='Name of the scenario to run')
|
|
run_parser.add_argument('--extra', nargs='*', help='Extra arguments for Locust')
|
|
|
|
# Headless command
|
|
headless_parser = subparsers.add_parser('headless', help='Run load test in headless mode')
|
|
headless_parser.add_argument('scenario', help='Name of the scenario to run')
|
|
headless_parser.add_argument('--extra', nargs='*', help='Extra arguments for Locust')
|
|
|
|
# List command
|
|
subparsers.add_parser('list', help='List all available scenarios')
|
|
|
|
# Generate data command
|
|
generate_parser = subparsers.add_parser('generate-data', help='Generate test data')
|
|
generate_parser.add_argument('--jobs', type=int, default=100, help='Number of jobs to generate')
|
|
generate_parser.add_argument('--users', type=int, default=50, help='Number of users to generate')
|
|
generate_parser.add_argument('--applications', type=int, default=500, help='Number of applications to generate')
|
|
|
|
# Master command
|
|
master_parser = subparsers.add_parser('master', help='Run as master node in distributed test')
|
|
master_parser.add_argument('scenario', help='Name of the scenario to run')
|
|
master_parser.add_argument('--workers', type=int, default=1, help='Number of expected workers')
|
|
|
|
# Worker command
|
|
subparsers.add_parser('worker', help='Run as worker node in distributed test')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return
|
|
|
|
runner = LoadTestRunner()
|
|
|
|
if args.command == 'run':
|
|
success = runner.run_test(args.scenario, args.extra)
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.command == 'headless':
|
|
success = runner.run_headless_test(args.scenario, args.extra)
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.command == 'list':
|
|
runner.list_scenarios()
|
|
|
|
elif args.command == 'generate-data':
|
|
config = {
|
|
"job_count": args.jobs,
|
|
"user_count": args.users,
|
|
"application_count": args.applications
|
|
}
|
|
runner.generate_test_data(config)
|
|
|
|
elif args.command == 'master':
|
|
success = runner.run_master_worker_test(args.scenario, master=True, workers=args.workers)
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.command == 'worker':
|
|
success = runner.run_master_worker_test('', master=False)
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|