from fastapi import HTTPException
from typing import Dict, Any, Optional
import logging
import os
import json
from cryptography.fernet import Fernet
logger = logging.getLogger(__name__)
# Encryption key for credentials (in production, use proper key management)
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key())
fernet = Fernet(ENCRYPTION_KEY)
# Supported services
VALID_SERVICES = {
"github": "GitHub API integration",
"vercel": "Vercel deployment service",
"linear": "Linear project management",
"notion": "Notion workspace integration",
"slack": "Slack messaging platform",
"discord": "Discord bot integration",
"google_workspace": "Google Workspace services"
}
async def delegate_to_service(
service_name: str,
payload: Dict[str, Any],
user_id: int
) -> Dict[str, Any]:
"""
Delegate MCP request to appropriate service handler
Args:
service_name: Name of the service to delegate to
payload: Request payload data
user_id: Authenticated user ID
Returns:
dict: Response from the service
Raises:
HTTPException: If service is not supported or credentials are invalid
"""
try:
# Validate service name
if service_name not in VALID_SERVICES:
logger.warning(f"Unsupported service requested: {service_name}")
raise HTTPException(
status_code=400,
detail=f"Service '{service_name}' is not supported. Available services: {list(VALID_SERVICES.keys())}"
)
logger.info(f"Delegating to service: {service_name} for user: {user_id}")
# Get encrypted credentials from database
credentials = await get_encrypted_credentials(user_id, service_name)
if credentials is None:
logger.warning(f"No credentials found for user {user_id} and service {service_name}")
raise HTTPException(
status_code=401,
detail=f"No credentials configured for service '{service_name}'"
)
# Decrypt credentials
decrypted_credentials = decrypt_credentials(credentials)
# Call respective service module
result = await call_service_module(
service_name=service_name,
payload=payload,
credentials=decrypted_credentials
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error delegating to service {service_name}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to process request for service '{service_name}': {str(e)}"
)
async def get_encrypted_credentials(user_id: int, service_name: str) -> Optional[Dict[str, str]]:
"""
Fetch encrypted credentials from database
Args:
user_id: User ID
service_name: Service name
Returns:
dict: Encrypted credentials or None if not found
"""
try:
# Mock database query - in production, implement actual database access
# This would typically query a user_credentials table
logger.debug(f"Fetching credentials for user {user_id} and service {service_name}")
# For demo purposes, return mock data
mock_credentials = {
"github": {"token": "mock_github_token", "username": "mock_user"},
"vercel": {"token": "mock_vercel_token", "team_id": "mock_team"},
"linear": {"api_key": "mock_linear_key"},
"notion": {"token": "mock_notion_token", "database_id": "mock_db"},
"slack": {"bot_token": "mock_slack_token", "channel": "mock_channel"},
"discord": {"bot_token": "mock_discord_token", "guild_id": "mock_guild"},
"google_workspace": {"service_account_json": "mock_service_account"}
}
return mock_credentials.get(service_name)
except Exception as e:
logger.error(f"Error fetching credentials: {e}")
return None
def decrypt_credentials(encrypted_data: Dict[str, str]) -> Dict[str, str]:
"""
Decrypt credentials using Fernet encryption
Args:
encrypted_data: Encrypted credential data
Returns:
dict: Decrypted credentials
"""
try:
decrypted_data = {}
for key, value in encrypted_data.items():
if isinstance(value, str):
# In production, decrypt the actual encrypted values
# For now, return as-is for demo
decrypted_data[key] = value
else:
decrypted_data[key] = value
logger.debug("Credentials decrypted successfully")
return decrypted_data
except Exception as e:
logger.error(f"Error decrypting credentials: {e}")
raise HTTPException(
status_code=500,
detail="Failed to decrypt credentials"
)
async def call_service_module(
service_name: str,
payload: Dict[str, Any],
credentials: Dict[str, str]
) -> Dict[str, Any]:
"""
Call the respective service module with decrypted credentials
Args:
service_name: Name of the service
payload: Request payload
credentials: Decrypted credentials
Returns:
dict: Response from service module
"""
try:
logger.info(f"Calling service module: {service_name}")
# For now, return dummy JSON response
# In production, this would import and call the actual service module
dummy_responses = {
"github": {
"status": "success",
"action": "github_action",
"data": {"repositories": 15, "commits": 234},
"message": "GitHub operation completed"
},
"vercel": {
"status": "success",
"action": "vercel_deployment",
"data": {"deployments": 3, "build_time": "45s"},
"message": "Vercel deployment completed"
},
"linear": {
"status": "success",
"action": "linear_ticket",
"data": {"tickets": 8, "completed": 5},
"message": "Linear issue updated"
},
"notion": {
"status": "success",
"action": "notion_page",
"data": {"pages": 2, "databases": 1},
"message": "Notion page created"
},
"slack": {
"status": "success",
"action": "slack_message",
"data": {"channels": 1, "messages_sent": 1},
"message": "Slack message sent"
},
"discord": {
"status": "success",
"action": "discord_message",
"data": {"guilds": 1, "messages": 1},
"message": "Discord message sent"
},
"google_workspace": {
"status": "success",
"action": "google_api_call",
"data": {"files": 3, "updates": 1},
"message": "Google Workspace operation completed"
}
}
response = dummy_responses.get(service_name, {
"status": "success",
"action": f"{service_name}_action",
"data": {"mock": True, "payload": payload},
"message": f"{service_name.title()} operation completed"
})
# Add metadata to response
response.update({
"service": service_name,
"user_id": 1, # This should come from the actual user context
"timestamp": "2025-11-01T14:15:22Z"
})
logger.info(f"Service {service_name} call completed successfully")
return response
except Exception as e:
logger.error(f"Error calling service module {service_name}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to execute {service_name} operation"
)
async def validate_service_credentials(service_name: str, credentials: Dict[str, str]) -> bool:
"""
Validate service credentials
Args:
service_name: Name of the service
credentials: Credentials to validate
Returns:
bool: True if credentials are valid
"""
try:
# Mock credential validation
# In production, this would make actual API calls to validate credentials
required_fields = {
"github": ["token", "username"],
"vercel": ["token"],
"linear": ["api_key"],
"notion": ["token"],
"slack": ["bot_token"],
"discord": ["bot_token"],
"google_workspace": ["service_account_json"]
}
if service_name not in required_fields:
return False
required = required_fields[service_name]
has_all_required = all(field in credentials for field in required)
logger.debug(f"Credential validation for {service_name}: {has_all_required}")
return has_all_required
except Exception as e:
logger.error(f"Error validating credentials for {service_name}: {e}")
return False