sqlite-explorer-fastmcp-mcp-server

by hannesrudolph
Verified
from pathlib import Path import sqlite3 import os from typing import List, Dict, Any, Optional from fastmcp import FastMCP # Initialize FastMCP server mcp = FastMCP("SQLite Explorer", log_level="CRITICAL") # Path to Messages database - must be provided via SQLITE_DB_PATH environment variable if 'SQLITE_DB_PATH' not in os.environ: raise ValueError("SQLITE_DB_PATH environment variable must be set") DB_PATH = Path(os.environ['SQLITE_DB_PATH']) class SQLiteConnection: def __init__(self, db_path: Path): self.db_path = db_path self.conn = None def __enter__(self): self.conn = sqlite3.connect(str(self.db_path)) self.conn.row_factory = sqlite3.Row return self.conn def __exit__(self, exc_type, exc_val, exc_tb): if self.conn: self.conn.close() @mcp.tool() def read_query( query: str, params: Optional[List[Any]] = None, fetch_all: bool = True, row_limit: int = 1000 ) -> List[Dict[str, Any]]: """Execute a query on the Messages database. Args: query: SELECT SQL query to execute params: Optional list of parameters for the query fetch_all: If True, fetches all results. If False, fetches one row. row_limit: Maximum number of rows to return (default 1000) Returns: List of dictionaries containing the query results """ if not DB_PATH.exists(): raise FileNotFoundError(f"Messages database not found at: {DB_PATH}") # Clean and validate the query query = query.strip() # Remove trailing semicolon if present if query.endswith(';'): query = query[:-1].strip() # Check for multiple statements by looking for semicolons not inside quotes def contains_multiple_statements(sql: str) -> bool: in_single_quote = False in_double_quote = False for char in sql: if char == "'" and not in_double_quote: in_single_quote = not in_single_quote elif char == '"' and not in_single_quote: in_double_quote = not in_double_quote elif char == ';' and not in_single_quote and not in_double_quote: return True return False if contains_multiple_statements(query): raise ValueError("Multiple SQL statements are not allowed") # Validate query type (allowing common CTEs) query_lower = query.lower() if not any(query_lower.startswith(prefix) for prefix in ('select', 'with')): raise ValueError("Only SELECT queries (including WITH clauses) are allowed for safety") params = params or [] with SQLiteConnection(DB_PATH) as conn: cursor = conn.cursor() try: # Only add LIMIT if query doesn't already have one if 'limit' not in query_lower: query = f"{query} LIMIT {row_limit}" cursor.execute(query, params) if fetch_all: results = cursor.fetchall() else: results = [cursor.fetchone()] return [dict(row) for row in results if row is not None] except sqlite3.Error as e: raise ValueError(f"SQLite error: {str(e)}") @mcp.tool() def list_tables() -> List[str]: """List all tables in the Messages database. Returns: List of table names in the database """ if not DB_PATH.exists(): raise FileNotFoundError(f"Messages database not found at: {DB_PATH}") with SQLiteConnection(DB_PATH) as conn: cursor = conn.cursor() try: cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' ORDER BY name """) return [row['name'] for row in cursor.fetchall()] except sqlite3.Error as e: raise ValueError(f"SQLite error: {str(e)}") @mcp.tool() def describe_table(table_name: str) -> List[Dict[str, str]]: """Get detailed information about a table's schema. Args: table_name: Name of the table to describe Returns: List of dictionaries containing column information: - name: Column name - type: Column data type - notnull: Whether the column can contain NULL values - dflt_value: Default value for the column - pk: Whether the column is part of the primary key """ if not DB_PATH.exists(): raise FileNotFoundError(f"Messages database not found at: {DB_PATH}") with SQLiteConnection(DB_PATH) as conn: cursor = conn.cursor() try: # Verify table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name=? """, [table_name]) if not cursor.fetchone(): raise ValueError(f"Table '{table_name}' does not exist") # Get table schema cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() return [dict(row) for row in columns] except sqlite3.Error as e: raise ValueError(f"SQLite error: {str(e)}")