"""
Thin MCP Server - Protocol layer only
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent, ImageContent, EmbeddedResource
# Import services and repositories
from repositories.postgres_repository import PostgresRepository
from repositories.vector_repository import VectorRepository
from services.schema_service import SchemaService
from services.sql_service import SQLService
from services.semantic_service import SemanticService
from services.synthesis_service import SynthesisService
from services.smart_search import SmartSearch
# Import MCP tools
from presentation.tools.search_tools import SearchTools
from presentation.tools.sql_tools import SQLTools
from presentation.tools.schema_tools import SchemaTools
# Import configuration and dependencies
from shared.config import load_config
from shared.exceptions import MCPError
logger = logging.getLogger(__name__)
class MCPDatabaseServer:
"""
Thin MCP server that coordinates between protocol and business logic
"""
def __init__(self, config_path: str = None):
self.config = load_config(config_path)
self.server = Server("database-assistant")
# Services will be initialized in startup
self.smart_search = None
self.search_tools = None
self.sql_tools = None
self.schema_tools = None
async def initialize_services(self):
"""Initialize all services and repositories"""
try:
logger.info("Initializing database services...")
# Initialize repositories
postgres_repo = PostgresRepository(self.config.database.engine)
vector_repo = VectorRepository(self.config.database.engine)
# Initialize services
schema_service = SchemaService(postgres_repo)
sql_service = SQLService(postgres_repo, schema_service, self.config.llm)
semantic_service = SemanticService(vector_repo, self.config.embeddings)
synthesis_service = SynthesisService(self.config.llm)
# Initialize orchestrator
self.smart_search = SmartSearch(
schema_service=schema_service,
sql_service=sql_service,
semantic_service=semantic_service,
synthesis_service=synthesis_service
)
# Initialize MCP tools
self.search_tools = SearchTools(self.smart_search)
self.sql_tools = SQLTools(sql_service)
self.schema_tools = SchemaTools(schema_service)
logger.info("Services initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize services: {e}")
raise MCPError(f"Service initialization failed: {e}")
def register_tools(self):
"""Register all MCP tools"""
# Search tools
@self.server.list_tools()
async def handle_list_tools() -> list[Tool]:
return [
Tool(
name="answer_question",
description="Answer natural language questions about the database using SQL and semantic search",
inputSchema={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "Natural language question about the data"
},
"filters": {
"type": "object",
"description": "Optional filters to apply to the search",
"default": {}
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 10
},
"include_sql": {
"type": "boolean",
"description": "Whether to include SQL-based search",
"default": True
},
"include_semantic": {
"type": "boolean",
"description": "Whether to include semantic search",
"default": True
}
},
"required": ["question"]
}
),
Tool(
name="semantic_search",
description="Search documents using semantic similarity",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
},
"filters": {
"type": "object",
"description": "Optional filters to apply",
"default": {}
}
},
"required": ["query"]
}
),
Tool(
name="execute_sql",
description="Execute a SQL query safely",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to execute"
},
"limit": {
"type": "boolean",
"description": "Whether to apply row limit for safety",
"default": True
}
},
"required": ["sql"]
}
),
Tool(
name="get_schema_info",
description="Get database schema information",
inputSchema={
"type": "object",
"properties": {
"include_sample_data": {
"type": "boolean",
"description": "Whether to include sample data",
"default": False
}
}
}
),
Tool(
name="get_search_capabilities",
description="Get information about available search capabilities",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "answer_question":
result = await self.search_tools.answer_question(**arguments)
elif name == "semantic_search":
result = await self.search_tools.semantic_search(**arguments)
elif name == "execute_sql":
result = await self.sql_tools.execute_sql(**arguments)
elif name == "get_schema_info":
result = await self.schema_tools.get_schema_info(**arguments)
elif name == "get_search_capabilities":
result = await self.search_tools.get_search_capabilities()
else:
raise MCPError(f"Unknown tool: {name}")
return [TextContent(type="text", text=str(result))]
except Exception as e:
logger.error(f"Tool {name} failed: {e}")
error_result = {"success": False, "error": str(e)}
return [TextContent(type="text", text=str(error_result))]
async def run(self):
"""Run the MCP server"""
try:
await self.initialize_services()
self.register_tools()
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="database-assistant",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=None,
experimental_capabilities=None,
),
),
)
except Exception as e:
logger.error(f"Server failed: {e}")
raise
def main():
"""Main entry point"""
import sys
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Get config path from command line args
config_path = sys.argv[1] if len(sys.argv) > 1 else None
# Create and run server
server = MCPDatabaseServer(config_path)
asyncio.run(server.run())
if __name__ == "__main__":
main()