server.py•8.16 kB
"""Main MCP server implementation for GCP services."""
import asyncio
import json
import logging
import os
from typing import Any, Dict, List, Optional, Sequence
import structlog
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.server import NotificationOptions
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from .auth import GCPAuthenticator
from .config import Config
from .tools.logging_tools import LoggingTools
from .tools.enterprise_logging_tools import EnterpriseLoggingTools
from .tools.monitoring_tools import MonitoringTools
from .tools.enterprise_monitoring_tools import EnterpriseMonitoringTools
from .exceptions import GCPMCPError
logger = structlog.get_logger(__name__)
class GCPMCPServer:
"""Main MCP server for GCP services."""
def __init__(self, config_path: Optional[str] = None):
"""Initialize the GCP MCP server.
Args:
config_path: Path to configuration file
"""
# Check for config path in environment variable
if not config_path:
config_path = os.environ.get("GCP_MCP_CONFIG")
self.config = Config.load(config_path)
self.server = Server("gcp-mcp")
self.authenticator = GCPAuthenticator(self.config)
self.logging_tools = LoggingTools(self.authenticator, self.config)
self.enterprise_logging_tools = EnterpriseLoggingTools(self.authenticator, self.config)
self.monitoring_tools = MonitoringTools(self.authenticator, self.config)
self.enterprise_monitoring_tools = EnterpriseMonitoringTools(self.authenticator, self.config)
# Set up server handlers
self._setup_handlers()
logger.info("GCP MCP Server initialized", config=self.config.model_dump())
def _setup_handlers(self) -> None:
"""Set up MCP server handlers."""
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools."""
tools = []
# Add basic logging tools
tools.extend(await self.logging_tools.get_tools())
# Add enterprise logging tools
tools.extend(await self.enterprise_logging_tools.get_tools())
# Add basic monitoring tools
tools.extend(await self.monitoring_tools.get_tools())
# Add enterprise monitoring tools
tools.extend(await self.enterprise_monitoring_tools.get_tools())
logger.info("Listed tools", count=len(tools))
return tools
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls."""
logger.info("Tool called", name=name, arguments=arguments)
try:
# Route to appropriate tool handler
basic_logging_tools = ["query_logs", "analyze_error_logs", "get_recent_errors", "search_logs_by_message"]
enterprise_logging_tools = [
"advanced_log_query", "error_root_cause_analysis", "security_log_analysis",
"performance_log_analysis", "log_pattern_discovery", "cross_service_trace_analysis"
]
monitoring_tools = ["get_metrics"]
enterprise_monitoring_tools = [
"advanced_metrics_query", "sla_slo_analysis", "alert_policy_analysis",
"resource_optimization_analysis", "custom_dashboard_metrics", "infrastructure_health_check"
]
if name in basic_logging_tools:
return await self.logging_tools.handle_tool_call(name, arguments)
elif name in enterprise_logging_tools:
return await self.enterprise_logging_tools.handle_tool_call(name, arguments)
elif name in monitoring_tools:
return await self.monitoring_tools.handle_tool_call(name, arguments)
elif name in enterprise_monitoring_tools:
return await self.enterprise_monitoring_tools.handle_tool_call(name, arguments)
else:
raise GCPMCPError(f"Unknown tool: {name}")
except Exception as e:
logger.error("Tool call failed", name=name, error=str(e))
return [TextContent(type="text", text=f"Error: {str(e)}")]
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available resources."""
resources = []
# Add project resources
projects = await self.authenticator.list_accessible_projects()
for project in projects:
resources.append(
Resource(
uri=f"gcp://project/{project}",
name=f"GCP Project: {project}",
description=f"Google Cloud Project {project}",
mimeType="application/json"
)
)
logger.info("Listed resources", count=len(resources))
return resources
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a resource."""
logger.info("Reading resource", uri=uri)
if uri.startswith("gcp://project/"):
project_id = uri.replace("gcp://project/", "")
project_info = await self.authenticator.get_project_info(project_id)
return json.dumps(project_info, indent=2)
else:
raise GCPMCPError(f"Unknown resource URI: {uri}")
async def run(self) -> None:
"""Run the MCP server."""
logger.info("Starting GCP MCP Server")
try:
# Initialize GCP clients
await self.authenticator.initialize()
await self.logging_tools.initialize()
await self.enterprise_logging_tools.initialize()
await self.monitoring_tools.initialize()
await self.enterprise_monitoring_tools.initialize()
# Run the server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gcp-mcp",
server_version="0.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except Exception as e:
logger.error("Server failed", error=str(e))
raise
finally:
logger.info("Server shutting down")
async def main() -> None:
"""Main entry point."""
# Set up logging
logging.basicConfig(level=logging.INFO)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Create and run server
server = GCPMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())