middleware_log_template.j2โข25.3 kB
{# Jinja2 template for FastAPI logging middleware with enhanced performance metrics #}
import logging
import time
import json
import sqlite3
import psutil
import threading
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.types import Message
from typing import Dict, Optional
# Create logs directory if it doesn't exist
import os
from pathlib import Path
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# Setup SQLite database for request logs
db_dir = Path("db")
db_dir.mkdir(exist_ok=True)
DB_PATH = db_dir / "request_logs.db"
def init_db():
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# Create the base table
cursor.execute('''
CREATE TABLE IF NOT EXISTS request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
type TEXT,
method TEXT,
path TEXT,
status_code INTEGER,
process_time_ms INTEGER,
client_host TEXT,
client_port TEXT,
headers TEXT,
query_params TEXT,
request_body TEXT,
response_body TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create schema version table for migration tracking
cursor.execute('''
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT
)
''')
# Check current schema version
cursor.execute("SELECT MAX(version) FROM schema_version")
current_version = cursor.fetchone()[0] or 0
# Apply migrations
migrate_database(cursor, current_version)
conn.commit()
conn.close()
def migrate_database(cursor, current_version):
"""Apply database migrations based on current version."""
# Migration 1: Add Phase 1 enhancement columns
if current_version < 1:
print("Applying migration 1: Adding Phase 1 enhancement columns...")
# Check which columns already exist
cursor.execute("PRAGMA table_info(request_logs)")
existing_columns = {col[1] for col in cursor.fetchall()}
# Add new columns if they don't exist
new_columns = [
("session_id", "TEXT"),
("test_scenario", "TEXT"),
("correlation_id", "TEXT"),
("user_agent", "TEXT"),
("response_size", "INTEGER"),
("is_admin", "BOOLEAN DEFAULT 0")
]
for column_name, column_type in new_columns:
if column_name not in existing_columns:
try:
cursor.execute(f'ALTER TABLE request_logs ADD COLUMN {column_name} {column_type}')
print(f"Added column: {column_name}")
except Exception as e:
print(f"Warning: Could not add column {column_name}: {e}")
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(1, "Added Phase 1 enhancement columns: session_id, test_scenario, correlation_id, user_agent, response_size, is_admin")
)
print("Migration 1 completed successfully")
# Migration 2: Create test sessions table (Phase 1)
if current_version < 2:
print("Applying migration 2: Creating test sessions table...")
cursor.execute('''
CREATE TABLE IF NOT EXISTS test_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
name TEXT,
description TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ended_at TIMESTAMP,
metadata TEXT,
total_requests INTEGER DEFAULT 0,
success_rate REAL DEFAULT 0.0
)
''')
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(2, "Created test_sessions table for session tracking")
)
print("Migration 2 completed successfully")
# Migration 3: Create performance metrics table (Phase 1)
if current_version < 3:
print("Applying migration 3: Creating performance metrics table...")
cursor.execute('''
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
avg_response_time REAL,
min_response_time REAL,
max_response_time REAL,
request_count INTEGER,
error_count INTEGER,
time_window TEXT
)
''')
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(3, "Created performance_metrics table for performance tracking")
)
print("Migration 3 completed successfully")
# Migration 5: Create enhanced performance metrics table (Phase 2 Part 4)
if current_version < 5:
print("Applying migration 5: Creating enhanced performance metrics table...")
# Drop the old performance_metrics table if it exists
cursor.execute('DROP TABLE IF EXISTS performance_metrics')
# Create the new enhanced performance_metrics table
cursor.execute('''
CREATE TABLE performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
response_time_ms REAL NOT NULL,
memory_usage_mb REAL,
cpu_usage_percent REAL,
database_queries INTEGER DEFAULT 0,
cache_hits INTEGER DEFAULT 0,
cache_misses INTEGER DEFAULT 0,
request_size_bytes INTEGER DEFAULT 0,
response_size_bytes INTEGER DEFAULT 0,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES request_logs (id)
)
''')
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(5, "Recreated enhanced performance_metrics table for comprehensive performance tracking")
)
print("Migration 5 completed successfully")
# Migration 6: Create enhanced test sessions table (Phase 2 Part 4)
if current_version < 6:
print("Applying migration 6: Creating enhanced test sessions table...")
# Drop the old test_sessions table if it exists
cursor.execute('DROP TABLE IF EXISTS test_sessions')
# Create the new enhanced test_sessions table
cursor.execute('''
CREATE TABLE test_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
scenario_name TEXT,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
total_requests INTEGER DEFAULT 0,
avg_response_time REAL DEFAULT 0.0,
status TEXT DEFAULT 'active',
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(6, "Recreated enhanced test_sessions table for advanced session tracking")
)
print("Migration 6 completed successfully")
# Migration 7: Create mock_scenarios table (missing table)
if current_version < 7:
print("Applying migration 7: Creating mock_scenarios table...")
cursor.execute('''
CREATE TABLE IF NOT EXISTS mock_scenarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
config TEXT,
is_active BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Record migration
cursor.execute(
"INSERT INTO schema_version (version, description) VALUES (?, ?)",
(7, "Created mock_scenarios table for scenario management")
)
print("Migration 7 completed successfully")
def get_schema_version():
"""Get current database schema version."""
try:
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
cursor.execute("SELECT MAX(version) FROM schema_version")
version = cursor.fetchone()[0] or 0
conn.close()
return version
except Exception:
return 0
def extract_session_info(request):
"""Extract session information from request headers."""
headers = dict(request.headers) if request.headers else {}
# Extract session ID from various possible headers
session_id = (
headers.get('x-session-id') or
headers.get('x-test-session') or
headers.get('session-id') or
None
)
# Extract test scenario
test_scenario = (
headers.get('x-test-scenario') or
headers.get('test-scenario') or
None
)
# Extract correlation ID
correlation_id = (
headers.get('x-correlation-id') or
headers.get('correlation-id') or
headers.get('x-request-id') or
None
)
# Extract user agent
user_agent = headers.get('user-agent', '')
return session_id, test_scenario, correlation_id, user_agent
# Performance monitoring utilities
class PerformanceMonitor:
"""Thread-safe performance monitoring for request metrics."""
def __init__(self):
self._lock = threading.Lock()
self._db_query_count = 0
self._cache_hits = 0
self._cache_misses = 0
def reset_counters(self):
"""Reset performance counters for a new request."""
with self._lock:
self._db_query_count = 0
self._cache_hits = 0
self._cache_misses = 0
def increment_db_queries(self, count: int = 1):
"""Increment database query counter."""
with self._lock:
self._db_query_count += count
def increment_cache_hits(self, count: int = 1):
"""Increment cache hit counter."""
with self._lock:
self._cache_hits += count
def increment_cache_misses(self, count: int = 1):
"""Increment cache miss counter."""
with self._lock:
self._cache_misses += count
def get_counters(self) -> Dict[str, int]:
"""Get current counter values."""
with self._lock:
return {
'db_queries': self._db_query_count,
'cache_hits': self._cache_hits,
'cache_misses': self._cache_misses
}
def get_memory_usage() -> float:
"""Get current memory usage in MB."""
try:
process = psutil.Process()
memory_info = process.memory_info()
return memory_info.rss / 1024 / 1024 # Convert bytes to MB
except Exception:
return 0.0
def get_cpu_usage() -> float:
"""Get current CPU usage percentage."""
try:
return psutil.cpu_percent(interval=None)
except Exception:
return 0.0
def store_performance_metrics(request_id: int, response_time_ms: float,
request_size: int, response_size: int,
counters: Dict[str, int]):
"""Store performance metrics in the database."""
try:
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
memory_usage = get_memory_usage()
cpu_usage = get_cpu_usage()
cursor.execute('''
INSERT INTO performance_metrics (
request_id, response_time_ms, memory_usage_mb, cpu_usage_percent,
database_queries, cache_hits, cache_misses,
request_size_bytes, response_size_bytes, recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
request_id, response_time_ms, memory_usage, cpu_usage,
counters.get('db_queries', 0), counters.get('cache_hits', 0),
counters.get('cache_misses', 0), request_size, response_size
))
conn.commit()
conn.close()
except Exception as e:
print(f"Error storing performance metrics: {e}")
def update_test_session(session_id: str, response_time_ms: float, scenario_name: Optional[str] = None):
"""Update test session with new request data."""
if not session_id:
return
try:
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# Check if session exists
cursor.execute("SELECT id, total_requests, avg_response_time FROM test_sessions WHERE session_id = ?", (session_id,))
session = cursor.fetchone()
if session:
# Update existing session
session_db_id, total_requests, avg_response_time = session
new_total = total_requests + 1
new_avg = ((avg_response_time * total_requests) + response_time_ms) / new_total
cursor.execute('''
UPDATE test_sessions
SET total_requests = ?, avg_response_time = ?, updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?
''', (new_total, new_avg, session_id))
else:
# Create new session
cursor.execute('''
INSERT INTO test_sessions (
session_id, scenario_name, total_requests, avg_response_time, status
) VALUES (?, ?, 1, ?, 'active')
''', (session_id, scenario_name, response_time_ms))
conn.commit()
conn.close()
except Exception as e:
print(f"Error updating test session: {e}")
# Global performance monitor instance
performance_monitor = PerformanceMonitor()
# Initialize database
init_db()
# Configure logging to both file and console
logger = logging.getLogger("mock_api_logger")
logger.setLevel(logging.INFO)
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# Add file handler with daily rotation
from datetime import datetime
log_file = logs_dir / f"requests_{datetime.now().strftime('%Y-%m-%d')}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(message)s') # Just the message for cleaner JSON parsing
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# Prevent duplicate logging
logger.propagate = False
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
start_time = time.time()
# Reset performance counters for this request
performance_monitor.reset_counters()
# Attempt to read request body without consuming it for the endpoint
# This is tricky with FastAPI/Starlette's request object model
# One common approach is to read it and then reconstruct it if necessary
# For simple JSON, this is often okay. For streaming/form data, it's more complex.
request_body_bytes = await request.body()
request_size = len(request_body_bytes) if request_body_bytes else 0
# Store the body so the endpoint can re-read it if necessary
# Starlette's Request object has a _body attribute that can be set
# or use a more robust method if issues arise with specific content types
async def receive() -> Message:
return {"type": "http.request", "body": request_body_bytes, "more_body": False}
# Create a new Request object with the captured body for the endpoint
# This ensures the endpoint receives the body correctly after we've read it here.
# However, directly modifying request._receive or creating a new request like this
# can be complex. A simpler approach for logging might be to log only metadata
# or use a more advanced APM tool if full body logging becomes problematic.
# For now, we'll log what we can and note potential complexities.
request_body_str = ""
try:
request_body_str = request_body_bytes.decode('utf-8')
except UnicodeDecodeError:
request_body_str = "[binary data]"
request_log_data = {
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime(start_time)),
"type": "request",
"method": request.method,
"path": request.url.path,
"query_params": str(request.query_params),
"client_host": request.client.host if request.client else "N/A",
"client_port": request.client.port if request.client else "N/A",
"headers": dict(request.headers),
"body": request_body_str, # Consider truncating large bodies
}
logger.info(json.dumps(request_log_data))
# Re-assign the body to the request object so it can be read by the endpoint
# This is a common pattern but can have edge cases.
# Starlette's Request object uses a stream for the body.
# Once read, it's consumed. We need to "reset" it.
# The `request._receive` mechanism is internal and can change.
# A more robust way is to pass the body along if the framework supports it,
# or accept that logging the body here means it might not be available downstream
# without special handling.
# For this mock server, we assume endpoints might not need the body if it's simple.
# If they do, this part needs to be very robust.
# Create a new scope for the request with the original body
# This is a more robust way to ensure the body is available for the endpoint
scope = request.scope
# Create a new receive channel that will provide the body
async def new_receive():
return {"type": "http.request", "body": request_body_bytes, "more_body": False}
# Create a new request object with the new receive channel
# This is not straightforward as Request objects are not meant to be mutated this way easily.
# The most reliable way is often to have the endpoint itself log its received body if critical.
# Or, use a framework feature if available.
# For now, we log the body as read, and accept limitations.
response = await call_next(Request(scope, receive=new_receive)) # Pass new request with body
process_time_ms = int((time.time() - start_time) * 1000)
response_body_bytes = b""
async for chunk in response.body_iterator:
response_body_bytes += chunk
response_body_str = ""
try:
response_body_str = response_body_bytes.decode('utf-8')
except UnicodeDecodeError:
response_body_str = "[binary data]"
response_log_data = {
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()),
"type": "response",
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time_ms": process_time_ms,
"headers": dict(response.headers),
"body": response_body_str, # Consider truncating
}
logger.info(json.dumps(response_log_data))
# All requests on business port are non-admin (admin requests are on separate port)
is_admin = False
# Extract session information
session_id, test_scenario, correlation_id, user_agent = extract_session_info(request)
# Calculate response size
response_size = len(response_body_bytes) if response_body_bytes else 0
# DEBUG: Log admin request detection
print(f"DEBUG MIDDLEWARE: Request path: {request.url.path}")
print(f"DEBUG MIDDLEWARE: Is admin request: {is_admin}")
print(f"DEBUG MIDDLEWARE: Session ID: {session_id}")
print(f"DEBUG MIDDLEWARE: Test scenario: {test_scenario}")
# Store request and response in SQLite database
request_id = None
try:
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# Check current schema and apply migrations if needed
cursor.execute("PRAGMA table_info(request_logs)")
columns = cursor.fetchall()
column_names = {col[1] for col in columns}
# Ensure all required columns exist (backward compatibility)
required_columns = {
'is_admin': 'BOOLEAN DEFAULT 0',
'session_id': 'TEXT',
'test_scenario': 'TEXT',
'correlation_id': 'TEXT',
'user_agent': 'TEXT',
'response_size': 'INTEGER'
}
for column_name, column_type in required_columns.items():
if column_name not in column_names:
try:
cursor.execute(f'ALTER TABLE request_logs ADD COLUMN {column_name} {column_type}')
conn.commit()
print(f"DEBUG MIDDLEWARE: Added column: {column_name}")
except Exception as e:
print(f"DEBUG MIDDLEWARE: Could not add column {column_name}: {e}")
# Insert the log entry with all available data
cursor.execute('''
INSERT INTO request_logs (
timestamp, type, method, path, status_code,
process_time_ms, client_host, client_port,
headers, query_params, request_body, response_body,
is_admin, session_id, test_scenario, correlation_id,
user_agent, response_size
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime()),
"request",
request.method,
request.url.path,
response.status_code,
process_time_ms,
request.client.host if request.client else "N/A",
str(request.client.port) if request.client else "N/A",
json.dumps(dict(request.headers)),
str(request.query_params),
request_body_str,
response_body_str,
1 if is_admin else 0,
session_id,
test_scenario,
correlation_id,
user_agent,
response_size
))
# Get the request ID for performance metrics
request_id = cursor.lastrowid
conn.commit()
print(f"DEBUG MIDDLEWARE: Stored enhanced request log with ID: {request_id}")
conn.close()
except Exception as e:
logger.error(f"Failed to store request log in database: {e}")
print(f"DEBUG MIDDLEWARE: Database error: {e}")
# Store performance metrics if request was logged successfully
if request_id:
try:
counters = performance_monitor.get_counters()
store_performance_metrics(
request_id, process_time_ms, request_size, response_size, counters
)
print(f"DEBUG MIDDLEWARE: Stored performance metrics for request {request_id}")
except Exception as e:
print(f"DEBUG MIDDLEWARE: Error storing performance metrics: {e}")
# Update test session if session_id is present
if session_id:
try:
update_test_session(session_id, process_time_ms, test_scenario)
print(f"DEBUG MIDDLEWARE: Updated test session: {session_id}")
except Exception as e:
print(f"DEBUG MIDDLEWARE: Error updating test session: {e}")
# Return a new response with the consumed body, as body_iterator consumes it
return Response(
content=response_body_bytes,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# In the generated main.py, this middleware will be added to the app:
# from .logging_middleware import LoggingMiddleware # Assuming it's saved as logging_middleware.py
# app.add_middleware(LoggingMiddleware)