Skip to main content
Glama
Teradata

Teradata MCP Server

Official
by Teradata
app.py38.6 kB
from __future__ import annotations """Application factory for the Teradata MCP server. High-level architecture: - server.py is a thin entrypoint that parses CLI/env into a Settings object and calls create_mcp_app(settings), then runs the chosen transport. - create_mcp_app builds a FastMCP instance, configures logging, adds middleware to capture per-request context (including stdio fast-path), sets up Teradata connections (and optional teradataml Feature Store), and registers tools, prompts and resources from both Python modules and YAML files. - Tool handlers are plain Python functions named handle_<toolName> living under src/teradata_mcp_server/tools/*. They remain protocol-agnostic. At startup, we auto-wrap them with a small adapter so they appear as MCP tools with clean signatures. The adapter injects a DB connection and sets QueryBand from the request context when using HTTP. """ import asyncio import inspect import os import re from importlib.resources import files as pkg_files from typing import Annotated, Any import yaml from fastmcp import FastMCP from fastmcp.prompts.prompt import TextContent, Message from pydantic import Field, BaseModel from teradata_mcp_server.config import Settings from teradata_mcp_server import utils as config_utils from teradata_mcp_server.utils import setup_logging, format_text_response, format_error_response, resolve_type_hint from teradata_mcp_server.middleware import RequestContextMiddleware from teradata_mcp_server.tools.utils.queryband import build_queryband from sqlalchemy.engine import Connection from fastmcp.server.dependencies import get_context from teradata_mcp_server.tools.utils import (get_dynamic_function_definition, get_anlytic_function_signature, convert_tdml_docstring_to_mcp_docstring, execute_analytic_function, get_partition_col_order_col_doc_string) import json def create_mcp_app(settings: Settings): """Create and configure the FastMCP app with middleware, tools, prompts, resources.""" logger = setup_logging(settings.logging_level, settings.mcp_transport) # Load tool module loader via teradata tools package try: from teradata_mcp_server import tools as td except ImportError: import tools as td # dev fallback mcp = FastMCP("teradata-mcp-server") # Profiles (load via utils to honor packaged + working-dir overrides) profile_name = settings.profile if not profile_name: logger.info("No profile specified, load all tools, prompts and resources.") config = config_utils.get_profile_config(profile_name) # Feature flags from profiles enableEFS = True if any(re.match(pattern, 'fs_*') for pattern in config.get('tool', [])) else False enableTDVS = True if any(re.match(pattern, 'tdvs_*') for pattern in config.get('tool', [])) else False enableBAR = True if any(re.match(pattern, 'bar_*') for pattern in config.get('tool', [])) else False # Initialize TD connection and optional teradataml/EFS context # Pass settings object to TDConn instead of just connection_url tdconn = td.TDConn(settings=settings) enable_analytic_functions = profile_name and profile_name == 'dataScientist' fs_config = None if enableEFS or enable_analytic_functions: try: import teradataml as tdml tdml.create_context(tdsqlengine=tdconn.engine) except (AttributeError, ImportError, ModuleNotFoundError) as e: logger.warning(f"teradataml not installed - disabling analytic functions: {e}") enable_analytic_functions = False except Exception as e: logger.warning(f"Error creating teradataml context - disabling analytic functions: {e}") enable_analytic_functions = False # Only import FeatureStoreConfig (which depends on tdfs4ds) when EFS tools are enabled try: from teradata_mcp_server.tools.fs.fs_utils import FeatureStoreConfig fs_config = FeatureStoreConfig() # teradataml is optional; warn if unavailable but keep EFS enabled try: import teradataml as tdml except (AttributeError, ImportError, ModuleNotFoundError): logger.warning("teradataml not installed; EFS tools will operate without a teradataml context") except (AttributeError, ImportError, ModuleNotFoundError) as e: logger.warning(f"Feature Store module not available - disabling EFS functionality: {e}") enableEFS = False # TeradataVectorStore connection (optional) tdvs = None if len(os.getenv("TD_BASE_URL", "").strip()) > 0: try: from teradata_mcp_server.tools.tdvs.tdvs_utilies import create_teradataml_context create_teradataml_context() enableTDVS = True except Exception as e: logger.error(f"Unable to establish connection to Teradata Vector Store, disabling: {e}") enableTDVS = False # BAR (Backup and Restore) system dependencies (optional) if enableBAR: try: # Check for BAR system availability by importing required modules import requests from teradata_mcp_server.tools.bar.dsa_client import DSAClient # Verify DSA connection if environment variables are set dsa_base_url = os.getenv("DSA_BASE_URL") dsa_host = os.getenv("DSA_HOST") dsa_port = os.getenv("DSA_PORT") if dsa_base_url or (dsa_host and dsa_port): logger.info("BAR system configured with DSA connection") else: logger.warning("BAR tools enabled but DSA connection not configured (missing DSA_BASE_URL or DSA_HOST/DSA_PORT) - disabling BAR functionality") enableBAR = False except (AttributeError, ImportError, ModuleNotFoundError) as e: logger.warning(f"BAR system dependencies not available - disabling BAR functionality: {e}") enableBAR = False # Middleware (auth + request context) from teradata_mcp_server.tools.auth_cache import SecureAuthCache auth_cache = SecureAuthCache(ttl_seconds=settings.auth_cache_ttl) def get_tdconn(recreate: bool = False): nonlocal tdconn, fs_config if recreate: tdconn = td.TDConn(settings=settings) if enableEFS: try: import teradataml as tdml fs_config = td.FeatureStoreConfig() try: tdml.create_context(tdsqlengine=tdconn.engine) except Exception: pass except Exception: pass return tdconn middleware = RequestContextMiddleware( logger=logger, auth_cache=auth_cache, tdconn_supplier=get_tdconn, auth_mode=settings.auth_mode, transport=settings.mcp_transport, ) mcp.add_middleware(middleware) # Adapters (inlined for simplicity) import socket hostname = socket.gethostname() process_id = f"{hostname}:{os.getpid()}" def execute_db_tool(tool, *args, **kwargs): """Execute a handler with a DB connection and MCP concerns. - Detects whether the handler expects a SQLAlchemy Connection or a raw DB-API connection and injects appropriately. - For HTTP transport, builds and sets Teradata QueryBand per request using the RequestContext captured by middleware. - Formats return values into FastMCP content and captures exceptions with context for easier debugging. """ tool_name = kwargs.pop('tool_name', getattr(tool, '__name__', 'unknown_tool')) tdconn_local = get_tdconn() if not getattr(tdconn_local, "engine", None): logger.info("Reinitializing TDConn") tdconn_local = get_tdconn(recreate=True) sig = inspect.signature(tool) first_param = next(iter(sig.parameters.values())) ann = first_param.annotation use_sqla = inspect.isclass(ann) and issubclass(ann, Connection) try: if use_sqla: from sqlalchemy import text with tdconn_local.engine.connect() as conn: # Always attempt to set QueryBand when a request context is present ctx = get_context() request_context = ctx.get_state("request_context") if ctx else None if request_context is not None: qb = build_queryband( application=mcp.name, profile=profile_name, process_id=process_id, tool_name=tool_name, request_context=request_context, ) try: conn.execute(text(f"SET QUERY_BAND = '{qb}' FOR SESSION")) logger.debug(f"QueryBand set: {qb}") logger.debug(f"Tool request context: {request_context}") except Exception as qb_error: logger.debug(f"Could not set QueryBand: {qb_error}") # If in Basic auth, do not run the tool without proxying if str(getattr(request_context, "auth_scheme", "")).lower() == "basic": return format_error_response( f"Cannot run tool '{tool_name}': failed to set QueryBand for Basic auth. Error: {qb_error}" ) result = tool(conn, *args, **kwargs) else: raw = tdconn_local.engine.raw_connection() try: # Always attempt to set QueryBand when a request context is present ctx = get_context() request_context = ctx.get_state("request_context") if ctx else None if request_context is not None: qb = build_queryband( application=mcp.name, profile=profile_name, process_id=process_id, tool_name=tool_name, request_context=request_context, ) try: cursor = raw.cursor() # Apply at session scope so it persists across statements cursor.execute(f"SET QUERY_BAND = '{qb}' FOR SESSION") cursor.close() logger.debug(f"QueryBand set: {qb}") logger.debug(f"Tool request context: {request_context}") except Exception as qb_error: logger.debug(f"Could not set QueryBand: {qb_error}") if str(getattr(request_context, "auth_scheme", "")).lower() == "basic": return format_error_response( f"Cannot run tool '{tool_name}': failed to set QueryBand for Basic auth. Error: {qb_error}" ) result = tool(raw, *args, **kwargs) finally: raw.close() return format_text_response(result) except Exception as e: logger.error(f"Error in execute_db_tool: {e}", exc_info=True, extra={"session_info": {"tool_name": tool_name}}) return format_error_response(str(e)) def create_mcp_tool( *, executor_func=None, signature, inject_kwargs=None, validate_required=False, tool_name="mcp_tool", tool_description=None, ): """ Unified factory for creating async MCP tool functions. All tool functions use asyncio.to_thread to execute blocking database operations. Args: executor_func: Callable that will be executed. Should be a function that calls execute_db_tool with appropriate arguments. signature: The inspect.Signature for the MCP tool function. inject_kwargs: Dict of kwargs to inject when calling executor_func. validate_required: Whether to validate required parameters are present. tool_name: Name to assign to the MCP tool function. tool_description: Description/docstring for the MCP tool function. Returns: An async function suitable for use as an MCP tool. """ inject_kwargs = inject_kwargs or {} # Extract annotations from signature parameters annotations = { name: param.annotation for name, param in signature.parameters.items() if param.annotation is not inspect.Parameter.empty } if validate_required: # Build list of required parameter names (those without defaults) required_params = [ name for name, param in signature.parameters.items() if param.default is inspect.Parameter.empty ] async def _mcp_tool(**kwargs): missing = [n for n in required_params if n not in kwargs] if missing: raise ValueError(f"Missing required parameters: {missing}") merged_kwargs = {**inject_kwargs, **kwargs} return await asyncio.to_thread(executor_func, **merged_kwargs) else: async def _mcp_tool(**kwargs): merged_kwargs = {**inject_kwargs, **kwargs} return await asyncio.to_thread(executor_func, **merged_kwargs) _mcp_tool.__name__ = tool_name _mcp_tool.__signature__ = signature _mcp_tool.__doc__ = tool_description _mcp_tool.__annotations__ = annotations return _mcp_tool def make_tool_wrapper(func): """Create an MCP-facing wrapper for a handle_* function. - Removes internal parameters (conn, tool_name, fs_config) from the MCP signature while still injecting them into the underlying handler. - Preserves the handler's parameter names and types so MCP clients can render friendly forms. """ sig = inspect.signature(func) inject_kwargs = {} removable = {"conn", "tool_name"} if "fs_config" in sig.parameters: inject_kwargs["fs_config"] = fs_config removable.add("fs_config") params = [ p for name, p in sig.parameters.items() if name not in removable and p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY) ] new_sig = sig.replace(parameters=params) # Create executor function that will be run in thread def executor(**kwargs): return execute_db_tool(func, **kwargs) return create_mcp_tool( executor_func=executor, signature=new_sig, inject_kwargs=inject_kwargs, validate_required=False, tool_name=getattr(func, "__name__", "wrapped_tool"), tool_description=func.__doc__, ) # Register code tools via module loader module_loader = td.initialize_module_loader(config) if module_loader: all_functions = module_loader.get_all_functions() for name, func in all_functions.items(): if not (inspect.isfunction(func) and name.startswith("handle_")): continue tool_name = name[len("handle_"):] if not any(re.match(p, tool_name) for p in config.get('tool', [])): continue # Skip template tools (used for developer reference only) if tool_name.startswith("tmpl_"): logger.debug(f"Skipping template tool: {tool_name}") continue # Skip BAR tools if BAR functionality is disabled if tool_name.startswith("bar_") and not enableBAR: logger.info(f"Skipping BAR tool: {tool_name} (BAR functionality disabled)") continue wrapped = make_tool_wrapper(func) mcp.tool(name=tool_name, description=wrapped.__doc__)(wrapped) logger.info(f"Created tool: {tool_name}") logger.debug(f"Tool Docstring: {wrapped.__doc__}") else: logger.warning("No module loader available, skipping code-defined tool registration") from teradata_mcp_server.tools.constants import TD_ANALYTIC_FUNCS as funcs if enable_analytic_functions: tdml_processed_funcs = set(tdml.analytics.json_parser.json_store._JsonStore._get_function_list()[0].keys()) for func_name in funcs: # Before adding the function, check if function is existed or not. # Connection is not mandatory for MCP server. If connection is not there, then # functions can not be added. if func_name not in tdml_processed_funcs: logger.warning("Function {} is not available. Hence not adding it. ".format(func_name)) continue func_metadata = tdml.analytics.json_parser.json_store._JsonStore.get_function_metadata(func_name) func_obj = getattr(tdml, func_name, None) func_params = func_metadata.function_params inp_data = [t.get_lang_name() for t in func_metadata.input_tables] # Add partition_by parameters for func parameters. additional_args_docs = [] for table in inp_data: func_params["{}_partition_column".format(table)] = None func_params["{}_order_column".format(table)] = None additional_args_docs.append(get_partition_col_order_col_doc_string(table)) # Generate function argument string. func_args_str = get_anlytic_function_signature(func_params) func_name = "tdml_" + func_name func_str = get_dynamic_function_definition().format( analytic_function=func_name, doc_string=func_obj.__init__.__doc__, func_args_str=func_args_str, tables_to_df=json.dumps(inp_data) ) doc_string = convert_tdml_docstring_to_mcp_docstring( func_obj.__init__.__doc__, additional_args_docs) # Execute the generated function definition in the global scope. # Global scope will have all other functions. So reference to other functions will work. exec(func_str, globals()) # Register the function as a tool in MCP server. func = globals()[func_name] mcp.tool(name=func_name, description=doc_string)(func) # Load YAML-defined tools/resources/prompts custom_object_files = [file for file in os.listdir() if file.endswith("_objects.yml")] if module_loader and profile_name: profile_yml_files = module_loader.get_required_yaml_paths() custom_object_files.extend(profile_yml_files) logger.info(f"Loading YAML files for profile '{profile_name}': {len(profile_yml_files)} files") else: tool_yml_resources = [] tools_pkg_root = pkg_files("teradata_mcp_server").joinpath("tools") if tools_pkg_root.is_dir(): for subpkg in tools_pkg_root.iterdir(): if subpkg.is_dir(): for entry in subpkg.iterdir(): if entry.is_file() and entry.name.endswith('.yml'): tool_yml_resources.append(entry) custom_object_files.extend(tool_yml_resources) logger.info(f"Loading all YAML files (no specific profile): {len(tool_yml_resources)} files") custom_objects: dict[str, Any] = {} custom_glossary: dict[str, Any] = {} for file in custom_object_files: try: if hasattr(file, "read_text"): text = file.read_text(encoding='utf-8') else: with open(file, encoding='utf-8', errors='replace') as f: text = f.read() loaded = yaml.safe_load(text) if loaded: custom_objects.update(loaded) except Exception as e: logger.error(f"Failed to load YAML from {file}: {e}") # Prompt helpers def make_custom_prompt(prompt_name: str, prompt: str, desc: str, parameters: dict | None = None): if parameters is None or len(parameters) == 0: async def _dynamic_prompt(): return Message(role="user", content=TextContent(type="text", text=prompt)) _dynamic_prompt.__name__ = prompt_name return mcp.prompt(description=desc)(_dynamic_prompt) else: param_objects: list[inspect.Parameter] = [] annotations: dict[str, Any] = {} for param_name, meta in parameters.items(): meta = meta or {} type_hint_raw = meta.get("type_hint", "str") type_hint = resolve_type_hint(type_hint_raw) required = meta.get("required", True) desc_txt = meta.get("description", "") # Get the type name for display type_name = type_hint.__name__ if hasattr(type_hint, '__name__') else str(type_hint_raw) desc_txt += f" (type: {type_name})" if required and "default" not in meta: default_value = Field(..., description=desc_txt) else: default_value = Field(default=meta.get("default", None), description=desc_txt) param_objects.append( inspect.Parameter( param_name, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default_value, annotation=type_hint, ) ) annotations[param_name] = type_hint sig = inspect.Signature(param_objects) async def _dynamic_prompt(**kwargs): missing = [name for name, meta in parameters.items() if (meta or {}).get("required", True) and name not in kwargs] if missing: raise ValueError(f"Missing parameters: {missing}") formatted_prompt = prompt.format(**kwargs) return Message(role="user", content=TextContent(type="text", text=formatted_prompt)) _dynamic_prompt.__signature__ = sig _dynamic_prompt.__annotations__ = annotations _dynamic_prompt.__name__ = prompt_name return mcp.prompt(description=desc)(_dynamic_prompt) def make_custom_query_tool(name, tool): description = tool.get("description", "") param_defs = tool.get("parameters", {}) parameters = [] if param_defs: description += "\nArguments:" for param_name, p in param_defs.items(): param_description = p.get("description", "") type_hint_raw = p.get("type_hint", "str") type_hint = resolve_type_hint(type_hint_raw) # Convert type string to actual type class annotation = Annotated[type_hint, param_description] if param_description else type_hint default = p.get("default", inspect.Parameter.empty) # inspect.Parameter.empty if p.get("required", True) else p.get("default", None) parameters.append( inspect.Parameter(param_name, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default, annotation=annotation) ) # Append parameter name and description to function description # Disabled as already included in Annotations (consider introducing a way to toggle this if clients can't see annotations) if False: type_name = type_hint.__name__ if hasattr(type_hint, '__name__') else str(type_hint_raw) description += f"\n * {param_name}" description += f": {param_description}" if param_description else "" description += f" (type: {type_name})" sig = inspect.Signature(parameters) # Create executor function that will be run in thread def executor(**kwargs): return execute_db_tool(td.handle_base_readQuery, tool["sql"], tool_name=name, **kwargs) tool_func = create_mcp_tool( executor_func=executor, signature=sig, validate_required=True, tool_name=name, ) return mcp.tool(name=name, description=description)(tool_func) """ Generate a SQL generation function that returns a query string for a given cube definition and tool parameters (grain, measures, filters...). """ def generate_cube_query_tool(name, cube): """ Generate a function to create aggregation SQL from a cube definition. :param cube: The cube definition :return: A SQL query string generator function taking dimensions and measures as comma-separated strings. """ def _cube_query_tool(dimensions: str, measures: str, dim_filters: str, meas_filters: str, order_by: str, top: int) -> str: """ Generate a SQL query string for the cube using the specified dimensions and measures. Args: dimensions (str): Comma-separated dimension names (keys in cube['dimensions']). measures (str): Comma-separated measure names (keys in cube['measures']). dim_filters (str): Filter SQL expressions on dimensions. meas_filters (str): Filter SQL expressions on computed measures. order_by (str): Order SQL expressions on selected dimensions and measures. top (int): Filters the top N results. Returns: str: The generated SQL query. """ dim_list_raw = [d.strip() for d in dimensions.split(",") if d.strip()] mes_list_raw = [m.strip() for m in measures.split(",") if m.strip()] # Get dimension expressions from dictionary dim_list = ",\n ".join([ cube["dimensions"][d]["expression"] if d in cube["dimensions"] else d for d in dim_list_raw ]) mes_lines = [] for measure in mes_list_raw: mdef = cube["measures"].get(measure) if mdef is None: raise ValueError(f"Measure '{measure}' not found in cube '{name}'.") expr = mdef["expression"] mes_lines.append(f"{expr} AS {measure}") meas_list = ",\n ".join(mes_lines) top_clause = f"TOP {top}" if top else "" dim_comma = ",\n " if dim_list.strip() else "" where_dim_clause = f"WHERE {dim_filters}" if dim_filters else "" where_meas_clause = f"WHERE {meas_filters}" if meas_filters else "" order_clause = f"ORDER BY {order_by}" if order_by else "" sql = ( f"SELECT {top_clause} * from\n" "(SELECT\n" f" {dim_list}{dim_comma}" f" {meas_list}\n" "FROM (\n" f"sel * from ({cube['sql'].strip()}) a \n" f"{where_dim_clause}" ") AS c\n" f"GROUP BY {', '.join(dim_list_raw)}" ") AS a\n" f"{where_meas_clause}" f"{order_clause}" ";" ) return sql return _cube_query_tool def make_custom_cube_tool(name, cube): # Build allowed values and examples FIRST so we can use them in annotations dimensions_dict = cube.get('dimensions', {}) measures_dict = cube.get('measures', {}) # Build dimension list with descriptions dim_list = [f"{n}: {d.get('description', '')}" for n, d in dimensions_dict.items()] dim_names = list(dimensions_dict.keys()) dimensions_desc = f"Comma-separated dimension names to group by. Allowed: {', '.join(dim_names)}" # Build measure list with descriptions meas_list = [f"{n}: {m.get('description', '')}" for n, m in measures_dict.items()] meas_names = list(measures_dict.keys()) measures_desc = f"Comma-separated measure names to aggregate. Allowed: {', '.join(meas_names)}" # Build filter examples dim_examples = [f"{d} {e}" for d, e in zip(dim_names[:2], ["= 'value'", "in ('X', 'Y', 'Z')"])] if dim_names else [] dim_example = ' AND '.join(dim_examples) if dim_examples else "dimension_name = 'value'" dim_filters_desc = f"Filter expression to apply to dimensions. Valid dimension names: [{', '.join(dim_names)}]. Example: {dim_example}" meas_examples = [f"{m} {e}" for m, e in zip(meas_names[:2], ["> 1000", "= 100"])] if meas_names else [] meas_example = ' AND '.join(meas_examples) if meas_examples else "measure_name > 1000" meas_filters_desc = f"Filter expression to apply to computed measures. Valid measure names: [{', '.join(meas_names)}]. Example: {meas_example}" # Build order example order_examples = [f"{d} {e}" for d, e in zip(dim_names[:2], ["ASC", "DESC"])] if dim_names else [] order_example = ', '.join(order_examples) if order_examples else "dimension_name ASC" order_by_desc = f"Order expression on dimensions and measures. Example: {order_example}" # Now build custom parameters param_defs = cube.get("parameters", {}) parameters = [] required_custom_params = [] for param_name, p in param_defs.items(): param_description = p.get("description", "") type_hint_raw = p.get("type_hint", "str") type_hint = resolve_type_hint(type_hint_raw) # Convert to actual type class annotation = Annotated[type_hint, param_description] if param_description else type_hint default = p.get("default", inspect.Parameter.empty) parameters.append( inspect.Parameter(param_name, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default, annotation=annotation) ) # Track required custom params for validation if default is inspect.Parameter.empty: required_custom_params.append(param_name) # Build the combined signature: fixed cube parameters + custom parameters # Fixed cube parameters with detailed annotated descriptions cube_params = [ inspect.Parameter("dimensions", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Annotated[str, dimensions_desc]), inspect.Parameter("measures", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Annotated[str, measures_desc]), inspect.Parameter("dim_filters", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default="", annotation=Annotated[str, dim_filters_desc]), inspect.Parameter("meas_filters", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default="", annotation=Annotated[str, meas_filters_desc]), inspect.Parameter("order_by", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default="", annotation=Annotated[str, order_by_desc]), inspect.Parameter("top", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, default=None, annotation=Annotated[int, "Limit the number of rows returned (positive integer)"]), ] # Separate required and optional custom parameters all_params = cube_params + parameters required_params = [p for p in all_params if p.default is inspect.Parameter.empty] optional_params = [p for p in all_params if p.default is not inspect.Parameter.empty] # Combine: required first, then optional (Python requirement) sig = inspect.Signature(required_params + optional_params) # Debug: log the signature parameters logger.debug(f"Cube tool '{name}' signature parameters: {list(sig.parameters.keys())}") for param_name, param in sig.parameters.items(): logger.debug(f" {param_name}: annotation={param.annotation}, default={param.default}") # Create executor function that will be run in thread def executor(dimensions, measures, dim_filters="", meas_filters="", order_by="", top=None, **kwargs): # Validate custom parameters missing = [n for n in required_custom_params if n not in kwargs] if missing: raise ValueError(f"Missing required parameters: {missing}") sql_generator = generate_cube_query_tool(name, cube) return execute_db_tool( td.handle_base_readQuery, sql=sql_generator( dimensions=dimensions, measures=measures, dim_filters=dim_filters, meas_filters=meas_filters, order_by=order_by, top=top ), tool_name=name, **kwargs ) # Build detailed dimension and measure lists for docstring dim_lines = [f"\t\t- {item}" for item in dim_list] measure_lines = [f"\t\t- {item}" for item in meas_list] # Build custom parameters documentation custom_param_lines = [] for param_name, p in param_defs.items(): param_desc = p.get('description', '') type_hint_raw = p.get('type_hint', 'str') type_hint = resolve_type_hint(type_hint_raw) param_type = type_hint.__name__ if hasattr(type_hint, '__name__') else str(type_hint_raw) is_required = p.get('default', inspect.Parameter.empty) is inspect.Parameter.empty required_text = " (required)" if is_required else " (optional)" custom_param_lines.append(f" * {param_name} ({param_type}){required_text}: {param_desc}") # Build custom parameters section if there are any custom_params_section = "" if custom_param_lines: custom_params_section = "\n" + chr(10).join(custom_param_lines) + "\n" doc_string = f""" {cube.get('description', '')} This is an OLAP cube tool that presents selected measures at a specified level of aggregation and filtering. Expected inputs: * dimensions (str): {dimensions_desc} {chr(10).join(dim_lines)} * measures (str): {measures_desc} {chr(10).join(measure_lines)} * dim_filters (str): {dim_filters_desc} * meas_filters (str): {meas_filters_desc} * order_by (str): {order_by_desc} * top (int): Limit the number of rows returned (positive integer) {custom_params_section} Returns: Query result as a formatted response. """ tool_func = create_mcp_tool( executor_func=executor, signature=sig, validate_required=False, # Validation happens inside executor for custom params tool_name='get_cube_' + name, tool_description=doc_string, ) return mcp.tool(name=name, description=doc_string)(tool_func) # Register custom objects custom_terms: list[tuple[str, Any, str]] = [] for name, obj in custom_objects.items(): obj_type = obj.get("type") if obj_type == "tool" and any(re.match(pattern, name) for pattern in config.get('tool',[])): fn = make_custom_query_tool(name, obj) globals()[name] = fn logger.info(f"Created tool: {name}") elif obj_type == "prompt" and any(re.match(pattern, name) for pattern in config.get('prompt',[])): fn = make_custom_prompt(name, obj["prompt"], obj.get("description", ""), obj.get("parameters", {})) globals()[name] = fn logger.info(f"Created prompt: {name}") elif obj_type == "cube" and any(re.match(pattern, name) for pattern in config.get('tool',[])): fn = make_custom_cube_tool(name, obj) globals()[name] = fn logger.info(f"Created cube: {name}") elif obj_type == "glossary" and any(re.match(pattern, name) for pattern in config.get('resource',[])): custom_glossary = {k: v for k, v in obj.items() if k != "type"} logger.info(f"Added custom glossary entries for: {name}.") else: logger.info(f"Type {obj_type if obj_type else ''} for custom object {name} is {'unknown' if obj_type else 'undefined'}.") for section in ("measures", "dimensions"): if section in obj and any(re.match(pattern, name) for pattern in config.get('tool',[])): custom_terms.extend((term, details, name) for term, details in obj[section].items()) # Enrich glossary for term, details, tool_name in custom_terms: term_key = term.strip() if term_key not in custom_glossary: custom_glossary[term_key] = {"definition": details.get("description"), "synonyms": [], "tools": [tool_name]} else: if "tools" not in custom_glossary[term_key]: custom_glossary[term_key]["tools"] = [] if tool_name not in custom_glossary[term_key]["tools"]: custom_glossary[term_key]["tools"].append(tool_name) if custom_glossary: @mcp.resource("glossary://all") def get_glossary() -> dict: return custom_glossary @mcp.resource("glossary://definitions") def get_glossary_definitions() -> dict: return {term: details["definition"] for term, details in custom_glossary.items()} @mcp.resource("glossary://term/{term_name}") def get_glossary_term(term_name: str) -> dict: term = custom_glossary.get(term_name) if term: return term else: return {"error": f"Glossary term not found: {term_name}"} # Return the configured app and some handles used by the entrypoint if needed return mcp, logger

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Teradata/teradata-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server