Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
middleware_log_template.j2โ€ข25.3 kB
{# Jinja2 template for FastAPI logging middleware with enhanced performance metrics #} import logging import time import json import sqlite3 import psutil import threading from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from starlette.types import Message from typing import Dict, Optional # Create logs directory if it doesn't exist import os from pathlib import Path logs_dir = Path("logs") logs_dir.mkdir(exist_ok=True) # Setup SQLite database for request logs db_dir = Path("db") db_dir.mkdir(exist_ok=True) DB_PATH = db_dir / "request_logs.db" def init_db(): conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() # Create the base table cursor.execute(''' CREATE TABLE IF NOT EXISTS request_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, type TEXT, method TEXT, path TEXT, status_code INTEGER, process_time_ms INTEGER, client_host TEXT, client_port TEXT, headers TEXT, query_params TEXT, request_body TEXT, response_body TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create schema version table for migration tracking cursor.execute(''' CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, description TEXT ) ''') # Check current schema version cursor.execute("SELECT MAX(version) FROM schema_version") current_version = cursor.fetchone()[0] or 0 # Apply migrations migrate_database(cursor, current_version) conn.commit() conn.close() def migrate_database(cursor, current_version): """Apply database migrations based on current version.""" # Migration 1: Add Phase 1 enhancement columns if current_version < 1: print("Applying migration 1: Adding Phase 1 enhancement columns...") # Check which columns already exist cursor.execute("PRAGMA table_info(request_logs)") existing_columns = {col[1] for col in cursor.fetchall()} # Add new columns if they don't exist new_columns = [ ("session_id", "TEXT"), ("test_scenario", "TEXT"), ("correlation_id", "TEXT"), ("user_agent", "TEXT"), ("response_size", "INTEGER"), ("is_admin", "BOOLEAN DEFAULT 0") ] for column_name, column_type in new_columns: if column_name not in existing_columns: try: cursor.execute(f'ALTER TABLE request_logs ADD COLUMN {column_name} {column_type}') print(f"Added column: {column_name}") except Exception as e: print(f"Warning: Could not add column {column_name}: {e}") # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (1, "Added Phase 1 enhancement columns: session_id, test_scenario, correlation_id, user_agent, response_size, is_admin") ) print("Migration 1 completed successfully") # Migration 2: Create test sessions table (Phase 1) if current_version < 2: print("Applying migration 2: Creating test sessions table...") cursor.execute(''' CREATE TABLE IF NOT EXISTS test_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT UNIQUE NOT NULL, name TEXT, description TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ended_at TIMESTAMP, metadata TEXT, total_requests INTEGER DEFAULT 0, success_rate REAL DEFAULT 0.0 ) ''') # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (2, "Created test_sessions table for session tracking") ) print("Migration 2 completed successfully") # Migration 3: Create performance metrics table (Phase 1) if current_version < 3: print("Applying migration 3: Creating performance metrics table...") cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, endpoint TEXT NOT NULL, method TEXT NOT NULL, avg_response_time REAL, min_response_time REAL, max_response_time REAL, request_count INTEGER, error_count INTEGER, time_window TEXT ) ''') # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (3, "Created performance_metrics table for performance tracking") ) print("Migration 3 completed successfully") # Migration 5: Create enhanced performance metrics table (Phase 2 Part 4) if current_version < 5: print("Applying migration 5: Creating enhanced performance metrics table...") # Drop the old performance_metrics table if it exists cursor.execute('DROP TABLE IF EXISTS performance_metrics') # Create the new enhanced performance_metrics table cursor.execute(''' CREATE TABLE performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id INTEGER, response_time_ms REAL NOT NULL, memory_usage_mb REAL, cpu_usage_percent REAL, database_queries INTEGER DEFAULT 0, cache_hits INTEGER DEFAULT 0, cache_misses INTEGER DEFAULT 0, request_size_bytes INTEGER DEFAULT 0, response_size_bytes INTEGER DEFAULT 0, recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (request_id) REFERENCES request_logs (id) ) ''') # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (5, "Recreated enhanced performance_metrics table for comprehensive performance tracking") ) print("Migration 5 completed successfully") # Migration 6: Create enhanced test sessions table (Phase 2 Part 4) if current_version < 6: print("Applying migration 6: Creating enhanced test sessions table...") # Drop the old test_sessions table if it exists cursor.execute('DROP TABLE IF EXISTS test_sessions') # Create the new enhanced test_sessions table cursor.execute(''' CREATE TABLE test_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT UNIQUE NOT NULL, scenario_name TEXT, start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP, total_requests INTEGER DEFAULT 0, avg_response_time REAL DEFAULT 0.0, status TEXT DEFAULT 'active', metadata TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (6, "Recreated enhanced test_sessions table for advanced session tracking") ) print("Migration 6 completed successfully") # Migration 7: Create mock_scenarios table (missing table) if current_version < 7: print("Applying migration 7: Creating mock_scenarios table...") cursor.execute(''' CREATE TABLE IF NOT EXISTS mock_scenarios ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, description TEXT, config TEXT, is_active BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Record migration cursor.execute( "INSERT INTO schema_version (version, description) VALUES (?, ?)", (7, "Created mock_scenarios table for scenario management") ) print("Migration 7 completed successfully") def get_schema_version(): """Get current database schema version.""" try: conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() cursor.execute("SELECT MAX(version) FROM schema_version") version = cursor.fetchone()[0] or 0 conn.close() return version except Exception: return 0 def extract_session_info(request): """Extract session information from request headers.""" headers = dict(request.headers) if request.headers else {} # Extract session ID from various possible headers session_id = ( headers.get('x-session-id') or headers.get('x-test-session') or headers.get('session-id') or None ) # Extract test scenario test_scenario = ( headers.get('x-test-scenario') or headers.get('test-scenario') or None ) # Extract correlation ID correlation_id = ( headers.get('x-correlation-id') or headers.get('correlation-id') or headers.get('x-request-id') or None ) # Extract user agent user_agent = headers.get('user-agent', '') return session_id, test_scenario, correlation_id, user_agent # Performance monitoring utilities class PerformanceMonitor: """Thread-safe performance monitoring for request metrics.""" def __init__(self): self._lock = threading.Lock() self._db_query_count = 0 self._cache_hits = 0 self._cache_misses = 0 def reset_counters(self): """Reset performance counters for a new request.""" with self._lock: self._db_query_count = 0 self._cache_hits = 0 self._cache_misses = 0 def increment_db_queries(self, count: int = 1): """Increment database query counter.""" with self._lock: self._db_query_count += count def increment_cache_hits(self, count: int = 1): """Increment cache hit counter.""" with self._lock: self._cache_hits += count def increment_cache_misses(self, count: int = 1): """Increment cache miss counter.""" with self._lock: self._cache_misses += count def get_counters(self) -> Dict[str, int]: """Get current counter values.""" with self._lock: return { 'db_queries': self._db_query_count, 'cache_hits': self._cache_hits, 'cache_misses': self._cache_misses } def get_memory_usage() -> float: """Get current memory usage in MB.""" try: process = psutil.Process() memory_info = process.memory_info() return memory_info.rss / 1024 / 1024 # Convert bytes to MB except Exception: return 0.0 def get_cpu_usage() -> float: """Get current CPU usage percentage.""" try: return psutil.cpu_percent(interval=None) except Exception: return 0.0 def store_performance_metrics(request_id: int, response_time_ms: float, request_size: int, response_size: int, counters: Dict[str, int]): """Store performance metrics in the database.""" try: conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() memory_usage = get_memory_usage() cpu_usage = get_cpu_usage() cursor.execute(''' INSERT INTO performance_metrics ( request_id, response_time_ms, memory_usage_mb, cpu_usage_percent, database_queries, cache_hits, cache_misses, request_size_bytes, response_size_bytes, recorded_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( request_id, response_time_ms, memory_usage, cpu_usage, counters.get('db_queries', 0), counters.get('cache_hits', 0), counters.get('cache_misses', 0), request_size, response_size )) conn.commit() conn.close() except Exception as e: print(f"Error storing performance metrics: {e}") def update_test_session(session_id: str, response_time_ms: float, scenario_name: Optional[str] = None): """Update test session with new request data.""" if not session_id: return try: conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() # Check if session exists cursor.execute("SELECT id, total_requests, avg_response_time FROM test_sessions WHERE session_id = ?", (session_id,)) session = cursor.fetchone() if session: # Update existing session session_db_id, total_requests, avg_response_time = session new_total = total_requests + 1 new_avg = ((avg_response_time * total_requests) + response_time_ms) / new_total cursor.execute(''' UPDATE test_sessions SET total_requests = ?, avg_response_time = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ? ''', (new_total, new_avg, session_id)) else: # Create new session cursor.execute(''' INSERT INTO test_sessions ( session_id, scenario_name, total_requests, avg_response_time, status ) VALUES (?, ?, 1, ?, 'active') ''', (session_id, scenario_name, response_time_ms)) conn.commit() conn.close() except Exception as e: print(f"Error updating test session: {e}") # Global performance monitor instance performance_monitor = PerformanceMonitor() # Initialize database init_db() # Configure logging to both file and console logger = logging.getLogger("mock_api_logger") logger.setLevel(logging.INFO) # Add console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # Add file handler with daily rotation from datetime import datetime log_file = logs_dir / f"requests_{datetime.now().strftime('%Y-%m-%d')}.log" file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_formatter = logging.Formatter('%(message)s') # Just the message for cleaner JSON parsing file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # Prevent duplicate logging logger.propagate = False class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: RequestResponseEndpoint): start_time = time.time() # Reset performance counters for this request performance_monitor.reset_counters() # Attempt to read request body without consuming it for the endpoint # This is tricky with FastAPI/Starlette's request object model # One common approach is to read it and then reconstruct it if necessary # For simple JSON, this is often okay. For streaming/form data, it's more complex. request_body_bytes = await request.body() request_size = len(request_body_bytes) if request_body_bytes else 0 # Store the body so the endpoint can re-read it if necessary # Starlette's Request object has a _body attribute that can be set # or use a more robust method if issues arise with specific content types async def receive() -> Message: return {"type": "http.request", "body": request_body_bytes, "more_body": False} # Create a new Request object with the captured body for the endpoint # This ensures the endpoint receives the body correctly after we've read it here. # However, directly modifying request._receive or creating a new request like this # can be complex. A simpler approach for logging might be to log only metadata # or use a more advanced APM tool if full body logging becomes problematic. # For now, we'll log what we can and note potential complexities. request_body_str = "" try: request_body_str = request_body_bytes.decode('utf-8') except UnicodeDecodeError: request_body_str = "[binary data]" request_log_data = { "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime(start_time)), "type": "request", "method": request.method, "path": request.url.path, "query_params": str(request.query_params), "client_host": request.client.host if request.client else "N/A", "client_port": request.client.port if request.client else "N/A", "headers": dict(request.headers), "body": request_body_str, # Consider truncating large bodies } logger.info(json.dumps(request_log_data)) # Re-assign the body to the request object so it can be read by the endpoint # This is a common pattern but can have edge cases. # Starlette's Request object uses a stream for the body. # Once read, it's consumed. We need to "reset" it. # The `request._receive` mechanism is internal and can change. # A more robust way is to pass the body along if the framework supports it, # or accept that logging the body here means it might not be available downstream # without special handling. # For this mock server, we assume endpoints might not need the body if it's simple. # If they do, this part needs to be very robust. # Create a new scope for the request with the original body # This is a more robust way to ensure the body is available for the endpoint scope = request.scope # Create a new receive channel that will provide the body async def new_receive(): return {"type": "http.request", "body": request_body_bytes, "more_body": False} # Create a new request object with the new receive channel # This is not straightforward as Request objects are not meant to be mutated this way easily. # The most reliable way is often to have the endpoint itself log its received body if critical. # Or, use a framework feature if available. # For now, we log the body as read, and accept limitations. response = await call_next(Request(scope, receive=new_receive)) # Pass new request with body process_time_ms = int((time.time() - start_time) * 1000) response_body_bytes = b"" async for chunk in response.body_iterator: response_body_bytes += chunk response_body_str = "" try: response_body_str = response_body_bytes.decode('utf-8') except UnicodeDecodeError: response_body_str = "[binary data]" response_log_data = { "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()), "type": "response", "method": request.method, "path": request.url.path, "status_code": response.status_code, "process_time_ms": process_time_ms, "headers": dict(response.headers), "body": response_body_str, # Consider truncating } logger.info(json.dumps(response_log_data)) # All requests on business port are non-admin (admin requests are on separate port) is_admin = False # Extract session information session_id, test_scenario, correlation_id, user_agent = extract_session_info(request) # Calculate response size response_size = len(response_body_bytes) if response_body_bytes else 0 # DEBUG: Log admin request detection print(f"DEBUG MIDDLEWARE: Request path: {request.url.path}") print(f"DEBUG MIDDLEWARE: Is admin request: {is_admin}") print(f"DEBUG MIDDLEWARE: Session ID: {session_id}") print(f"DEBUG MIDDLEWARE: Test scenario: {test_scenario}") # Store request and response in SQLite database request_id = None try: conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() # Check current schema and apply migrations if needed cursor.execute("PRAGMA table_info(request_logs)") columns = cursor.fetchall() column_names = {col[1] for col in columns} # Ensure all required columns exist (backward compatibility) required_columns = { 'is_admin': 'BOOLEAN DEFAULT 0', 'session_id': 'TEXT', 'test_scenario': 'TEXT', 'correlation_id': 'TEXT', 'user_agent': 'TEXT', 'response_size': 'INTEGER' } for column_name, column_type in required_columns.items(): if column_name not in column_names: try: cursor.execute(f'ALTER TABLE request_logs ADD COLUMN {column_name} {column_type}') conn.commit() print(f"DEBUG MIDDLEWARE: Added column: {column_name}") except Exception as e: print(f"DEBUG MIDDLEWARE: Could not add column {column_name}: {e}") # Insert the log entry with all available data cursor.execute(''' INSERT INTO request_logs ( timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, is_admin, session_id, test_scenario, correlation_id, user_agent, response_size ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime()), "request", request.method, request.url.path, response.status_code, process_time_ms, request.client.host if request.client else "N/A", str(request.client.port) if request.client else "N/A", json.dumps(dict(request.headers)), str(request.query_params), request_body_str, response_body_str, 1 if is_admin else 0, session_id, test_scenario, correlation_id, user_agent, response_size )) # Get the request ID for performance metrics request_id = cursor.lastrowid conn.commit() print(f"DEBUG MIDDLEWARE: Stored enhanced request log with ID: {request_id}") conn.close() except Exception as e: logger.error(f"Failed to store request log in database: {e}") print(f"DEBUG MIDDLEWARE: Database error: {e}") # Store performance metrics if request was logged successfully if request_id: try: counters = performance_monitor.get_counters() store_performance_metrics( request_id, process_time_ms, request_size, response_size, counters ) print(f"DEBUG MIDDLEWARE: Stored performance metrics for request {request_id}") except Exception as e: print(f"DEBUG MIDDLEWARE: Error storing performance metrics: {e}") # Update test session if session_id is present if session_id: try: update_test_session(session_id, process_time_ms, test_scenario) print(f"DEBUG MIDDLEWARE: Updated test session: {session_id}") except Exception as e: print(f"DEBUG MIDDLEWARE: Error updating test session: {e}") # Return a new response with the consumed body, as body_iterator consumes it return Response( content=response_body_bytes, status_code=response.status_code, headers=dict(response.headers), media_type=response.media_type ) # In the generated main.py, this middleware will be added to the app: # from .logging_middleware import LoggingMiddleware # Assuming it's saved as logging_middleware.py # app.add_middleware(LoggingMiddleware)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server