IMPLEMENTATION_CHECKLIST.md•25 kB
# IMPLEMENTATION CHECKLIST - ULTIMATE MCP PLATFORM
## Systematic Enhancement Plan
**Version:** 1.0
**Created:** October 21, 2025
**Priority System:** P0 (Critical) → P1 (High) → P2 (Medium) → P3 (Low)
---
## PRIORITY 0: CRITICAL INFRASTRUCTURE (Complete First)
### ✅ Completion Criteria
- [ ] All P0 items implemented and tested
- [ ] Load test: Handle 500 RPS for 10 minutes
- [ ] Zero database-related failures in stress test
- [ ] Performance improvement: >50% latency reduction
---
### P0.1: Add Retry Logic to Neo4jClient ⏱️ 4h | 🎯 High Impact
**Location:** `backend/mcp_server/database/neo4j_client.py`
**Problem:** Single database failures cause immediate request failures without retry
**Implementation:**
```python
# Add dependency
# requirements.txt: tenacity==8.2.3
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
from neo4j.exceptions import ServiceUnavailable, SessionExpired
import logging
logger = logging.getLogger(__name__)
class Neo4jClient:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((ServiceUnavailable, SessionExpired)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def execute_read_with_retry(
self, query: str, parameters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
"""Execute read query with automatic retry on transient failures."""
return await self.execute_read(query, parameters)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((ServiceUnavailable, SessionExpired)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def execute_write_with_retry(
self, query: str, parameters: dict[str, Any] | None = None
) -> None:
"""Execute write query with automatic retry on transient failures."""
return await self.execute_write(query, parameters)
```
**Testing:**
```python
# tests/test_neo4j_retry.py
async def test_retry_on_service_unavailable(mocker):
client = Neo4jClient(...)
mock_execute = mocker.patch.object(client, 'execute_read')
mock_execute.side_effect = [
ServiceUnavailable("Connection lost"),
ServiceUnavailable("Connection lost"),
[{"result": "success"}] # Third attempt succeeds
]
result = await client.execute_read_with_retry("RETURN 1")
assert result == [{"result": "success"}]
assert mock_execute.call_count == 3
```
**Success Metrics:**
- [ ] Retry logic handles 99% of transient failures
- [ ] Average retry count <1.1 per request
- [ ] No cascading failures during network hiccups
---
### P0.2: Integrate Circuit Breakers into Neo4j Operations ⏱️ 6h | 🎯 High Impact
**Location:** `backend/mcp_server/database/neo4j_client.py`
**Problem:** Cascading failures when database is down
**Implementation:**
```python
from .utils.circuit_breaker import CircuitBreakerRegistry, CircuitBreakerConfig
class Neo4jClient:
def __init__(self, uri, user, password, database, **kwargs):
# ... existing init ...
self.circuit_registry = CircuitBreakerRegistry()
self._read_breaker_config = CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout_seconds=30.0
)
self._write_breaker_config = CircuitBreakerConfig(
failure_threshold=3, # More strict for writes
success_threshold=2,
timeout_seconds=60.0
)
async def execute_read(self, query: str, parameters: dict[str, Any] | None = None):
breaker = await self.circuit_registry.get_or_create(
"neo4j_read",
self._read_breaker_config
)
return await breaker.call(self._execute_read_internal, query, parameters)
async def execute_write(self, query: str, parameters: dict[str, Any] | None = None):
breaker = await self.circuit_registry.get_or_create(
"neo4j_write",
self._write_breaker_config
)
return await breaker.call(self._execute_write_internal, query, parameters)
async def _execute_read_internal(self, query, parameters):
"""Internal read implementation (original execute_read logic)"""
# Move original execute_read logic here
...
```
**Testing:**
```python
async def test_circuit_breaker_opens_after_failures():
client = Neo4jClient(...)
# Trigger 5 failures
for _ in range(5):
with pytest.raises(Neo4jError):
await client.execute_read("INVALID QUERY")
# Circuit should be open now
with pytest.raises(CircuitBreakerError):
await client.execute_read("RETURN 1")
```
**Success Metrics:**
- [ ] Circuit opens after configured threshold
- [ ] System fails fast when circuit is open (<10ms)
- [ ] Circuit auto-recovers after timeout period
---
### P0.3: Implement ProcessPoolExecutor for Code Execution ⏱️ 4h | 🎯 High Impact
**Location:** `backend/mcp_server/tools/exec_tool.py`
**Problem:** `asyncio.to_thread` exhausts threadpool under load
**Implementation:**
```python
import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
class ExecutionTool:
def __init__(self, neo4j: Neo4jClient) -> None:
self._neo4j = neo4j
# Use separate process pool for isolation
max_workers = min(multiprocessing.cpu_count(), 4)
self._executor = ProcessPoolExecutor(max_workers=max_workers)
self._semaphore = asyncio.Semaphore(max_workers * 2)
async def run(self, request: ExecutionRequest) -> ExecutionResponse:
ensure_supported_language(request.language)
if request.language != "python":
raise ValueError("Execution tool currently supports only Python code.")
ensure_safe_python(request.code)
# Use semaphore to limit concurrent executions
async with self._semaphore:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self._executor,
self._execute_python,
request
)
await self._persist(result)
return ExecutionResponse(
id=result.id,
return_code=result.return_code,
stdout=result.stdout,
stderr=result.stderr,
duration_seconds=result.duration_seconds,
)
async def shutdown(self):
"""Graceful shutdown of executor"""
self._executor.shutdown(wait=True)
```
**Testing:**
```python
@pytest.mark.asyncio
async def test_concurrent_execution_scaling():
tool = ExecutionTool(mock_neo4j)
# Execute 100 concurrent requests
requests = [
ExecutionRequest(code="print('test')", language="python")
for _ in range(100)
]
start = time.time()
results = await asyncio.gather(*[tool.run(req) for req in requests])
duration = time.time() - start
assert all(r.return_code == 0 for r in results)
assert duration < 10.0 # Should complete in <10s
```
**Success Metrics:**
- [ ] Handle 100 concurrent executions without blocking
- [ ] Total time for 100 executions <10s (vs ~25s currently)
- [ ] CPU utilization >60% during execution
---
### P0.4: Add Distributed Caching with Redis ⏱️ 8h | 🎯 High Impact
**Location:** `backend/mcp_server/utils/cache.py`
**Problem:** In-memory cache doesn't scale across instances
**Implementation:**
```python
import aioredis
import json
from typing import Any
class DistributedCache:
"""Two-tier cache: L1 (in-memory) + L2 (Redis)"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
local_max_size: int = 500,
default_ttl: float = 3600.0,
):
self.redis_url = redis_url
self.local_cache = InMemoryCache(max_size=local_max_size, default_ttl=default_ttl)
self.redis: aioredis.Redis | None = None
self.default_ttl = default_ttl
async def connect(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
async def close(self):
"""Close Redis connection"""
if self.redis:
await self.redis.close()
async def get(self, key: str) -> Any | None:
# Try L1 cache first (fastest)
value = await self.local_cache.get(key)
if value is not None:
return value
# Try L2 cache (Redis)
if self.redis:
try:
redis_value = await self.redis.get(key)
if redis_value:
value = json.loads(redis_value)
# Populate L1 cache
await self.local_cache.set(key, value)
return value
except Exception as e:
logger.error(f"Redis get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: float | None = None) -> None:
effective_ttl = ttl or self.default_ttl
# Set in L1 (local)
await self.local_cache.set(key, value, effective_ttl)
# Set in L2 (Redis)
if self.redis:
try:
await self.redis.setex(
key,
int(effective_ttl),
json.dumps(value, default=str)
)
except Exception as e:
logger.error(f"Redis set error: {e}")
async def delete(self, key: str) -> bool:
# Delete from both tiers
local_deleted = await self.local_cache.delete(key)
if self.redis:
try:
await self.redis.delete(key)
except Exception as e:
logger.error(f"Redis delete error: {e}")
return local_deleted
```
**Configuration:**
```python
# backend/mcp_server/config.py
class CacheConfig(SettingsBase):
enabled: bool = Field(default=True)
backend: str = Field(default="redis") # "memory" or "redis"
redis_url: str = Field(default="redis://localhost:6379")
local_max_size: int = Field(default=500)
default_ttl: int = Field(default=3600)
```
**Testing:**
```python
async def test_distributed_cache_l1_l2():
cache = DistributedCache(redis_url="redis://localhost:6379")
await cache.connect()
# Set value
await cache.set("test_key", {"data": "value"})
# Clear L1 to force L2 lookup
await cache.local_cache.clear()
# Should retrieve from L2 (Redis)
value = await cache.get("test_key")
assert value == {"data": "value"}
await cache.close()
```
**Success Metrics:**
- [ ] Cache hit rate >80% with multi-instance deployment
- [ ] L1 hit latency <1ms, L2 hit latency <10ms
- [ ] Zero cache inconsistencies across instances
---
### P0.5: Optimize Neo4j Connection Pool ⏱️ 2h | 🎯 Medium Impact
**Location:** `backend/mcp_server/database/neo4j_client.py`
**Problem:** Default pool sizing not optimized for workload
**Implementation:**
```python
class Neo4jClient:
def __init__(
self,
uri: str,
user: str,
password: str,
database: str,
*,
max_connection_pool_size: int | None = None,
connection_acquisition_timeout: float | None = None,
max_connection_lifetime: int = 3600, # 1 hour (was 300s)
) -> None:
# Auto-calculate optimal pool size if not provided
if max_connection_pool_size is None:
import multiprocessing
cpu_count = multiprocessing.cpu_count()
# Formula: pool_size = cpu_count * 2 + spare
max_connection_pool_size = min(cpu_count * 2 + 4, 100)
if connection_acquisition_timeout is None:
# Fail fast - don't queue indefinitely
connection_acquisition_timeout = 5.0
self._driver = AsyncGraphDatabase.driver(
uri,
auth=(user, password),
max_connection_lifetime=max_connection_lifetime,
max_connection_pool_size=max_connection_pool_size,
connection_acquisition_timeout=connection_acquisition_timeout,
# Additional optimizations
keep_alive=True,
connection_timeout=10.0,
max_transaction_retry_time=15.0,
)
self._database = database
logger.info(
"Neo4j connection pool configured",
extra={
"max_pool_size": max_connection_pool_size,
"acquisition_timeout": connection_acquisition_timeout,
"max_lifetime": max_connection_lifetime,
}
)
```
**Configuration:**
```python
# config/production.yaml
database:
max_connection_pool_size: 50 # Tune based on load
connection_acquisition_timeout: 5.0
max_connection_lifetime: 3600
```
**Success Metrics:**
- [ ] Connection acquisition time <10ms at P99
- [ ] Pool utilization 50-70% under normal load
- [ ] Zero connection timeout errors under 500 RPS
---
### P0.6: Add Query Result Caching ⏱️ 4h | 🎯 High Impact
**Location:** `backend/mcp_server/database/neo4j_client.py`
**Problem:** Expensive metrics queries re-executed every request
**Implementation:**
```python
from functools import wraps
import hashlib
class Neo4jClient:
def __init__(self, ...):
# ... existing init ...
self.query_cache = DistributedCache() # Or InMemoryCache if Redis not available
def cached_query(self, ttl: float = 60.0):
"""Decorator to cache query results"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key from query + params
cache_key = self._generate_cache_key(func.__name__, args, kwargs)
# Try cache first
cached_result = await self.query_cache.get(cache_key)
if cached_result is not None:
logger.debug(f"Query cache hit: {func.__name__}")
return cached_result
# Execute query
result = await func(*args, **kwargs)
# Cache result
await self.query_cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def _generate_cache_key(self, func_name: str, args, kwargs) -> str:
"""Generate deterministic cache key"""
import json
key_data = {
"func": func_name,
"args": args[1:], # Skip self
"kwargs": kwargs
}
key_str = json.dumps(key_data, sort_keys=True, default=str)
return f"neo4j:{hashlib.sha256(key_str.encode()).hexdigest()[:16]}"
@cached_query(ttl=60.0) # Cache for 60 seconds
async def get_metrics(self) -> GraphMetrics:
"""Get graph metrics with caching"""
# ... existing implementation ...
@cached_query(ttl=300.0) # Cache for 5 minutes
async def execute_read(self, query: str, parameters: dict[str, Any] | None = None):
"""Execute read query with caching for safe queries"""
# Only cache if query is deterministic and safe
if self._is_cacheable_query(query):
return await self._execute_read_cached(query, parameters)
return await self._execute_read_internal(query, parameters)
def _is_cacheable_query(self, query: str) -> bool:
"""Check if query is safe to cache"""
query_upper = query.upper().strip()
# Only cache pure MATCH queries without time-based functions
return (
query_upper.startswith("MATCH")
and "CALL" not in query_upper
and "DATETIME()" not in query_upper
and "TIMESTAMP()" not in query_upper
)
```
**Testing:**
```python
async def test_query_result_caching():
client = Neo4jClient(...)
# First call - cache miss
result1 = await client.get_metrics()
# Second call - cache hit
start = time.time()
result2 = await client.get_metrics()
duration = time.time() - start
assert result1 == result2
assert duration < 0.01 # <10ms from cache
```
**Success Metrics:**
- [ ] Metrics endpoint latency reduced by 90% (100ms → 10ms)
- [ ] Cache hit rate >70% for read queries
- [ ] No stale data (cache invalidation works correctly)
---
## PRIORITY 1: SECURITY & PERFORMANCE ENHANCEMENTS
### P1.1: Implement Comprehensive Input Sanitization ⏱️ 6h
**Location:** Create `backend/mcp_server/utils/sanitization.py`
**Implementation:**
```python
import bleach
import html
from typing import Any, Dict
class InputSanitizer:
"""Comprehensive input sanitization for all user inputs"""
@staticmethod
def sanitize_string(value: str, max_length: int = 10000) -> str:
"""Sanitize string input - remove dangerous characters"""
if len(value) > max_length:
raise ValueError(f"String exceeds max length of {max_length}")
# Remove null bytes and control characters (except newlines/tabs)
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)
# HTML escape to prevent XSS
sanitized = html.escape(sanitized)
return sanitized
@staticmethod
def sanitize_dict(data: Dict[str, Any], max_depth: int = 5) -> Dict[str, Any]:
"""Recursively sanitize dictionary"""
if max_depth <= 0:
raise ValueError("Maximum nesting depth exceeded")
sanitized = {}
for key, value in data.items():
# Sanitize key
clean_key = InputSanitizer.sanitize_string(str(key), max_length=256)
# Sanitize value based on type
if isinstance(value, str):
clean_value = InputSanitizer.sanitize_string(value)
elif isinstance(value, dict):
clean_value = InputSanitizer.sanitize_dict(value, max_depth - 1)
elif isinstance(value, list):
clean_value = [
InputSanitizer.sanitize_string(item) if isinstance(item, str) else item
for item in value
]
else:
clean_value = value
sanitized[clean_key] = clean_value
return sanitized
```
**Integration:**
```python
# In tools/graph_tool.py
async def _upsert_node(self, node: GraphNode) -> None:
ensure_valid_identifier(node.key, field="node.key")
# Sanitize properties before storage
sanitized_props = InputSanitizer.sanitize_dict(node.properties)
labels = ["GraphNode"] + [self._normalise_label(label) for label in node.labels]
label_fragment = ":".join(labels)
await self._neo4j.execute_write(
f"MERGE (n:{label_fragment} {{key: $key}}) SET n += $props",
{"key": node.key, "props": sanitized_props},
)
```
---
### P1.2: Add User-Based Rate Limiting ⏱️ 4h
**Implementation:**
```python
from slowapi import Limiter
from slowapi.util import get_remote_address
def get_user_identifier(request: Request) -> str:
"""Extract user ID from JWT token for rate limiting"""
auth = request.headers.get("Authorization")
if auth and auth.startswith("Bearer "):
token = auth[7:]
try:
payload = jwt.decode(token, settings.auth_token, algorithms=["HS256"])
return payload.get("sub", get_remote_address(request))
except:
pass
return get_remote_address(request)
# Update limiter
limiter = Limiter(key_func=get_user_identifier)
```
---
### P1.3: Enhance JWT Secret Validation ⏱️ 2h
**Implementation:**
```python
import secrets
def validate_secret_entropy(secret: str, min_bits: int = 256):
"""Ensure secret has sufficient entropy"""
if len(secret) < min_bits // 8:
raise ValueError(
f"Secret must be at least {min_bits // 8} characters "
f"for {min_bits}-bit security"
)
# Check for common weak patterns
weak_patterns = ["password", "secret", "test", "demo", "change-me"]
if any(pattern in secret.lower() for pattern in weak_patterns):
raise ValueError("Secret contains weak patterns")
# In config.py
class Settings:
def __init__(self):
# ... existing code ...
validate_secret_entropy(self.auth_token, min_bits=256)
```
---
### P1.4: Implement Batch Graph Operations ⏱️ 6h
**Location:** `backend/mcp_server/tools/graph_tool.py`
**Implementation:**
```python
async def upsert(self, payload: GraphUpsertPayload) -> GraphUpsertResponse:
# Use single transaction for all operations
async def batch_upsert(tx):
# Batch insert nodes
for node in payload.nodes:
ensure_valid_identifier(node.key, field="node.key")
labels = ["GraphNode"] + [self._normalise_label(l) for l in node.labels]
label_fragment = ":".join(labels)
await tx.run(
f"MERGE (n:{label_fragment} {{key: $key}}) SET n += $props",
{"key": node.key, "props": node.properties}
)
# Batch insert relationships
for rel in payload.relationships:
ensure_valid_identifier(rel.start, field="relationship.start")
ensure_valid_identifier(rel.end, field="relationship.end")
rel_type = self._normalise_label(rel.type)
await tx.run(
f"MATCH (start:GraphNode {{key: $start}}) "
f"MATCH (end:GraphNode {{key: $end}}) "
f"MERGE (start)-[r:{rel_type}]->(end) "
"SET r += $props",
{"start": rel.start, "end": rel.end, "props": rel.properties}
)
await self._neo4j.execute_write_transaction(batch_upsert)
metrics = await self._neo4j.get_metrics()
return GraphUpsertResponse(metrics=metrics)
```
---
### P1.5: Add Request Correlation IDs ⏱️ 4h
**Implementation:**
```python
class CorrelationMiddleware(BaseHTTPMiddleware):
"""Propagate correlation IDs across service boundaries"""
async def dispatch(self, request: Request, call_next):
correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
request.state.correlation_id = correlation_id
# Set in logging context
structlog.contextvars.bind_contextvars(correlation_id=correlation_id)
response = await call_next(request)
response.headers["X-Correlation-ID"] = correlation_id
structlog.contextvars.unbind_contextvars("correlation_id")
return response
# Add to app
app.add_middleware(CorrelationMiddleware)
```
---
## PRIORITY 2: CODE QUALITY IMPROVEMENTS
### P2.1: Add Comprehensive Error Context ⏱️ 4h
**Pattern:**
```python
# Before
raise ValueError("Token contains no valid roles")
# After
raise ValueError(
f"Token contains no valid roles. "
f"Found role strings: {role_strings}, "
f"Expected one of: {[r.value for r in Role]}"
)
```
### P2.2: Standardize Logging Patterns ⏱️ 4h
**Pattern:**
```python
# Use structlog everywhere
import structlog
logger = structlog.get_logger(__name__)
logger.info(
"query_executed",
query_type="read",
duration_ms=duration * 1000,
correlation_id=request.state.correlation_id
)
```
### P2.3: Add API Versioning ⏱️ 4h
**Implementation:**
```python
from fastapi import APIRouter
v1_router = APIRouter(prefix="/api/v1")
@v1_router.post("/execute_code")
async def execute_code_v1(...):
...
app.include_router(v1_router)
```
---
## TESTING CHECKLIST
### Load Tests
- [ ] 500 RPS sustained for 10 minutes
- [ ] 1000 concurrent users
- [ ] Memory usage stable under load
- [ ] No connection pool exhaustion
### Chaos Tests
- [ ] Database network partition recovery
- [ ] Redis failure graceful degradation
- [ ] Process crash recovery
### Security Tests
- [ ] Cypher injection attempts blocked
- [ ] JWT forgery attempts fail
- [ ] Rate limit bypasses prevented
- [ ] XSS payloads sanitized
---
## DEPLOYMENT CHECKLIST
### Pre-Deployment
- [ ] All tests passing (unit + integration + load)
- [ ] Code coverage >90%
- [ ] Security scan passed
- [ ] Performance benchmarks met
### Deployment
- [ ] Database migrations applied
- [ ] Redis cluster configured
- [ ] Monitoring dashboards created
- [ ] Alerts configured
### Post-Deployment
- [ ] Monitor error rates <0.1% for 24h
- [ ] Validate latency P99 <200ms
- [ ] Check resource utilization <70%
- [ ] Verify logs for correlation IDs
---
## ROLLBACK PLAN
If any issues occur:
1. **Immediate:** Revert to previous version
2. **Within 1h:** Identify root cause
3. **Within 4h:** Fix or escalate
4. **Within 24h:** Post-mortem document
**Rollback Command:**
```bash
git revert HEAD~1
docker-compose up -d --build
```
---
**Document Version:** 1.0
**Last Updated:** October 21, 2025