Skip to main content
Glama
run_integration_tests.py12.7 kB
#!/usr/bin/env python3 """ Integration test runner for Tiger MCP multi-account workflows. Provides comprehensive test execution with proper setup, teardown, and reporting for database, process pool, and MCP server integration tests. """ import argparse import os import subprocess import sys import time from pathlib import Path from typing import Dict, List import psutil from loguru import logger import docker # Test configuration TEST_CONFIG = { "postgres_port": 15432, "redis_port": 16379, "test_db_name": "tiger_mcp_test", "test_user": "tiger_test", "test_password": "tiger_test", "docker_network": "tiger-mcp-test-network", } class IntegrationTestRunner: """Integration test runner with Docker container management.""" def __init__(self, config: Dict): self.config = config self.docker_client = None self.containers = {} self.setup_logging() def setup_logging(self): """Set up logging configuration.""" logger.remove() # Remove default handler # Console logging logger.add( sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | {message}", level="INFO", ) # File logging log_file = Path(__file__).parent / "test_results" / "integration_tests.log" log_file.parent.mkdir(exist_ok=True) logger.add( log_file, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name} | {message}", level="DEBUG", rotation="10 MB", retention="7 days", ) def check_prerequisites(self): """Check system prerequisites for running integration tests.""" logger.info("Checking prerequisites...") # Check Docker try: self.docker_client = docker.from_env() self.docker_client.ping() logger.info("✓ Docker is available") except Exception as e: logger.error(f"✗ Docker is not available: {e}") return False # Check available ports for service, port in [ ("PostgreSQL", self.config["postgres_port"]), ("Redis", self.config["redis_port"]), ]: if self.is_port_in_use(port): logger.warning(f"Port {port} is already in use (for {service})") # Check system resources memory = psutil.virtual_memory() if memory.available < 2 * 1024 * 1024 * 1024: # 2GB logger.warning("Less than 2GB of available memory - tests may be slow") logger.info("Prerequisites check completed") return True def is_port_in_use(self, port: int) -> bool: """Check if a port is in use.""" import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("localhost", port)) == 0 def start_infrastructure(self): """Start required infrastructure containers.""" logger.info("Starting infrastructure containers...") # Start PostgreSQL logger.info("Starting PostgreSQL container...") postgres_container = self.docker_client.containers.run( "postgres:15-alpine", name="tiger-mcp-postgres-test", environment={ "POSTGRES_DB": self.config["test_db_name"], "POSTGRES_USER": self.config["test_user"], "POSTGRES_PASSWORD": self.config["test_password"], "POSTGRES_INITDB_ARGS": "--auth-host=scram-sha-256", }, ports={"5432/tcp": self.config["postgres_port"]}, detach=True, remove=True, network_mode="bridge", ) self.containers["postgres"] = postgres_container # Start Redis logger.info("Starting Redis container...") redis_container = self.docker_client.containers.run( "redis:7-alpine", name="tiger-mcp-redis-test", command="redis-server --appendonly yes", ports={"6379/tcp": self.config["redis_port"]}, detach=True, remove=True, network_mode="bridge", ) self.containers["redis"] = redis_container # Wait for containers to be healthy self.wait_for_infrastructure() def wait_for_infrastructure(self, timeout: int = 60): """Wait for infrastructure containers to be ready.""" logger.info("Waiting for infrastructure to be ready...") start_time = time.time() while time.time() - start_time < timeout: try: # Check PostgreSQL postgres_ready = self.check_postgres_health() redis_ready = self.check_redis_health() if postgres_ready and redis_ready: logger.info("✓ All infrastructure containers are ready") return True logger.info("Waiting for containers to be ready...") time.sleep(2) except Exception as e: logger.debug(f"Health check error (expected during startup): {e}") time.sleep(2) logger.error("Timeout waiting for infrastructure") return False def check_postgres_health(self) -> bool: """Check PostgreSQL container health.""" try: result = subprocess.run( [ "pg_isready", "-h", "localhost", "-p", str(self.config["postgres_port"]), "-U", self.config["test_user"], "-d", self.config["test_db_name"], ], capture_output=True, timeout=5, ) return result.returncode == 0 except Exception: return False def check_redis_health(self) -> bool: """Check Redis container health.""" try: result = subprocess.run( [ "redis-cli", "-h", "localhost", "-p", str(self.config["redis_port"]), "ping", ], capture_output=True, timeout=5, ) return result.returncode == 0 and b"PONG" in result.stdout except Exception: return False def setup_test_database(self): """Set up test database schema.""" logger.info("Setting up test database schema...") # Set environment variables for database setup env = os.environ.copy() env.update( { "DATABASE_URL": f"postgresql+asyncpg://{self.config['test_user']}:{self.config['test_password']}@localhost:{self.config['postgres_port']}/{self.config['test_db_name']}", "PYTHONPATH": str(Path(__file__).parent.parent.parent / "src"), } ) # Run database setup script setup_script = Path(__file__).parent / "setup_test_db.py" if setup_script.exists(): result = subprocess.run( [sys.executable, str(setup_script)], env=env, capture_output=True, text=True, ) if result.returncode != 0: logger.error(f"Database setup failed: {result.stderr}") return False logger.info("✓ Test database schema created") return True def run_tests(self, test_args: List[str]) -> bool: """Run the integration tests.""" logger.info("Running integration tests...") # Set up test environment env = os.environ.copy() env.update( { "DATABASE_URL": f"postgresql+asyncpg://{self.config['test_user']}:{self.config['test_password']}@localhost:{self.config['postgres_port']}/{self.config['test_db_name']}", "REDIS_URL": f"redis://localhost:{self.config['redis_port']}/15", "ENVIRONMENT": "test", "ENCRYPTION_MASTER_KEY": "dGVzdF9lbmNyeXB0aW9uX2tleV8zMl9ieXRlc190ZXN0", "JWT_SECRET": "test_jwt_secret_integration_tests_very_secure", "TIGER_MOCK_MODE": "true", "PYTHONPATH": str(Path(__file__).parent.parent.parent / "src"), } ) # Build pytest command cmd = [ sys.executable, "-m", "pytest", str(Path(__file__).parent), "-v", "--tb=short", "--strict-markers", "--strict-config", ] + test_args logger.info(f"Running command: {' '.join(cmd)}") # Run tests start_time = time.time() result = subprocess.run(cmd, env=env) duration = time.time() - start_time if result.returncode == 0: logger.info(f"✓ All tests passed in {duration:.2f}s") return True else: logger.error(f"✗ Tests failed after {duration:.2f}s") return False def cleanup(self): """Clean up containers and resources.""" logger.info("Cleaning up infrastructure...") for name, container in self.containers.items(): try: logger.info(f"Stopping {name} container...") container.stop() container.wait(timeout=10) logger.info(f"✓ {name} container stopped") except Exception as e: logger.warning(f"Error stopping {name} container: {e}") logger.info("Cleanup completed") def run(self, test_args: List[str]) -> bool: """Run the complete integration test suite.""" success = True try: # Check prerequisites if not self.check_prerequisites(): return False # Start infrastructure self.start_infrastructure() # Setup database if not self.setup_test_database(): return False # Run tests success = self.run_tests(test_args) except KeyboardInterrupt: logger.info("Test run interrupted by user") success = False except Exception as e: logger.error(f"Test run failed with error: {e}") success = False finally: # Always cleanup self.cleanup() return success def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Integration test runner for Tiger MCP multi-account workflows" ) parser.add_argument( "--test-pattern", "-k", help="Only run tests matching this pattern" ) parser.add_argument("--test-file", "-f", help="Only run tests from this file") parser.add_argument( "--parallel", "-n", type=int, default=4, help="Number of parallel test workers" ) parser.add_argument( "--coverage", action="store_true", help="Run with coverage reporting" ) parser.add_argument( "--performance", action="store_true", help="Run performance tests (longer duration)", ) parser.add_argument( "--load-test", action="store_true", help="Run load tests (high resource usage)" ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() # Build pytest arguments test_args = [] if args.test_pattern: test_args.extend(["-k", args.test_pattern]) if args.test_file: test_args.append(args.test_file) if args.parallel > 1: test_args.extend(["-n", str(args.parallel)]) if args.coverage: test_args.extend( [ "--cov=shared", "--cov-report=xml:test_results/coverage.xml", "--cov-report=html:test_results/coverage_html", "--cov-report=term", ] ) if args.performance: test_args.extend(["-m", "performance"]) if args.load_test: test_args.extend(["-m", "load_test"]) if args.verbose: test_args.append("-vv") # Add JUnit XML output test_args.extend(["--junit-xml=test_results/integration-results.xml"]) # Create test runner and run tests runner = IntegrationTestRunner(TEST_CONFIG) success = runner.run(test_args) # Exit with appropriate code sys.exit(0 if success else 1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server