database_endpoints.py•7.74 kB
"""
Database-specific HTTP endpoints for the MCP HTTP Bridge
Provides REST API access to the new clean architecture components
"""
import logging
from typing import Dict, Any, Optional
from fastapi import HTTPException
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# Request/Response Models for Database Operations
class SmartSearchRequest(BaseModel):
question: str
include_schema: Optional[bool] = True
max_sql_queries: Optional[int] = 3
max_semantic_results: Optional[int] = 10
class SmartSearchResponse(BaseModel):
success: bool
question: str
strategy_used: str
response: str
metadata: Dict[str, Any]
error: Optional[str] = None
class SchemaInfoRequest(BaseModel):
include_sample_data: Optional[bool] = False
class SchemaInfoResponse(BaseModel):
success: bool
summary: str
table_count: int
relationship_count: int
tables: list
error: Optional[str] = None
class SQLExecutionRequest(BaseModel):
sql: str
limit: Optional[bool] = True
class SQLExecutionResponse(BaseModel):
success: bool
data: Optional[list] = None
rows_affected: int
execution_time: float
query: Optional[str] = None
error: Optional[str] = None
class SemanticSearchRequest(BaseModel):
query: str
limit: Optional[int] = 10
table_filter: Optional[str] = None
fk_filter: Optional[str] = None
class SemanticSearchResponse(BaseModel):
success: bool
query: str
results: list
count: int
error: Optional[str] = None
def add_database_endpoints(app, mcp_server):
"""Add database-specific endpoints to the FastAPI app"""
@app.post("/api/database/smart-search", response_model=SmartSearchResponse)
async def smart_search(request: SmartSearchRequest):
"""Intelligent search that automatically determines the best strategy"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.smart_search_service.search(
question=request.question,
include_schema=request.include_schema,
max_sql_queries=request.max_sql_queries,
max_semantic_results=request.max_semantic_results
)
return SmartSearchResponse(**result)
except Exception as e:
logger.error(f"Smart search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/database/schema", response_model=SchemaInfoResponse)
async def get_schema_info(request: SchemaInfoRequest):
"""Get database schema information"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.schema_tools.get_schema_info(
include_sample_data=request.include_sample_data
)
return SchemaInfoResponse(**result)
except Exception as e:
logger.error(f"Get schema info failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/database/sql/execute", response_model=SQLExecutionResponse)
async def execute_sql(request: SQLExecutionRequest):
"""Execute a SQL query safely"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.sql_tools.execute_sql(
sql=request.sql,
limit=request.limit
)
return SQLExecutionResponse(**result)
except Exception as e:
logger.error(f"SQL execution failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/database/sql/validate")
async def validate_sql(request: SQLExecutionRequest):
"""Validate SQL without executing"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.sql_tools.validate_sql(sql=request.sql)
return result
except Exception as e:
logger.error(f"SQL validation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/database/semantic/search", response_model=SemanticSearchResponse)
async def semantic_search(request: SemanticSearchRequest):
"""Perform semantic search"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.semantic_tools.semantic_search(
query=request.query,
limit=request.limit,
table_filter=request.table_filter,
fk_filter=request.fk_filter
)
return SemanticSearchResponse(**result)
except Exception as e:
logger.error(f"Semantic search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/database/capabilities")
async def get_search_capabilities():
"""Get information about search system capabilities"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.smart_search_service.get_search_capabilities()
return {
'success': True,
'capabilities': result
}
except Exception as e:
logger.error(f"Get capabilities failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/database/suggest-questions")
async def suggest_questions(topic: Optional[str] = None):
"""Suggest example questions based on available data"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.smart_search_service.suggest_questions(topic)
return {
'success': True,
'suggestions': result,
'count': len(result)
}
except Exception as e:
logger.error(f"Suggest questions failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/database/tables/find-relevant")
async def find_relevant_tables(question: str):
"""Find tables relevant to a question"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.schema_tools.find_relevant_tables(question=question)
return result
except Exception as e:
logger.error(f"Find relevant tables failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/database/tables/{table_name}")
async def get_table_details(table_name: str):
"""Get detailed information about a specific table"""
try:
if not mcp_server:
raise HTTPException(status_code=503, detail="MCP Database Server not available")
result = await mcp_server.schema_tools.get_table_details(table_name=table_name)
return result
except Exception as e:
logger.error(f"Get table details failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
logger.info("Database endpoints added to FastAPI app")