Skip to main content
Glama

MCP Search Server

by Nghiauet
app_server.py16.2 kB
""" MCPAgentServer - Exposes MCPApp as MCP server, and mcp-agent workflows and agents as MCP tools. """ import json from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any, Dict, List, Type, TYPE_CHECKING from mcp.server.fastmcp import Context as MCPContext, FastMCP from mcp.server.fastmcp.exceptions import ToolError from mcp.server.fastmcp.tools import Tool as FastTool from mcp_agent.app import MCPApp from mcp_agent.agents.agent import Agent from mcp_agent.core.context_dependent import ContextDependent from mcp_agent.executor.workflow import Workflow from mcp_agent.executor.workflow_registry import ( WorkflowRegistry, InMemoryWorkflowRegistry, ) from mcp_agent.logging.logger import get_logger from mcp_agent.mcp.mcp_server_registry import ServerRegistry if TYPE_CHECKING: from mcp_agent.core.context import Context logger = get_logger(__name__) class ServerContext(ContextDependent): """Context object for the MCP App server.""" def __init__(self, mcp: FastMCP, context: "Context", **kwargs): super().__init__(context=context, **kwargs) self.mcp = mcp self.active_agents: Dict[str, Agent] = {} # Initialize workflow registry if not already present if not self.context.workflow_registry: if self.context.config.execution_engine == "asyncio": self.context.workflow_registry = InMemoryWorkflowRegistry() elif self.context.config.execution_engine == "temporal": from mcp_agent.executor.temporal.workflow_registry import ( TemporalWorkflowRegistry, ) self.context.workflow_registry = TemporalWorkflowRegistry( executor=self.context.executor ) else: raise ValueError( f"Unsupported execution engine: {self.context.config.execution_engine}" ) # TODO: saqadri (MAC) - Do we need to notify the client that tools list changed? # Since this is at initialization time, we may not need to # (depends on when the server reports that it's intialized/ready) def register_workflow(self, workflow_name: str, workflow_cls: Type[Workflow]): """Register a workflow class.""" if workflow_name not in self.context.workflows: self.workflows[workflow_name] = workflow_cls # Create tools for this workflow create_workflow_specific_tools(self.mcp, workflow_name, workflow_cls) @property def app(self) -> MCPApp: """Get the MCPApp instance associated with this server context.""" return self.context.app @property def workflows(self) -> Dict[str, Type[Workflow]]: """Get the workflows registered in this server context.""" return self.app.workflows @property def workflow_registry(self) -> WorkflowRegistry: """Get the workflow registry for this server context.""" return self.context.workflow_registry def create_mcp_server_for_app(app: MCPApp) -> FastMCP: """ Create an MCP server for a given MCPApp instance. Args: app: The MCPApp instance to create a server for Returns: A configured FastMCP server instance """ # Create a lifespan function specific to this app @asynccontextmanager async def app_specific_lifespan(mcp: FastMCP) -> AsyncIterator[ServerContext]: """Initialize and manage MCPApp lifecycle.""" # Initialize the app if it's not already initialized await app.initialize() # Create the server context which is available during the lifespan of the server server_context = ServerContext(mcp=mcp, context=app.context) # Register initial workflow tools create_workflow_tools(mcp, server_context) try: yield server_context finally: # Don't clean up the MCPApp here - let the caller handle that pass # Create FastMCP server with the app's name mcp = FastMCP( name=app.name or "mcp_agent_server", # TODO: saqadri (MAC) - create a much more detailed description # based on all the available agents and workflows, # or use the MCPApp's description if available. instructions=f"MCP server exposing {app.name} workflows and agents. Description: {app.description}", lifespan=app_specific_lifespan, ) # region Workflow Tools @mcp.tool(name="workflows-list") def list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]: """ List all available workflow types with their detailed information. Returns information about each workflow type including name, description, and parameters. This helps in making an informed decision about which workflow to run. """ server_context: ServerContext = ctx.request_context.lifespan_context result = {} for workflow_name, workflow_cls in server_context.workflows.items(): # Get workflow documentation run_fn_tool = FastTool.from_function(workflow_cls.run) # Define common endpoints for all workflows endpoints = [ f"workflows-{workflow_name}-run", f"workflows-{workflow_name}-get_status", ] result[workflow_name] = { "name": workflow_name, "description": workflow_cls.__doc__ or run_fn_tool.description, "capabilities": ["run", "resume", "cancel", "get_status"], "tool_endpoints": endpoints, "run_parameters": run_fn_tool.parameters, } return result @mcp.tool(name="workflows-runs-list") async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]: """ List all workflow instances (runs) with their detailed status information. This returns information about actual workflow instances (runs), not workflow types. For each running workflow, returns its ID, name, current state, and available operations. This helps in identifying and managing active workflow instances. Returns: A dictionary mapping workflow instance IDs to their detailed status information. """ server_context: ServerContext = ctx.request_context.lifespan_context # Get all workflow statuses from the registry workflow_statuses = ( await server_context.workflow_registry.list_workflow_statuses() ) return workflow_statuses @mcp.tool(name="workflows-run") async def run_workflow( ctx: MCPContext, workflow_name: str, run_parameters: Dict[str, Any] | None = None, ) -> str: """ Run a workflow with the given name. Args: workflow_name: The name of the workflow to run. run_parameters: Arguments to pass to the workflow run. workflows/list method will return the run_parameters schema for each workflow. Returns: The run ID of the started workflow run, which can be passed to workflows/get_status, workflows/resume, and workflows/cancel. """ return await _workflow_run(ctx, workflow_name, run_parameters) @mcp.tool(name="workflows-get_status") async def get_workflow_status( ctx: MCPContext, workflow_name: str, run_id: str ) -> Dict[str, Any]: """ Get the status of a running workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered. Args: workflow_name: The name of the workflow to check. run_id: The ID of the workflow instance to check, received from workflows/run or workflows/runs/list. Returns: A dictionary with comprehensive information about the workflow status. """ return await _workflow_status(ctx, run_id, workflow_name) @mcp.tool(name="workflows-resume") async def resume_workflow( ctx: MCPContext, run_id: str, workflow_name: str | None = None, signal_name: str | None = "resume", payload: str | None = None, ) -> bool: """ Resume a paused workflow. Args: run_id: The ID of the workflow to resume, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to resume. signal_name: Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal. payload: Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input. Returns: True if the workflow was resumed, False otherwise. """ server_context: ServerContext = ctx.request_context.lifespan_context workflow_registry = server_context.workflow_registry if not workflow_registry: raise ToolError("Workflow registry not found for MCPApp Server.") logger.info( f"Resuming workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) # Get the workflow instance from the registry result = await workflow_registry.resume_workflow( run_id=run_id, workflow_id=workflow_name, signal_name=signal_name, payload=payload, ) if result: logger.debug( f"Signaled workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) else: logger.error( f"Failed to signal workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) @mcp.tool(name="workflows-cancel") async def cancel_workflow( ctx: MCPContext, run_id: str, workflow_name: str | None = None ) -> bool: """ Cancel a running workflow. Args: run_id: The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to cancel. Returns: True if the workflow was cancelled, False otherwise. """ server_context: ServerContext = ctx.request_context.lifespan_context workflow_registry = server_context.workflow_registry logger.info(f"Cancelling workflow {workflow_name} with ID {run_id}") # Get the workflow instance from the registry result = await workflow_registry.cancel_workflow( run_id=run_id, workflow_id=workflow_name ) if result: logger.debug(f"Cancelled workflow {workflow_name} with ID {run_id}") else: logger.error(f"Failed to cancel workflow {workflow_name} with ID {run_id}") # endregion return mcp # region per-Workflow Tools def create_workflow_tools(mcp: FastMCP, server_context: ServerContext): """ Create workflow-specific tools for registered workflows. This is called at server start to register specific endpoints for each workflow. """ if not server_context: logger.warning("Server config not available for creating workflow tools") return for workflow_name, workflow_cls in server_context.workflows.items(): create_workflow_specific_tools(mcp, workflow_name, workflow_cls) def create_workflow_specific_tools( mcp: FastMCP, workflow_name: str, workflow_cls: Type["Workflow"] ): """Create specific tools for a given workflow.""" run_fn_tool = FastTool.from_function(workflow_cls.run) run_fn_tool_params = json.dumps(run_fn_tool.parameters, indent=2) @mcp.tool( name=f"workflows-{workflow_name}-run", description=f""" Run the '{workflow_name}' workflow and get a run ID back. Workflow Description: {workflow_cls.__doc__} {run_fn_tool.description} Args: run_parameters: Dictionary of parameters for the workflow run. The schema for these parameters is as follows: {run_fn_tool_params} """, ) async def run( ctx: MCPContext, run_parameters: Dict[str, Any] | None = None, ) -> Dict[str, Any]: return await _workflow_run(ctx, workflow_name, run_parameters) @mcp.tool( name=f"workflows-{workflow_name}-get_status", description=f""" Get the status of a running {workflow_name} workflow. Args: run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run. """, ) async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: return await _workflow_status(ctx, run_id=run_id, workflow_id=workflow_name) # endregion def _get_server_descriptions( server_registry: ServerRegistry | None, server_names: List[str] ) -> List: servers: List[dict[str, str]] = [] if server_registry: for server_name in server_names: config = server_registry.get_server_context(server_name) if config: servers.append( { "name": config.name, "description": config.description, } ) else: servers.append({"name": server_name}) else: servers = [{"name": server_name} for server_name in server_names] return servers def _get_server_descriptions_as_string( server_registry: ServerRegistry | None, server_names: List[str] ) -> str: servers = _get_server_descriptions(server_registry, server_names) # Format each server's information as a string server_strings = [] for server in servers: if "description" in server: server_strings.append(f"{server['name']}: {server['description']}") else: server_strings.append(f"{server['name']}") # Join all server strings with a newline return "\n".join(server_strings) # region Workflow Utils async def _workflow_run( ctx: MCPContext, workflow_id: str, run_parameters: Dict[str, Any] | None = None, ) -> str: server_context: ServerContext = ctx.request_context.lifespan_context if workflow_id not in server_context.workflows: raise ToolError(f"Workflow '{workflow_id}' not found.") # Get the workflow class workflow_cls = server_context.workflows[workflow_id] # Create and initialize the workflow instance using the factory method try: # Create workflow instance workflow = await workflow_cls.create( name=workflow_id, context=server_context.context ) run_parameters = run_parameters or {} # Run the workflow asynchronously and get its ID run_id = await workflow.run_async(**run_parameters) logger.info( f"Workflow {workflow_id} started with run ID {run_id}. Parameters: {run_parameters}" ) return run_id except Exception as e: logger.error(f"Error creating workflow {workflow_id}: {str(e)}") raise ToolError(f"Error creating workflow {workflow_id}: {str(e)}") from e async def _workflow_status( ctx: MCPContext, run_id: str, workflow_id: str | None = None ) -> Dict[str, Any]: server_context: ServerContext = ctx.request_context.lifespan_context workflow_registry: WorkflowRegistry = server_context.workflow_registry if not workflow_registry: raise ToolError("Workflow registry not found for MCPApp Server.") status = await workflow_registry.get_workflow_status( run_id=run_id, workflow_id=workflow_id ) return status # endregion

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nghiauet/mcp-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server