http_wrapper.pyā¢9.95 kB
"""HTTP wrapper for SQLite MCP Server using FastAPI.
This module exposes the SQLite database operations via HTTP REST API,
making it easier for chatbot applications to call MCP tools.
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Any, Dict, List, Optional
from sqlite_mcp.db import SQLiteDatabase
# Initialize database
db = SQLiteDatabase()
# Request/Response Models
class ExecuteQueryRequest(BaseModel):
"""Request model for execute_query tool."""
query: str
parameters: Optional[List[Any]] = None
class InsertRequest(BaseModel):
"""Request model for insert tool."""
table: str
data: Dict[str, Any]
class UpdateRequest(BaseModel):
"""Request model for update tool."""
table: str
data: Dict[str, Any]
where: str
where_params: Optional[List[Any]] = None
class DeleteRequest(BaseModel):
"""Request model for delete tool."""
table: str
where: str
where_params: Optional[List[Any]] = None
class CreateTableRequest(BaseModel):
"""Request model for create_table tool."""
table: str
schema: str
class OpenDatabaseRequest(BaseModel):
"""Request model for open_database tool."""
path: str
class GetTableSchemaRequest(BaseModel):
"""Request model for get_table_schema tool."""
table: str
class MissingDatabaseError(HTTPException):
"""Exception when database is not open."""
def __init__(self):
super().__init__(
status_code=400,
detail="No database is open. Call open_database first."
)
# Create FastAPI app
app = FastAPI(
title="SQLite MCP Server - HTTP Wrapper",
description="HTTP REST API wrapper for SQLite MCP Server",
version="1.0.0"
)
# Health check
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"database_connected": db.connection is not None,
"service": "SQLite MCP HTTP Wrapper"
}
# Database Management Endpoints
@app.post("/tools/open_database")
async def open_database_endpoint(request: OpenDatabaseRequest):
"""Open or create a SQLite database.
Args:
request: OpenDatabaseRequest with path
Returns:
Success message
"""
try:
result = db.open(request.path)
return {"success": True, "message": result, "database_path": request.path}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/tools/close_database")
async def close_database_endpoint():
"""Close the current database connection.
Returns:
Success message
"""
try:
result = db.close()
return {"success": True, "message": result}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# Query Endpoints
@app.post("/tools/execute_query")
async def execute_query_endpoint(request: ExecuteQueryRequest):
"""Execute a SELECT query.
Args:
request: ExecuteQueryRequest with query and optional parameters
Returns:
Query results with rows and column count
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.execute_query(request.query, request.parameters)
return {
"success": True,
"rows": result["rows"],
"column_count": result["column_count"],
"record_count": len(result["rows"])
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# CRUD Endpoints
@app.post("/tools/insert")
async def insert_endpoint(request: InsertRequest):
"""Insert a row into a table.
Args:
request: InsertRequest with table and data
Returns:
Insert result with lastID and changes count
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.insert(request.table, request.data)
return {
"success": True,
"lastID": result["lastID"],
"changes": result["changes"],
"table": request.table
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/tools/update")
async def update_endpoint(request: UpdateRequest):
"""Update rows in a table.
Args:
request: UpdateRequest with table, data, and where clause
Returns:
Update result with changes count
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.update(request.table, request.data, request.where, request.where_params)
return {
"success": True,
"changes": result["changes"],
"table": request.table,
"where_clause": request.where
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/tools/delete")
async def delete_endpoint(request: DeleteRequest):
"""Delete rows from a table.
Args:
request: DeleteRequest with table and where clause
Returns:
Delete result with changes count
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.delete(request.table, request.where, request.where_params)
return {
"success": True,
"changes": result["changes"],
"table": request.table,
"where_clause": request.where
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# Schema Management Endpoints
@app.post("/tools/create_table")
async def create_table_endpoint(request: CreateTableRequest):
"""Create a new table.
Args:
request: CreateTableRequest with table name and schema
Returns:
Success message
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.create_table(request.table, request.schema)
return {"success": True, "message": result, "table": request.table}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tools/list_tables")
async def list_tables_endpoint():
"""List all tables in the database.
Returns:
List of table names
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.list_tables()
return {"success": True, "tables": result["tables"], "count": len(result["tables"])}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/tools/get_table_schema")
async def get_table_schema_endpoint(request: GetTableSchemaRequest):
"""Get the schema of a table.
Args:
request: GetTableSchemaRequest with table name
Returns:
Column information
"""
if db.connection is None:
raise MissingDatabaseError()
try:
result = db.get_table_schema(request.table)
return {
"success": True,
"table": request.table,
"columns": result["columns"],
"column_count": len(result["columns"])
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# Tools Discovery Endpoint
@app.get("/tools")
async def list_tools():
"""List all available tools.
Returns:
List of available tools with descriptions
"""
return {
"tools": [
{
"name": "open_database",
"description": "Open or create a SQLite database file",
"endpoint": "POST /tools/open_database",
"parameters": ["path"]
},
{
"name": "close_database",
"description": "Close the current database connection",
"endpoint": "POST /tools/close_database",
"parameters": []
},
{
"name": "execute_query",
"description": "Execute a SELECT query and return results",
"endpoint": "POST /tools/execute_query",
"parameters": ["query", "parameters"]
},
{
"name": "insert",
"description": "Insert a row into a table",
"endpoint": "POST /tools/insert",
"parameters": ["table", "data"]
},
{
"name": "update",
"description": "Update rows in a table",
"endpoint": "POST /tools/update",
"parameters": ["table", "data", "where", "where_params"]
},
{
"name": "delete",
"description": "Delete rows from a table",
"endpoint": "POST /tools/delete",
"parameters": ["table", "where", "where_params"]
},
{
"name": "create_table",
"description": "Create a new table",
"endpoint": "POST /tools/create_table",
"parameters": ["table", "schema"]
},
{
"name": "list_tables",
"description": "List all tables in the database",
"endpoint": "GET /tools/list_tables",
"parameters": []
},
{
"name": "get_table_schema",
"description": "Get the schema of a table",
"endpoint": "POST /tools/get_table_schema",
"parameters": ["table"]
}
],
"total_tools": 9
}
if __name__ == "__main__":
import uvicorn
print("š Starting SQLite MCP HTTP Server...")
print("š API Documentation: http://localhost:8000/docs")
print("š Health Check: http://localhost:8000/health")
print("š ļø Tools: http://localhost:8000/tools")
uvicorn.run(app, host="0.0.0.0", port=8000)