Skip to main content
Glama
security.py8.3 kB
""" Security utilities and best practices for the Binance MCP Server. This module implements security best practices including input validation, error sanitization, rate limiting, and secure configuration management. """ import os import logging import hashlib import secrets from typing import Dict, Any, Optional, List from functools import wraps logger = logging.getLogger(__name__) class SecurityConfig: """ Security configuration and validation for MCP server. """ def __init__(self): self.rate_limit_enabled = os.getenv("MCP_RATE_LIMIT_ENABLED", "true").lower() == "true" self.max_requests_per_minute = int(os.getenv("MCP_MAX_REQUESTS_PER_MINUTE", "60")) self.enable_input_validation = os.getenv("MCP_INPUT_VALIDATION", "true").lower() == "true" self.log_security_events = os.getenv("MCP_LOG_SECURITY", "true").lower() == "true" def is_secure(self) -> bool: """Check if current configuration meets security requirements.""" return all([ self.rate_limit_enabled, self.max_requests_per_minute <= 120, # Reasonable limit self.enable_input_validation ]) def get_security_warnings(self) -> List[str]: """Get list of security configuration warnings.""" warnings = [] if not self.rate_limit_enabled: warnings.append("Rate limiting is disabled - this may expose the server to abuse") if self.max_requests_per_minute > 120: warnings.append("Rate limit is set too high - recommended maximum is 120 requests/minute") if not self.enable_input_validation: warnings.append("Input validation is disabled - this is a security risk") return warnings def validate_api_credentials() -> bool: """ Validate that API credentials are properly configured and secure. Returns: bool: True if credentials are valid and secure """ api_key = os.getenv("BINANCE_API_KEY") api_secret = os.getenv("BINANCE_API_SECRET") if not api_key or not api_secret: logger.error("Missing API credentials") return False # Validate API key format (basic check) if len(api_key) < 32: logger.warning("API key appears to be too short") return False if len(api_secret) < 32: logger.warning("API secret appears to be too short") return False # Check for common insecure values insecure_values = ["test", "demo", "example", "your_key", "your_secret"] if api_key.lower() in insecure_values or api_secret.lower() in insecure_values: logger.error("API credentials appear to be placeholder values") return False return True def secure_hash(data: str) -> str: """ Create a secure hash of sensitive data for logging/identification purposes. Args: data: The data to hash Returns: str: SHA-256 hash of the data """ return hashlib.sha256(data.encode('utf-8')).hexdigest()[:16] # First 16 chars for brevity def generate_request_id() -> str: """ Generate a secure random request ID for tracking. Returns: str: Cryptographically secure random request ID """ return secrets.token_hex(8) class SecurityMiddleware: """ Security middleware for request validation and monitoring. """ def __init__(self, config: Optional[SecurityConfig] = None): self.config = config or SecurityConfig() self.request_counts = {} def validate_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: """ Validate incoming request for security issues. Args: request_data: The request data to validate Returns: Dict containing validation result and any security warnings """ warnings = [] is_valid = True # Check for common injection patterns if self._contains_injection_patterns(request_data): warnings.append("Potential injection pattern detected") is_valid = False # Check request size if self._request_too_large(request_data): warnings.append("Request size exceeds limits") is_valid = False return { "valid": is_valid, "warnings": warnings, "request_id": generate_request_id() } def _contains_injection_patterns(self, data: Dict[str, Any]) -> bool: """Check for common injection patterns in request data.""" if not isinstance(data, dict): return False dangerous_patterns = [ 'script', 'javascript:', 'onload=', 'onerror=', 'eval(', 'exec(', 'system(', 'shell_exec', '../', './', '..\\', '.\\', 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP' ] data_str = str(data).lower() return any(pattern.lower() in data_str for pattern in dangerous_patterns) def _request_too_large(self, data: Dict[str, Any], max_size: int = 1024*1024) -> bool: """Check if request data is too large.""" try: import json return len(json.dumps(data).encode('utf-8')) > max_size except (TypeError, ValueError): return True def security_audit_log(event_type: str, details: Dict[str, Any], level: str = "INFO") -> None: """ Log security-related events with proper formatting. Args: event_type: Type of security event details: Event details (will be sanitized) level: Log level (INFO, WARNING, ERROR) """ security_logger = logging.getLogger(f"{__name__}.security") # Sanitize details to prevent log injection sanitized_details = {k: _sanitize_log_value(v) for k, v in details.items()} log_entry = { "event_type": event_type, "details": sanitized_details, "request_id": generate_request_id() } if level.upper() == "ERROR": security_logger.error(f"SECURITY_EVENT: {log_entry}") elif level.upper() == "WARNING": security_logger.warning(f"SECURITY_EVENT: {log_entry}") else: security_logger.info(f"SECURITY_EVENT: {log_entry}") def _sanitize_log_value(value: Any) -> Any: """Sanitize values before logging to prevent log injection.""" if isinstance(value, str): # Remove control characters and potential log injection patterns return ''.join(c for c in value if c.isprintable())[:500] # Limit length return str(value)[:500] if value is not None else None def secure_tool_wrapper(func): """ Decorator to add security validation to MCP tools. """ @wraps(func) def wrapper(*args, **kwargs): request_id = generate_request_id() try: # Log tool invocation (without sensitive data) security_audit_log( "tool_invocation", { "tool_name": func.__name__, "request_id": request_id, "args_count": len(args), "kwargs_count": len(kwargs) } ) result = func(*args, **kwargs) # Log successful completion if isinstance(result, dict) and result.get("success"): security_audit_log( "tool_success", {"tool_name": func.__name__, "request_id": request_id} ) else: security_audit_log( "tool_failure", {"tool_name": func.__name__, "request_id": request_id}, level="WARNING" ) return result except Exception as e: security_audit_log( "tool_exception", { "tool_name": func.__name__, "request_id": request_id, "error_type": type(e).__name__ }, level="ERROR" ) raise return wrapper

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnalyticAce/BinanceMCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server