Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
admin_middleware_log_template.j2โ€ข9.59 kB
{# Jinja2 template for FastAPI admin logging middleware with separate admin logging #} import logging import time import json import sqlite3 import psutil import threading from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from starlette.types import Message from typing import Dict, Optional # Create logs directory if it doesn't exist import os from pathlib import Path logs_dir = Path("logs") logs_dir.mkdir(exist_ok=True) # Setup SQLite database for admin logs (separate from business API logs) db_dir = Path("db") db_dir.mkdir(exist_ok=True) ADMIN_DB_PATH = db_dir / "admin_logs.db" def init_admin_db(): conn = sqlite3.connect(str(ADMIN_DB_PATH)) cursor = conn.cursor() # Create the admin logs table cursor.execute(''' CREATE TABLE IF NOT EXISTS admin_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, type TEXT, method TEXT, path TEXT, status_code INTEGER, process_time_ms INTEGER, client_host TEXT, client_port TEXT, headers TEXT, query_params TEXT, request_body TEXT, response_body TEXT, session_id TEXT, user_agent TEXT, response_size INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create schema version table for migration tracking cursor.execute(''' CREATE TABLE IF NOT EXISTS admin_schema_version ( version INTEGER PRIMARY KEY, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, description TEXT ) ''') # Check current schema version cursor.execute("SELECT MAX(version) FROM admin_schema_version") current_version = cursor.fetchone()[0] or 0 # Apply migrations if needed if current_version < 1: # Record initial schema cursor.execute( "INSERT INTO admin_schema_version (version, description) VALUES (?, ?)", (1, "Initial admin logs schema with separate admin logging") ) conn.commit() conn.close() def extract_admin_session_info(request): """Extract session information from admin request headers.""" headers = dict(request.headers) if request.headers else {} # Extract session ID from various possible headers session_id = ( headers.get('x-session-id') or headers.get('x-admin-session') or headers.get('session-id') or None ) # Extract user agent user_agent = headers.get('user-agent', '') return session_id, user_agent # Performance monitoring utilities for admin class AdminPerformanceMonitor: """Thread-safe performance monitoring for admin request metrics.""" def __init__(self): self._lock = threading.Lock() self._admin_request_count = 0 def increment_admin_requests(self, count: int = 1): """Increment admin request counter.""" with self._lock: self._admin_request_count += count def get_admin_counters(self) -> Dict[str, int]: """Get current admin counter values.""" with self._lock: return { 'admin_requests': self._admin_request_count } def get_memory_usage() -> float: """Get current memory usage in MB.""" try: process = psutil.Process() memory_info = process.memory_info() return memory_info.rss / 1024 / 1024 # Convert bytes to MB except Exception: return 0.0 def get_cpu_usage() -> float: """Get current CPU usage percentage.""" try: return psutil.cpu_percent(interval=None) except Exception: return 0.0 # Global admin performance monitor instance admin_performance_monitor = AdminPerformanceMonitor() # Initialize admin database init_admin_db() # Configure logging to both file and console for admin admin_logger = logging.getLogger("admin_api_logger") admin_logger.setLevel(logging.INFO) # Add console handler admin_console_handler = logging.StreamHandler() admin_console_handler.setLevel(logging.INFO) admin_console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') admin_console_handler.setFormatter(admin_console_formatter) admin_logger.addHandler(admin_console_handler) # Add file handler with daily rotation for admin from datetime import datetime admin_log_file = logs_dir / f"admin_requests_{datetime.now().strftime('%Y-%m-%d')}.log" admin_file_handler = logging.FileHandler(admin_log_file) admin_file_handler.setLevel(logging.INFO) admin_file_formatter = logging.Formatter('%(message)s') # Just the message for cleaner JSON parsing admin_file_handler.setFormatter(admin_file_formatter) admin_logger.addHandler(admin_file_handler) # Prevent duplicate logging admin_logger.propagate = False class AdminLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: RequestResponseEndpoint): start_time = time.time() # Increment admin request counter admin_performance_monitor.increment_admin_requests() # Attempt to read request body without consuming it for the endpoint request_body_bytes = await request.body() request_size = len(request_body_bytes) if request_body_bytes else 0 async def receive() -> Message: return {"type": "http.request", "body": request_body_bytes, "more_body": False} # Create a new Request object with the captured body for the endpoint request_body_str = "" try: request_body_str = request_body_bytes.decode('utf-8') except UnicodeDecodeError: request_body_str = "[binary data]" admin_request_log_data = { "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime(start_time)), "type": "admin_request", "method": request.method, "path": request.url.path, "query_params": str(request.query_params), "client_host": request.client.host if request.client else "N/A", "client_port": request.client.port if request.client else "N/A", "headers": dict(request.headers), "body": request_body_str, } admin_logger.info(json.dumps(admin_request_log_data)) # Create a new request object with the new receive channel response = await call_next(Request(request.scope, receive=receive)) process_time_ms = int((time.time() - start_time) * 1000) response_body_bytes = b"" async for chunk in response.body_iterator: response_body_bytes += chunk response_body_str = "" try: response_body_str = response_body_bytes.decode('utf-8') except UnicodeDecodeError: response_body_str = "[binary data]" admin_response_log_data = { "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()), "type": "admin_response", "method": request.method, "path": request.url.path, "status_code": response.status_code, "process_time_ms": process_time_ms, "headers": dict(response.headers), "body": response_body_str, } admin_logger.info(json.dumps(admin_response_log_data)) # Extract admin session information session_id, user_agent = extract_admin_session_info(request) # Calculate response size response_size = len(response_body_bytes) if response_body_bytes else 0 # Store admin request and response in separate SQLite database try: conn = sqlite3.connect(str(ADMIN_DB_PATH)) cursor = conn.cursor() # Insert the admin log entry cursor.execute(''' INSERT INTO admin_logs ( timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, session_id, user_agent, response_size ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime()), "admin_request", request.method, request.url.path, response.status_code, process_time_ms, request.client.host if request.client else "N/A", str(request.client.port) if request.client else "N/A", json.dumps(dict(request.headers)), str(request.query_params), request_body_str, response_body_str, session_id, user_agent, response_size )) conn.commit() conn.close() except Exception as e: admin_logger.error(f"Failed to store admin request log in database: {e}") # Return a new response with the consumed body return Response( content=response_body_bytes, status_code=response.status_code, headers=dict(response.headers), media_type=response.media_type ) # In the generated admin main.py, this middleware will be added to the admin app: # from admin_logging_middleware import AdminLoggingMiddleware # admin_app.add_middleware(AdminLoggingMiddleware)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server