Skip to main content
Glama

MCP Policy Gatekeeper

by GILSMON
server_validation.pyโ€ข8.07 kB
#!/usr/bin/env python3 """ MCP Server for Command Validation Intercepts shell commands and enforces organizational policies """ import asyncio import json import re from typing import Any from mcp.server import Server from mcp.types import Tool, TextContent, ErrorData import mcp.server.stdio # Policy Rules Configuration POLICY_RULES = { "destructive_commands": { "patterns": [ r"\brm\s+-rf\s+/", # rm -rf / r"\brm\s+-rf\s+\*", # rm -rf * r"\bdd\s+.*of=/dev/", # dd to device r"\bmkfs\.", # filesystem formatting r"\b:(){:|:&};:", # fork bomb r"\bchmod\s+-R\s+777", # dangerous permissions ], "message": "Destructive command blocked by ORG-SEC-001" }, "network_exfiltration": { "patterns": [ r"curl\s+.*\|\s*bash", # curl pipe to bash r"wget\s+.*\|\s*sh", # wget pipe to shell r"nc\s+-l", # netcat listener r"python.*-m\s+http\.server", # simple http server ], "message": "Network exfiltration risk blocked by ORG-SEC-002" }, "system_modification": { "patterns": [ r"\bsudo\s+rm", # sudo rm r"\bapt-get\s+remove", # package removal r"\byum\s+remove", r"\bsystemctl\s+stop", # stopping services r"\bkill\s+-9\s+1", # killing init ], "message": "System modification requires approval - ORG-OPS-003" }, "sensitive_paths": { "patterns": [ r"/etc/passwd", r"/etc/shadow", r"\.ssh/id_rsa", r"\.aws/credentials", r"\.env", ], "message": "Access to sensitive files blocked - ORG-SEC-004" } } # Allowlist for safe commands ALLOWED_COMMANDS = { "git", "ls", "cat", "echo", "pwd", "cd", "grep", "find", "npm", "yarn", "python", "node", "pip", "docker", "kubectl", "terraform", "make", "cargo" } class CommandValidator: """Validates shell commands against organizational policies""" def __init__(self): self.violation_count = 0 self.audit_log = [] def validate_command(self, command: str) -> dict: """ Validates a command against all policy rules Returns: {"allowed": bool, "reason": str, "policy": str} """ command = command.strip() # Check if command starts with allowed command first_word = command.split()[0] if command.split() else "" base_command = first_word.split("/")[-1] # Handle /usr/bin/git # Check against policy rules for policy_name, policy in POLICY_RULES.items(): for pattern in policy["patterns"]: if re.search(pattern, command, re.IGNORECASE): self.violation_count += 1 self.audit_log.append({ "command": command, "policy": policy_name, "status": "BLOCKED" }) return { "allowed": False, "reason": policy["message"], "policy": policy_name, "pattern_matched": pattern } # Log allowed command self.audit_log.append({ "command": command, "status": "ALLOWED" }) return { "allowed": True, "reason": "Command passed all policy checks", "policy": None } def get_stats(self) -> dict: """Returns validation statistics""" return { "total_commands": len(self.audit_log), "blocked": self.violation_count, "allowed": len(self.audit_log) - self.violation_count, "recent_log": self.audit_log[-10:] # Last 10 commands } # Initialize validator validator = CommandValidator() # Create MCP server server = Server("command-validator") @server.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="validate_command", description="Validates a shell command against organizational security policies before execution. Returns validation result with policy details.", inputSchema={ "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to validate" } }, "required": ["command"] } ), Tool( name="execute_validated_command", description="Validates and executes a shell command if it passes all policy checks. Use this instead of direct shell access.", inputSchema={ "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to validate and execute" } }, "required": ["command"] } ), Tool( name="get_validation_stats", description="Get statistics about command validations (blocked vs allowed)", inputSchema={ "type": "object", "properties": {} } ), Tool( name="list_policies", description="List all active security policies and their rules", inputSchema={ "type": "object", "properties": {} } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls""" if name == "validate_command": command = arguments.get("command", "") result = validator.validate_command(command) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "execute_validated_command": command = arguments.get("command", "") validation = validator.validate_command(command) if not validation["allowed"]: return [TextContent( type="text", text=json.dumps({ "status": "blocked", "command": command, "reason": validation["reason"], "policy": validation["policy"] }, indent=2) )] # In production, you would execute the command here # For safety, we'll simulate execution return [TextContent( type="text", text=json.dumps({ "status": "success", "command": command, "validation": "passed", "output": f"[SIMULATED] Command '{command}' would be executed here" }, indent=2) )] elif name == "get_validation_stats": stats = validator.get_stats() return [TextContent( type="text", text=json.dumps(stats, indent=2) )] elif name == "list_policies": policies = { name: { "message": policy["message"], "pattern_count": len(policy["patterns"]) } for name, policy in POLICY_RULES.items() } return [TextContent( type="text", text=json.dumps(policies, indent=2) )] else: raise ValueError(f"Unknown tool: {name}") async def main(): """Run the MCP server""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GILSMON/mcpServer_as_gatekeeper'

If you have feedback or need assistance with the MCP directory API, please join our Discord server