Skip to main content
Glama
bpamiri
by bpamiri

read_rows

Retrieve specific rows from SQL Server tables using primary keys or custom filters to access targeted data.

Instructions

Read rows from a table by primary key or filter.

Provide one of: id (single row), ids (multiple rows), or filter (WHERE clause). Args: table: Table name (can include schema: 'dbo.Users' or 'Users') id: Single primary key value (for composite keys, use filter) ids: List of primary key values filter: WHERE clause without 'WHERE' keyword (e.g., "status = 'active'") columns: List of columns to return (default: all columns) max_rows: Maximum rows to return Returns: Dictionary with: - table: Full table name - rows: List of row dictionaries - count: Number of rows returned

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
tableYes
idNo
idsNo
filterNo
columnsNo
max_rowsNo

Implementation Reference

  • The primary handler function for the 'read_rows' tool. Decorated with @mcp.tool() to register it with the MCP server. Parses table name, builds SELECT query based on provided id, ids, filter, or defaults to all rows; applies column selection and row limits; executes via connection manager and returns rows with metadata or error.
    @mcp.tool() def read_rows( table: str, id: Any | None = None, ids: list[Any] | None = None, filter: str | None = None, columns: list[str] | None = None, max_rows: int | None = None, ) -> dict[str, Any]: """Read rows from a table by primary key or filter. Provide one of: id (single row), ids (multiple rows), or filter (WHERE clause). Args: table: Table name (can include schema: 'dbo.Users' or 'Users') id: Single primary key value (for composite keys, use filter) ids: List of primary key values filter: WHERE clause without 'WHERE' keyword (e.g., "status = 'active'") columns: List of columns to return (default: all columns) max_rows: Maximum rows to return Returns: Dictionary with: - table: Full table name - rows: List of row dictionaries - count: Number of rows returned """ try: manager = get_connection_manager() config = manager.config schema, table_name = parse_table_name(table) # Determine columns col_list = ", ".join(columns) if columns else "*" # Determine effective row limit effective_max_rows = min(max_rows or config.max_rows, config.max_rows) # Build query params: list[Any] = [] if id is not None: # Single row by primary key pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE [{pk_cols[0]}] = %s" ) params = [id] elif ids is not None: # Multiple rows by primary keys pk_cols = _get_primary_key_columns(schema, table_name) if not pk_cols: return {"error": f"No primary key found for table {schema}.{table_name}"} placeholders = ", ".join(["%s"] * len(ids)) query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE [{pk_cols[0]}] IN ({placeholders})" ) params = list(ids) elif filter is not None: # Custom filter # Note: filter params not supported - use execute_query for complex cases query = ( f"SELECT TOP {effective_max_rows} {col_list} " f"FROM [{schema}].[{table_name}] WHERE {filter}" ) else: # All rows (with limit) query = f"SELECT TOP {effective_max_rows} {col_list} FROM [{schema}].[{table_name}]" rows = manager.execute_query(query, tuple(params) if params else None) return { "table": f"{schema}.{table_name}", "rows": rows, "count": len(rows), } except Exception as e: logger.error(f"Error reading rows from {table}: {e}") return {"error": str(e)}
  • Helper function used by read_rows to dynamically retrieve the primary key column name(s) for ID-based row selection from the INFORMATION_SCHEMA.
    def _get_primary_key_columns(schema: str, table: str) -> list[str]: """Get primary key column(s) for a table. Args: schema: Schema name table: Table name Returns: List of primary key column names """ manager = get_connection_manager() query = """ SELECT c.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE c ON tc.CONSTRAINT_NAME = c.CONSTRAINT_NAME AND tc.TABLE_SCHEMA = c.TABLE_SCHEMA AND tc.TABLE_NAME = c.TABLE_NAME WHERE tc.TABLE_SCHEMA = %s AND tc.TABLE_NAME = %s AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY' ORDER BY c.ORDINAL_POSITION """ rows = manager.execute_query(query, (schema, table)) return [row["COLUMN_NAME"] for row in rows]
  • Imports the crud module (containing read_rows) to trigger @mcp.tool() decorators and register the tools with the global MCP instance.
    from . import crud, databases, export, query, stored_procs, tables __all__ = [ "crud", "databases", "export", "query", "stored_procs", "tables", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bpamiri/pymssql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server