execute_sql
Run SQL queries on Microsoft SQL Server databases through a secure, controlled MCP interface.
Instructions
Execute an SQL query on the SQL Server
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | The SQL query to execute |
Implementation Reference
- src/mssql_mcp_server/server.py:218-265 (handler)The call_tool function is the actual handler for execute_sql. It receives tool invocations, looks up the query argument, connects to MSSQL via pymssql, executes the query, and returns results as TextContent.
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Execute SQL commands.""" config = get_db_config() command = get_command() logger.info(f"Calling tool: {name} with arguments: {arguments}") if name != command: raise ValueError(f"Unknown tool: {name}") query = arguments.get("query") if not query: raise ValueError("Query is required") try: conn = pymssql.connect(**config) cursor = conn.cursor() cursor.execute(query) # Special handling for table listing if is_select_query(query) and "INFORMATION_SCHEMA.TABLES" in query.upper(): tables = cursor.fetchall() result = ["Tables_in_" + config["database"]] # Header result.extend([table[0] for table in tables]) cursor.close() conn.close() return [TextContent(type="text", text="\n".join(result))] # Regular SELECT queries elif is_select_query(query): columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] cursor.close() conn.close() return [TextContent(type="text", text="\n".join([",".join(columns)] + result))] # Non-SELECT queries else: conn.commit() affected_rows = cursor.rowcount cursor.close() conn.close() return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {affected_rows}")] except Exception as e: logger.error(f"Error executing SQL '{query}': {e}") return [TextContent(type="text", text=f"Error executing query: {str(e)}")] - The list_tools function registers the execute_sql tool with its schema (name, description, and inputSchema requiring a 'query' string parameter).
@app.list_tools() async def list_tools() -> list[Tool]: """List available SQL Server tools.""" command = get_command() logger.info("Listing tools...") return [ Tool( name=command, description="Execute an SQL query on the SQL Server", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to execute" } }, "required": ["query"] } ) ] - src/mssql_mcp_server/server.py:100-102 (registration)The get_command function returns 'execute_sql' as the default tool command name, and is used by both list_tools and call_tool to define/check the tool name.
def get_command(): """Get the command to execute SQL queries.""" return os.getenv("MSSQL_COMMAND", "execute_sql") - The is_select_query helper function determines if a query is a SELECT statement, used by the handler to decide how to process query results.
def is_select_query(query: str) -> bool: """ Check if a query is a SELECT statement, accounting for comments. Handles both single-line (--) and multi-line (/* */) SQL comments. """ # Remove multi-line comments /* ... */ query_cleaned = re.sub(r'/\*.*?\*/', '', query, flags=re.DOTALL) # Remove single-line comments -- ... lines = query_cleaned.split('\n') cleaned_lines = [] for line in lines: # Find -- comment marker and remove everything after it comment_pos = line.find('--') if comment_pos != -1: line = line[:comment_pos] cleaned_lines.append(line) query_cleaned = '\n'.join(cleaned_lines) # Get the first non-empty word after stripping whitespace first_word = query_cleaned.strip().split()[0] if query_cleaned.strip() else "" return first_word.upper() == "SELECT" - src/mssql_mcp_server/server.py:17-30 (helper)The validate_table_name helper function validates and escapes table names to prevent SQL injection.
def validate_table_name(table_name: str) -> str: """Validate and escape table name to prevent SQL injection.""" # Allow only alphanumeric, underscore, and dot (for schema.table) if not re.match(r'^[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)?$', table_name): raise ValueError(f"Invalid table name: {table_name}") # Split schema and table if present parts = table_name.split('.') if len(parts) == 2: # Escape both schema and table name return f"[{parts[0]}].[{parts[1]}]" else: # Just table name return f"[{table_name}]"