mcp_client.py•11.8 kB
"""Python MCP HTTP Client for SQLite MCP Server.
This module provides a client library to communicate with the SQLite MCP Server
via HTTP. It's designed for easy integration into Python chatbot applications.
"""
import requests
import json
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class MCPClientError(Exception):
"""Base exception for MCP Client errors."""
pass
class DatabaseNotOpenError(MCPClientError):
"""Exception when database is not open."""
pass
@dataclass
class QueryResult:
"""Result from query execution."""
rows: List[Dict[str, Any]]
column_count: int
record_count: int
success: bool
@dataclass
class InsertResult:
"""Result from insert operation."""
lastID: int
changes: int
table: str
success: bool
@dataclass
class UpdateResult:
"""Result from update operation."""
changes: int
table: str
where_clause: str
success: bool
@dataclass
class DeleteResult:
"""Result from delete operation."""
changes: int
table: str
where_clause: str
success: bool
class MCPClient:
"""HTTP Client for SQLite MCP Server.
This client communicates with the FastAPI-wrapped MCP server via HTTP.
It provides methods for all database operations.
Example:
>>> client = MCPClient("http://localhost:8000")
>>> client.open_database("./data/myapp.db")
>>> results = client.execute_query("SELECT * FROM users")
>>> client.insert("users", {"name": "John", "age": 30})
"""
def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 30):
"""Initialize MCP Client.
Args:
base_url: Base URL of the MCP HTTP server (default: http://localhost:8000)
timeout: Request timeout in seconds (default: 30)
"""
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self._check_connection()
def _check_connection(self) -> bool:
"""Check if server is reachable.
Returns:
True if server is healthy
Raises:
MCPClientError: If server is not reachable
"""
try:
response = requests.get(f"{self.base_url}/health", timeout=5)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
raise MCPClientError(
f"Cannot connect to MCP server at {self.base_url}. "
f"Make sure the server is running. Error: {str(e)}"
)
def _make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Make HTTP request to MCP server.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
data: Request data (for POST requests)
params: Query parameters
Returns:
Response JSON
Raises:
MCPClientError: If request fails
"""
url = f"{self.base_url}{endpoint}"
try:
if method.upper() == "GET":
response = requests.get(url, params=params, timeout=self.timeout)
else:
response = requests.request(
method=method,
url=url,
json=data,
params=params,
timeout=self.timeout,
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
# Try to parse error response
try:
error_detail = e.response.json().get("detail", str(e))
except:
error_detail = str(e)
raise MCPClientError(f"Server error: {error_detail}")
except requests.exceptions.RequestException as e:
raise MCPClientError(f"Request failed: {str(e)}")
# Database Management Methods
def open_database(self, path: str) -> str:
"""Open or create a SQLite database.
Args:
path: Path to the database file
Returns:
Success message
Raises:
MCPClientError: If operation fails
"""
response = self._make_request("POST", "/tools/open_database", {"path": path})
return response["message"]
def close_database(self) -> str:
"""Close the current database connection.
Returns:
Success message
Raises:
MCPClientError: If operation fails
"""
response = self._make_request("POST", "/tools/close_database")
return response["message"]
# Query Methods
def execute_query(
self,
query: str,
parameters: Optional[List[Any]] = None,
) -> QueryResult:
"""Execute a SELECT query.
Args:
query: SQL SELECT query
parameters: Optional query parameters for prepared statements
Returns:
QueryResult with rows and column count
Raises:
MCPClientError: If query fails
"""
data = {
"query": query,
"parameters": parameters or [],
}
response = self._make_request("POST", "/tools/execute_query", data)
return QueryResult(
rows=response["rows"],
column_count=response["column_count"],
record_count=response["record_count"],
success=response["success"],
)
# CRUD Methods
def insert(self, table: str, data: Dict[str, Any]) -> InsertResult:
"""Insert a row into a table.
Args:
table: Table name
data: Dictionary of column names and values
Returns:
InsertResult with lastID and changes count
Raises:
MCPClientError: If insert fails
"""
request_data = {"table": table, "data": data}
response = self._make_request("POST", "/tools/insert", request_data)
return InsertResult(
lastID=response["lastID"],
changes=response["changes"],
table=response["table"],
success=response["success"],
)
def update(
self,
table: str,
data: Dict[str, Any],
where: str,
where_params: Optional[List[Any]] = None,
) -> UpdateResult:
"""Update rows in a table.
Args:
table: Table name
data: Dictionary of column names and new values
where: WHERE clause condition
where_params: Optional parameters for WHERE clause
Returns:
UpdateResult with changes count
Raises:
MCPClientError: If update fails
"""
request_data = {
"table": table,
"data": data,
"where": where,
"where_params": where_params or [],
}
response = self._make_request("POST", "/tools/update", request_data)
return UpdateResult(
changes=response["changes"],
table=response["table"],
where_clause=response["where_clause"],
success=response["success"],
)
def delete(
self,
table: str,
where: str,
where_params: Optional[List[Any]] = None,
) -> DeleteResult:
"""Delete rows from a table.
Args:
table: Table name
where: WHERE clause condition
where_params: Optional parameters for WHERE clause
Returns:
DeleteResult with changes count
Raises:
MCPClientError: If delete fails
"""
request_data = {
"table": table,
"where": where,
"where_params": where_params or [],
}
response = self._make_request("POST", "/tools/delete", request_data)
return DeleteResult(
changes=response["changes"],
table=response["table"],
where_clause=response["where_clause"],
success=response["success"],
)
# Schema Methods
def create_table(self, table: str, schema: str) -> str:
"""Create a new table.
Args:
table: Table name
schema: Column definitions
Returns:
Success message
Raises:
MCPClientError: If table creation fails
"""
request_data = {"table": table, "schema": schema}
response = self._make_request("POST", "/tools/create_table", request_data)
return response["message"]
def list_tables(self) -> List[str]:
"""List all tables in the database.
Returns:
List of table names
Raises:
MCPClientError: If operation fails
"""
response = self._make_request("GET", "/tools/list_tables")
return response["tables"]
def get_table_schema(self, table: str) -> List[Dict[str, Any]]:
"""Get the schema of a table.
Args:
table: Table name
Returns:
List of column information dictionaries
Raises:
MCPClientError: If operation fails
"""
request_data = {"table": table}
response = self._make_request("POST", "/tools/get_table_schema", request_data)
return response["columns"]
# Utility Methods
def get_tools(self) -> List[Dict[str, Any]]:
"""Get list of available tools.
Returns:
List of tool definitions
Raises:
MCPClientError: If operation fails
"""
response = self._make_request("GET", "/tools")
return response["tools"]
def health_check(self) -> Dict[str, Any]:
"""Check server health status.
Returns:
Health status information
Raises:
MCPClientError: If check fails
"""
try:
response = requests.get(f"{self.base_url}/health", timeout=5)
response.raise_for_status()
return response.json()
except Exception as e:
raise MCPClientError(f"Health check failed: {str(e)}")
# Context manager support
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
try:
self.close_database()
except:
pass # Ignore errors on exit
# Convenience functions
def create_client(base_url: str = "http://localhost:8000") -> MCPClient:
"""Create and return an MCP client instance.
Args:
base_url: Base URL of the MCP HTTP server
Returns:
MCPClient instance
Raises:
MCPClientError: If server is not reachable
"""
return MCPClient(base_url)
if __name__ == "__main__":
# Example usage
try:
client = MCPClient()
# Open database
print("Opening database...")
print(client.open_database("./data/sample_data.db"))
# List tables
print("\nListing tables...")
tables = client.list_tables()
print(f"Tables: {tables}")
# Query data
print("\nQuerying spam_number table...")
result = client.execute_query("SELECT * FROM Spam_number LIMIT 3")
print(f"Found {result.record_count} records:")
for row in result.rows:
print(f" {row}")
# Close database
print("\nClosing database...")
print(client.close_database())
except MCPClientError as e:
print(f"Error: {e}")