"""
MCP stdio Server
Provides your database tools and resources via MCP protocol over stdio
"""
import asyncio
import logging
import sys
from pathlib import Path
from typing import Any, Dict
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from mcp.server import Server
from mcp.types import Tool, Resource
import mcp.server.stdio
from services.mcp_resources_service import MCPResourcesService
from services.mcp_tools_service import MCPToolsService
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StdioMCPServer:
"""MCP server that communicates via stdio"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or self._get_default_config()
self.server = Server("database-mcp-server")
# Initialize services
self.resources_service = MCPResourcesService(self.config)
self.tools_service = MCPToolsService(self.config)
self._register_handlers()
def _get_default_config(self):
"""Get default configuration"""
try:
from config import Config
database_url = Config.SQLALCHEMY_BINDS['db3']
except:
database_url = 'postgresql://postgres:postgres@localhost/postgres'
return {
'database': {
'connection_string': database_url
}
}
def _register_handlers(self):
"""Register MCP protocol handlers"""
@self.server.list_resources()
async def handle_list_resources() -> list[Resource]:
"""Handle MCP resource listing"""
try:
available_resources = self.resources_service.list_resources()
resources = []
for resource_def in available_resources:
resource = Resource(
uri=resource_def["uri"],
name=resource_def["name"],
description=resource_def["description"],
mimeType=resource_def["mimeType"]
)
resources.append(resource)
return resources
except Exception as e:
logger.error(f"Error listing resources: {e}")
return []
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Handle MCP resource reading"""
try:
result = await self.resources_service.read_resource(uri)
return result.get("text", "{}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
return f'{{"error": "{str(e)}", "uri": "{uri}"}}'
@self.server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""Handle MCP tool listing"""
try:
tool_info = self.tools_service.get_available_tools()
tools = []
for tool_def in tool_info.get("tools", []):
tool = Tool(
name=tool_def["name"],
description=tool_def["description"],
inputSchema=tool_def["parameters"]
)
tools.append(tool)
return tools
except Exception as e:
logger.error(f"Error listing tools: {e}")
return []
@self.server.call_tool()
async def handle_tool_call(name: str, arguments: Dict[str, Any]) -> list[Any]:
"""Handle MCP tool calls"""
try:
result = await self.tools_service.handle_tool_call(name, arguments)
return [result]
except Exception as e:
logger.error(f"Error handling tool call {name}: {e}")
return [{
"success": False,
"error": str(e),
"tool_name": name,
"arguments": arguments
}]
async def run(self):
"""Run the MCP server"""
logger.info("Starting MCP stdio server...")
logger.info("Available databases: db1, db2, db3")
logger.info("Ready to handle MCP protocol via stdio")
async def main():
"""Main entry point"""
server = StdioMCPServer()
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
None
)
if __name__ == "__main__":
asyncio.run(main())