Tecton MCP Server
Official
by tecton-ai
from typing import Any, List, Dict
import httpx
import click
from mcp.server.fastmcp import FastMCP
from utils import _err, _log, run_command
from tecton._internals import metadata_service
from tecton import conf
from tecton.cli.cli import cli
from tecton_core.fco_container import FcoContainer
from tecton_proto.metadataservice.metadata_service__client_pb2 import (
GetAllFeatureServicesRequest,
ListWorkspacesRequest,
QueryFeatureViewsRequest,
GetAllTransformationsRequest,
GetTransformationRequest,
GetAllVirtualDataSourcesRequest,
GetAllEntitiesRequest,
GetFeatureViewRequest
)
# Initialize FastMCP server
mcp = FastMCP("tecton")
def get_command_help(command, prefix="") -> Dict:
"""Recursively get help text for a command and its subcommands"""
result = {
"name": prefix + command.name if prefix else command.name,
"help": command.help,
"options": [],
}
# Get options
ctx = click.Context(command)
for param in command.get_params(ctx):
opt_record = param.get_help_record(ctx)
if opt_record:
result["options"].append({"name": opt_record[0], "help": opt_record[1]})
# Get subcommands if this is a group
if isinstance(command, click.Group):
result["commands"] = []
for cmd_name, cmd in sorted(command.commands.items()):
if not cmd.hidden:
sub_prefix = (
f"{prefix}{command.name} " if prefix else f"{command.name} "
)
result["commands"].append(get_command_help(cmd, sub_prefix))
return result
@mcp.tool()
async def tecton_cli_help() -> dict:
"""
Query the tecton CLI to get a structured representation of all available commands and their options.
Can be used to explore the CLI's functionality, including commands for checking user identity,
managing workspaces, and cluster connectivity.
Returns:
dict: A nested dictionary containing the CLI command structure with the following keys:
- name (str): The command name
- help (str): The command's help text
- options (list): List of dictionaries containing option names and help text
- commands (list): List of nested command dictionaries (for subcommands)
"""
return get_command_help(cli)
@mcp.tool()
async def tecton_cli_execute(command: str = "") -> str:
"""
Execute a tecton cli command
Use the tecton_cli_help tool to figure out which commands you have at your disposal
Args:
command: tecton command to execute, including any flags for that command. Do not prefix with the name of the cli, tecton
Returns:
str: The result of the command. May indicate success or failure
"""
try:
_log(f"Running tecton cli command {command}")
code, out, err = run_command(f"tecton {command}")
if code != 0:
return _err(f"{err}\n\n{out}")
return out
except Exception as e:
return _err(e)
@mcp.tool()
async def list_workspaces() -> List[str]:
"""
List all workspaces in the currently connected Tecton cluster.
Returns:
List[str]: A list of workspace names in the connected Tecton cluster.
Returns an empty list if the operation fails.
"""
try:
request = ListWorkspacesRequest()
response = metadata_service.instance().ListWorkspaces(request)
return [ws.name for ws in response.workspaces]
except Exception as e:
_err(f"Failed to list workspaces: {str(e)}")
return []
@mcp.tool()
async def list_feature_views() -> List[Dict]:
"""
List all feature views in the currently connected tecton cluster
Returns:
List[dict]: A list of dictionaries containing name, description, workspace, and tags for each feature view
"""
try:
feature_views = []
workspaces = await list_workspaces()
for workspace in workspaces:
request = QueryFeatureViewsRequest(workspace=workspace)
response = metadata_service.instance().QueryFeatureViews(request)
for fco in response.fco_container.fcos:
if hasattr(fco, "feature_view") and fco.feature_view.fco_metadata.name:
feature_views.append(
{
"name": fco.feature_view.fco_metadata.name,
"description": fco.feature_view.fco_metadata.description,
"workspace": workspace,
"tags": dict(fco.feature_view.fco_metadata.tags),
}
)
return sorted(feature_views, key=lambda x: x["name"])
except Exception as e:
_err(f"Failed to list feature views: {str(e)}")
return []
@mcp.tool()
async def list_feature_services() -> List[Dict]:
"""
List all feature services in the currently connected tecton cluster
Returns:
List[dict]: A list of dictionaries containing name, description, workspace, and tags for each feature service
"""
try:
feature_services = []
workspaces = await list_workspaces()
for workspace in workspaces:
request = GetAllFeatureServicesRequest(workspace=workspace)
response = metadata_service.instance().GetAllFeatureServices(request)
for feature_service in response.feature_services:
if feature_service.fco_metadata.name:
feature_services.append(
{
"name": feature_service.fco_metadata.name,
"description": feature_service.fco_metadata.description,
"workspace": workspace,
"tags": dict(feature_service.fco_metadata.tags),
}
)
_log("Inspecting FeatureServices")
return sorted(feature_services, key=lambda x: x["name"])
except Exception as e:
_err(f"Failed to list feature services: {str(e)}")
return []
@mcp.tool()
async def list_transformations() -> List[Dict]:
"""
List all transformations in the currently connected tecton cluster
Returns:
List[dict]: A list of dictionaries containing name, description, workspace, and tags for each transformation
"""
try:
transformations = []
workspaces = await list_workspaces()
for workspace in workspaces:
request = GetAllTransformationsRequest(workspace=workspace)
response = metadata_service.instance().GetAllTransformations(request)
for transformation in response.transformations:
if transformation.fco_metadata.name:
transformations.append(
{
"name": transformation.fco_metadata.name,
"description": transformation.fco_metadata.description,
"workspace": workspace,
"tags": dict(transformation.fco_metadata.tags),
}
)
return sorted(transformations, key=lambda x: x["name"])
except Exception as e:
_err(f"Failed to list transformations: {str(e)}")
return []
@mcp.tool()
async def get_feature_service_configuration(name: str, workspace: str) -> str:
"""
Get the entire definition of a feature service from the Tecton cluster.
Args:
name: The name of the feature service to retrieve
workspace: The workspace containing the feature service
Returns:
str: A serialized protobuf for a Tecton Feature Service. Also includes all objects this FS depends on, like their FeatureViews etc.
Raises:
Exception: If the feature service is not found or other errors occur
"""
try:
_log(f"Getting feature service configuration for {name} in workspace {workspace}")
import tecton
fs = tecton.get_feature_service(name=name, workspace=workspace)
fs_spec = str(fs._feature_set_config)
spec_str = f"""
Feature Service Proto:
{fs_spec}
"""
return spec_str
except Exception as e:
_err(f"Failed to get feature view definition: {str(e)}")
return ""
@mcp.tool()
async def get_feature_view_configuration(name: str, workspace: str) -> str:
"""
Get the entire definition of a feature view from the Tecton cluster.
Args:
name: The name of the feature view to retrieve
workspace: The workspace containing the feature view
Returns:
str: A serialized protobuf for a Tecton Feature View, defining metadata, schema, pipeline, and storage. Also includes all objects this FV depends on.
Raises:
Exception: If the feature view is not found or other errors occur
"""
try:
_log(f"Getting feature view configuration for {name} in workspace {workspace}")
import tecton
fv = tecton.get_feature_view(name=name, workspace=workspace)
fv_spec = str(fv._feature_definition)
dependent_specs = '\n'.join(str(k) for k in fv._get_dependent_specs())
spec_str = f"""
Feature View Proto:
{fv_spec}
Proto of all objects this FV depends on (you will see references in the Feature View Proto above to these objects):
{dependent_specs}
"""
return spec_str
except Exception as e:
_err(f"Failed to get feature view definition: {str(e)}")
return ""
@mcp.tool()
async def get_feature_view_code(name: str, workspace: str) -> str:
"""
Get the code definition of a feature view from the Tecton cluster.
This essentially gets the transformation using the feature view name and assumes its the same name as transformation name which works in most cases.
Args:
name: The name of the feature view to retrieve
workspace: The workspace containing the feature view
Returns:
str: The Python code definition of the feature view
Raises:
Exception: If the feature view is not found or other errors occur
"""
try:
request = GetTransformationRequest(
name=name,
workspace=workspace,
run_object_version_check=not conf.get_bool("SKIP_OBJECT_VERSION_CHECK"),
)
response = metadata_service.instance().GetTransformation(request)
fco_container = FcoContainer.from_proto(response.fco_container)
transformation_spec = fco_container.get_single_root()
if transformation_spec is None:
msg = f"Transformation '{name}' not found in workspace '{workspace}'. Try running `list_feature_views()` to view all registered feature views."
raise ValueError(msg)
# Validate that all required fields are present
if not hasattr(transformation_spec, "validation_args"):
raise ValueError("Missing validation_args in transformation_spec")
if not hasattr(transformation_spec.validation_args, "transformation"):
raise ValueError("Missing transformation in validation_args")
if not hasattr(transformation_spec.validation_args.transformation, "args"):
raise ValueError("Missing args in transformation")
if not hasattr(
transformation_spec.validation_args.transformation.args, "user_function"
):
raise ValueError("Missing user_function in args")
if not hasattr(
transformation_spec.validation_args.transformation.args.user_function,
"body",
):
raise ValueError("Missing body in user_function")
return (
transformation_spec.validation_args.transformation.args.user_function.body
)
except Exception as e:
_err(f"Failed to get feature view code: {str(e)}")
return ""
@mcp.tool()
async def list_data_sources() -> List[Dict]:
"""
List all data sources in the currently connected tecton cluster
Returns:
List[dict]: A list of dictionaries containing name, description, workspace, and tags for each data source
"""
try:
data_sources = []
workspaces = await list_workspaces()
for workspace in workspaces:
request = GetAllVirtualDataSourcesRequest(workspace=workspace)
response = metadata_service.instance().GetAllVirtualDataSources(request)
for data_source in response.virtual_data_sources:
if data_source.fco_metadata.name:
data_sources.append(
{
"name": data_source.fco_metadata.name,
"description": data_source.fco_metadata.description,
"workspace": workspace,
"tags": dict(data_source.fco_metadata.tags),
}
)
return sorted(data_sources, key=lambda x: x["name"])
except Exception as e:
_err(f"Failed to list data sources: {str(e)}")
return []
@mcp.tool()
async def list_entities() -> List[Dict]:
"""
List all entities in the currently connected tecton cluster
Returns:
List[dict]: A list of dictionaries containing name, description, workspace, and tags for each entity
"""
try:
entities = []
workspaces = await list_workspaces()
for workspace in workspaces:
request = GetAllEntitiesRequest(workspace=workspace)
response = metadata_service.instance().GetAllEntities(request)
for entity in response.entities:
if entity.fco_metadata.name:
entities.append(
{
"name": entity.fco_metadata.name,
"description": entity.fco_metadata.description,
"workspace": workspace,
"tags": dict(entity.fco_metadata.tags),
}
)
return sorted(entities, key=lambda x: x["name"])
except Exception as e:
_err(f"Failed to list entities: {str(e)}")
return []
if __name__ == "__main__":
# Initialize and run the server
print("Run the main server")
mcp.run(transport='stdio')