Skip to main content
Glama
bpamiri
by bpamiri
crud.py9.65 kB
"""CRUD operation tools for mssql-mcp.""" import logging from typing import Any from ..app import mcp from ..connection import QueryError from ..server import get_connection_manager from ..utils.safety import parse_table_name logger = logging.getLogger(__name__) def _get_primary_key_columns(schema: str, table: str) -> list[str]: """Get primary key column(s) for a table. Args: schema: Schema name table: Table name Returns: List of primary key column names """ manager = get_connection_manager() query = """ SELECT c.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE c ON tc.CONSTRAINT_NAME = c.CONSTRAINT_NAME AND tc.TABLE_SCHEMA = c.TABLE_SCHEMA AND tc.TABLE_NAME = c.TABLE_NAME WHERE tc.TABLE_SCHEMA = %s AND tc.TABLE_NAME = %s AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY' ORDER BY c.ORDINAL_POSITION """ rows = manager.execute_query(query, (schema, table)) return [row["COLUMN_NAME"] for row in rows] @mcp.tool() def read_rows( table: str, id: Any | None = None, ids: list[Any] | None = None, filter: str | None = None, columns: list[str] | None = None, max_rows: int | None = None, ) -> dict[str, Any]: """Read rows from a table by primary key or filter. Provide one of: id (single row), ids (multiple rows), or filter (WHERE clause). Args: table: Table name (can include schema: 'dbo.Users' or 'Users') id: Single primary key value (for composite keys, use filter) ids: List of primary key values filter: WHERE clause without 'WHERE' keyword (e.g., "status = 'active'") columns: List of columns to return (default: all columns) max_rows: Maximum rows to return Returns: Dictionary with: - table: Full table name - rows: List of row dictionaries - count: Number of rows returned """ try: manager = get_connection_manager() config = manager.config schema, table_name = parse_table_name(table) # Determine columns col_list = ", ".join(columns) if columns else "*" # Determine effective row limit effective_max_rows = min(max_rows or config.max_rows, config.max_rows) # Build query params: list[Any] = [] if id is not None: # Single row by primary key pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE [{pk_cols[0]}] = %s" ) params = [id] elif ids is not None: # Multiple rows by primary keys pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} placeholders = ", ".join(["%s"] * len(ids)) query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE [{pk_cols[0]}] IN ({placeholders})" ) params = list(ids) elif filter is not None: # Custom filter # Note: filter params not supported - use execute_query for complex cases query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE {filter}" ) else: # All rows (with limit) query = f"SELECT TOP {effective_max_rows} {col_list} FROM [{schema}].[{table_name}]" rows = manager.execute_query(query, tuple(params) if params else None) return { "table": f"{schema}.{table_name}", "rows": rows, "count": len(rows), } except Exception as e: logger.error(f"Error reading rows from {table}: {e}") return {"error": str(e)} @mcp.tool() def insert_row(table: str, data: dict[str, Any]) -> dict[str, Any]: """Insert a new row into a table. Args: table: Table name (can include schema: 'dbo.Users' or 'Users') data: Dictionary of column names and values to insert Returns: Dictionary with: - status: 'success' or error - table: Full table name - inserted: The inserted row (including generated identity columns) """ try: manager = get_connection_manager() config = manager.config # Check read-only mode if config.read_only: return {"error": "Write operations disabled in read-only mode"} schema, table_name = parse_table_name(table) if not data: return {"error": "No data provided for insert"} # Build INSERT statement with OUTPUT clause cols = ", ".join([f"[{k}]" for k in data]) placeholders = ", ".join(["%s"] * len(data)) values = tuple(data.values()) query = f""" INSERT INTO [{schema}].[{table_name}] ({cols}) OUTPUT INSERTED.* VALUES ({placeholders}) """ rows = manager.execute_query(query, values) # The OUTPUT clause returns the inserted row inserted = rows[0] if rows else data return { "status": "success", "table": f"{schema}.{table_name}", "inserted": inserted, } except QueryError as e: logger.error(f"Error inserting row into {table}: {e}") return {"error": str(e)} except Exception as e: logger.error(f"Unexpected error inserting row into {table}: {e}") return {"error": str(e)} @mcp.tool() def update_row(table: str, id: Any, data: dict[str, Any]) -> dict[str, Any]: """Update an existing row by primary key. Args: table: Table name (can include schema: 'dbo.Users' or 'Users') id: Primary key value of the row to update data: Dictionary of column names and new values Returns: Dictionary with: - status: 'success' or error - table: Full table name - updated: The updated row """ try: manager = get_connection_manager() config = manager.config # Check read-only mode if config.read_only: return {"error": "Write operations disabled in read-only mode"} schema, table_name = parse_table_name(table) if not data: return {"error": "No data provided for update"} # Get primary key column pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} # Build UPDATE statement with OUTPUT clause set_clauses = ", ".join([f"[{k}] = %s" for k in data]) params = tuple(data.values()) + (id,) query = f""" UPDATE [{schema}].[{table_name}] SET {set_clauses} OUTPUT INSERTED.* WHERE [{pk_cols[0]}] = %s """ rows = manager.execute_query(query, params) if not rows: return { "error": f"No row found with {pk_cols[0]} = {id}", "table": f"{schema}.{table_name}", } return { "status": "success", "table": f"{schema}.{table_name}", "updated": rows[0], } except QueryError as e: logger.error(f"Error updating row in {table}: {e}") return {"error": str(e)} except Exception as e: logger.error(f"Unexpected error updating row in {table}: {e}") return {"error": str(e)} @mcp.tool() def delete_row(table: str, id: Any) -> dict[str, Any]: """Delete a row by primary key. Args: table: Table name (can include schema: 'dbo.Users' or 'Users') id: Primary key value of the row to delete Returns: Dictionary with: - status: 'deleted' or error - table: Full table name - id: The deleted row's ID - rows_affected: Number of rows deleted (should be 1) """ try: manager = get_connection_manager() config = manager.config # Check read-only mode if config.read_only: return {"error": "Delete operations disabled in read-only mode"} schema, table_name = parse_table_name(table) # Get primary key column pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} # Build DELETE statement query = f"DELETE FROM [{schema}].[{table_name}] WHERE [{pk_cols[0]}] = %s" affected = manager.execute_non_query(query, (id,)) if affected == 0: return { "error": f"No row found with {pk_cols[0]} = {id}", "table": f"{schema}.{table_name}", } return { "status": "deleted", "table": f"{schema}.{table_name}", "id": id, "rows_affected": affected, } except QueryError as e: logger.error(f"Error deleting row from {table}: {e}") return {"error": str(e)} except Exception as e: logger.error(f"Unexpected error deleting row from {table}: {e}") return {"error": str(e)}

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bpamiri/pymssql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server