Skip to main content
Glama

Ultimate MCP Coding Platform

IMPLEMENTATION_GUIDE.md20.6 kB
# Ultimate MCP Platform - Implementation Guide **Document Version**: 1.0 **Last Updated**: October 10, 2025 **Status**: In Progress - Phase 1 This document provides step-by-step implementation instructions for enhancing the Ultimate MCP Platform to enterprise/FAANG production standards. It complements the [ENTERPRISE_EVALUATION.md](./ENTERPRISE_EVALUATION.md) strategic roadmap. --- ## Table of Contents 1. [Phase 1: Critical Fixes & Audit Logging](#phase-1-critical-fixes--audit-logging) 2. [Phase 2: RBAC Implementation](#phase-2-rbac-implementation) 3. [Testing Strategy](#testing-strategy) 4. [Integration Checklist](#integration-checklist) 5. [Verification & Validation](#verification--validation) --- ## Phase 1: Critical Fixes & Audit Logging ### ✅ COMPLETED: Syntax Error Fixes **Files Fixed**: - `backend/mcp_server/tools/enhanced_exec_tool.py` (lines 323, 382, 446) - `backend/mcp_server/config.py` (removed unused `os` import) **Verification**: ```bash cd backend source .venv/bin/activate python -m py_compile mcp_server/tools/enhanced_exec_tool.py # Should succeed ruff check . --select F,E # Should show fewer errors ``` ### ✅ COMPLETED: Audit Logging Module **Created Files**: 1. `backend/mcp_server/audit/__init__.py` - Module exports 2. `backend/mcp_server/audit/logger.py` - Core audit logging implementation 3. `backend/tests/test_audit.py` - Comprehensive test suite **Features**: - 11 audit event types (authentication, authorization, code execution, security violations) - Structured logging with JSON output - Neo4j persistence for long-term audit trail - Query capabilities with filters - Error resilience (continues if Neo4j unavailable) **Usage Example**: ```python from mcp_server.audit import AuditLogger, AuditEventType # Initialize with Neo4j client audit_logger = AuditLogger(neo4j_client=neo4j_client) # Log authentication await audit_logger.log_authentication( success=True, user_id="user-123", ip_address="192.168.1.100", request_id=request_id ) # Log code execution await audit_logger.log_code_execution( user_id="user-123", code_hash=hashlib.sha256(code.encode()).hexdigest(), language="python", success=True, duration_ms=execution_time * 1000 ) # Query audit log events = await audit_logger.query_audit_log( event_type=AuditEventType.CODE_EXECUTION, user_id="user-123", limit=100 ) ``` **Testing**: ```bash cd backend source .venv/bin/activate pytest tests/test_audit.py -v # All 13 tests should pass ``` --- ## Phase 2: RBAC Implementation ### Step 1: Design Role/Permission Model **Roles** (from least to most privileged): 1. **viewer**: Read-only access - Permissions: `tools:lint`, `tools:read`, `graph:query` - No code execution, no data modification 2. **developer**: Developer access - Permissions: All viewer + `tools:execute`, `tools:test`, `tools:generate` - Can execute code, run tests, generate code - Cannot modify graph structure 3. **admin**: Full administrative access - Permissions: All developer + `graph:upsert`, `graph:delete`, `system:admin` - Can modify graph, access system administration **Permission Format**: `resource:action` - Resources: `tools`, `graph`, `system` - Actions: `read`, `write`, `execute`, `delete`, `admin` ### Step 2: Implement RBAC Manager **File**: `backend/mcp_server/auth/rbac.py` ```python """Role-Based Access Control (RBAC) implementation.""" from __future__ import annotations from dataclasses import dataclass from enum import Enum from typing import Any class Role(Enum): """User roles.""" VIEWER = "viewer" DEVELOPER = "developer" ADMIN = "admin" @dataclass class Permission: """Permission definition.""" resource: str # 'tools', 'graph', 'system' action: str # 'read', 'write', 'execute', 'delete', 'admin' def __str__(self) -> str: return f"{self.resource}:{self.action}" # Role → Permissions mapping ROLE_PERMISSIONS: dict[Role, list[Permission]] = { Role.VIEWER: [ Permission("tools", "read"), Permission("tools", "lint"), Permission("graph", "query"), ], Role.DEVELOPER: [ # Inherits all viewer permissions Permission("tools", "read"), Permission("tools", "lint"), Permission("graph", "query"), # Additional developer permissions Permission("tools", "execute"), Permission("tools", "test"), Permission("tools", "generate"), ], Role.ADMIN: [ # Inherits all developer permissions Permission("tools", "read"), Permission("tools", "lint"), Permission("graph", "query"), Permission("tools", "execute"), Permission("tools", "test"), Permission("tools", "generate"), # Additional admin permissions Permission("graph", "upsert"), Permission("graph", "delete"), Permission("system", "admin"), ], } class RBACManager: """Role-Based Access Control manager.""" def __init__(self, neo4j_client: Any = None): """Initialize RBAC manager. Args: neo4j_client: Optional Neo4j client for persistence """ self.neo4j_client = neo4j_client def get_role_permissions(self, role: Role) -> list[Permission]: """Get all permissions for a role. Args: role: User role Returns: List of permissions """ return ROLE_PERMISSIONS.get(role, []) def check_permission( self, roles: list[Role], required_permission: Permission ) -> bool: """Check if user has required permission. Args: roles: User's roles required_permission: Permission to check Returns: True if user has permission """ user_permissions = set() for role in roles: # Assuming get_role_permissions returns a list. # For better performance, ROLE_PERMISSIONS could store sets directly. user_permissions.update(self.get_role_permissions(role)) return required_permission in user_permissions async def assign_role(self, user_id: str, role: Role) -> None: """Assign role to user. Args: user_id: User identifier role: Role to assign """ if not self.neo4j_client: return query = """ MERGE (u:User {user_id: $user_id}) MERGE (r:Role {name: $role}) MERGE (u)-[:HAS_ROLE]->(r) """ await self.neo4j_client.execute_write( query, {"user_id": user_id, "role": role.value} ) async def get_user_roles(self, user_id: str) -> list[Role]: """Get user's roles. Args: user_id: User identifier Returns: List of user's roles """ if not self.neo4j_client: return [Role.VIEWER] # Default role query = """ MATCH (u:User {user_id: $user_id})-[:HAS_ROLE]->(r:Role) RETURN r.name AS role """ results = await self.neo4j_client.execute_read(query, {"user_id": user_id}) roles = [] for result in results: try: roles.append(Role(result["role"])) except ValueError: pass # Invalid role, skip return roles or [Role.VIEWER] # Default to viewer if no roles found ``` ### Step 3: Update JWT Token Structure **File**: `backend/mcp_server/auth/jwt_handler.py` ```python """JWT token handling with RBAC support.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any import jwt from .rbac import Role class JWTHandler: """Handle JWT token creation and validation.""" def __init__(self, secret_key: str, algorithm: str = "HS256"): """Initialize JWT handler. Args: secret_key: Secret key for signing tokens algorithm: JWT algorithm """ self.secret_key = secret_key self.algorithm = algorithm def create_token( self, user_id: str, roles: list[Role], expires_in_hours: int = 24, additional_claims: dict[str, Any] | None = None, ) -> str: """Create JWT token with roles. Args: user_id: User identifier roles: User's roles expires_in_hours: Token expiration time additional_claims: Additional claims to include Returns: Signed JWT token """ now = datetime.now(timezone.utc) expires_at = now + timedelta(hours=expires_in_hours) payload = { "sub": user_id, "roles": [role.value for role in roles], "iat": now, "exp": expires_at, "iss": "ultimate-mcp", } if additional_claims: payload.update(additional_claims) return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) def verify_token(self, token: str) -> dict[str, Any]: """Verify and decode JWT token. Args: token: JWT token to verify Returns: Decoded token payload Raises: jwt.InvalidTokenError: If token is invalid """ return jwt.decode( token, self.secret_key, algorithms=[self.algorithm], issuer="ultimate-mcp" ) def extract_roles(self, token: str) -> list[Role]: """Extract roles from JWT token. Args: token: JWT token Returns: List of user roles """ try: payload = self.verify_token(token) role_strings = payload.get("roles", []) valid_roles = [] for role_str in role_strings: try: valid_roles.append(Role(role_str)) except ValueError: # Optionally log the invalid role string for debugging # logger.warning(f"Invalid role '{role_str}' found in token.") pass return valid_roles or [Role.VIEWER] except jwt.InvalidTokenError: return [Role.VIEWER] # Default to viewer on token error ``` ### Step 4: Create Permission Decorator **File**: `backend/mcp_server/auth/decorators.py` ```python """Authentication and authorization decorators.""" from __future__ import annotations from functools import wraps from typing import Any, Callable from fastapi import HTTPException, Request, status from .rbac import Permission, RBACManager def require_permission(resource: str, action: str) -> Callable: """Decorator to require specific permission. Args: resource: Resource name action: Action name Returns: Decorator function """ required_permission = Permission(resource, action) def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: # Extract request from args/kwargs request: Request | None = None for arg in args: if isinstance(arg, Request): request = arg break if not request: request = kwargs.get("request") if not request: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Request object not found", ) # Get security context from request state security_context = getattr(request.state, "security_context", None) if not security_context: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", ) # Check permission rbac_manager = getattr(request.app.state, "rbac_manager", None) if not rbac_manager: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="RBAC manager not initialized", ) roles = security_context.roles if not rbac_manager.check_permission(roles, required_permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission denied: {required_permission}", ) return await func(*args, **kwargs) return wrapper return decorator ``` ### Step 5: Integration with Enhanced Server **Modifications to**: `backend/mcp_server/enhanced_server.py` ```python # Add imports from mcp_server.audit import AuditLogger from mcp_server.auth.rbac import RBACManager, Role from mcp_server.auth.jwt_handler import JWTHandler from mcp_server.auth.decorators import require_permission # Add global components audit_logger: AuditLogger | None = None rbac_manager: RBACManager | None = None jwt_handler: JWTHandler | None = None # Update lifespan to initialize new components @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: global neo4j_client, security_manager, metrics_collector, health_checker global audit_logger, rbac_manager, jwt_handler # ... existing initialization ... # Initialize audit logger audit_logger = AuditLogger(neo4j_client=neo4j_client) # Initialize RBAC manager rbac_manager = RBACManager(neo4j_client=neo4j_client) # Initialize JWT handler jwt_handler = JWTHandler(secret_key=config.security.secret_key) # Store in app state app.state.audit_logger = audit_logger app.state.rbac_manager = rbac_manager app.state.jwt_handler = jwt_handler # ... rest of initialization ... # Update execute_code endpoint with permission check @app.post("/api/v1/execute", response_model=ExecutionResponse) @limiter.limit("10/minute") @require_permission("tools", "execute") # Add permission check async def execute_code( request: Request, execution_request: ExecutionRequest, security_context: SecurityContext = Depends(get_security_context), ) -> ExecutionResponse: """Execute code with enhanced security and monitoring.""" # Log execution attempt if audit_logger: await audit_logger.log_code_execution( user_id=security_context.user_id, code_hash=hashlib.sha256(execution_request.code.encode()).hexdigest(), language=execution_request.language, success=True, # Will be updated duration_ms=0, # Will be updated ip_address=security_context.ip_address, request_id=str(uuid.uuid4()), ) # ... existing execution logic ... ``` --- ## Testing Strategy ### Audit Logging Tests **Run Tests**: ```bash cd backend source .venv/bin/activate pytest tests/test_audit.py -v --cov=mcp_server/audit ``` **Expected Output**: - ✅ test_log_authentication_success - ✅ test_log_authentication_failure - ✅ test_log_authorization_granted - ✅ test_log_authorization_denied - ✅ test_log_code_execution - ✅ test_log_security_violation - ✅ test_query_audit_log_no_filters - ✅ test_query_audit_log_with_filters - ✅ test_audit_logger_without_neo4j - ✅ test_audit_event_persistence_failure **Coverage Target**: >90% ### RBAC Tests **File**: `backend/tests/test_rbac.py` (to be created) ```python """Tests for RBAC functionality.""" import pytest from mcp_server.auth.rbac import Permission, RBACManager, Role @pytest.mark.asyncio async def test_viewer_permissions(): """Test viewer role has correct permissions.""" rbac = RBACManager() # Viewer can lint assert rbac.check_permission( [Role.VIEWER], Permission("tools", "lint") ) # Viewer cannot execute assert not rbac.check_permission( [Role.VIEWER], Permission("tools", "execute") ) @pytest.mark.asyncio async def test_developer_permissions(): """Test developer role has correct permissions.""" rbac = RBACManager() # Developer can execute assert rbac.check_permission( [Role.DEVELOPER], Permission("tools", "execute") ) # Developer cannot upsert graph assert not rbac.check_permission( [Role.DEVELOPER], Permission("graph", "upsert") ) @pytest.mark.asyncio async def test_admin_permissions(): """Test admin role has all permissions.""" rbac = RBACManager() # Admin can do everything assert rbac.check_permission( [Role.ADMIN], Permission("graph", "upsert") ) assert rbac.check_permission( [Role.ADMIN], Permission("system", "admin") ) ``` --- ## Integration Checklist ### Phase 1 Completion Checklist - [x] Fix syntax errors in enhanced_exec_tool.py - [x] Remove unused imports - [x] Run ruff auto-fix - [x] Create audit logging module - [x] Write comprehensive audit logging tests - [ ] Create RBAC module - [ ] Implement JWT handler with roles - [ ] Add permission decorators - [ ] Integrate with enhanced server - [ ] Write RBAC tests - [ ] Update API documentation - [ ] Test end-to-end authentication flow - [ ] Update deployment documentation ### Pre-Production Validation Before moving to Phase 2: 1. **Code Quality** ```bash ruff check backend/mcp_server --select E,F,I,B,UP,N,S mypy backend/mcp_server ``` 2. **Test Coverage** ```bash pytest backend/tests --cov=backend/mcp_server --cov-report=term-missing --cov-fail-under=80 ``` 3. **Security Scan** ```bash bandit -r backend/mcp_server -ll safety check --file backend/requirements_enhanced.txt ``` 4. **Integration Test** - Start full stack (backend + Neo4j) - Test authentication flow - Verify audit logs in Neo4j - Test permission enforcement - Check metrics endpoint --- ## Verification & Validation ### Manual Testing Script ```bash #!/bin/bash # Test Phase 1 implementation set -e echo "=== Phase 1 Validation ===" # 1. Start Neo4j echo "Starting Neo4j..." docker run -d --name test-neo4j \ -p 7474:7474 -p 7687:7687 \ -e NEO4J_AUTH=neo4j/testpassword123 \ neo4j:5.23.0 sleep 10 # 2. Start backend echo "Starting backend..." export NEO4J_PASSWORD=testpassword123 export AUTH_TOKEN=$(openssl rand -hex 24) export SECRET_KEY=$(openssl rand -hex 32) cd backend source .venv/bin/activate uvicorn mcp_server.enhanced_server:app --host 0.0.0.0 --port 8000 & BACKEND_PID=$! sleep 5 # 3. Test health echo "Testing health endpoint..." curl -f http://localhost:8000/health || { echo "Health check failed"; exit 1; } # 4. Test authentication echo "Testing authentication..." TOKEN=$(curl -X POST http://localhost:8000/auth/token \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"admin"}' | jq -r '.access_token') # 5. Test code execution with audit echo "Testing code execution..." curl -X POST http://localhost:8000/api/v1/execute \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"code":"print(\"hello\")", "language":"python"}' || { echo "Execution failed"; exit 1; } # 6. Query audit log echo "Querying audit log..." curl -X GET http://localhost:8000/api/v1/audit?limit=10 \ -H "Authorization: Bearer $TOKEN" # 7. Cleanup echo "Cleaning up..." kill $BACKEND_PID docker stop test-neo4j docker rm test-neo4j echo "=== All tests passed! ===" ``` ### Success Criteria Phase 1 is complete when: 1. ✅ All syntax errors fixed 2. ✅ Audit logging module implemented and tested 3. ⏳ RBAC system implemented and tested 4. ⏳ JWT handler with roles implemented 5. ⏳ Permission decorators working 6. ⏳ Integration tests passing 7. ⏳ Documentation updated 8. ⏳ Code quality checks passing (ruff, mypy, bandit) 9. ⏳ Test coverage >80% --- ## Next Steps After Phase 1 completion, proceed to: - **Phase 2**: Security Hardening (Secrets Management, Enhanced Sandboxing) - **Phase 3**: Observability Platform (OpenTelemetry, Prometheus, ELK) Refer to [ENTERPRISE_EVALUATION.md](./ENTERPRISE_EVALUATION.md) for the full roadmap. --- **Document Owner**: Engineering Team **Review Cadence**: Weekly **Last Reviewed**: October 10, 2025

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Senpai-Sama7/Ultimate_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server