IMPLEMENTATION_GUIDE.md•20.6 kB
# Ultimate MCP Platform - Implementation Guide
**Document Version**: 1.0
**Last Updated**: October 10, 2025
**Status**: In Progress - Phase 1
This document provides step-by-step implementation instructions for enhancing the Ultimate MCP Platform to enterprise/FAANG production standards. It complements the [ENTERPRISE_EVALUATION.md](./ENTERPRISE_EVALUATION.md) strategic roadmap.
---
## Table of Contents
1. [Phase 1: Critical Fixes & Audit Logging](#phase-1-critical-fixes--audit-logging)
2. [Phase 2: RBAC Implementation](#phase-2-rbac-implementation)
3. [Testing Strategy](#testing-strategy)
4. [Integration Checklist](#integration-checklist)
5. [Verification & Validation](#verification--validation)
---
## Phase 1: Critical Fixes & Audit Logging
### ✅ COMPLETED: Syntax Error Fixes
**Files Fixed**:
- `backend/mcp_server/tools/enhanced_exec_tool.py` (lines 323, 382, 446)
- `backend/mcp_server/config.py` (removed unused `os` import)
**Verification**:
```bash
cd backend
source .venv/bin/activate
python -m py_compile mcp_server/tools/enhanced_exec_tool.py # Should succeed
ruff check . --select F,E # Should show fewer errors
```
### ✅ COMPLETED: Audit Logging Module
**Created Files**:
1. `backend/mcp_server/audit/__init__.py` - Module exports
2. `backend/mcp_server/audit/logger.py` - Core audit logging implementation
3. `backend/tests/test_audit.py` - Comprehensive test suite
**Features**:
- 11 audit event types (authentication, authorization, code execution, security violations)
- Structured logging with JSON output
- Neo4j persistence for long-term audit trail
- Query capabilities with filters
- Error resilience (continues if Neo4j unavailable)
**Usage Example**:
```python
from mcp_server.audit import AuditLogger, AuditEventType
# Initialize with Neo4j client
audit_logger = AuditLogger(neo4j_client=neo4j_client)
# Log authentication
await audit_logger.log_authentication(
success=True,
user_id="user-123",
ip_address="192.168.1.100",
request_id=request_id
)
# Log code execution
await audit_logger.log_code_execution(
user_id="user-123",
code_hash=hashlib.sha256(code.encode()).hexdigest(),
language="python",
success=True,
duration_ms=execution_time * 1000
)
# Query audit log
events = await audit_logger.query_audit_log(
event_type=AuditEventType.CODE_EXECUTION,
user_id="user-123",
limit=100
)
```
**Testing**:
```bash
cd backend
source .venv/bin/activate
pytest tests/test_audit.py -v
# All 13 tests should pass
```
---
## Phase 2: RBAC Implementation
### Step 1: Design Role/Permission Model
**Roles** (from least to most privileged):
1. **viewer**: Read-only access
- Permissions: `tools:lint`, `tools:read`, `graph:query`
- No code execution, no data modification
2. **developer**: Developer access
- Permissions: All viewer + `tools:execute`, `tools:test`, `tools:generate`
- Can execute code, run tests, generate code
- Cannot modify graph structure
3. **admin**: Full administrative access
- Permissions: All developer + `graph:upsert`, `graph:delete`, `system:admin`
- Can modify graph, access system administration
**Permission Format**: `resource:action`
- Resources: `tools`, `graph`, `system`
- Actions: `read`, `write`, `execute`, `delete`, `admin`
### Step 2: Implement RBAC Manager
**File**: `backend/mcp_server/auth/rbac.py`
```python
"""Role-Based Access Control (RBAC) implementation."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Any
class Role(Enum):
"""User roles."""
VIEWER = "viewer"
DEVELOPER = "developer"
ADMIN = "admin"
@dataclass
class Permission:
"""Permission definition."""
resource: str # 'tools', 'graph', 'system'
action: str # 'read', 'write', 'execute', 'delete', 'admin'
def __str__(self) -> str:
return f"{self.resource}:{self.action}"
# Role → Permissions mapping
ROLE_PERMISSIONS: dict[Role, list[Permission]] = {
Role.VIEWER: [
Permission("tools", "read"),
Permission("tools", "lint"),
Permission("graph", "query"),
],
Role.DEVELOPER: [
# Inherits all viewer permissions
Permission("tools", "read"),
Permission("tools", "lint"),
Permission("graph", "query"),
# Additional developer permissions
Permission("tools", "execute"),
Permission("tools", "test"),
Permission("tools", "generate"),
],
Role.ADMIN: [
# Inherits all developer permissions
Permission("tools", "read"),
Permission("tools", "lint"),
Permission("graph", "query"),
Permission("tools", "execute"),
Permission("tools", "test"),
Permission("tools", "generate"),
# Additional admin permissions
Permission("graph", "upsert"),
Permission("graph", "delete"),
Permission("system", "admin"),
],
}
class RBACManager:
"""Role-Based Access Control manager."""
def __init__(self, neo4j_client: Any = None):
"""Initialize RBAC manager.
Args:
neo4j_client: Optional Neo4j client for persistence
"""
self.neo4j_client = neo4j_client
def get_role_permissions(self, role: Role) -> list[Permission]:
"""Get all permissions for a role.
Args:
role: User role
Returns:
List of permissions
"""
return ROLE_PERMISSIONS.get(role, [])
def check_permission(
self, roles: list[Role], required_permission: Permission
) -> bool:
"""Check if user has required permission.
Args:
roles: User's roles
required_permission: Permission to check
Returns:
True if user has permission
"""
user_permissions = set()
for role in roles:
# Assuming get_role_permissions returns a list.
# For better performance, ROLE_PERMISSIONS could store sets directly.
user_permissions.update(self.get_role_permissions(role))
return required_permission in user_permissions
async def assign_role(self, user_id: str, role: Role) -> None:
"""Assign role to user.
Args:
user_id: User identifier
role: Role to assign
"""
if not self.neo4j_client:
return
query = """
MERGE (u:User {user_id: $user_id})
MERGE (r:Role {name: $role})
MERGE (u)-[:HAS_ROLE]->(r)
"""
await self.neo4j_client.execute_write(
query, {"user_id": user_id, "role": role.value}
)
async def get_user_roles(self, user_id: str) -> list[Role]:
"""Get user's roles.
Args:
user_id: User identifier
Returns:
List of user's roles
"""
if not self.neo4j_client:
return [Role.VIEWER] # Default role
query = """
MATCH (u:User {user_id: $user_id})-[:HAS_ROLE]->(r:Role)
RETURN r.name AS role
"""
results = await self.neo4j_client.execute_read(query, {"user_id": user_id})
roles = []
for result in results:
try:
roles.append(Role(result["role"]))
except ValueError:
pass # Invalid role, skip
return roles or [Role.VIEWER] # Default to viewer if no roles found
```
### Step 3: Update JWT Token Structure
**File**: `backend/mcp_server/auth/jwt_handler.py`
```python
"""JWT token handling with RBAC support."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from .rbac import Role
class JWTHandler:
"""Handle JWT token creation and validation."""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
"""Initialize JWT handler.
Args:
secret_key: Secret key for signing tokens
algorithm: JWT algorithm
"""
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(
self,
user_id: str,
roles: list[Role],
expires_in_hours: int = 24,
additional_claims: dict[str, Any] | None = None,
) -> str:
"""Create JWT token with roles.
Args:
user_id: User identifier
roles: User's roles
expires_in_hours: Token expiration time
additional_claims: Additional claims to include
Returns:
Signed JWT token
"""
now = datetime.now(timezone.utc)
expires_at = now + timedelta(hours=expires_in_hours)
payload = {
"sub": user_id,
"roles": [role.value for role in roles],
"iat": now,
"exp": expires_at,
"iss": "ultimate-mcp",
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> dict[str, Any]:
"""Verify and decode JWT token.
Args:
token: JWT token to verify
Returns:
Decoded token payload
Raises:
jwt.InvalidTokenError: If token is invalid
"""
return jwt.decode(
token, self.secret_key, algorithms=[self.algorithm], issuer="ultimate-mcp"
)
def extract_roles(self, token: str) -> list[Role]:
"""Extract roles from JWT token.
Args:
token: JWT token
Returns:
List of user roles
"""
try:
payload = self.verify_token(token)
role_strings = payload.get("roles", [])
valid_roles = []
for role_str in role_strings:
try:
valid_roles.append(Role(role_str))
except ValueError:
# Optionally log the invalid role string for debugging
# logger.warning(f"Invalid role '{role_str}' found in token.")
pass
return valid_roles or [Role.VIEWER]
except jwt.InvalidTokenError:
return [Role.VIEWER] # Default to viewer on token error
```
### Step 4: Create Permission Decorator
**File**: `backend/mcp_server/auth/decorators.py`
```python
"""Authentication and authorization decorators."""
from __future__ import annotations
from functools import wraps
from typing import Any, Callable
from fastapi import HTTPException, Request, status
from .rbac import Permission, RBACManager
def require_permission(resource: str, action: str) -> Callable:
"""Decorator to require specific permission.
Args:
resource: Resource name
action: Action name
Returns:
Decorator function
"""
required_permission = Permission(resource, action)
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Extract request from args/kwargs
request: Request | None = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
request = kwargs.get("request")
if not request:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Request object not found",
)
# Get security context from request state
security_context = getattr(request.state, "security_context", None)
if not security_context:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
)
# Check permission
rbac_manager = getattr(request.app.state, "rbac_manager", None)
if not rbac_manager:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="RBAC manager not initialized",
)
roles = security_context.roles
if not rbac_manager.check_permission(roles, required_permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {required_permission}",
)
return await func(*args, **kwargs)
return wrapper
return decorator
```
### Step 5: Integration with Enhanced Server
**Modifications to**: `backend/mcp_server/enhanced_server.py`
```python
# Add imports
from mcp_server.audit import AuditLogger
from mcp_server.auth.rbac import RBACManager, Role
from mcp_server.auth.jwt_handler import JWTHandler
from mcp_server.auth.decorators import require_permission
# Add global components
audit_logger: AuditLogger | None = None
rbac_manager: RBACManager | None = None
jwt_handler: JWTHandler | None = None
# Update lifespan to initialize new components
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
global neo4j_client, security_manager, metrics_collector, health_checker
global audit_logger, rbac_manager, jwt_handler
# ... existing initialization ...
# Initialize audit logger
audit_logger = AuditLogger(neo4j_client=neo4j_client)
# Initialize RBAC manager
rbac_manager = RBACManager(neo4j_client=neo4j_client)
# Initialize JWT handler
jwt_handler = JWTHandler(secret_key=config.security.secret_key)
# Store in app state
app.state.audit_logger = audit_logger
app.state.rbac_manager = rbac_manager
app.state.jwt_handler = jwt_handler
# ... rest of initialization ...
# Update execute_code endpoint with permission check
@app.post("/api/v1/execute", response_model=ExecutionResponse)
@limiter.limit("10/minute")
@require_permission("tools", "execute") # Add permission check
async def execute_code(
request: Request,
execution_request: ExecutionRequest,
security_context: SecurityContext = Depends(get_security_context),
) -> ExecutionResponse:
"""Execute code with enhanced security and monitoring."""
# Log execution attempt
if audit_logger:
await audit_logger.log_code_execution(
user_id=security_context.user_id,
code_hash=hashlib.sha256(execution_request.code.encode()).hexdigest(),
language=execution_request.language,
success=True, # Will be updated
duration_ms=0, # Will be updated
ip_address=security_context.ip_address,
request_id=str(uuid.uuid4()),
)
# ... existing execution logic ...
```
---
## Testing Strategy
### Audit Logging Tests
**Run Tests**:
```bash
cd backend
source .venv/bin/activate
pytest tests/test_audit.py -v --cov=mcp_server/audit
```
**Expected Output**:
- ✅ test_log_authentication_success
- ✅ test_log_authentication_failure
- ✅ test_log_authorization_granted
- ✅ test_log_authorization_denied
- ✅ test_log_code_execution
- ✅ test_log_security_violation
- ✅ test_query_audit_log_no_filters
- ✅ test_query_audit_log_with_filters
- ✅ test_audit_logger_without_neo4j
- ✅ test_audit_event_persistence_failure
**Coverage Target**: >90%
### RBAC Tests
**File**: `backend/tests/test_rbac.py` (to be created)
```python
"""Tests for RBAC functionality."""
import pytest
from mcp_server.auth.rbac import Permission, RBACManager, Role
@pytest.mark.asyncio
async def test_viewer_permissions():
"""Test viewer role has correct permissions."""
rbac = RBACManager()
# Viewer can lint
assert rbac.check_permission(
[Role.VIEWER],
Permission("tools", "lint")
)
# Viewer cannot execute
assert not rbac.check_permission(
[Role.VIEWER],
Permission("tools", "execute")
)
@pytest.mark.asyncio
async def test_developer_permissions():
"""Test developer role has correct permissions."""
rbac = RBACManager()
# Developer can execute
assert rbac.check_permission(
[Role.DEVELOPER],
Permission("tools", "execute")
)
# Developer cannot upsert graph
assert not rbac.check_permission(
[Role.DEVELOPER],
Permission("graph", "upsert")
)
@pytest.mark.asyncio
async def test_admin_permissions():
"""Test admin role has all permissions."""
rbac = RBACManager()
# Admin can do everything
assert rbac.check_permission(
[Role.ADMIN],
Permission("graph", "upsert")
)
assert rbac.check_permission(
[Role.ADMIN],
Permission("system", "admin")
)
```
---
## Integration Checklist
### Phase 1 Completion Checklist
- [x] Fix syntax errors in enhanced_exec_tool.py
- [x] Remove unused imports
- [x] Run ruff auto-fix
- [x] Create audit logging module
- [x] Write comprehensive audit logging tests
- [ ] Create RBAC module
- [ ] Implement JWT handler with roles
- [ ] Add permission decorators
- [ ] Integrate with enhanced server
- [ ] Write RBAC tests
- [ ] Update API documentation
- [ ] Test end-to-end authentication flow
- [ ] Update deployment documentation
### Pre-Production Validation
Before moving to Phase 2:
1. **Code Quality**
```bash
ruff check backend/mcp_server --select E,F,I,B,UP,N,S
mypy backend/mcp_server
```
2. **Test Coverage**
```bash
pytest backend/tests --cov=backend/mcp_server --cov-report=term-missing --cov-fail-under=80
```
3. **Security Scan**
```bash
bandit -r backend/mcp_server -ll
safety check --file backend/requirements_enhanced.txt
```
4. **Integration Test**
- Start full stack (backend + Neo4j)
- Test authentication flow
- Verify audit logs in Neo4j
- Test permission enforcement
- Check metrics endpoint
---
## Verification & Validation
### Manual Testing Script
```bash
#!/bin/bash
# Test Phase 1 implementation
set -e
echo "=== Phase 1 Validation ==="
# 1. Start Neo4j
echo "Starting Neo4j..."
docker run -d --name test-neo4j \
-p 7474:7474 -p 7687:7687 \
-e NEO4J_AUTH=neo4j/testpassword123 \
neo4j:5.23.0
sleep 10
# 2. Start backend
echo "Starting backend..."
export NEO4J_PASSWORD=testpassword123
export AUTH_TOKEN=$(openssl rand -hex 24)
export SECRET_KEY=$(openssl rand -hex 32)
cd backend
source .venv/bin/activate
uvicorn mcp_server.enhanced_server:app --host 0.0.0.0 --port 8000 &
BACKEND_PID=$!
sleep 5
# 3. Test health
echo "Testing health endpoint..."
curl -f http://localhost:8000/health || { echo "Health check failed"; exit 1; }
# 4. Test authentication
echo "Testing authentication..."
TOKEN=$(curl -X POST http://localhost:8000/auth/token \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin"}' | jq -r '.access_token')
# 5. Test code execution with audit
echo "Testing code execution..."
curl -X POST http://localhost:8000/api/v1/execute \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"code":"print(\"hello\")", "language":"python"}' || { echo "Execution failed"; exit 1; }
# 6. Query audit log
echo "Querying audit log..."
curl -X GET http://localhost:8000/api/v1/audit?limit=10 \
-H "Authorization: Bearer $TOKEN"
# 7. Cleanup
echo "Cleaning up..."
kill $BACKEND_PID
docker stop test-neo4j
docker rm test-neo4j
echo "=== All tests passed! ==="
```
### Success Criteria
Phase 1 is complete when:
1. ✅ All syntax errors fixed
2. ✅ Audit logging module implemented and tested
3. ⏳ RBAC system implemented and tested
4. ⏳ JWT handler with roles implemented
5. ⏳ Permission decorators working
6. ⏳ Integration tests passing
7. ⏳ Documentation updated
8. ⏳ Code quality checks passing (ruff, mypy, bandit)
9. ⏳ Test coverage >80%
---
## Next Steps
After Phase 1 completion, proceed to:
- **Phase 2**: Security Hardening (Secrets Management, Enhanced Sandboxing)
- **Phase 3**: Observability Platform (OpenTelemetry, Prometheus, ELK)
Refer to [ENTERPRISE_EVALUATION.md](./ENTERPRISE_EVALUATION.md) for the full roadmap.
---
**Document Owner**: Engineering Team
**Review Cadence**: Weekly
**Last Reviewed**: October 10, 2025