admin_middleware_log_template.j2โข9.59 kB
{# Jinja2 template for FastAPI admin logging middleware with separate admin logging #}
import logging
import time
import json
import sqlite3
import psutil
import threading
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.types import Message
from typing import Dict, Optional
# Create logs directory if it doesn't exist
import os
from pathlib import Path
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# Setup SQLite database for admin logs (separate from business API logs)
db_dir = Path("db")
db_dir.mkdir(exist_ok=True)
ADMIN_DB_PATH = db_dir / "admin_logs.db"
def init_admin_db():
conn = sqlite3.connect(str(ADMIN_DB_PATH))
cursor = conn.cursor()
# Create the admin logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS admin_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
type TEXT,
method TEXT,
path TEXT,
status_code INTEGER,
process_time_ms INTEGER,
client_host TEXT,
client_port TEXT,
headers TEXT,
query_params TEXT,
request_body TEXT,
response_body TEXT,
session_id TEXT,
user_agent TEXT,
response_size INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create schema version table for migration tracking
cursor.execute('''
CREATE TABLE IF NOT EXISTS admin_schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT
)
''')
# Check current schema version
cursor.execute("SELECT MAX(version) FROM admin_schema_version")
current_version = cursor.fetchone()[0] or 0
# Apply migrations if needed
if current_version < 1:
# Record initial schema
cursor.execute(
"INSERT INTO admin_schema_version (version, description) VALUES (?, ?)",
(1, "Initial admin logs schema with separate admin logging")
)
conn.commit()
conn.close()
def extract_admin_session_info(request):
"""Extract session information from admin request headers."""
headers = dict(request.headers) if request.headers else {}
# Extract session ID from various possible headers
session_id = (
headers.get('x-session-id') or
headers.get('x-admin-session') or
headers.get('session-id') or
None
)
# Extract user agent
user_agent = headers.get('user-agent', '')
return session_id, user_agent
# Performance monitoring utilities for admin
class AdminPerformanceMonitor:
"""Thread-safe performance monitoring for admin request metrics."""
def __init__(self):
self._lock = threading.Lock()
self._admin_request_count = 0
def increment_admin_requests(self, count: int = 1):
"""Increment admin request counter."""
with self._lock:
self._admin_request_count += count
def get_admin_counters(self) -> Dict[str, int]:
"""Get current admin counter values."""
with self._lock:
return {
'admin_requests': self._admin_request_count
}
def get_memory_usage() -> float:
"""Get current memory usage in MB."""
try:
process = psutil.Process()
memory_info = process.memory_info()
return memory_info.rss / 1024 / 1024 # Convert bytes to MB
except Exception:
return 0.0
def get_cpu_usage() -> float:
"""Get current CPU usage percentage."""
try:
return psutil.cpu_percent(interval=None)
except Exception:
return 0.0
# Global admin performance monitor instance
admin_performance_monitor = AdminPerformanceMonitor()
# Initialize admin database
init_admin_db()
# Configure logging to both file and console for admin
admin_logger = logging.getLogger("admin_api_logger")
admin_logger.setLevel(logging.INFO)
# Add console handler
admin_console_handler = logging.StreamHandler()
admin_console_handler.setLevel(logging.INFO)
admin_console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
admin_console_handler.setFormatter(admin_console_formatter)
admin_logger.addHandler(admin_console_handler)
# Add file handler with daily rotation for admin
from datetime import datetime
admin_log_file = logs_dir / f"admin_requests_{datetime.now().strftime('%Y-%m-%d')}.log"
admin_file_handler = logging.FileHandler(admin_log_file)
admin_file_handler.setLevel(logging.INFO)
admin_file_formatter = logging.Formatter('%(message)s') # Just the message for cleaner JSON parsing
admin_file_handler.setFormatter(admin_file_formatter)
admin_logger.addHandler(admin_file_handler)
# Prevent duplicate logging
admin_logger.propagate = False
class AdminLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
start_time = time.time()
# Increment admin request counter
admin_performance_monitor.increment_admin_requests()
# Attempt to read request body without consuming it for the endpoint
request_body_bytes = await request.body()
request_size = len(request_body_bytes) if request_body_bytes else 0
async def receive() -> Message:
return {"type": "http.request", "body": request_body_bytes, "more_body": False}
# Create a new Request object with the captured body for the endpoint
request_body_str = ""
try:
request_body_str = request_body_bytes.decode('utf-8')
except UnicodeDecodeError:
request_body_str = "[binary data]"
admin_request_log_data = {
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime(start_time)),
"type": "admin_request",
"method": request.method,
"path": request.url.path,
"query_params": str(request.query_params),
"client_host": request.client.host if request.client else "N/A",
"client_port": request.client.port if request.client else "N/A",
"headers": dict(request.headers),
"body": request_body_str,
}
admin_logger.info(json.dumps(admin_request_log_data))
# Create a new request object with the new receive channel
response = await call_next(Request(request.scope, receive=receive))
process_time_ms = int((time.time() - start_time) * 1000)
response_body_bytes = b""
async for chunk in response.body_iterator:
response_body_bytes += chunk
response_body_str = ""
try:
response_body_str = response_body_bytes.decode('utf-8')
except UnicodeDecodeError:
response_body_str = "[binary data]"
admin_response_log_data = {
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()),
"type": "admin_response",
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time_ms": process_time_ms,
"headers": dict(response.headers),
"body": response_body_str,
}
admin_logger.info(json.dumps(admin_response_log_data))
# Extract admin session information
session_id, user_agent = extract_admin_session_info(request)
# Calculate response size
response_size = len(response_body_bytes) if response_body_bytes else 0
# Store admin request and response in separate SQLite database
try:
conn = sqlite3.connect(str(ADMIN_DB_PATH))
cursor = conn.cursor()
# Insert the admin log entry
cursor.execute('''
INSERT INTO admin_logs (
timestamp, type, method, path, status_code,
process_time_ms, client_host, client_port,
headers, query_params, request_body, response_body,
session_id, user_agent, response_size
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime()),
"admin_request",
request.method,
request.url.path,
response.status_code,
process_time_ms,
request.client.host if request.client else "N/A",
str(request.client.port) if request.client else "N/A",
json.dumps(dict(request.headers)),
str(request.query_params),
request_body_str,
response_body_str,
session_id,
user_agent,
response_size
))
conn.commit()
conn.close()
except Exception as e:
admin_logger.error(f"Failed to store admin request log in database: {e}")
# Return a new response with the consumed body
return Response(
content=response_body_bytes,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# In the generated admin main.py, this middleware will be added to the admin app:
# from admin_logging_middleware import AdminLoggingMiddleware
# admin_app.add_middleware(AdminLoggingMiddleware)