#!/usr/bin/env python3
"""Detailed implementation for HIGH priority security recommendations."""
# 1. š“ HIGH: Continue monitoring for hardcoded credentials in future development
def setup_credential_monitoring():
"""Pre-commit hook to scan for hardcoded credentials."""
# .pre-commit-config.yaml
pre_commit_config = """
repos:
- repo: https://github.com/Yelp/detect-secrets
rev: v1.4.0
hooks:
- id: detect-secrets
args: ['--baseline', '.secrets.baseline']
exclude: |
(?x)^(
.*\.pyc|
.*\.pyo|
.*\.egg-info/.*|
.venv/.*|
__pycache__/.*
)$
- repo: https://github.com/gitguardian/ggshield
rev: v1.25.0
hooks:
- id: ggshield
language: python
stages: [commit]
"""
# GitHub Actions workflow for credential scanning
github_workflow = """
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run GitGuardian scan
uses: GitGuardian/ggshield-action@v1.25.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITGUARDIAN_API_KEY: ${{ secrets.GITGUARDIAN_API_KEY }}
"""
return pre_commit_config, github_workflow
# 2. š“ HIGH: Implement additional rate limiting for production deployments
class ProductionRateLimiter:
"""Enhanced rate limiting for production MCP server."""
def __init__(self):
self.request_counts = {}
self.blocked_ips = set()
def enhanced_rate_limiting_config(self):
"""Production-ready rate limiting configuration."""
return {
# Per-client rate limits
"requests_per_minute": 60,
"requests_per_hour": 1000,
"requests_per_day": 10000,
# Per-tool rate limits
"assess_security_posture_per_hour": 100,
"health_check_per_minute": 10,
# Burst protection
"burst_limit": 10,
"burst_window_seconds": 60,
# Progressive penalties
"warning_threshold": 0.8, # 80% of limit
"block_threshold": 1.0, # 100% of limit
"block_duration_minutes": 15,
# Whitelist for trusted clients
"whitelisted_clients": [
"claude-desktop",
"cursor-ide",
"internal-monitoring"
]
}
def implement_rate_limiting_middleware(self):
"""Rate limiting middleware implementation."""
middleware_code = '''
import time
from collections import defaultdict, deque
from typing import Dict, Set
from functools import wraps
class RateLimiter:
def __init__(self, config: dict):
self.config = config
self.request_history: Dict[str, deque] = defaultdict(deque)
self.blocked_clients: Set[str] = set()
self.block_expiry: Dict[str, float] = {}
def is_rate_limited(self, client_id: str, tool_name: str) -> tuple[bool, str]:
"""Check if client is rate limited."""
current_time = time.time()
# Remove expired blocks
if client_id in self.block_expiry:
if current_time > self.block_expiry[client_id]:
self.blocked_clients.discard(client_id)
del self.block_expiry[client_id]
# Check if client is blocked
if client_id in self.blocked_clients:
remaining = int(self.block_expiry[client_id] - current_time)
return True, f"Rate limited. Try again in {remaining} seconds"
# Check rate limits
history = self.request_history[client_id]
# Clean old requests
minute_ago = current_time - 60
while history and history[0] < minute_ago:
history.popleft()
# Check limits
requests_last_minute = len(history)
if requests_last_minute >= self.config["requests_per_minute"]:
self._block_client(client_id, current_time)
return True, "Rate limit exceeded"
# Record request
history.append(current_time)
return False, ""
def _block_client(self, client_id: str, current_time: float):
"""Block client for configured duration."""
self.blocked_clients.add(client_id)
block_duration = self.config["block_duration_minutes"] * 60
self.block_expiry[client_id] = current_time + block_duration
def rate_limit_decorator(rate_limiter: RateLimiter):
"""Decorator for MCP tools to enforce rate limiting."""
def decorator(func):
@wraps(func)
async def wrapper(ctx, *args, **kwargs):
client_id = getattr(ctx, 'client_id', 'unknown')
tool_name = func.__name__
is_limited, message = rate_limiter.is_rate_limited(client_id, tool_name)
if is_limited:
return {
"error": "RateLimitExceeded",
"message": message,
"retry_after": rate_limiter.config["block_duration_minutes"] * 60
}
return await func(ctx, *args, **kwargs)
return wrapper
return decorator
'''
return middleware_code
def production_deployment_config(self):
"""Production deployment configuration with rate limiting."""
return {
# Environment variables for production
"environment_variables": {
"AWS_SECURITY_ADVISOR_RATE_LIMIT_ENABLED": "true",
"AWS_SECURITY_ADVISOR_REQUESTS_PER_MINUTE": "60",
"AWS_SECURITY_ADVISOR_REQUESTS_PER_HOUR": "1000",
"AWS_SECURITY_ADVISOR_BLOCK_DURATION": "15",
"AWS_SECURITY_ADVISOR_MONITORING_ENABLED": "true"
},
# Docker configuration
"docker_limits": {
"memory": "512m",
"cpu": "0.5",
"restart_policy": "unless-stopped"
},
# Monitoring and alerting
"monitoring": {
"rate_limit_violations_alert": True,
"blocked_clients_alert": True,
"unusual_traffic_alert": True
}
}
def implement_security_monitoring():
"""Security monitoring implementation."""
monitoring_code = '''
import logging
from datetime import datetime, timedelta
from collections import Counter
class SecurityMonitor:
def __init__(self):
self.security_events = []
self.alert_thresholds = {
"failed_auth_attempts": 5,
"rate_limit_violations": 10,
"suspicious_patterns": 3
}
def log_security_event(self, event_type: str, client_id: str, details: dict):
"""Log security events for monitoring."""
event = {
"timestamp": datetime.utcnow(),
"event_type": event_type,
"client_id": client_id,
"details": details
}
self.security_events.append(event)
# Check for alerts
self._check_alert_conditions(event)
def _check_alert_conditions(self, event):
"""Check if event triggers security alerts."""
recent_events = [
e for e in self.security_events
if e["timestamp"] > datetime.utcnow() - timedelta(hours=1)
]
# Count events by type and client
event_counts = Counter(
(e["event_type"], e["client_id"])
for e in recent_events
)
# Trigger alerts if thresholds exceeded
for (event_type, client_id), count in event_counts.items():
threshold = self.alert_thresholds.get(event_type.split("_")[0] + "_" + event_type.split("_")[1], 999)
if count >= threshold:
self._send_security_alert(event_type, client_id, count)
def _send_security_alert(self, event_type: str, client_id: str, count: int):
"""Send security alert to monitoring system."""
alert = {
"severity": "HIGH",
"event_type": event_type,
"client_id": client_id,
"count": count,
"timestamp": datetime.utcnow().isoformat()
}
# Log to security log
logging.getLogger("security").warning(
f"Security alert: {event_type} from {client_id} ({count} events)"
)
# Send to monitoring system (implement based on your setup)
# self._send_to_monitoring_system(alert)
'''
return monitoring_code
if __name__ == "__main__":
print("š Security Recommendations Implementation Guide")
print("=" * 55)
# 1. Credential Monitoring Setup
print("\n1ļøā£ Hardcoded Credentials Monitoring")
print("-" * 35)
print("ā
Pre-commit hooks for credential scanning")
print("ā
GitHub Actions security workflow")
print("ā
GitGuardian integration")
print("ā
Secrets baseline management")
# 2. Rate Limiting Implementation
print("\n2ļøā£ Production Rate Limiting")
print("-" * 30)
limiter = ProductionRateLimiter()
config = limiter.enhanced_rate_limiting_config()
print(f"ā
Requests per minute: {config['requests_per_minute']}")
print(f"ā
Requests per hour: {config['requests_per_hour']}")
print(f"ā
Block duration: {config['block_duration_minutes']} minutes")
print(f"ā
Burst protection: {config['burst_limit']} requests")
print(f"ā
Whitelisted clients: {len(config['whitelisted_clients'])}")
print("\nšÆ Implementation Steps:")
print("1. Add pre-commit hooks to repository")
print("2. Configure GitHub Actions security scanning")
print("3. Implement rate limiting middleware")
print("4. Add security monitoring and alerting")
print("5. Configure production deployment limits")
print("6. Set up monitoring dashboards")
print("\nā
Ready for production deployment with enhanced security!")