import asyncio
import logging
import os
from typing import Dict, Any
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from dotenv import load_dotenv
from .core.factory import AdapterFactory
from .interfaces.database_adapter import DatabaseAdapter
# Load env
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("sql_mcp")
# Initialize Server
app = Server("sql-mcp")
# Global adapter
# In a real scenario, we might support multiple active connections/adapters,
# but for now we follow the pattern of 1 server instance = 1 DB connection context.
_adapter: DatabaseAdapter = None
def get_adapter() -> DatabaseAdapter:
global _adapter
if _adapter is None:
db_type = os.getenv("DB_TYPE", "mysql")
config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT"), # Do not default to 3306 here, let adapter decide
"user": os.getenv("DB_USER", "root"),
"password": os.getenv("DB_PASSWORD", ""),
"database": os.getenv("DB_DATABASE", "test"),
"server": os.getenv("DB_HOST", "localhost"), # auto-alias: DB_HOST values flow here too
"dsn": os.getenv("DB_DSN", ""),
"server_name": os.getenv("DB_SERVER_NAME", "")
}
logger.info(f"Initializing adapter for {db_type}...")
_adapter = AdapterFactory.create(db_type, config)
return _adapter
@app.list_resources()
async def list_resources() -> list[Resource]:
adapter = get_adapter()
tables = adapter.list_tables()
return [
Resource(
uri=f"db://{t}/data",
name=f"Table: {t}",
mimeType="text/plain",
description=f"Data in table {t}"
) for t in tables
]
@app.read_resource()
async def read_resource(uri: Any) -> str:
adapter = get_adapter()
uri_str = str(uri)
table_name = uri_str.split("/")[-2] # generic parsing
result = adapter.execute_query(f"SELECT * FROM {table_name} LIMIT 50")
return str(result["rows"])
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_query",
description="Execute a SQL query on the configured database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
),
Tool(
name="describe_table",
description="Get table schema",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string"}
},
"required": ["table_name"]
}
),
Tool(
name="get_db_version",
description="Get database version info",
inputSchema={"type": "object", "properties": {}}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
adapter = get_adapter()
try:
if name == "execute_query":
result = adapter.execute_query(arguments["query"])
return [TextContent(type="text", text=str(result))]
elif name == "describe_table":
result = adapter.describe_table(arguments["table_name"])
return [TextContent(type="text", text=str(result))]
elif name == "get_db_version":
version = adapter.get_version()
return [TextContent(type="text", text=version)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
from mcp.server.stdio import stdio_server
logger.info("Starting SQL MCP Server...")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())