base_readQuery
Execute SQL queries on Teradata databases through SQLAlchemy, returning results with fully rendered SQL in metadata for analysis and debugging.
Instructions
Execute a SQL query via SQLAlchemy, bind parameters if provided (prepared SQL), and return the fully rendered SQL (with literals) in metadata.
Arguments: sql - SQL text, with optional bind-parameter placeholders
Returns: ResponseType: formatted response with query results + metadata
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | No |
Implementation Reference
- Core handler function that executes SQL query on provided connection, handles bind parameters, fetches and formats results as JSON, compiles final SQL with literals for metadata, and returns structured response.def handle_base_readQuery( conn: Connection, sql: str | None = None, tool_name: str | None = None, *args, **kwargs ): """ Execute a SQL query via SQLAlchemy, bind parameters if provided (prepared SQL), and return the fully rendered SQL (with literals) in metadata. Arguments: sql - SQL text, with optional bind-parameter placeholders Returns: ResponseType: formatted response with query results + metadata """ logger.debug(f"Tool: handle_base_readQuery: Args: sql: {sql}, args={args!r}, kwargs={kwargs!r}") # 1. Build a textual SQL statement stmt = text(sql) # 2. Execute with bind parameters if provided result = conn.execute(stmt, kwargs) if kwargs else conn.execute(stmt) # 3. Fetch rows & column metadata cursor = result.cursor # underlying DB-API cursor raw_rows = cursor.fetchall() or [] data = rows_to_json(cursor.description, raw_rows) columns = [ { "name": col[0], "type": getattr(col[1], "__name__", str(col[1])) } for col in (cursor.description or []) ] # 4. Compile the statement with literal binds for “final SQL” # Fallback to DefaultDialect if conn has no `.dialect` dialect = getattr(conn, "dialect", default.DefaultDialect()) compiled = stmt.compile( dialect=dialect, compile_kwargs={"literal_binds": True} ) final_sql = str(compiled) # 5. Build metadata using the rendered SQL metadata = { "tool_name": tool_name if tool_name else "base_readQuery", "sql": final_sql, "columns": columns, "row_count": len(data), } logger.debug(f"Tool: handle_base_readQuery: metadata: {metadata}") return create_response(data, metadata)
- src/teradata_mcp_server/app.py:462-487 (registration)Dynamically registers all handle_* functions (including handle_base_readQuery → base_readQuery) from tool modules as MCP tools, using wrappers to inject DB connections and handle signatures.module_loader = td.initialize_module_loader(config) if module_loader: all_functions = module_loader.get_all_functions() for name, func in all_functions.items(): if not (inspect.isfunction(func) and name.startswith("handle_")): continue tool_name = name[len("handle_"):] if not any(re.match(p, tool_name) for p in config.get('tool', [])): continue # Skip template tools (used for developer reference only) if tool_name.startswith("tmpl_"): logger.debug(f"Skipping template tool: {tool_name}") continue # Skip BAR tools if BAR functionality is disabled if tool_name.startswith("bar_") and not enableBAR: logger.info(f"Skipping BAR tool: {tool_name} (BAR functionality disabled)") continue # Skip chat completion tools if chat completion functionality is disabled if tool_name.startswith("chat_") and not enableChat: logger.info(f"Skipping chat completion tool: {tool_name} (chat completion functionality disabled)") continue wrapped = make_tool_wrapper(func) mcp.tool(name=tool_name, description=wrapped.__doc__)(wrapped) logger.info(f"Created tool: {tool_name}") logger.debug(f"Tool Docstring: {wrapped.__doc__}") else:
- Universal executor for DB tool handlers like handle_base_readQuery: manages SQLAlchemy/raw connections, sets Teradata QueryBand from request context, handles execution and formatting, with error capture.def execute_db_tool(tool, *args, **kwargs): """Execute a handler with a DB connection and MCP concerns. - Detects whether the handler expects a SQLAlchemy Connection or a raw DB-API connection and injects appropriately. - For HTTP transport, builds and sets Teradata QueryBand per request using the RequestContext captured by middleware. - Formats return values into FastMCP content and captures exceptions with context for easier debugging. """ tool_name = kwargs.pop('tool_name', getattr(tool, '__name__', 'unknown_tool')) tdconn_local = get_tdconn() if not getattr(tdconn_local, "engine", None): logger.info("Reinitializing TDConn") tdconn_local = get_tdconn(recreate=True) sig = inspect.signature(tool) first_param = next(iter(sig.parameters.values())) ann = first_param.annotation use_sqla = inspect.isclass(ann) and issubclass(ann, Connection) try: if use_sqla: from sqlalchemy import text with tdconn_local.engine.connect() as conn: # Always attempt to set QueryBand when a request context is present ctx = get_context() request_context = ctx.get_state("request_context") if ctx else None if request_context is not None: qb = build_queryband( application=mcp.name, profile=profile_name, process_id=process_id, tool_name=tool_name, request_context=request_context, ) try: conn.execute(text(f"SET QUERY_BAND = '{qb}' FOR SESSION")) logger.debug(f"QueryBand set: {qb}") logger.debug(f"Tool request context: {request_context}") except Exception as qb_error: logger.debug(f"Could not set QueryBand: {qb_error}") # If in Basic auth, do not run the tool without proxying if str(getattr(request_context, "auth_scheme", "")).lower() == "basic": return format_error_response( f"Cannot run tool '{tool_name}': failed to set QueryBand for Basic auth. Error: {qb_error}" ) result = tool(conn, *args, **kwargs) else: raw = tdconn_local.engine.raw_connection() try: # Always attempt to set QueryBand when a request context is present ctx = get_context() request_context = ctx.get_state("request_context") if ctx else None if request_context is not None: qb = build_queryband( application=mcp.name, profile=profile_name, process_id=process_id, tool_name=tool_name, request_context=request_context, ) try: cursor = raw.cursor() # Apply at session scope so it persists across statements cursor.execute(f"SET QUERY_BAND = '{qb}' FOR SESSION") cursor.close() logger.debug(f"QueryBand set: {qb}") logger.debug(f"Tool request context: {request_context}") except Exception as qb_error: logger.debug(f"Could not set QueryBand: {qb_error}") if str(getattr(request_context, "auth_scheme", "")).lower() == "basic": return format_error_response( f"Cannot run tool '{tool_name}': failed to set QueryBand for Basic auth. Error: {qb_error}" ) result = tool(raw, *args, **kwargs) finally: raw.close() return format_text_response(result) except Exception as e: logger.error(f"Error in execute_db_tool: {e}", exc_info=True, extra={"session_info": {"tool_name": tool_name}}) return format_error_response(str(e))
- Creates MCP tool wrappers for handle_* functions by stripping internal params (conn, tool_name), preserving user-facing signature for schema inference, and setting up async/threaded execution via execute_db_tool.def make_tool_wrapper(func): """Create an MCP-facing wrapper for a handle_* function. - Removes internal parameters (conn, tool_name, fs_config) from the MCP signature while still injecting them into the underlying handler. - Preserves the handler's parameter names and types so MCP clients can render friendly forms. """ sig = inspect.signature(func) inject_kwargs = {} removable = {"conn", "tool_name"} if "fs_config" in sig.parameters: inject_kwargs["fs_config"] = fs_config removable.add("fs_config") params = [ p for name, p in sig.parameters.items() if name not in removable and p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY) ] new_sig = sig.replace(parameters=params) # Create executor function that will be run in thread def executor(**kwargs): return execute_db_tool(func, **kwargs) return create_mcp_tool( executor_func=executor, signature=new_sig, inject_kwargs=inject_kwargs, validate_required=False, tool_name=getattr(func, "__name__", "wrapped_tool"), tool_description=func.__doc__, )