"""
Unified MCP server entry point
Integrates all MCP tools and services into a single server
"""
import logging
import asyncio
from typing import Dict, Any, Optional
from sqlalchemy import create_engine
# MCP SDK imports (assuming standard MCP server setup)
from mcp.server import Server
from mcp.types import Tool
# Repository layer
from repositories.postgres_repository import PostgresRepository
from repositories.vector_repository import VectorRepository
from repositories.embedding_repository import EmbeddingRepository
# Service layer
from services.schema_service import SchemaService
from services.sql_service import SQLService
from services.semantic_service import SemanticService
from services.synthesis_service import SynthesisService
from services.smart_search_service import SmartSearchService
# MCP tools layer
from presentation.tools.schema_tools import SchemaTools
from presentation.tools.sql_tools import SQLTools
from presentation.tools.semantic_tools import SemanticTools
from presentation.tools.synthesis_tools import SynthesisTools
# Configuration
from shared.models import DatabaseConnection, EmbeddingConfig
from shared.exceptions import ConfigurationError
logger = logging.getLogger(__name__)
class MCPDatabaseServer:
"""
Unified MCP server for database operations
Provides all database and search functionality through MCP protocol
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server = Server("database-mcp-server")
# Infrastructure components
self.engine = None
# Repository layer
self.postgres_repo = None
self.vector_repo = None
self.embedding_repo = None
# Service layer
self.schema_service = None
self.sql_service = None
self.semantic_service = None
self.synthesis_service = None
self.smart_search_service = None
# MCP tools
self.schema_tools = None
self.sql_tools = None
self.semantic_tools = None
self.synthesis_tools = None
async def initialize(self):
"""Initialize all components"""
try:
logger.info("Initializing MCP Database Server...")
# Setup database connection
await self._setup_database()
# Initialize repositories
await self._setup_repositories()
# Initialize services
await self._setup_services()
# Initialize MCP tools
await self._setup_mcp_tools()
# Register MCP tools with server
await self._register_tools()
logger.info("MCP Database Server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize MCP server: {e}")
raise ConfigurationError(f"Server initialization failed: {e}")
async def _setup_database(self):
"""Setup database connection"""
db_config = self.config.get('database', {})
if not db_config:
raise ConfigurationError("Database configuration is required")
connection_string = db_config.get('connection_string')
if not connection_string:
# Build from components
db_conn = DatabaseConnection(
host=db_config['host'],
port=db_config['port'],
database=db_config['database'],
username=db_config['username'],
password=db_config['password'],
schema=db_config.get('schema')
)
connection_string = db_conn.get_connection_string()
self.engine = create_engine(connection_string)
logger.info("Database connection established")
async def _setup_repositories(self):
"""Initialize repository layer"""
self.postgres_repo = PostgresRepository(self.engine)
self.vector_repo = VectorRepository(self.engine)
self.embedding_repo = EmbeddingRepository(self.engine)
logger.info("Repository layer initialized")
async def _setup_services(self):
"""Initialize service layer"""
# Basic services
self.schema_service = SchemaService(self.postgres_repo)
self.sql_service = SQLService(
self.postgres_repo,
self.schema_service,
self.config.get('llm', {})
)
self.semantic_service = SemanticService(
self.vector_repo,
self.config.get('embedding', {})
)
self.synthesis_service = SynthesisService(self.config.get('llm', {}))
# Smart search orchestrator
self.smart_search_service = SmartSearchService(
self.schema_service,
self.sql_service,
self.semantic_service,
self.synthesis_service,
self.config.get('llm', {})
)
logger.info("Service layer initialized")
async def _setup_mcp_tools(self):
"""Initialize MCP tools"""
self.schema_tools = SchemaTools(self.schema_service)
self.sql_tools = SQLTools(self.sql_service)
self.semantic_tools = SemanticTools(self.semantic_service)
self.synthesis_tools = SynthesisTools(self.synthesis_service)
logger.info("MCP tools initialized")
async def _register_tools(self):
"""Register all MCP tools with the server"""
# Schema tools
self.server.add_tool(Tool(
name="get_schema_info",
description="Get database schema information",
parameters={
"type": "object",
"properties": {
"include_sample_data": {
"type": "boolean",
"description": "Whether to include sample data",
"default": False
}
}
}
))
self.server.add_tool(Tool(
name="find_relevant_tables",
description="Find tables relevant to a question",
parameters={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "Natural language question"
}
},
"required": ["question"]
}
))
self.server.add_tool(Tool(
name="get_table_details",
description="Get detailed information about a specific table",
parameters={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
))
# SQL tools
self.server.add_tool(Tool(
name="execute_sql",
description="Execute a SQL query safely",
parameters={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to execute"
},
"limit": {
"type": "boolean",
"description": "Whether to apply row limit for safety",
"default": True
}
},
"required": ["sql"]
}
))
self.server.add_tool(Tool(
name="validate_sql",
description="Validate SQL without executing",
parameters={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to validate"
}
},
"required": ["sql"]
}
))
self.server.add_tool(Tool(
name="explain_sql",
description="Get execution plan for SQL query",
parameters={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to explain"
}
},
"required": ["sql"]
}
))
# Semantic tools
self.server.add_tool(Tool(
name="semantic_search",
description="Perform semantic search",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
},
"table_filter": {
"type": "string",
"description": "Optional table filter"
},
"fk_filter": {
"type": "string",
"description": "Optional foreign key filter"
}
},
"required": ["query"]
}
))
# Smart search - the main orchestrated search
self.server.add_tool(Tool(
name="smart_search",
description="Intelligent search that automatically determines the best strategy",
parameters={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "Natural language question"
},
"include_schema": {
"type": "boolean",
"description": "Whether to include schema info in response",
"default": True
},
"max_sql_queries": {
"type": "integer",
"description": "Maximum SQL queries to execute",
"default": 3
},
"max_semantic_results": {
"type": "integer",
"description": "Maximum semantic search results",
"default": 10
}
},
"required": ["question"]
}
))
logger.info("All MCP tools registered")
# Register the tool call handler
@self.server.call_tool()
async def handle_tool_call(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP tool calls"""
try:
# Schema tools
if name == "get_schema_info":
return await self.schema_tools.get_schema_info(**arguments)
elif name == "find_relevant_tables":
return await self.schema_tools.find_relevant_tables(**arguments)
elif name == "get_table_details":
return await self.schema_tools.get_table_details(**arguments)
# SQL tools
elif name == "execute_sql":
return await self.sql_tools.execute_sql(**arguments)
elif name == "validate_sql":
return await self.sql_tools.validate_sql(**arguments)
elif name == "explain_sql":
return await self.sql_tools.explain_sql(**arguments)
# Semantic tools
elif name == "semantic_search":
return await self.semantic_tools.semantic_search(**arguments)
# Smart search
elif name == "smart_search":
return await self.smart_search_service.search(**arguments)
else:
return {
'success': False,
'error': f"Unknown tool: {name}"
}
except Exception as e:
logger.error(f"Tool call failed: {name} - {e}")
return {
'success': False,
'error': str(e)
}
async def start(self, transport_type: str = "stdio"):
"""Start the MCP server"""
try:
await self.initialize()
if transport_type == "stdio":
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(read_stream, write_stream)
else:
raise ConfigurationError(f"Unsupported transport type: {transport_type}")
except Exception as e:
logger.error(f"Failed to start MCP server: {e}")
raise
async def main():
"""Main entry point"""
# Example configuration - this would normally come from environment/config file
config = {
'database': {
'host': 'localhost',
'port': 5432,
'database': 'your_database',
'username': 'your_username',
'password': 'your_password'
},
'llm': {
'provider': 'openai',
'model': 'gpt-4',
'api_key': 'your_api_key'
},
'embedding': {
'provider': 'openai',
'model': 'text-embedding-ada-002',
'api_key': 'your_api_key'
}
}
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create and start server
server = MCPDatabaseServer(config)
await server.start()
if __name__ == "__main__":
asyncio.run(main())