Skip to main content
Glama

Panther MCP Server

Official
Apache 2.0
31
  • Apple
server-architecture-guide.md24.5 kB
# MCP Server Architecture Guide This guide covers the architecture patterns and context management strategies for building robust MCP servers with FastMCP, based on the Panther MCP server implementation and FastMCP best practices. ## Table of Contents - [Server Architecture Overview](#server-architecture-overview) - [Component Organization](#component-organization) - [Context Management](#context-management) - [Server Composition](#server-composition) - [Transport Protocols](#transport-protocols) - [Middleware Patterns](#middleware-patterns) - [State Management](#state-management) - [Logging and Monitoring](#logging-and-monitoring) - [Configuration Management](#configuration-management) ## Server Architecture Overview ### Core Components ``` FastMCP Server ├── Tools (Functions callable by LLMs) ├── Resources (Static/dynamic content providers) ├── Prompts (Reusable message templates) ├── Context (Request-scoped capabilities) ├── Middleware (Cross-cutting concerns) └── Transport (STDIO, HTTP, SSE) ``` ### Basic Server Structure ```python # server.py import logging import sys from fastmcp import FastMCP from .panther_mcp_core import register_all_components # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stderr, ) def create_server() -> FastMCP: """Create and configure the MCP server.""" # Create server instance mcp = FastMCP( name="panther-mcp-server", instructions="Provides Panther security platform integration capabilities", tags=["security", "siem", "panther"] ) # Register all components register_all_components(mcp) return mcp def main(): """Main entry point.""" mcp = create_server() mcp.run() if __name__ == "__main__": main() ``` ## Component Organization ### 1. Modular Component Structure Organize components by functional domain: ``` src/mcp_panther/panther_mcp_core/ ├── __init__.py # Component registration ├── tools/ # Tool implementations │ ├── __init__.py │ ├── registry.py # Tool decorator │ ├── alerts.py # Alert management tools │ ├── detections.py # Detection management tools │ ├── data_lake.py # Data lake query tools │ ├── users.py # User management tools │ └── ... ├── resources/ # Resource providers │ ├── __init__.py │ ├── registry.py # Resource decorator │ └── config.py # Configuration resources ├── prompts/ # Prompt templates │ ├── __init__.py │ ├── registry.py # Prompt decorator │ └── alert_triage.py # Alert analysis prompts └── client/ # External API integration ├── __init__.py ├── panther_client.py # GraphQL client └── queries.py # GraphQL queries ``` ### 2. Component Registration Pattern Centralized component registration: ```python # panther_mcp_core/__init__.py from fastmcp import FastMCP from . import tools, resources, prompts def register_all_components(mcp: FastMCP) -> None: """Register all MCP components with the server.""" # Register tools tools.register_tools(mcp) # Register resources resources.register_resources(mcp) # Register prompts prompts.register_prompts(mcp) # tools/__init__.py from fastmcp import FastMCP from . import alerts, detections, data_lake, users def register_tools(mcp: FastMCP) -> None: """Register all tool modules.""" # Import modules to trigger decorator registration alerts.register_alert_tools(mcp) detections.register_detection_tools(mcp) data_lake.register_data_lake_tools(mcp) users.register_user_tools(mcp) # tools/alerts.py from .registry import mcp_tool def register_alert_tools(mcp): """Register alert-related tools.""" # Tools are auto-registered via decorators when module is imported pass @mcp_tool async def get_alert(alert_id: str) -> dict: """Get alert information.""" # Implementation ``` ### 3. Registry Pattern for Decorators Custom decorators for component registration: ```python # tools/registry.py from typing import Callable, Any from fastmcp import FastMCP # Global registry to collect decorated functions _tool_registry: list[Callable] = [] def mcp_tool(func: Callable) -> Callable: """Decorator to register MCP tools.""" _tool_registry.append(func) return func def register_tools(mcp: FastMCP) -> None: """Register all collected tools with MCP server.""" for tool_func in _tool_registry: mcp.tool(tool_func) # Usage @mcp_tool async def my_tool() -> dict: """Tool implementation.""" pass ``` ## Context Management ### 1. Context Access Patterns FastMCP provides two ways to access context: #### Dependency Injection (Recommended) ```python from fastmcp import Context @mcp.tool async def process_with_logging( data: str, ctx: Context, # Context injected as parameter ) -> dict[str, Any]: """Tool that uses context for logging and progress.""" await ctx.info(f"Starting processing of {len(data)} characters") try: # Simulate long operation with progress reporting for i in range(0, len(data), 100): chunk = data[i:i+100] await ctx.report_progress( current=i + len(chunk), total=len(data), description=f"Processing chunk {i//100 + 1}" ) # Process chunk await process_chunk(chunk) await ctx.info("Processing completed successfully") return { "success": True, "processed_length": len(data) } except Exception as e: await ctx.error(f"Processing failed: {str(e)}") raise ``` #### Dependency Function (For Nested Calls) ```python from fastmcp import get_context async def helper_function(data: str) -> str: """Helper function that needs context access.""" ctx = get_context() # Get context from current request await ctx.debug(f"Processing data in helper: {len(data)} chars") # Process data result = process_data(data) await ctx.debug("Helper processing complete") return result @mcp.tool async def main_tool(input_data: str) -> dict[str, Any]: """Main tool that calls helper functions.""" # Helper functions can access context automatically processed = await helper_function(input_data) return { "success": True, "result": processed } ``` ### 2. Context Capabilities #### Logging ```python @mcp.tool async def detailed_operation(ctx: Context) -> dict[str, Any]: """Demonstrate all logging levels.""" await ctx.debug("Debug: Starting detailed operation") await ctx.info("Info: Operation parameters validated") await ctx.warning("Warning: Using default timeout value") await ctx.error("Error: Retrying failed request") return {"success": True} ``` #### Progress Reporting ```python @mcp.tool async def batch_processor( items: list[str], ctx: Context, ) -> dict[str, Any]: """Process items with progress reporting.""" results = [] total_items = len(items) for i, item in enumerate(items): # Report progress await ctx.report_progress( current=i + 1, total=total_items, description=f"Processing item: {item}" ) # Process item result = await process_item(item) results.append(result) # Log milestones if (i + 1) % 10 == 0: await ctx.info(f"Completed {i + 1}/{total_items} items") return { "success": True, "processed_count": len(results), "results": results } ``` #### State Management ```python @mcp.tool async def stateful_operation( operation_id: str, ctx: Context, ) -> dict[str, Any]: """Use context state for request-scoped data sharing.""" # Store operation metadata ctx.set_state("operation_id", operation_id) ctx.set_state("start_time", time.time()) # Process with helper functions that can access state result = await process_with_state() # Calculate duration start_time = ctx.get_state("start_time") duration = time.time() - start_time return { "success": True, "operation_id": operation_id, "duration_seconds": duration, "result": result } async def process_with_state() -> str: """Helper that accesses shared state.""" ctx = get_context() operation_id = ctx.get_state("operation_id") await ctx.info(f"Processing operation {operation_id}") # Implementation return "processed" ``` ## Server Composition ### 1. Multi-Server Composition Combine multiple specialized servers: ```python from fastmcp import FastMCP def create_alert_server() -> FastMCP: """Server focused on alert management.""" mcp = FastMCP(name="panther-alerts") # Register alert-specific tools return mcp def create_detection_server() -> FastMCP: """Server focused on detection management.""" mcp = FastMCP(name="panther-detections") # Register detection-specific tools return mcp def create_composite_server() -> FastMCP: """Composite server combining functionality.""" # Create main server main_server = FastMCP(name="panther-composite") # Create specialized servers alert_server = create_alert_server() detection_server = create_detection_server() # Compose servers (if supported by FastMCP) # This pattern may vary based on FastMCP version return main_server ``` ### 2. Modular Registration Register components conditionally: ```python def create_server(enabled_modules: list[str] = None) -> FastMCP: """Create server with selective module loading.""" if enabled_modules is None: enabled_modules = ["alerts", "detections", "data_lake"] mcp = FastMCP(name="panther-mcp-server") # Register modules based on configuration if "alerts" in enabled_modules: from .tools import alerts alerts.register_alert_tools(mcp) if "detections" in enabled_modules: from .tools import detections detections.register_detection_tools(mcp) if "data_lake" in enabled_modules: from .tools import data_lake data_lake.register_data_lake_tools(mcp) return mcp ``` ## Transport Protocols ### 1. STDIO Transport (Default) Standard input/output transport for MCP clients: ```python def main(): """Run server with STDIO transport.""" mcp = create_server() # Default transport is STDIO mcp.run() # Reads from stdin, writes to stdout ``` ### 2. HTTP Transport HTTP-based transport for web integration: ```python def main(): """Run server with HTTP transport.""" mcp = create_server() # Configure for HTTP transport mcp.run( transport="streamable-http", host="0.0.0.0", port=8000 ) ``` ### 3. Environment-Based Transport Selection ```python import os def main(): """Run server with transport from environment.""" mcp = create_server() transport = os.getenv("MCP_TRANSPORT", "stdio") host = os.getenv("MCP_HOST", "127.0.0.1") port = int(os.getenv("MCP_PORT", "3000")) if transport == "streamable-http": mcp.run( transport=transport, host=host, port=port ) else: mcp.run() # Default STDIO ``` ## Middleware Patterns ### 1. Request Logging Middleware ```python from fastmcp import Context import time import logging logger = logging.getLogger(__name__) async def logging_middleware(ctx: Context, call_next): """Log all tool calls with timing.""" start_time = time.time() # Get request information tool_name = getattr(ctx, 'tool_name', 'unknown') logger.info(f"Starting tool call: {tool_name}") try: # Call the actual tool result = await call_next() duration = time.time() - start_time logger.info(f"Tool {tool_name} completed in {duration:.2f}s") return result except Exception as e: duration = time.time() - start_time logger.error(f"Tool {tool_name} failed after {duration:.2f}s: {str(e)}") raise # Register middleware with server def create_server() -> FastMCP: mcp = FastMCP(name="panther-mcp-server") # Register middleware (syntax may vary by FastMCP version) # mcp.middleware(logging_middleware) return mcp ``` ### 2. Authentication Middleware ```python async def auth_middleware(ctx: Context, call_next): """Validate API token for all requests.""" # Check for valid authentication api_token = os.getenv("PANTHER_API_TOKEN") instance_url = os.getenv("PANTHER_INSTANCE_URL") if not api_token or not instance_url: raise ValueError("Missing required authentication configuration") # Store auth info in context for tools to use ctx.set_state("api_token", api_token) ctx.set_state("instance_url", instance_url) # Proceed with request return await call_next() ``` ### 3. Error Handling Middleware ```python async def error_handling_middleware(ctx: Context, call_next): """Global error handling for all tools.""" try: return await call_next() except PermissionError: return { "success": False, "error": "Insufficient permissions for this operation", "error_code": "PERMISSION_DENIED" } except ConnectionError: return { "success": False, "error": "Unable to connect to Panther API", "error_code": "CONNECTION_ERROR" } except Exception as e: logger.exception("Unexpected error in tool execution") return { "success": False, "error": "An unexpected error occurred", "error_code": "INTERNAL_ERROR" } ``` ## State Management ### 1. Request-Scoped State Use context state for data sharing within a single request: ```python @mcp.tool async def multi_step_operation( input_data: str, ctx: Context, ) -> dict[str, Any]: """Operation with multiple steps sharing state.""" # Initialize shared state ctx.set_state("input_data", input_data) ctx.set_state("step_results", []) # Step 1 step1_result = await execute_step1() # Step 2 can access state from step 1 step2_result = await execute_step2() # Get accumulated results all_results = ctx.get_state("step_results") return { "success": True, "steps_completed": len(all_results), "results": all_results } async def execute_step1() -> str: """Step that updates shared state.""" ctx = get_context() input_data = ctx.get_state("input_data") result = f"Step 1 processed: {input_data}" # Update shared state results = ctx.get_state("step_results") results.append(result) ctx.set_state("step_results", results) return result ``` ### 2. Client Connection State Manage client-specific information: ```python class ClientStateManager: """Manage per-client state across requests.""" def __init__(self): self.client_sessions = {} async def get_client_info(self, ctx: Context) -> dict: """Get client information from context.""" # Extract client identifier (implementation depends on FastMCP) client_id = getattr(ctx, 'client_id', 'default') if client_id not in self.client_sessions: self.client_sessions[client_id] = { "created_at": time.time(), "request_count": 0, "last_activity": time.time() } session = self.client_sessions[client_id] session["request_count"] += 1 session["last_activity"] = time.time() return session # Global state manager client_state = ClientStateManager() @mcp.tool async def get_session_info(ctx: Context) -> dict[str, Any]: """Get current session information.""" session = await client_state.get_client_info(ctx) return { "success": True, "session": session } ``` ## Logging and Monitoring ### 1. Structured Logging ```python import logging import json from typing import Any class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging.""" def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": self.formatTime(record), "level": record.levelname, "logger": record.name, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_data["exception"] = self.formatException(record.exc_info) # Add extra fields for key, value in record.__dict__.items(): if key not in log_data and not key.startswith('_'): log_data[key] = value return json.dumps(log_data) # Configure structured logging def setup_logging(): """Configure structured logging for the server.""" logger = logging.getLogger() logger.setLevel(logging.INFO) handler = logging.StreamHandler(sys.stderr) handler.setFormatter(StructuredFormatter()) logger.addHandler(handler) ``` ### 2. Tool Performance Monitoring ```python import functools import time from typing import Callable, Any def monitor_performance(func: Callable) -> Callable: """Decorator to monitor tool performance.""" @functools.wraps(func) async def wrapper(*args, **kwargs) -> Any: start_time = time.time() tool_name = func.__name__ try: result = await func(*args, **kwargs) duration = time.time() - start_time # Log performance metrics logger.info( "Tool execution completed", extra={ "tool_name": tool_name, "duration_seconds": duration, "status": "success" } ) return result except Exception as e: duration = time.time() - start_time logger.error( "Tool execution failed", extra={ "tool_name": tool_name, "duration_seconds": duration, "status": "error", "error_type": type(e).__name__, "error_message": str(e) } ) raise return wrapper # Usage @monitor_performance @mcp.tool async def monitored_tool() -> dict[str, Any]: """Tool with performance monitoring.""" # Implementation pass ``` ## Configuration Management ### 1. Environment-Based Configuration ```python import os from typing import Optional from pydantic import BaseSettings class ServerConfig(BaseSettings): """Server configuration from environment variables.""" # Panther API configuration panther_instance_url: str panther_api_token: str # Server configuration server_name: str = "panther-mcp-server" log_level: str = "INFO" # Transport configuration transport: str = "stdio" host: str = "127.0.0.1" port: int = 3000 # Feature flags enable_caching: bool = False enable_metrics: bool = True class Config: env_prefix = "MCP_" case_sensitive = False # Global configuration config = ServerConfig() def create_server() -> FastMCP: """Create server using configuration.""" mcp = FastMCP( name=config.server_name, instructions="Panther security platform integration", ) # Configure logging level logging.getLogger().setLevel(getattr(logging, config.log_level.upper())) # Register components based on configuration register_components(mcp, config) return mcp ``` ### 2. Dynamic Configuration ```python @mcp.resource("config://panther") def get_server_config() -> dict[str, Any]: """Provide current server configuration to clients.""" return { "server_name": config.server_name, "panther_instance": config.panther_instance_url, "features": { "caching_enabled": config.enable_caching, "metrics_enabled": config.enable_metrics, }, "transport": { "type": config.transport, "host": config.host if config.transport == "http" else None, "port": config.port if config.transport == "http" else None, } } ``` ## Deployment Patterns ### 1. Docker Deployment ```dockerfile # Dockerfile FROM python:3.12-slim WORKDIR /app # Install dependencies COPY pyproject.toml uv.lock ./ RUN pip install uv && uv sync --frozen # Copy source code COPY src/ ./src/ # Set environment variables ENV MCP_TRANSPORT=streamable-http ENV MCP_HOST=0.0.0.0 ENV MCP_PORT=8000 # Expose port EXPOSE 8000 # Run server CMD ["uv", "run", "python", "-m", "mcp_panther.server"] ``` ### 2. Process Management ```python import signal import asyncio from typing import Optional class GracefulServer: """Server with graceful shutdown handling.""" def __init__(self): self.mcp: Optional[FastMCP] = None self.shutdown_event = asyncio.Event() def setup_signal_handlers(self): """Setup signal handlers for graceful shutdown.""" def signal_handler(signum, frame): print(f"Received signal {signum}, initiating graceful shutdown...") self.shutdown_event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async def run(self): """Run server with graceful shutdown.""" self.setup_signal_handlers() self.mcp = create_server() try: # Start server (this would be adapted to FastMCP's async interface) await self.mcp.serve_async() except KeyboardInterrupt: print("Shutdown initiated by user") finally: await self.cleanup() async def cleanup(self): """Cleanup resources during shutdown.""" print("Cleaning up resources...") # Close database connections, cleanup temp files, etc. if self.mcp: # Cleanup MCP server resources pass print("Graceful shutdown complete") def main(): """Main entry point with graceful shutdown.""" server = GracefulServer() asyncio.run(server.run()) ``` ## Best Practices Summary 1. **Modular Architecture**: Organize components by domain/functionality 2. **Registry Pattern**: Use centralized component registration 3. **Context Usage**: Leverage context for logging, progress, and state 4. **Transport Flexibility**: Support multiple transport protocols 5. **Structured Logging**: Use JSON logging for better observability 6. **Configuration Management**: Environment-based configuration with validation 7. **Graceful Shutdown**: Handle shutdown signals properly 8. **Performance Monitoring**: Track tool execution metrics 9. **Error Handling**: Implement comprehensive error handling patterns 10. **Testing Architecture**: Design for testability with dependency injection ## Related Documentation - [MCP Development Best Practices](./mcp-development-best-practices.md) - [Tool Design Patterns](./tool-design-patterns.md) - [MCP Testing Guide](./mcp-testing-guide.md) - [FastMCP Documentation](https://gofastmcp.com/)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server