Skip to main content
Glama

MCP Server for OpenMetadata

by yangkyeongmo
pipelines.py6.48 kB
"""Pipeline entity management for OpenMetadata. This module provides comprehensive pipeline management operations including CRUD operations, field filtering, pagination support, and task relationship management. Pipelines enable the flow of data from source to destination through processing steps. """ from typing import Any, Callable, Dict, List, Optional, Union import mcp.types as types from src.openmetadata.openmetadata_client import get_client def get_all_functions() -> List[tuple[Callable, str, str]]: """Return list of (function, name, description) tuples for registration. Returns: List of tuples containing function reference, tool name, and description """ return [ (list_pipelines, "list_pipelines", "List pipelines from OpenMetadata with pagination and filtering"), (get_pipeline, "get_pipeline", "Get details of a specific pipeline by ID"), (get_pipeline_by_name, "get_pipeline_by_name", "Get details of a specific pipeline by fully qualified name"), (create_pipeline, "create_pipeline", "Create a new pipeline in OpenMetadata"), (update_pipeline, "update_pipeline", "Update an existing pipeline in OpenMetadata"), (delete_pipeline, "delete_pipeline", "Delete a pipeline from OpenMetadata"), ] async def list_pipelines( limit: int = 10, offset: int = 0, fields: Optional[str] = None, service: Optional[str] = None, include_deleted: bool = False, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """List pipelines with pagination. Args: limit: Maximum number of pipelines to return (1 to 1000000) offset: Number of pipelines to skip fields: Comma-separated list of fields to include service: Filter pipelines by service name include_deleted: Whether to include deleted pipelines Returns: List of MCP content types containing pipeline list and metadata """ client = get_client() params = {"limit": min(max(1, limit), 1000000), "offset": max(0, offset)} if fields: params["fields"] = fields if service: params["service"] = service if include_deleted: params["include"] = "all" result = client.get("pipelines", params=params) # Add UI URL for web interface integration if "data" in result: for pipeline in result["data"]: pipeline_fqn = pipeline.get("fullyQualifiedName", "") if pipeline_fqn: pipeline["ui_url"] = f"{client.host}/pipeline/{pipeline_fqn}" return [types.TextContent(type="text", text=str(result))] async def get_pipeline( pipeline_id: str, fields: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get details of a specific pipeline by ID. Args: pipeline_id: ID of the pipeline fields: Comma-separated list of fields to include Returns: List of MCP content types containing pipeline details """ client = get_client() params = {} if fields: params["fields"] = fields result = client.get(f"pipelines/{pipeline_id}", params=params) # Add UI URL for web interface integration pipeline_fqn = result.get("fullyQualifiedName", "") if pipeline_fqn: result["ui_url"] = f"{client.host}/pipeline/{pipeline_fqn}" return [types.TextContent(type="text", text=str(result))] async def get_pipeline_by_name( fqn: str, fields: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Get details of a specific pipeline by fully qualified name. Args: fqn: Fully qualified name of the pipeline fields: Comma-separated list of fields to include Returns: List of MCP content types containing pipeline details """ client = get_client() params = {} if fields: params["fields"] = fields result = client.get(f"pipelines/name/{fqn}", params=params) # Add UI URL for web interface integration pipeline_fqn = result.get("fullyQualifiedName", "") if pipeline_fqn: result["ui_url"] = f"{client.host}/pipeline/{pipeline_fqn}" return [types.TextContent(type="text", text=str(result))] async def create_pipeline( pipeline_data: Dict[str, Any], ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Create a new pipeline. Args: pipeline_data: Pipeline data including name, description, tasks, etc. Returns: List of MCP content types containing created pipeline details """ client = get_client() result = client.post("pipelines", json_data=pipeline_data) # Add UI URL for web interface integration pipeline_fqn = result.get("fullyQualifiedName", "") if pipeline_fqn: result["ui_url"] = f"{client.host}/pipeline/{pipeline_fqn}" return [types.TextContent(type="text", text=str(result))] async def update_pipeline( pipeline_id: str, pipeline_data: Dict[str, Any], ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Update an existing pipeline. Args: pipeline_id: ID of the pipeline to update pipeline_data: Updated pipeline data Returns: List of MCP content types containing updated pipeline details """ client = get_client() result = client.put(f"pipelines/{pipeline_id}", json_data=pipeline_data) # Add UI URL for web interface integration pipeline_fqn = result.get("fullyQualifiedName", "") if pipeline_fqn: result["ui_url"] = f"{client.host}/pipeline/{pipeline_fqn}" return [types.TextContent(type="text", text=str(result))] async def delete_pipeline( pipeline_id: str, hard_delete: bool = False, recursive: bool = False, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """Delete a pipeline. Args: pipeline_id: ID of the pipeline to delete hard_delete: Whether to perform a hard delete recursive: Whether to recursively delete children Returns: List of MCP content types confirming deletion """ client = get_client() params = {"hardDelete": hard_delete, "recursive": recursive} client.delete(f"pipelines/{pipeline_id}", params=params) return [types.TextContent(type="text", text=f"Pipeline {pipeline_id} deleted successfully")]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-openmetadata'

If you have feedback or need assistance with the MCP directory API, please join our Discord server