LLMS.txt•317 kB
# Project Structure and Function Definitions
This file contains the project directory structure and function definitions to help LLMs understand the codebase.
## Directory Structure
```
mcp-agent/
├── .vscode/
│ ├── extensions.json
│ ├── settings.json
├── schema/
│ ├── mcp-agent.config.schema.json
├── src/
│ ├── mcp_agent/
│ │ ├── agents/
│ │ │ ├── __init__.py
│ │ │ ├── agent.py
│ │ ├── cli/
│ │ │ ├── commands/
│ │ │ │ ├── config.py
│ │ │ ├── __init__.py
│ │ │ ├── __main__.py
│ │ │ ├── main.py
│ │ │ ├── terminal.py
│ │ ├── core/
│ │ │ ├── context.py
│ │ │ ├── context_dependent.py
│ │ │ ├── decorator_app.py
│ │ │ ├── exceptions.py
│ │ ├── data/
│ │ │ ├── artificial_analysis_llm_benchmarks.json
│ │ ├── eval/
│ │ │ ├── __init__.py
│ │ ├── executor/
│ │ │ ├── temporal/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── workflow_registry.py
│ │ │ │ ├── workflow_signal.py
│ │ │ ├── __init__.py
│ │ │ ├── decorator_registry.py
│ │ │ ├── executor.py
│ │ │ ├── signal_registry.py
│ │ │ ├── task_registry.py
│ │ │ ├── workflow.py
│ │ │ ├── workflow_registry.py
│ │ │ ├── workflow_signal.py
│ │ │ ├── workflow_task.py
│ │ ├── human_input/
│ │ │ ├── __init__.py
│ │ │ ├── handler.py
│ │ │ ├── types.py
│ │ ├── logging/
│ │ │ ├── __init__.py
│ │ │ ├── event_progress.py
│ │ │ ├── events.py
│ │ │ ├── json_serializer.py
│ │ │ ├── listeners.py
│ │ │ ├── logger.py
│ │ │ ├── progress_display.py
│ │ │ ├── rich_progress.py
│ │ │ ├── tracing.py
│ │ │ ├── transport.py
│ │ ├── mcp/
│ │ │ ├── __init__.py
│ │ │ ├── gen_client.py
│ │ │ ├── mcp_agent_client_session.py
│ │ │ ├── mcp_aggregator.py
│ │ │ ├── mcp_connection_manager.py
│ │ │ ├── mcp_server_registry.py
│ │ ├── server/
│ │ │ ├── app_server.py
│ │ │ ├── app_server_types.py
│ │ ├── telemetry/
│ │ │ ├── __init__.py
│ │ │ ├── usage_tracking.py
│ │ ├── utils/
│ │ │ ├── common.py
│ │ │ ├── pydantic_type_serializer.py
│ │ ├── workflows/
│ │ │ ├── embedding/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── embedding_base.py
│ │ │ │ ├── embedding_cohere.py
│ │ │ │ ├── embedding_openai.py
│ │ │ ├── evaluator_optimizer/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── evaluator_optimizer.py
│ │ │ ├── intent_classifier/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── intent_classifier_base.py
│ │ │ │ ├── intent_classifier_embedding.py
│ │ │ │ ├── intent_classifier_embedding_cohere.py
│ │ │ │ ├── intent_classifier_embedding_openai.py
│ │ │ │ ├── intent_classifier_llm.py
│ │ │ │ ├── intent_classifier_llm_anthropic.py
│ │ │ │ ├── intent_classifier_llm_openai.py
│ │ │ ├── llm/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── augmented_llm.py
│ │ │ │ ├── augmented_llm_anthropic.py
│ │ │ │ ├── augmented_llm_azure.py
│ │ │ │ ├── augmented_llm_bedrock.py
│ │ │ │ ├── augmented_llm_google.py
│ │ │ │ ├── augmented_llm_ollama.py
│ │ │ │ ├── augmented_llm_openai.py
│ │ │ │ ├── llm_selector.py
│ │ │ ├── orchestrator/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── orchestrator.py
│ │ │ │ ├── orchestrator_models.py
│ │ │ │ ├── orchestrator_prompts.py
│ │ │ ├── parallel/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── fan_in.py
│ │ │ │ ├── fan_out.py
│ │ │ │ ├── parallel_llm.py
│ │ │ ├── router/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── router_base.py
│ │ │ │ ├── router_embedding.py
│ │ │ │ ├── router_embedding_cohere.py
│ │ │ │ ├── router_embedding_openai.py
│ │ │ │ ├── router_llm.py
│ │ │ │ ├── router_llm_anthropic.py
│ │ │ │ ├── router_llm_openai.py
│ │ │ ├── swarm/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── swarm.py
│ │ │ │ ├── swarm_anthropic.py
│ │ │ │ ├── swarm_openai.py
│ │ │ ├── __init__.py
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── config.py
│ │ ├── console.py
│ │ ├── py.typed
├── LLMS.md
├── logs.txt
├── test_output.txt
```
## Project README
<p align="center">
<img src="https://github.com/user-attachments/assets/6f4e40c4-dc88-47b6-b965-5856b69416d2" alt="Logo" width="300" />
</p>
<p align="center">
<em>Build effective agents with Model Context Protocol using simple, composable patterns.</em>
<p align="center">
<a href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples" target="_blank"><strong>Examples</strong></a>
|
<a href="https://www.anthropic.com/research/building-effective-agents" target="_blank"><strong>Building Effective Agents</strong></a>
|
<a href="https://modelcontextprotocol.io/introduction" target="_blank"><strong>MCP</strong></a>
</p>
<p align="center">
<a href="https://pypi.org/project/mcp-agent/"><img src="https://img.shields.io/pypi/v/mcp-agent?color=%2334D058&label=pypi" /></a>
<a href="https://github.com/lastmile-ai/mcp-agent/issues"><img src="https://img.shields.io/github/issues-raw/lastmile-ai/mcp-agent" /></a>
<a href="https://lmai.link/discord/mcp-agent"><img src="https://shields.io/discord/1089284610329952357" alt="discord" /></a>
<img alt="Pepy Total Downloads" src="https://img.shields.io/pepy/dt/mcp-agent?label=pypi%20%7C%20downloads"/>
<a href="https://github.com/lastmile-ai/mcp-agent/blob/main/LICENSE"><img src="https://img.shields.io/pypi/l/mcp-agent" /></a>
</p>
## Overview
**`mcp-agent`** is a simple, composable framework to build agents using [Model Context Protocol](https://modelcontextprotocol.io/introduction).
**Inspiration**: Anthropic announced 2 foundational updates for AI application developers:
1. [Model Context Protocol](https://www.anthropic.com/news/model-context-protocol) - a standardized interface to let any software be accessible to AI assistants via MCP servers.
2. [Building Effective Agents](https://www.anthropic.com/research/building-effective-agents) - a seminal writeup on simple, composable patterns for building production-ready AI agents.
`mcp-agent` puts these two foundational pieces into an AI application framework:
1. It handles the pesky business of managing the lifecycle of MCP server connections so you don't have to.
2. It implements every pattern described in Building Effective Agents, and does so in a _composable_ way, allowing you to chain these patterns together.
3. **Bonus**: It implements [OpenAI's Swarm](https://github.com/openai/swarm) pattern for multi-agent orchestration, but in a model-agnostic way.
Altogether, this is the simplest and easiest way to build robust agent applications. Much like MCP, this project is in early development.
We welcome all kinds of [contributions](/CONTRIBUTING.md), feedback and your help in growing this to become a new standard.
## Get Started
We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects:
```bash
uv add "mcp-agent"
```
Alternatively:
```bash
pip install mcp-agent
```
### Quickstart
> [!TIP]
> The [`examples`](/examples) directory has several example applications to get started with.
> To run an example, clone this repo, then:
>
> ```bash
> cd examples/basic/mcp_basic_agent # Or any other example
> cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # Update API keys
> uv run main.py
> ```
Here is a basic "finder" agent that uses the fetch and filesystem servers to look up a file, read a blog and write a tweet. [Example link](./examples/basic/mcp_basic_agent/):
<details open>
<summary>finder_agent.py</summary>
```python
import asyncio
import os
from mcp_agent.app import MCPApp
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
app = MCPApp(name="hello_world_agent")
async def example_usage():
async with app.run() as mcp_agent_app:
logger = mcp_agent_app.logger
# This agent can read the filesystem or fetch URLs
finder_agent = Agent(
name="finder",
instruction="""You can read local files or fetch URLs.
Return the requested information when asked.""",
server_names=["fetch", "filesystem"], # MCP servers this Agent can use
)
async with finder_agent:
# Automatically initializes the MCP servers and adds their tools for LLM use
tools = await finder_agent.list_tools()
logger.info(f"Tools available:", data=tools)
# Attach an OpenAI LLM to the agent (defaults to GPT-4o)
llm = await finder_agent.attach_llm(OpenAIAugmentedLLM)
# This will perform a file lookup and read using the filesystem server
result = await llm.generate_str(
message="Show me what's in README.md verbatim"
)
logger.info(f"README.md contents: {result}")
# Uses the fetch server to fetch the content from URL
result = await llm.generate_str(
message="Print the first two paragraphs from https://www.anthropic.com/research/building-effective-agents"
)
logger.info(f"Blog intro: {result}")
# Multi-turn interactions by default
result = await llm.generate_str("Summarize that in a 128-char tweet")
logger.info(f"Tweet: {result}")
if __name__ == "__main__":
asyncio.run(example_usage())
```
</details>
<details>
<summary>mcp_agent.config.yaml</summary>
```yaml
execution_engine: asyncio
logger:
transports: [console] # You can use [file, console] for both
level: debug
path: "logs/mcp-agent.jsonl" # Used for file transport
# For dynamic log filenames:
# path_settings:
# path_pattern: "logs/mcp-agent-{unique_id}.jsonl"
# unique_id: "timestamp" # Or "session_id"
# timestamp_format: "%Y%m%d_%H%M%S"
mcp:
servers:
fetch:
command: "uvx"
args: ["mcp-server-fetch"]
filesystem:
command: "npx"
args:
[
"-y",
"@modelcontextprotocol/server-filesystem",
"<add_your_directories>",
]
openai:
# Secrets (API keys, etc.) are stored in an mcp_agent.secrets.yaml file which can be gitignored
default_model: gpt-4o
```
</details>
<details>
<summary>Agent output</summary>
<img width="2398" alt="Image" src="https://github.com/user-attachments/assets/eaa60fdf-bcc6-460b-926e-6fa8534e9089" />
</details>
## Table of Contents
- [Why use mcp-agent?](#why-use-mcp-agent)
- [Example Applications](#examples)
- [Claude Desktop](#claude-desktop)
- [Streamlit](#streamlit)
- [Gmail Agent](#gmail-agent)
- [RAG](#simple-rag-chatbot)
- [Marimo](#marimo)
- [Python](#python)
- [Swarm (CLI)](#swarm)
- [Core Concepts](#core-components)
- [Workflows Patterns](#workflows)
- [Augmented LLM](#augmentedllm)
- [Parallel](#parallel)
- [Router](#router)
- [Intent-Classifier](#intentclassifier)
- [Orchestrator-Workers](#orchestrator-workers)
- [Evaluator-Optimizer](#evaluator-optimizer)
- [OpenAI Swarm](#swarm-1)
- [Advanced](#advanced)
- [Composing multiple workflows](#composability)
- [Signaling and Human input](#signaling-and-human-input)
- [App Config](#app-config)
- [MCP Server Management](#mcp-server-management)
- [Contributing](#contributing)
- [Roadmap](#roadmap)
- [FAQs](#faqs)
## Why use `mcp-agent`?
There are too many AI frameworks out there already. But `mcp-agent` is the only one that is purpose-built for a shared protocol - [MCP](https://modelcontextprotocol.io/introduction). It is also the most lightweight, and is closer to an agent pattern library than a framework.
As [more services become MCP-aware](https://github.com/punkpeye/awesome-mcp-servers), you can use mcp-agent to build robust and controllable AI agents that can leverage those services out-of-the-box.
## Examples
Before we go into the core concepts of mcp-agent, let's show what you can build with it.
In short, you can build any kind of AI application with mcp-agent: multi-agent collaborative workflows, human-in-the-loop workflows, RAG pipelines and more.
### Claude Desktop
You can integrate mcp-agent apps into MCP clients like Claude Desktop.
#### mcp-agent server
This app wraps an mcp-agent application inside an MCP server, and exposes that server to Claude Desktop.
The app exposes agents and workflows that Claude Desktop can invoke to service of the user's request.
https://github.com/user-attachments/assets/7807cffd-dba7-4f0c-9c70-9482fd7e0699
This demo shows a multi-agent evaluation task where each agent evaluates aspects of an input poem, and
then an aggregator summarizes their findings into a final response.
**Details**: Starting from a user's request over text, the application:
- dynamically defines agents to do the job
- uses the appropriate workflow to orchestrate those agents (in this case the Parallel workflow)
**Link to code**: [examples/basic/mcp_agent_server](./examples/basic/mcp_agent_server)
> [!NOTE]
> Huge thanks to [Jerron Lim (@StreetLamb)](https://github.com/StreetLamb)
> for developing and contributing this example!
### Streamlit
You can deploy mcp-agent apps using Streamlit.
#### Gmail agent
This app is able to perform read and write actions on gmail using text prompts -- i.e. read, delete, send emails, mark as read/unread, etc.
It uses an MCP server for Gmail.
https://github.com/user-attachments/assets/54899cac-de24-4102-bd7e-4b2022c956e3
**Link to code**: [gmail-mcp-server](https://github.com/jasonsum/gmail-mcp-server/blob/add-mcp-agent-streamlit/streamlit_app.py)
> [!NOTE]
> Huge thanks to [Jason Summer (@jasonsum)](https://github.com/jasonsum)
> for developing and contributing this example!
#### Simple RAG Chatbot
This app uses a Qdrant vector database (via an MCP server) to do Q&A over a corpus of text.
https://github.com/user-attachments/assets/f4dcd227-cae9-4a59-aa9e-0eceeb4acaf4
**Link to code**: [examples/usecases/streamlit_mcp_rag_agent](./examples/usecases/streamlit_mcp_rag_agent/)
> [!NOTE]
> Huge thanks to [Jerron Lim (@StreetLamb)](https://github.com/StreetLamb)
> for developing and contributing this example!
### Marimo
[Marimo](https://github.com/marimo-team/marimo) is a reactive Python notebook that replaces Jupyter and Streamlit.
Here's the "file finder" agent from [Quickstart](#quickstart) implemented in Marimo:
<img src="https://github.com/user-attachments/assets/139a95a5-e3ac-4ea7-9c8f-bad6577e8597" width="400"/>
**Link to code**: [examples/usecases/marimo_mcp_basic_agent](./examples/usecases/marimo_mcp_basic_agent/)
> [!NOTE]
> Huge thanks to [Akshay Agrawal (@akshayka)](https://github.com/akshayka)
> for developing and contributing this example!
### Python
You can write mcp-agent apps as Python scripts or Jupyter notebooks.
#### Swarm
This example demonstrates a multi-agent setup for handling different customer service requests in an airline context using the Swarm workflow pattern. The agents can triage requests, handle flight modifications, cancellations, and lost baggage cases.
https://github.com/user-attachments/assets/b314d75d-7945-4de6-965b-7f21eb14a8bd
**Link to code**: [examples/workflows/workflow_swarm](./examples/workflows/workflow_swarm/)
## Core Components
The following are the building blocks of the mcp-agent framework:
- **[MCPApp](./src/mcp_agent/app.py)**: global state and app configuration
- **MCP server management**: [`gen_client`](./src/mcp_agent/mcp/gen_client.py) and [`MCPConnectionManager`](./src/mcp_agent/mcp/mcp_connection_manager.py) to easily connect to MCP servers.
- **[Agent](./src/mcp_agent/agents/agent.py)**: An Agent is an entity that has access to a set of MCP servers and exposes them to an LLM as tool calls. It has a name and purpose (instruction).
- **[AugmentedLLM](./src/mcp_agent/workflows/llm/augmented_llm.py)**: An LLM that is enhanced with tools provided from a collection of MCP servers. Every Workflow pattern described below is an `AugmentedLLM` itself, allowing you to compose and chain them together.
Everything in the framework is a derivative of these core capabilities.
## Workflows
mcp-agent provides implementations for every pattern in Anthropic’s [Building Effective Agents](https://www.anthropic.com/research/building-effective-agents), as well as the OpenAI [Swarm](https://github.com/openai/swarm) pattern.
Each pattern is model-agnostic, and exposed as an `AugmentedLLM`, making everything very composable.
### AugmentedLLM
[AugmentedLLM](./src/mcp_agent/workflows/llm/augmented_llm.py) is an LLM that has access to MCP servers and functions via Agents.
LLM providers implement the AugmentedLLM interface to expose 3 functions:
- `generate`: Generate message(s) given a prompt, possibly over multiple iterations and making tool calls as needed.
- `generate_str`: Calls `generate` and returns result as a string output.
- `generate_structured`: Uses [Instructor](https://github.com/instructor-ai/instructor) to return the generated result as a Pydantic model.
Additionally, `AugmentedLLM` has memory, to keep track of long or short-term history.
<details>
<summary>Example</summary>
```python
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_anthropic import AnthropicAugmentedLLM
finder_agent = Agent(
name="finder",
instruction="You are an agent with filesystem + fetch access. Return the requested file or URL contents.",
server_names=["fetch", "filesystem"],
)
async with finder_agent:
llm = await finder_agent.attach_llm(AnthropicAugmentedLLM)
result = await llm.generate_str(
message="Print the first 2 paragraphs of https://www.anthropic.com/research/building-effective-agents",
# Can override model, tokens and other defaults
)
logger.info(f"Result: {result}")
# Multi-turn conversation
result = await llm.generate_str(
message="Summarize those paragraphs in a 128 character tweet",
)
logger.info(f"Result: {result}")
```
</details>
### [Parallel](src/mcp_agent/workflows/parallel/parallel_llm.py)

Fan-out tasks to multiple sub-agents and fan-in the results. Each subtask is an AugmentedLLM, as is the overall Parallel workflow, meaning each subtask can optionally be a more complex workflow itself.
> [!NOTE]
>
> **[Link to full example](examples/workflows/workflow_parallel/main.py)**
<details>
<summary>Example</summary>
```python
proofreader = Agent(name="proofreader", instruction="Review grammar...")
fact_checker = Agent(name="fact_checker", instruction="Check factual consistency...")
style_enforcer = Agent(name="style_enforcer", instruction="Enforce style guidelines...")
grader = Agent(name="grader", instruction="Combine feedback into a structured report.")
parallel = ParallelLLM(
fan_in_agent=grader,
fan_out_agents=[proofreader, fact_checker, style_enforcer],
llm_factory=OpenAIAugmentedLLM,
)
result = await parallel.generate_str("Student short story submission: ...", RequestParams(model="gpt4-o"))
```
</details>
### [Router](src/mcp_agent/workflows/router/)

Given an input, route to the `top_k` most relevant categories. A category can be an Agent, an MCP server or a regular function.
mcp-agent provides several router implementations, including:
- [`EmbeddingRouter`](src/mcp_agent/workflows/router/router_embedding.py): uses embedding models for classification
- [`LLMRouter`](src/mcp_agent/workflows/router/router_llm.py): uses LLMs for classification
> [!NOTE]
>
> **[Link to full example](examples/workflows/workflow_router/main.py)**
<details>
<summary>Example</summary>
```python
def print_hello_world:
print("Hello, world!")
finder_agent = Agent(name="finder", server_names=["fetch", "filesystem"])
writer_agent = Agent(name="writer", server_names=["filesystem"])
llm = OpenAIAugmentedLLM()
router = LLMRouter(
llm=llm,
agents=[finder_agent, writer_agent],
functions=[print_hello_world],
)
results = await router.route( # Also available: route_to_agent, route_to_server
request="Find and print the contents of README.md verbatim",
top_k=1
)
chosen_agent = results[0].result
async with chosen_agent:
...
```
</details>
### [IntentClassifier](src/mcp_agent/workflows/intent_classifier/)
A close sibling of Router, the Intent Classifier pattern identifies the `top_k` Intents that most closely match a given input.
Just like a Router, mcp-agent provides both an [embedding](src/mcp_agent/workflows/intent_classifier/intent_classifier_embedding.py) and [LLM-based](src/mcp_agent/workflows/intent_classifier/intent_classifier_llm.py) intent classifier.
### [Evaluator-Optimizer](src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py)

One LLM (the “optimizer”) refines a response, another (the “evaluator”) critiques it until a response exceeds a quality criteria.
> [!NOTE]
>
> **[Link to full example](examples/workflows/workflow_evaluator_optimizer/main.py)**
<details>
<summary>Example</summary>
```python
optimizer = Agent(name="cover_letter_writer", server_names=["fetch"], instruction="Generate a cover letter ...")
evaluator = Agent(name="critiquer", instruction="Evaluate clarity, specificity, relevance...")
llm = EvaluatorOptimizerLLM(
optimizer=optimizer,
evaluator=evaluator,
llm_factory=OpenAIAugmentedLLM,
min_rating=QualityRating.EXCELLENT, # Keep iterating until the minimum quality bar is reached
)
result = await eo_llm.generate_str("Write a job cover letter for an AI framework developer role at LastMile AI.")
print("Final refined cover letter:", result)
```
</details>
### [Orchestrator-workers](src/mcp_agent/workflows/orchestrator/orchestrator.py)

A higher-level LLM generates a plan, then assigns them to sub-agents, and synthesizes the results.
The Orchestrator workflow automatically parallelizes steps that can be done in parallel, and blocks on dependencies.
> [!NOTE]
>
> **[Link to full example](examples/workflows/workflow_orchestrator_worker/main.py)**
<details>
<summary>Example</summary>
```python
finder_agent = Agent(name="finder", server_names=["fetch", "filesystem"])
writer_agent = Agent(name="writer", server_names=["filesystem"])
proofreader = Agent(name="proofreader", ...)
fact_checker = Agent(name="fact_checker", ...)
style_enforcer = Agent(name="style_enforcer", instructions="Use APA style guide from ...", server_names=["fetch"])
orchestrator = Orchestrator(
llm_factory=AnthropicAugmentedLLM,
available_agents=[finder_agent, writer_agent, proofreader, fact_checker, style_enforcer],
)
task = "Load short_story.md, evaluate it, produce a graded_report.md with multiple feedback aspects."
result = await orchestrator.generate_str(task, RequestParams(model="gpt-4o"))
print(result)
```
</details>
### [Swarm](src/mcp_agent/workflows/swarm/swarm.py)
OpenAI has an experimental multi-agent pattern called [Swarm](https://github.com/openai/swarm), which we provide a model-agnostic reference implementation for in mcp-agent.
<img src="https://github.com/openai/swarm/blob/main/assets/swarm_diagram.png?raw=true" width=500 />
The mcp-agent Swarm pattern works seamlessly with MCP servers, and is exposed as an `AugmentedLLM`, allowing for composability with other patterns above.
> [!NOTE]
>
> **[Link to full example](examples/workflows/workflow_swarm/main.py)**
<details>
<summary>Example</summary>
```python
triage_agent = SwarmAgent(...)
flight_mod_agent = SwarmAgent(...)
lost_baggage_agent = SwarmAgent(...)
# The triage agent decides whether to route to flight_mod_agent or lost_baggage_agent
swarm = AnthropicSwarm(agent=triage_agent, context_variables={...})
test_input = "My bag was not delivered!"
result = await swarm.generate_str(test_input)
print("Result:", result)
```
</details>
## Advanced
### Composability
An example of composability is using an [Evaluator-Optimizer](#evaluator-optimizer) workflow as the planner LLM inside
the [Orchestrator](#orchestrator-workers) workflow. Generating a high-quality plan to execute is important for robust behavior, and an evaluator-optimizer can help ensure that.
Doing so is seamless in mcp-agent, because each workflow is implemented as an `AugmentedLLM`.
<details>
<summary>Example</summary>
```python
optimizer = Agent(name="plan_optimizer", server_names=[...], instruction="Generate a plan given an objective ...")
evaluator = Agent(name="plan_evaluator", instruction="Evaluate logic, ordering and precision of plan......")
planner_llm = EvaluatorOptimizerLLM(
optimizer=optimizer,
evaluator=evaluator,
llm_factory=OpenAIAugmentedLLM,
min_rating=QualityRating.EXCELLENT,
)
orchestrator = Orchestrator(
llm_factory=AnthropicAugmentedLLM,
available_agents=[finder_agent, writer_agent, proofreader, fact_checker, style_enforcer],
planner=planner_llm # It's that simple
)
...
```
</details>
### Signaling and Human Input
**Signaling**: The framework can pause/resume tasks. The agent or LLM might “signal” that it needs user input, so the workflow awaits. A developer may signal during a workflow to seek approval or review before continuing with a workflow.
**Human Input**: If an Agent has a `human_input_callback`, the LLM can call a `__human_input__` tool to request user input mid-workflow.
<details>
<summary>Example</summary>
The [Swarm example](examples/workflows/workflow_swarm/main.py) shows this in action.
```python
from mcp_agent.human_input.handler import console_input_callback
lost_baggage = SwarmAgent(
name="Lost baggage traversal",
instruction=lambda context_variables: f"""
{
FLY_AIR_AGENT_PROMPT.format(
customer_context=context_variables.get("customer_context", "None"),
flight_context=context_variables.get("flight_context", "None"),
)
}\n Lost baggage policy: policies/lost_baggage_policy.md""",
functions=[
escalate_to_agent,
initiate_baggage_search,
transfer_to_triage,
case_resolved,
],
server_names=["fetch", "filesystem"],
human_input_callback=console_input_callback, # Request input from the console
)
```
</details>
### App Config
Create an [`mcp_agent.config.yaml`](/schema/mcp-agent.config.schema.json) and a gitignored [`mcp_agent.secrets.yaml`](./examples/basic/mcp_basic_agent/mcp_agent.secrets.yaml.example) to define MCP app configuration. This controls logging, execution, LLM provider APIs, and MCP server configuration.
### MCP server management
mcp-agent makes it trivial to connect to MCP servers. Create an [`mcp_agent.config.yaml`](/schema/mcp-agent.config.schema.json) to define server configuration under the `mcp` section:
```yaml
mcp:
servers:
fetch:
command: "uvx"
args: ["mcp-server-fetch"]
description: "Fetch content at URLs from the world wide web"
```
#### [`gen_client`](src/mcp_agent/mcp/gen_client.py)
Manage the lifecycle of an MCP server within an async context manager:
```python
from mcp_agent.mcp.gen_client import gen_client
async with gen_client("fetch") as fetch_client:
# Fetch server is initialized and ready to use
result = await fetch_client.list_tools()
# Fetch server is automatically disconnected/shutdown
```
The gen_client function makes it easy to spin up connections to MCP servers.
#### Persistent server connections
In many cases, you want an MCP server to stay online for persistent use (e.g. in a multi-step tool use workflow).
For persistent connections, use:
- [`connect`](<(src/mcp_agent/mcp/gen_client.py)>) and [`disconnect`](src/mcp_agent/mcp/gen_client.py)
```python
from mcp_agent.mcp.gen_client import connect, disconnect
fetch_client = None
try:
fetch_client = connect("fetch")
result = await fetch_client.list_tools()
finally:
disconnect("fetch")
```
- [`MCPConnectionManager`](src/mcp_agent/mcp/mcp_connection_manager.py)
For even more fine-grained control over server connections, you can use the MCPConnectionManager.
<details>
<summary>Example</summary>
```python
from mcp_agent.context import get_current_context
from mcp_agent.mcp.mcp_connection_manager import MCPConnectionManager
context = get_current_context()
connection_manager = MCPConnectionManager(context.server_registry)
async with connection_manager:
fetch_client = await connection_manager.get_server("fetch") # Initializes fetch server
result = fetch_client.list_tool()
fetch_client2 = await connection_manager.get_server("fetch") # Reuses same server connection
# All servers managed by connection manager are automatically disconnected/shut down
```
</details>
#### MCP Server Aggregator
[`MCPAggregator`](src/mcp_agent/mcp/mcp_aggregator.py) acts as a "server-of-servers".
It provides a single MCP server interface for interacting with multiple MCP servers.
This allows you to expose tools from multiple servers to LLM applications.
<details>
<summary>Example</summary>
```python
from mcp_agent.mcp.mcp_aggregator import MCPAggregator
aggregator = await MCPAggregator.create(server_names=["fetch", "filesystem"])
async with aggregator:
# combined list of tools exposed by 'fetch' and 'filesystem' servers
tools = await aggregator.list_tools()
# namespacing -- invokes the 'fetch' server to call the 'fetch' tool
fetch_result = await aggregator.call_tool(name="fetch-fetch", arguments={"url": "https://www.anthropic.com/research/building-effective-agents"})
# no namespacing -- first server in the aggregator exposing that tool wins
read_file_result = await aggregator.call_tool(name="read_file", arguments={})
```
</details>
## Contributing
We welcome any and all kinds of contributions. Please see the [CONTRIBUTING guidelines](./CONTRIBUTING.md) to get started.
### Special Mentions
There have already been incredible community contributors who are driving this project forward:
- [Shaun Smith (@evalstate)](https://github.com/evalstate) -- who has been leading the charge on countless complex improvements, both to `mcp-agent` and generally to the MCP ecosystem.
- [Jerron Lim (@StreetLamb)](https://github.com/StreetLamb) -- who has contributed countless hours and excellent examples, and great ideas to the project.
- [Jason Summer (@jasonsum)](https://github.com/jasonsum) -- for identifying several issues and adapting his Gmail MCP server to work with mcp-agent
## Roadmap
We will be adding a detailed roadmap (ideally driven by your feedback). The current set of priorities include:
- **Durable Execution** -- allow workflows to pause/resume and serialize state so they can be replayed or be paused indefinitely. We are working on integrating [Temporal](./src/mcp_agent/executor/temporal.py) for this purpose.
- **Memory** -- adding support for long-term memory
- **Streaming** -- Support streaming listeners for iterative progress
- **Additional MCP capabilities** -- Expand beyond tool calls to support:
- Resources
- Prompts
- Notifications
## FAQs
### What are the core benefits of using mcp-agent?
mcp-agent provides a streamlined approach to building AI agents using capabilities exposed by **MCP** (Model Context Protocol) servers.
MCP is quite low-level, and this framework handles the mechanics of connecting to servers, working with LLMs, handling external signals (like human input) and supporting persistent state via durable execution. That lets you, the developer, focus on the core business logic of your AI application.
Core benefits:
- 🤝 **Interoperability**: ensures that any tool exposed by any number of MCP servers can seamlessly plug in to your agents.
- ⛓️ **Composability & Cutstomizability**: Implements well-defined workflows, but in a composable way that enables compound workflows, and allows full customization across model provider, logging, orchestrator, etc.
- 💻 **Programmatic control flow**: Keeps things simple as developers just write code instead of thinking in graphs, nodes and edges. For branching logic, you write `if` statements. For cycles, use `while` loops.
- 🖐️ **Human Input & Signals**: Supports pausing workflows for external signals, such as human input, which are exposed as tool calls an Agent can make.
### Do you need an MCP client to use mcp-agent?
No, you can use mcp-agent anywhere, since it handles MCPClient creation for you. This allows you to leverage MCP servers outside of MCP hosts like Claude Desktop.
Here's all the ways you can set up your mcp-agent application:
#### MCP-Agent Server
You can expose mcp-agent applications as MCP servers themselves (see [example](./examples/basic/mcp_agent_server)), allowing MCP clients to interface with sophisticated AI workflows using the standard tools API of MCP servers. This is effectively a server-of-servers.
#### MCP Client or Host
You can embed mcp-agent in an MCP client directly to manage the orchestration across multiple MCP servers.
#### Standalone
You can use mcp-agent applications in a standalone fashion (i.e. they aren't part of an MCP client). The [`examples`](/examples/) are all standalone applications.
### Tell me a fun fact
I debated naming this project _silsila_ (سلسلہ), which means chain of events in Urdu. mcp-agent is more matter-of-fact, but there's still an easter egg in the project paying homage to silsila.
## Code Examples
The MCP-Agent framework provides multiple ways to create and run AI agents with MCP server connections:
### Basic Agent Example
```python
from mcp_agent.app import MCPApp
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
app = MCPApp(name="my_agent")
async def main():
async with app.run() as agent_app:
# Create an agent with filesystem and fetch capabilities
agent = Agent(
name="finder",
instruction="You help find and analyze files and web content",
server_names=["fetch", "filesystem"]
)
async with agent:
# Attach an LLM to the agent
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# Generate responses using MCP tools
result = await llm.generate_str(
message="Find and read the config file"
)
print(result)
```
### Decorator Style Agent
```python
from mcp_agent.core.decorator_app import MCPAgentDecorator
agent_app = MCPAgentDecorator("my-app")
@agent_app.agent(
name="basic_agent",
instruction="A simple agent that helps with basic tasks",
servers=["filesystem"]
)
async def main():
async with agent_app.run() as agent:
result = await agent.send("basic_agent", "What files are here?")
print(result)
```
### Router-Based Workflow
```python
from mcp_agent.workflows.router.router_llm_openai import OpenAILLMRouter
from mcp_agent.agents.agent import Agent
# Create specialized agents
finder_agent = Agent(
name="finder",
instruction="Find and read files",
server_names=["filesystem"]
)
writer_agent = Agent(
name="writer",
instruction="Write content to files",
server_names=["filesystem"]
)
# Router automatically selects the best agent
router = OpenAILLMRouter(agents=[finder_agent, writer_agent])
# Route a request to the most appropriate agent
results = await router.route_to_agent(
request="Read the config file", top_k=1
)
selected_agent = results[0].result
```
### Configuration
MCP-Agent uses YAML configuration files (`mcp_agent.config.yaml`):
```yaml
execution_engine: "asyncio"
mcp:
servers:
filesystem:
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem"]
fetch:
command: "uvx"
args: ["mcp-server-fetch"]
openai:
default_model: "gpt-4o-mini"
```
## Function and Class Definitions
*Note: Test files, example files, and script files are excluded from this section.*
### src/mcp_agent/agents/agent.py
**Class: `Agent`**
- **Inherits from**: BaseModel
- **Description**: An Agent is an entity that has access to a set of MCP servers and can interact with them.
Each agent should have a purpose defined by its instruction.
- **Attributes**:
- `name` (str): Agent name.
- `instruction` (str | Callable[[Dict], str]) = 'You are a helpful agent.': Instruction for the agent. This can be a string or a callable that takes a dictionary and returns a string. The callable can be used to generate dynamic instructions based on the context.
- `server_names` (List[str]) = Field(default_factory=list): List of MCP server names that the agent can access.
- `functions` (List[Callable]) = Field(default_factory=list): List of local functions that the agent can call.
- `context` (Optional[Context]) = None: The application context that the agent is running in.
- `connection_persistence` (bool) = True: Whether to persist connections to the MCP servers.
- `human_input_callback` (Optional[Callable]) = None: Callback function for requesting human input. Must match HumanInputCallback protocol.
- `llm` (Optional[Any]) = None: The LLM instance that is attached to the agent. This is set in attach_llm method.
- `initialized` (bool) = False: Whether the agent has been initialized. This is set to True after agent.initialize() is completed.
- `model_config` = ConfigDict(arbitrary_types_allowed=True, extra='allow')
- `_function_tool_map` (Dict[str, FastTool]) = PrivateAttr(default_factory=dict)
- `_namespaced_tool_map` (Dict[str, NamespacedTool]) = PrivateAttr(default_factory=dict)
- `_server_to_tool_map` (Dict[str, List[NamespacedTool]]) = PrivateAttr(default_factory=dict)
- `_namespaced_prompt_map` (Dict[str, NamespacedPrompt]) = PrivateAttr(default_factory=dict)
- `_server_to_prompt_map` (Dict[str, List[NamespacedPrompt]]) = PrivateAttr(default_factory=dict)
- `_agent_tasks` ('AgentTasks') = PrivateAttr(default=None)
- `_init_lock` (asyncio.Lock) = PrivateAttr(default_factory=asyncio.Lock)
**Class: `InitAggregatorRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to load/initialize an agent's servers.
- **Attributes**:
- `agent_name` (str)
- `server_names` (List[str])
- `connection_persistence` (bool) = True
- `force` (bool) = False
**Class: `InitAggregatorResponse`**
- **Inherits from**: BaseModel
- **Description**: Response for the load server request.
- **Attributes**:
- `initialized` (bool)
- `namespaced_tool_map` (Dict[str, NamespacedTool]) = Field(default_factory=dict)
- `server_to_tool_map` (Dict[str, List[NamespacedTool]]) = Field(default_factory=dict)
- `namespaced_prompt_map` (Dict[str, NamespacedPrompt]) = Field(default_factory=dict)
- `server_to_prompt_map` (Dict[str, List[NamespacedPrompt]]) = Field(default_factory=dict)
**Class: `ListToolsRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to list tools for an agent.
- **Attributes**:
- `agent_name` (str)
- `server_name` (Optional[str]) = None
**Class: `CallToolRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to call a tool for an agent.
- **Attributes**:
- `agent_name` (str)
- `server_name` (Optional[str]) = None
- `name` (str)
- `arguments` (Optional[dict[str, str]]) = None
**Class: `ListPromptsRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to list prompts for an agent.
- **Attributes**:
- `agent_name` (str)
- `server_name` (Optional[str]) = None
**Class: `GetPromptRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to get a prompt from an agent.
- **Attributes**:
- `agent_name` (str)
- `server_name` (Optional[str]) = None
- `name` (str)
- `arguments` (Optional[dict[str, str]]) = None
**Class: `GetCapabilitiesRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to get the capabilities of a specific server.
- **Attributes**:
- `agent_name` (str)
- `server_name` (Optional[str]) = None
**Class: `GetServerSessionRequest`**
- **Inherits from**: BaseModel
- **Description**: Request to get the session data of a specific server.
- **Attributes**:
- `agent_name` (str)
- `server_name` (str)
**Class: `GetServerSessionResponse`**
- **Inherits from**: BaseModel
- **Description**: Response to the get server session request.
- **Attributes**:
- `session_id` (str | None) = None
- `session_data` (dict[str, Any]) = Field(default_factory=dict)
- `error` (Optional[str]) = None
**Class: `AgentTasks`**
- **Description**: Agent tasks for executing agent-related activities.
- **Attributes**:
- `server_aggregators_for_agent` (Dict[str, MCPAggregator]) = {}
- `server_aggregators_for_agent_lock` (asyncio.Lock) = asyncio.Lock()
- `agent_refcounts` (dict[str, int]) = {}
**Function:** `Agent.model_post_init(self, __context) -> None`
**Function:** `Agent.attach_llm(self, llm_factory: Callable[..., LLM] | None = None, llm: LLM | None = None) -> LLM`
- **Description**: Create an LLM instance for the agent. Args: llm_factory: A callable that constructs an AugmentedLLM or its subclass. The factory should accept keyword arguments matching the AugmentedLLM constructor parameters. llm: An instance of AugmentedLLM or its subclass. If provided, this will be used instead of creating a new instance. Returns: An instance of AugmentedLLM or one of its subclasses.
- **Parameters**
- `self`
- `llm_factory` (Callable[..., LLM] | None, optional): A callable that constructs an AugmentedLLM or its subclass. The factory should accept keyword arguments matching the AugmentedLLM constructor parameters.
- `llm` (LLM | None, optional): An instance of AugmentedLLM or its subclass. If provided, this will be used instead of creating a new instance.
- **Returns**
- `LLM`: An instance of AugmentedLLM or one of its subclasses.
**Function:** `Agent.initialize(self, force: bool = False)`
- **Description**: Initialize the agent.
- **Parameters**
- `self`
- `force` (bool, optional): Default is False
**Function:** `Agent.shutdown(self)`
- **Description**: Shutdown the agent and close all MCP server connections. NOTE: This method is called automatically when the agent is used as an async context manager.
- **Parameters**
- `self`
**Function:** `Agent.close(self)`
- **Description**: Close the agent and release all resources. Synonymous with shutdown.
- **Parameters**
- `self`
**Function:** `Agent.__aenter__(self)`
**Function:** `Agent.__aexit__(self, exc_type, exc_val, exc_tb)`
**Function:** `Agent.get_capabilities(self, server_name: str | None) -> ServerCapabilities | Dict[str, ServerCapabilities]`
- **Description**: Get the capabilities of a specific server.
- **Parameters**
- `self`
- `server_name` (str | None)
- **Returns**
- `ServerCapabilities | Dict[str, ServerCapabilities]`: Return value
**Function:** `Agent.get_server_session(self, server_name: str)`
- **Description**: Get the session data of a specific server.
- **Parameters**
- `self`
- `server_name` (str)
**Function:** `Agent.list_tools(self, server_name: str | None = None) -> ListToolsResult`
**Function:** `Agent.list_prompts(self, server_name: str | None = None) -> ListPromptsResult`
**Function:** `Agent.get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> GetPromptResult`
**Function:** `Agent.request_human_input(self, request: HumanInputRequest) -> str`
- **Description**: Request input from a human user. Pauses the workflow until input is received. Args: request: The human input request Returns: The input provided by the human Raises: TimeoutError: If the timeout is exceeded ValueError: If human_input_callback is not set or doesn't have the right signature
- **Parameters**
- `self`
- `request` (HumanInputRequest): The human input request
- **Returns**
- `str`: The input provided by the human
- **Raises**: TimeoutError: If the timeout is exceeded ValueError: If human_input_callback is not set or doesn't have the right signature
**Function:** `Agent.call_callback_and_signal()`
**Function:** `Agent.call_tool(self, name: str, arguments: dict | None = None, server_name: str | None = None) -> CallToolResult`
**Function:** `Agent._call_human_input_tool(self, arguments: dict | None = None) -> CallToolResult`
**Function:** `AgentTasks.__init__(self, context: 'Context')`
**Function:** `AgentTasks.initialize_aggregator_task(self, request: InitAggregatorRequest) -> InitAggregatorResponse`
- **Description**: Load/initialize an agent's servers.
- **Parameters**
- `self`
- `request` (InitAggregatorRequest)
- **Returns**
- `InitAggregatorResponse`: Return value
**Function:** `AgentTasks.shutdown_aggregator_task(self, agent_name: str) -> bool`
- **Description**: Shutdown the agent's servers.
- **Parameters**
- `self`
- `agent_name` (str)
- **Returns**
- `bool`: Return value
**Function:** `AgentTasks.list_tools_task(self, request: ListToolsRequest) -> ListToolsResult`
- **Description**: List tools for an agent.
- **Parameters**
- `self`
- `request` (ListToolsRequest)
- **Returns**
- `ListToolsResult`: Return value
**Function:** `AgentTasks.call_tool_task(self, request: CallToolRequest) -> CallToolResult`
- **Description**: Call a tool for an agent.
- **Parameters**
- `self`
- `request` (CallToolRequest)
- **Returns**
- `CallToolResult`: Return value
**Function:** `AgentTasks.list_prompts_task(self, request: ListPromptsRequest) -> ListPromptsResult`
- **Description**: List tools for an agent.
- **Parameters**
- `self`
- `request` (ListPromptsRequest)
- **Returns**
- `ListPromptsResult`: Return value
**Function:** `AgentTasks.get_prompt_task(self, request: GetPromptRequest) -> GetPromptResult`
- **Description**: Get a prompt for an agent.
- **Parameters**
- `self`
- `request` (GetPromptRequest)
- **Returns**
- `GetPromptResult`: Return value
**Function:** `AgentTasks.get_capabilities_task(self, request: GetCapabilitiesRequest) -> Dict[str, ServerCapabilities]`
- **Description**: Get the capabilities of a specific server.
- **Parameters**
- `self`
- `request` (GetCapabilitiesRequest)
- **Returns**
- `Dict[str, ServerCapabilities]`: Return value
**Function:** `AgentTasks.get_server_session(self, request: GetServerSessionRequest) -> GetServerSessionResponse`
- **Description**: Get the session for a specific server.
- **Parameters**
- `self`
- `request` (GetServerSessionRequest)
- **Returns**
- `GetServerSessionResponse`: Return value
### src/mcp_agent/app.py
**Class: `MCPApp`**
- **Description**: Main application class that manages global state and can host workflows.
Example usage:
app = MCPApp()
@app.workflow
class MyWorkflow(Workflow[str]):
@app.task
async def my_task(self):
pass
async def run(self):
await self.my_task()
async with app.run() as running_app:
workflow = MyWorkflow()
result = await workflow.execute()
**Function:** `MCPApp.__init__(self, name: str = 'mcp_application', description: str | None = None, settings: Optional[Settings] | str = None, human_input_callback: Optional[HumanInputCallback] = None, signal_notification: Optional[SignalWaitCallback] = None, upstream_session: Optional['ServerSession'] = None, model_selector: ModelSelector = None)`
- **Description**: Initialize the application with a name and optional settings. Args: name: Name of the application description: Description of the application. If you expose the MCPApp as an MCP server, provide a detailed description, since it will be used as the server's description. settings: Application configuration - If unspecified, the settings are loaded from mcp_agent.config.yaml. If this is a string, it is treated as the path to the config file to load. human_input_callback: Callback for handling human input signal_notification: Callback for getting notified on workflow signals/events. upstream_session: Upstream session if the MCPApp is running as a server to an MCP client. initialize_model_selector: Initializes the built-in ModelSelector to help with model selection. Defaults to False.
- **Parameters**
- `self`
- `name` (str, optional): Name of the application
- `description` (str | None, optional): Description of the application. If you expose the MCPApp as an MCP server, provide a detailed description, since it will be used as the server's description.
- `settings` (Optional[Settings] | str, optional): Application configuration - If unspecified, the settings are loaded from mcp_agent.config.yaml. If this is a string, it is treated as the path to the config file to load.
- `human_input_callback` (Optional[HumanInputCallback], optional): Callback for handling human input
- `signal_notification` (Optional[SignalWaitCallback], optional): Callback for getting notified on workflow signals/events.
- `upstream_session` (Optional['ServerSession'], optional): Upstream session if the MCPApp is running as a server to an MCP client.
- `model_selector` (ModelSelector, optional): Default is None
**Function:** `MCPApp.context(self) -> Context`
**Function:** `MCPApp.config(self)`
**Function:** `MCPApp.server_registry(self)`
**Function:** `MCPApp.executor(self)`
**Function:** `MCPApp.engine(self)`
**Function:** `MCPApp.upstream_session(self)`
**Function:** `MCPApp.upstream_session(self, value)`
**Function:** `MCPApp.workflows(self)`
**Function:** `MCPApp.tasks(self)`
**Function:** `MCPApp.session_id(self)`
**Function:** `MCPApp.logger(self)`
**Function:** `MCPApp.initialize(self)`
- **Description**: Initialize the application.
- **Parameters**
- `self`
**Function:** `MCPApp.cleanup(self)`
- **Description**: Cleanup application resources.
- **Parameters**
- `self`
**Function:** `MCPApp.run(self)`
- **Description**: Run the application. Use as context manager. Example: async with app.run() as running_app: # App is initialized here pass
- **Parameters**
- `self`
- **async with app.run() as running_app**: # App is initialized here pass
**Function:** `MCPApp.workflow(self, cls: Type) -> Type`
- **Description**: Decorator for a workflow class. By default it's a no-op, but different executors can use this to customize behavior for workflow registration. Example: If Temporal is available & we use a TemporalExecutor, this decorator will wrap with temporal_workflow.defn.
- **Parameters**
- `self`
- `cls` (Type)
- **Returns**
- `Type`: Return value
- **Example**: If Temporal is available & we use a TemporalExecutor, this decorator will wrap with temporal_workflow.defn.
**Function:** `MCPApp.workflow_signal(self, fn: Callable[..., R] | None = None) -> Callable[..., R]`
- **Description**: Decorator for a workflow's signal handler. Different executors can use this to customize behavior for workflow signal handling. Args: fn: The function to decorate (optional, for use with direct application) name: Optional custom name for the signal. If not provided, uses the function name. Example: If Temporal is in use, this gets converted to @workflow.signal.
- **Parameters**
- `self`
- `fn` (Callable[..., R] | None, optional): The function to decorate (optional, for use with direct application)
- **Returns**
- `Callable[..., R]`: Return value
- **Example**: If Temporal is in use, this gets converted to @workflow.signal.
**Function:** `MCPApp.decorator(func)`
**Function:** `MCPApp.wrapper()`
**Function:** `MCPApp.workflow_run(self, fn: Callable[..., R]) -> Callable[..., R]`
- **Description**: Decorator for a workflow's main 'run' method. Different executors can use this to customize behavior for workflow execution. Example: If Temporal is in use, this gets converted to @workflow.run.
- **Parameters**
- `self`
- `fn` (Callable[..., R])
- **Returns**
- `Callable[..., R]`: Return value
- **Example**: If Temporal is in use, this gets converted to @workflow.run.
**Function:** `MCPApp.wrapper()`
**Function:** `MCPApp.workflow_task(self, name: str | None = None, schedule_to_close_timeout: timedelta | None = None, retry_policy: Dict[str, Any] | None = None) -> Callable[[Callable[..., R]], Callable[..., R]]`
- **Description**: Decorator to mark a function as a workflow task, automatically registering it in the global activity registry. Args: name: Optional custom name for the activity schedule_to_close_timeout: Maximum time the task can take to complete retry_policy: Retry policy configuration **kwargs: Additional metadata passed to the activity registration Returns: Decorated function that preserves async and typing information Raises: TypeError: If the decorated function is not async ValueError: If the retry policy or timeout is invalid
- **Parameters**
- `self`
- `name` (str | None, optional): Optional custom name for the activity
- `schedule_to_close_timeout` (timedelta | None, optional): Maximum time the task can take to complete
- `retry_policy` (Dict[str, Any] | None, optional): Retry policy configuration
- **Returns**
- `Callable[[Callable[..., R]], Callable[..., R]]`: Decorated function that preserves async and typing information
- **Raises**: TypeError: If the decorated function is not async ValueError: If the retry policy or timeout is invalid
**Function:** `MCPApp.decorator(target: Callable[..., R]) -> Callable[..., R]`
**Function:** `MCPApp._bound_adapter()`
**Function:** `MCPApp.is_workflow_task(self, func: Callable[..., Any]) -> bool`
- **Description**: Check if a function is marked as a workflow task. This gets set for functions that are decorated with @workflow_task.
- **Parameters**
- `self`
- `func` (Callable[..., Any])
- **Returns**
- `bool`: Return value
**Function:** `MCPApp._register_global_workflow_tasks(self)`
- **Description**: Register all statically defined workflow tasks with this app instance.
- **Parameters**
- `self`
**Function:** `MCPApp._bound_adapter()`
### src/mcp_agent/cli/commands/config.py
**Function:** `show()`
- **Description**: Show the configuration.
### src/mcp_agent/cli/main.py
**Function:** `main(verbose: bool = typer.Option(False, '--verbose', '-v', help='Enable verbose mode'), quiet: bool = typer.Option(False, '--quiet', '-q', help='Disable output'), color: bool = typer.Option(True, '--color/--no-color', help='Enable/disable color output'))`
- **Description**: Main entry point for the MCP Agent CLI.
- **Parameters**
- `verbose` (bool, optional): Default is typer.Option(False, '--verbose', '-v', help='Enable verbose mode')
- `quiet` (bool, optional): Default is typer.Option(False, '--quiet', '-q', help='Disable output')
- `color` (bool, optional): Default is typer.Option(True, '--color/--no-color', help='Enable/disable color output')
### src/mcp_agent/cli/terminal.py
**Class: `Application`**
**Function:** `Application.__init__(self, verbosity: int = 0, enable_color: bool = True)`
**Function:** `Application.log(self, message: str, level: str = 'info')`
**Function:** `Application.status(self, message: str)`
### src/mcp_agent/config.py
**Module Description**: Reading settings from environment variables and providing a settings object for the application configuration.
**Class: `MCPServerAuthSettings`**
- **Inherits from**: BaseModel
- **Description**: Represents authentication configuration for a server.
- **Attributes**:
- `api_key` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `MCPRootSettings`**
- **Inherits from**: BaseModel
- **Description**: Represents a root directory configuration for an MCP server.
- **Attributes**:
- `uri` (str): The URI identifying the root. Must start with file://
- `name` (Optional[str]) = None: Optional name for the root.
- `server_uri_alias` (Optional[str]) = None: Optional URI alias for presentation to the server
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `MCPServerSettings`**
- **Inherits from**: BaseModel
- **Description**: Represents the configuration for an individual server.
- **Attributes**:
- `name` (str | None) = None: The name of the server.
- `description` (str | None) = None: The description of the server.
- `transport` (Literal['stdio', 'sse', 'streamable_http', 'websocket']) = 'stdio': The transport mechanism.
- `command` (str | None) = None: The command to execute the server (e.g. npx) in stdio mode.
- `args` (List[str]) = Field(default_factory=list): The arguments for the server command in stdio mode.
- `url` (str | None) = None: The URL for the server for SSE, Streamble HTTP or websocket transport.
- `headers` (Dict[str, str] | None) = None: HTTP headers for SSE or Streamable HTTP requests.
- `http_timeout_seconds` (int | None) = None: HTTP request timeout in seconds for SSE or Streamable HTTP requests. Note: This is different from read_timeout_seconds, which determines how long (in seconds) the client will wait for a new event before disconnecting
- `read_timeout_seconds` (int | None) = None: Timeout in seconds the client will wait for a new event before disconnecting from an SSE or Streamable HTTP server connection.
- `terminate_on_close` (bool) = True: For Streamable HTTP transport, whether to terminate the session on connection close.
- `auth` (MCPServerAuthSettings | None) = None: The authentication configuration for the server.
- `roots` (List[MCPRootSettings] | None) = None: Root directories this server has access to.
- `env` (Dict[str, str] | None) = None: Environment variables to pass to the server process.
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `MCPSettings`**
- **Inherits from**: BaseModel
- **Description**: Configuration for all MCP servers.
- **Attributes**:
- `servers` (Dict[str, MCPServerSettings]) = Field(default_factory=dict)
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `AnthropicSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using Anthropic models in the MCP Agent application.
- **Attributes**:
- `api_key` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `BedrockSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using Bedrock models in the MCP Agent application.
- **Attributes**:
- `aws_access_key_id` (str | None) = None
- `aws_secret_access_key` (str | None) = None
- `aws_session_token` (str | None) = None
- `aws_region` (str | None) = None
- `profile` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `CohereSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using Cohere models in the MCP Agent application.
- **Attributes**:
- `api_key` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `OpenAISettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using OpenAI models in the MCP Agent application.
- **Attributes**:
- `api_key` (str | None) = None
- `reasoning_effort` (Literal['low', 'medium', 'high']) = 'medium'
- `base_url` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `AzureSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using Azure models in the MCP Agent application.
- **Attributes**:
- `api_key` (str | None) = None
- `endpoint` (str)
- `credential_scopes` (List[str] | None) = Field(default=['https://cognitiveservices.azure.com/.default'])
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `GoogleSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for using Google models in the MCP Agent application.
- **Attributes**:
- `api_key` (str | None) = None: Or use the GOOGLE_API_KEY environment variable
- `vertexai` (bool) = False
- `project` (str | None) = None
- `location` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `TemporalSettings`**
- **Inherits from**: BaseModel
- **Description**: Temporal settings for the MCP Agent application.
- **Attributes**:
- `host` (str)
- `namespace` (str) = 'default'
- `task_queue` (str)
- `max_concurrent_activities` (int | None) = None
- `api_key` (str | None) = None
- `timeout_seconds` (int | None) = 60
**Class: `UsageTelemetrySettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for usage telemetry in the MCP Agent application.
Anonymized usage metrics are sent to a telemetry server to help improve the product.
- **Attributes**:
- `enabled` (bool) = True: Enable usage telemetry in the MCP Agent application.
- `enable_detailed_telemetry` (bool) = False: If enabled, detailed telemetry data, including prompts and agents, will be sent to the telemetry server.
**Class: `OpenTelemetrySettings`**
- **Inherits from**: BaseModel
- **Description**: OTEL settings for the MCP Agent application.
- **Attributes**:
- `enabled` (bool) = True
- `service_name` (str) = 'mcp-agent'
- `service_instance_id` (str | None) = None
- `service_version` (str | None) = None
- `otlp_endpoint` (str | None) = None: OTLP endpoint for OpenTelemetry tracing
- `console_debug` (bool) = False: Log spans to console
- `sample_rate` (float) = 1.0: Sample rate for tracing (1.0 = sample everything)
**Class: `LogPathSettings`**
- **Inherits from**: BaseModel
- **Description**: Settings for configuring log file paths with dynamic elements like timestamps or session IDs.
- **Attributes**:
- `path_pattern` (str) = 'logs/mcp-agent-{unique_id}.jsonl': Path pattern for log files with a {unique_id} placeholder. The placeholder will be replaced according to the unique_id setting. Example: "logs/mcp-agent-{unique_id}.jsonl"
- `unique_id` (Literal['timestamp', 'session_id']) = 'timestamp': Type of unique identifier to use in the log filename: - timestamp: Uses the current time formatted according to timestamp_format - session_id: Generates a UUID for the session
- `timestamp_format` (str) = '%Y%m%d_%H%M%S': Format string for timestamps when unique_id is set to "timestamp". Uses Python's datetime.strftime format.
**Class: `LoggerSettings`**
- **Inherits from**: BaseModel
- **Description**: Logger settings for the MCP Agent application.
- **Attributes**:
- `type` (Literal['none', 'console', 'file', 'http']) = 'console'
- `transports` (List[Literal['none', 'console', 'file', 'http']]) = []: List of transports to use (can enable multiple simultaneously)
- `level` (Literal['debug', 'info', 'warning', 'error']) = 'info': Minimum logging level
- `progress_display` (bool) = False: Enable or disable the progress display
- `path` (str) = 'mcp-agent.jsonl': Path to log file, if logger 'type' is 'file'.
- `path_settings` (LogPathSettings | None) = None: Save log files with more advanced path semantics, like having timestamps or session id in the log name.
- `batch_size` (int) = 100: Number of events to accumulate before processing
- `flush_interval` (float) = 2.0: How often to flush events in seconds
- `max_queue_size` (int) = 2048: Maximum queue size for event processing
- `http_endpoint` (str | None) = None: HTTP endpoint for event transport
- `http_headers` (dict[str, str] | None) = None: HTTP headers for event transport
- `http_timeout` (float) = 5.0: HTTP timeout seconds for event transport
**Class: `Settings`**
- **Inherits from**: BaseSettings
- **Description**: Settings class for the MCP Agent application.
- **Attributes**:
- `model_config` = SettingsConfigDict(env_nested_delimiter='__', env_file='.env', env_file_encoding='utf-8', extra='allow', nested_model_default_partial_update=True)
- `mcp` (MCPSettings | None) = MCPSettings(): MCP config, such as MCP servers
- `execution_engine` (Literal['asyncio', 'temporal']) = 'asyncio': Execution engine for the MCP Agent application
- `temporal` (TemporalSettings | None) = None: Settings for Temporal workflow orchestration
- `anthropic` (AnthropicSettings | None) = None: Settings for using Anthropic models in the MCP Agent application
- `bedrock` (BedrockSettings | None) = None: Settings for using Bedrock models in the MCP Agent application
- `cohere` (CohereSettings | None) = None: Settings for using Cohere models in the MCP Agent application
- `openai` (OpenAISettings | None) = None: Settings for using OpenAI models in the MCP Agent application
- `azure` (AzureSettings | None) = None: Settings for using Azure models in the MCP Agent application
- `google` (GoogleSettings | None) = None: Settings for using Google models in the MCP Agent application
- `otel` (OpenTelemetrySettings | None) = OpenTelemetrySettings(): OpenTelemetry logging settings for the MCP Agent application
- `logger` (LoggerSettings | None) = LoggerSettings(): Logger settings for the MCP Agent application
- `usage_telemetry` (UsageTelemetrySettings | None) = UsageTelemetrySettings(): Usage tracking settings for the MCP Agent application
**Function:** `MCPRootSettings.validate_uri(cls, v: str) -> str`
- **Description**: Validate that the URI starts with file:// (required by specification 2024-11-05)
- **Parameters**
- `cls`
- `v` (str)
- **Returns**
- `str`: Return value
**Function:** `Settings.find_config(cls) -> Path | None`
- **Description**: Find the config file in the current directory or parent directories.
- **Parameters**
- `cls`
- **Returns**
- `Path | None`: Return value
**Function:** `Settings.find_secrets(cls) -> Path | None`
- **Description**: Find the secrets file in the current directory or parent directories.
- **Parameters**
- `cls`
- **Returns**
- `Path | None`: Return value
**Function:** `Settings._find_config(cls, filenames: List[str]) -> Path | None`
- **Description**: Find the config file of one of the possible names in the current directory or parent directories.
- **Parameters**
- `cls`
- `filenames` (List[str])
- **Returns**
- `Path | None`: Return value
**Function:** `get_settings(config_path: str | None = None) -> Settings`
- **Description**: Get settings instance, automatically loading from config file if available.
- **Parameters**
- `config_path` (str | None, optional): Default is None
- **Returns**
- `Settings`: Return value
**Function:** `deep_merge(base: dict, update: dict) -> dict`
- **Description**: Recursively merge two dictionaries, preserving nested structures.
- **Parameters**
- `base` (dict)
- `update` (dict)
- **Returns**
- `dict`: Return value
### src/mcp_agent/core/context.py
**Module Description**: A central context object to store global state that is shared across the application.
**Class: `Context`**
- **Inherits from**: BaseModel
- **Description**: Context that is passed around through the application.
This is a global context that is shared across the application.
- **Attributes**:
- `config` (Optional[Settings]) = None
- `executor` (Optional[Executor]) = None
- `human_input_handler` (Optional[HumanInputCallback]) = None
- `signal_notification` (Optional[SignalWaitCallback]) = None
- `upstream_session` (Optional[ServerSession]) = None
- `model_selector` (Optional[ModelSelector]) = None
- `session_id` (str | None) = None
- `app` (Optional['MCPApp']) = None
- `server_registry` (Optional[ServerRegistry]) = None
- `task_registry` (Optional[ActivityRegistry]) = None
- `signal_registry` (Optional[SignalRegistry]) = None
- `decorator_registry` (Optional[DecoratorRegistry]) = None
- `workflow_registry` (Optional['WorkflowRegistry']) = None
- `tracer` (Optional[trace.Tracer]) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Function:** `configure_otel(config: 'Settings')`
- **Description**: Configure OpenTelemetry based on the application config.
- **Parameters**
- `config` ('Settings')
**Function:** `configure_logger(config: 'Settings', session_id: str | None = None)`
- **Description**: Configure logging and tracing based on the application config.
- **Parameters**
- `config` ('Settings')
- `session_id` (str | None, optional): Default is None
**Function:** `configure_usage_telemetry(_config: 'Settings')`
- **Description**: Configure usage telemetry based on the application config. TODO: saqadri - implement usage tracking
- **Parameters**
- `_config` ('Settings')
**Function:** `configure_executor(config: 'Settings')`
- **Description**: Configure the executor based on the application config.
- **Parameters**
- `config` ('Settings')
**Function:** `configure_workflow_registry(config: 'Settings', executor: Executor)`
- **Description**: Configure the workflow registry based on the application config.
- **Parameters**
- `config` ('Settings')
- `executor` (Executor)
**Function:** `initialize_context(config: Optional['Settings'] = None, task_registry: Optional[ActivityRegistry] = None, decorator_registry: Optional[DecoratorRegistry] = None, signal_registry: Optional[SignalRegistry] = None, store_globally: bool = False)`
- **Description**: Initialize the global application context.
- **Parameters**
- `config` (Optional['Settings'], optional): Default is None
- `task_registry` (Optional[ActivityRegistry], optional): Default is None
- `decorator_registry` (Optional[DecoratorRegistry], optional): Default is None
- `signal_registry` (Optional[SignalRegistry], optional): Default is None
- `store_globally` (bool, optional): Default is False
**Function:** `cleanup_context()`
- **Description**: Cleanup the global application context.
**Function:** `get_current_context() -> Context`
- **Description**: Synchronous initializer/getter for global application context. For async usage, use aget_current_context instead.
- **Returns**
- `Context`: Return value
**Function:** `run_async()`
**Function:** `get_current_config()`
- **Description**: Get the current application config.
### src/mcp_agent/core/context_dependent.py
**Class: `ContextDependent`**
- **Description**: Mixin class for components that need context access.
Provides both global fallback and instance-specific context support.
**Function:** `ContextDependent.__init__(self, context: Optional['Context'] = None)`
**Function:** `ContextDependent.context(self) -> 'Context'`
- **Description**: Get context, with graceful fallback to global context if needed. Raises clear error if no context is available.
- **Parameters**
- `self`
- **Returns**
- `'Context'`: Return value
**Function:** `ContextDependent.use_context(self, context: 'Context')`
- **Description**: Temporarily use a different context.
- **Parameters**
- `self`
- `context` ('Context')
### src/mcp_agent/core/decorator_app.py
**Module Description**: Decorator-based interface for MCP Agent applications. Provides a simplified way to create and manage agents using decorators.
**Class: `MCPAgentDecorator`**
- **Description**: A decorator-based interface for MCP Agent applications.
Provides a simplified way to create and manage agents using decorators.
**Class: `AgentAppWrapper`**
- **Description**: Wrapper class providing a simplified interface to the agent application.
**Function:** `MCPAgentDecorator.__init__(self, name: str, config_path: Optional[str] = None)`
- **Description**: Initialize the decorator interface. Args: name: Name of the application config_path: Optional path to config file
- **Parameters**
- `self`
- `name` (str): Name of the application
- `config_path` (Optional[str], optional): Optional path to config file
**Function:** `MCPAgentDecorator._load_config(self)`
- **Description**: Load configuration, properly handling YAML without dotenv processing
- **Parameters**
- `self`
**Function:** `MCPAgentDecorator.agent(self, name: str, instruction: str, servers: List[str]) -> Callable`
- **Description**: Decorator to create and register an agent. Args: name: Name of the agent instruction: Base instruction for the agent servers: List of server names the agent should connect to
- **Parameters**
- `self`
- `name` (str): Name of the agent
- `instruction` (str): Base instruction for the agent
- `servers` (List[str]): List of server names the agent should connect to
- **Returns**
- `Callable`: Return value
**Function:** `MCPAgentDecorator.decorator(func: Callable) -> Callable`
**Function:** `MCPAgentDecorator.wrapper()`
**Function:** `MCPAgentDecorator.run(self)`
- **Description**: Context manager for running the application. Handles setup and teardown of the app and agents.
- **Parameters**
- `self`
**Function:** `AgentAppWrapper.__init__(self, app: MCPApp, agents: Dict[str, Agent])`
**Function:** `AgentAppWrapper.send(self, agent_name: str, message: str) -> Any`
- **Description**: Send a message to a specific agent and get the response. Args: agent_name: Name of the agent to send message to message: Message to send Returns: Agent's response
- **Parameters**
- `self`
- `agent_name` (str): Name of the agent to send message to
- `message` (str): Message to send
- **Returns**
- `Any`: Agent's response
### src/mcp_agent/core/exceptions.py
**Module Description**: Custom exceptions for the mcp-agent library. Enables user-friendly error handling for common issues.
**Class: `MCPAgentError`**
- **Inherits from**: Exception
- **Description**: Base exception class for FastAgent errors
**Class: `ServerConfigError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when there are issues with MCP server configuration
Example: Server name referenced in agent.servers[] but not defined in config
**Class: `AgentConfigError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when there are issues with Agent or Workflow configuration
Example: Parallel fan-in references unknown agent
**Class: `ProviderKeyError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when there are issues with LLM provider API keys
Example: OpenAI/Anthropic key not configured but model requires it
**Class: `ServerInitializationError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when a server fails to initialize properly.
**Class: `ModelConfigError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when there are issues with LLM model configuration
Example: Unknown model name in model specification string
**Class: `CircularDependencyError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised when we detect a Circular Dependency in the workflow
**Class: `PromptExitError`**
- **Inherits from**: MCPAgentError
- **Description**: Raised from enhanced_prompt when the user requests hard exits
**Function:** `MCPAgentError.__init__(self, message: str, details: str = '')`
**Function:** `ServerConfigError.__init__(self, message: str, details: str = '')`
**Function:** `AgentConfigError.__init__(self, message: str, details: str = '')`
**Function:** `ProviderKeyError.__init__(self, message: str, details: str = '')`
**Function:** `ServerInitializationError.__init__(self, message: str, details: str = '')`
**Function:** `ModelConfigError.__init__(self, message: str, details: str = '')`
**Function:** `CircularDependencyError.__init__(self, message: str, details: str = '')`
**Function:** `PromptExitError.__init__(self, message: str, details: str = '')`
### src/mcp_agent/executor/decorator_registry.py
**Module Description**: Keep track of all workflow decorator overloads indexed by executor backend. Different executors may have different ways of configuring workflows.
**Class: `DecoratorRegistry`**
- **Description**: Centralized decorator management with validation and metadata.
**Function:** `DecoratorRegistry.__init__(self)`
**Function:** `DecoratorRegistry.register_workflow_defn_decorator(self, executor_name: str, decorator: Callable[[Type], Type])`
- **Description**: Registers a workflow definition decorator for a given executor. :param executor_name: Unique name of the executor. :param decorator: The decorator to register.
- **Parameters**
- `self`
- `executor_name` (str)
- `decorator` (Callable[[Type], Type])
**Function:** `DecoratorRegistry.get_workflow_defn_decorator(self, executor_name: str) -> Callable[[Type], Type]`
- **Description**: Retrieves a workflow definition decorator for a given executor. :param executor_name: Unique name of the executor. :return: The decorator function.
- **Parameters**
- `self`
- `executor_name` (str)
- **Returns**
- `Callable[[Type], Type]`: Return value
**Function:** `DecoratorRegistry.register_workflow_run_decorator(self, executor_name: str, decorator: Callable[[Callable[..., R]], Callable[..., R]])`
- **Description**: Registers a workflow run decorator for a given executor. :param executor_name: Unique name of the executor. :param decorator: The decorator to register.
- **Parameters**
- `self`
- `executor_name` (str)
- `decorator` (Callable[[Callable[..., R]], Callable[..., R]])
**Function:** `DecoratorRegistry.get_workflow_run_decorator(self, executor_name: str) -> Callable[[Callable[..., R]], Callable[..., R]]`
- **Description**: Retrieves a workflow run decorator for a given executor. :param executor_name: Unique name of the executor. :return: The decorator function.
- **Parameters**
- `self`
- `executor_name` (str)
- **Returns**
- `Callable[[Callable[..., R]], Callable[..., R]]`: Return value
**Function:** `DecoratorRegistry.register_workflow_task_decorator(self, executor_name: str, decorator: Callable[[Callable[..., T]], Callable[..., T]])`
- **Description**: Registers a workflow task decorator for a given executor. :param executor_name: Unique name of the executor. :param decorator: The decorator to register.
- **Parameters**
- `self`
- `executor_name` (str)
- `decorator` (Callable[[Callable[..., T]], Callable[..., T]])
**Function:** `DecoratorRegistry.get_workflow_task_decorator(self, executor_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]`
- **Description**: Retrieves a workflow task decorator for a given executor. :param executor_name: Unique name of the executor. :return: The decorator function.
- **Parameters**
- `self`
- `executor_name` (str)
- **Returns**
- `Callable[[Callable[..., T]], Callable[..., T]]`: Return value
**Function:** `DecoratorRegistry.register_workflow_signal_decorator(self, executor_name: str, decorator: Callable[[Callable[..., S]], Callable[..., S]])`
- **Description**: Registers a workflow signal decorator for a given executor. :param executor_name: Unique name of the executor. :param decorator: The decorator to register.
- **Parameters**
- `self`
- `executor_name` (str)
- `decorator` (Callable[[Callable[..., S]], Callable[..., S]])
**Function:** `DecoratorRegistry.get_workflow_signal_decorator(self, executor_name: str) -> Callable[[Callable[..., S]], Callable[..., S]]`
- **Description**: Retrieves a workflow signal decorator for a given executor. :param executor_name: Unique name of the executor. :return: The decorator function.
- **Parameters**
- `self`
- `executor_name` (str)
- **Returns**
- `Callable[[Callable[..., S]], Callable[..., S]]`: Return value
**Function:** `default_workflow_defn(cls: Type) -> Type`
- **Description**: Default no-op workflow definition decorator.
- **Parameters**
- `cls` (Type)
- **Returns**
- `Type`: Return value
**Function:** `default_workflow_run(fn: Callable[..., R]) -> Callable[..., R]`
- **Description**: Default no-op workflow run decorator.
- **Parameters**
- `fn` (Callable[..., R])
- **Returns**
- `Callable[..., R]`: Return value
**Function:** `wrapper()`
**Function:** `default_workflow_task(fn: Callable[..., T]) -> Callable[..., T]`
- **Description**: Default no-op workflow task decorator.
- **Parameters**
- `fn` (Callable[..., T])
- **Returns**
- `Callable[..., T]`: Return value
**Function:** `wrapper()`
**Function:** `default_workflow_signal(fn: Callable[..., R]) -> Callable[..., R]`
- **Description**: Default no-op workflow signal decorator.
- **Parameters**
- `fn` (Callable[..., R])
- **Returns**
- `Callable[..., R]`: Return value
**Function:** `wrapper()`
**Function:** `register_asyncio_decorators(decorator_registry: DecoratorRegistry)`
- **Description**: Registers default asyncio decorators.
- **Parameters**
- `decorator_registry` (DecoratorRegistry)
**Function:** `register_temporal_decorators(decorator_registry: DecoratorRegistry)`
- **Description**: Registers Temporal decorators if Temporal SDK is available.
- **Parameters**
- `decorator_registry` (DecoratorRegistry)
### src/mcp_agent/executor/executor.py
**Class: `ExecutorConfig`**
- **Inherits from**: BaseModel
- **Description**: Configuration for executors.
- **Attributes**:
- `max_concurrent_activities` (int | None) = None
- `timeout_seconds` (timedelta | None) = None
- `retry_policy` (Dict[str, Any] | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `Executor`**
- **Inherits from**: ABC, ContextDependent
- **Description**: Abstract base class for different execution backends
**Class: `AsyncioExecutor`**
- **Inherits from**: Executor
- **Description**: Default executor using asyncio
**Function:** `Executor.__init__(self, engine: str, config: ExecutorConfig | None = None, signal_bus: SignalHandler = None, context: Optional['Context'] = None)`
**Function:** `Executor.execution_context(self)`
- **Description**: Context manager for execution setup/teardown.
- **Parameters**
- `self`
**Function:** `Executor.execute(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
- **Description**: Execute a list of tasks and return their results
- **Parameters**
- `self`
- `task` (Callable[..., R] | Coroutine[Any, Any, R])
- **Returns**
- `R | BaseException`: Return value
**Function:** `Executor.execute_many(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> List[R | BaseException]`
- **Description**: Execute a list of tasks and return their results
- **Parameters**
- `self`
- `tasks` (List[Callable[..., R] | Coroutine[Any, Any, R]])
- **Returns**
- `List[R | BaseException]`: Return value
**Function:** `Executor.execute_streaming(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> AsyncIterator[R | BaseException]`
- **Description**: Execute tasks and yield results as they complete
- **Parameters**
- `self`
- `tasks` (List[Callable[..., R] | Coroutine[Any, Any, R]])
- **Returns**
- `AsyncIterator[R | BaseException]`: Return value
**Function:** `Executor.map(self, func: Callable[..., R], inputs: List[Any]) -> List[R | BaseException]`
- **Description**: Run `func(item)` for each item in `inputs` with concurrency limit.
- **Parameters**
- `self`
- `func` (Callable[..., R])
- `inputs` (List[Any])
- **Returns**
- `List[R | BaseException]`: Return value
**Function:** `Executor.run(item)`
**Function:** `Executor.validate_task(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> None`
- **Description**: Validate a task before execution.
- **Parameters**
- `self`
- `task` (Callable[..., R] | Coroutine[Any, Any, R])
- **Returns**
- `None`: Return value
**Function:** `Executor.signal(self, signal_name: str, payload: SignalValueT = None, signal_description: str | None = None, workflow_id: str | None = None, run_id: str | None = None) -> None`
- **Description**: Emit a signal. Args: signal_name: The name of the signal to emit payload: Optional data to include with the signal signal_description: Optional human-readable description workflow_id: Optional workflow ID to send the signal workflow_id: Optional run ID of the workflow instance to signal
- **Parameters**
- `self`
- `signal_name` (str): The name of the signal to emit
- `payload` (SignalValueT, optional): Optional data to include with the signal
- `signal_description` (str | None, optional): Optional human-readable description
- `workflow_id` (str | None, optional): Optional run ID of the workflow instance to signal
- `run_id` (str | None, optional): Default is None
- **Returns**
- `None`: Return value
**Function:** `Executor.wait_for_signal(self, signal_name: str, request_id: str | None = None, workflow_id: str | None = None, run_id: str | None = None, signal_description: str | None = None, timeout_seconds: int | None = None, signal_type: Type[SignalValueT] = str) -> SignalValueT`
- **Description**: Wait until a signal with signal_name is emitted (or timeout). Return the signal's payload when triggered, or raise on timeout.
- **Parameters**
- `self`
- `signal_name` (str)
- `request_id` (str | None, optional): Default is None
- `workflow_id` (str | None, optional): Default is None
- `run_id` (str | None, optional): Default is None
- `signal_description` (str | None, optional): Default is None
- `timeout_seconds` (int | None, optional): Default is None
- `signal_type` (Type[SignalValueT], optional): Default is str
- **Returns**
- `SignalValueT`: Return value
**Function:** `Executor.uuid(self) -> uuid.UUID`
- **Description**: Generate a UUID. Some executors enforce deterministic UUIDs, so this is an opportunity for an executor to provide its own UUID generation. Defaults to uuid4().
- **Parameters**
- `self`
- **Returns**
- `uuid.UUID`: Return value
**Function:** `Executor.random(self) -> random.Random`
- **Description**: Get a random number generator. Some executors enforce deterministic random number generation, so this is an opportunity for an executor to provide its own random number generator. Defaults to random.Random().
- **Parameters**
- `self`
- **Returns**
- `random.Random`: Return value
**Function:** `AsyncioExecutor.__init__(self, config: ExecutorConfig | None = None, signal_bus: SignalHandler | None = None)`
**Function:** `AsyncioExecutor._execute_task(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
**Function:** `AsyncioExecutor.run_task(task: Callable[..., R] | Coroutine[Any, Any, R]) -> R`
**Function:** `AsyncioExecutor.execute(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
- **Description**: Execute a task and return its results. Args: task: The task to execute *args: Positional arguments to pass to the task **kwargs: Additional arguments to pass to the tasks Returns: A result or exception
- **Parameters**
- `self`
- `task` (Callable[..., R] | Coroutine[Any, Any, R]): The task to execute
- **Returns**
- `R | BaseException`: A result or exception
**Function:** `AsyncioExecutor.execute_many(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> List[R | BaseException]`
- **Description**: Execute a list of tasks and return their results. Args: tasks: The tasks to execute *args: Positional arguments to pass to each task **kwargs: Additional arguments to pass to the tasks Returns: A list of results or exceptions
- **Parameters**
- `self`
- `tasks` (List[Callable[..., R] | Coroutine[Any, Any, R]]): The tasks to execute
- **Returns**
- `List[R | BaseException]`: A list of results or exceptions
**Function:** `AsyncioExecutor.execute_streaming(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> AsyncIterator[R | BaseException]`
- **Description**: Execute tasks and yield results as they complete. Args: tasks: The tasks to execute *args: Positional arguments to pass to each task **kwargs: Additional arguments to pass to the tasks Yields: Results or exceptions as tasks complete
- **Parameters**
- `self`
- `tasks` (List[Callable[..., R] | Coroutine[Any, Any, R]]): The tasks to execute
- **Returns**
- `AsyncIterator[R | BaseException]`: Return value
- **Yields**: Results or exceptions as tasks complete
**Function:** `AsyncioExecutor.signal(self, signal_name: str, payload: SignalValueT = None, signal_description: str | None = None, workflow_id: str | None = None, run_id: str | None = None) -> None`
**Function:** `AsyncioExecutor.wait_for_signal(self, signal_name: str, request_id: str | None = None, workflow_id: str | None = None, run_id: str | None = None, signal_description: str | None = None, timeout_seconds: int | None = None, signal_type: Type[SignalValueT] = str) -> SignalValueT`
### src/mcp_agent/executor/signal_registry.py
**Class: `SignalRegistry`**
- **Description**: Centralized signals management
**Function:** `SignalRegistry.__init__(self)`
**Function:** `SignalRegistry.register(self, name: str, func: Callable, state: Dict[str, Any] | None = None)`
**Function:** `SignalRegistry.get_signal(self, name: str) -> Callable`
**Function:** `SignalRegistry.get_state(self, name: str) -> Dict[str, Any]`
**Function:** `SignalRegistry.list_signals(self) -> List[str]`
**Function:** `SignalRegistry.is_registered(self, name: str) -> bool`
- **Description**: Check if an Signal handler is already registered with the given name.
- **Parameters**
- `self`
- `name` (str)
- **Returns**
- `bool`: Return value
### src/mcp_agent/executor/task_registry.py
**Module Description**: Keep track of all activities/tasks that the executor needs to run. This is used by the workflow engine to dynamically orchestrate a workflow graph. The user just writes standard functions annotated with @workflow_task, but behind the scenes a workflow graph is built.
**Class: `ActivityRegistry`**
- **Description**: Centralized task/activity management with validation and metadata.
**Function:** `ActivityRegistry.__init__(self)`
**Function:** `ActivityRegistry.register(self, name: str, func: Callable, metadata: Dict[str, Any] | None = None)`
**Function:** `ActivityRegistry.get_activity(self, name: str) -> Callable`
**Function:** `ActivityRegistry.get_metadata(self, name: str) -> Dict[str, Any]`
**Function:** `ActivityRegistry.list_activities(self) -> List[str]`
**Function:** `ActivityRegistry.is_registered(self, name: str) -> bool`
- **Description**: Check if an activity is already registered with the given name.
- **Parameters**
- `self`
- `name` (str)
- **Returns**
- `bool`: Return value
### src/mcp_agent/executor/temporal/__init__.py
**Module Description**: Temporal based orchestrator for the MCP Agent. Temporal provides durable execution and robust workflow orchestration, as well as dynamic control flow, making it a good choice for an AI agent orchestrator. Read more: https://docs.temporal.io/develop/python/core-application
**Class: `TemporalExecutorConfig`**
- **Inherits from**: ExecutorConfig, TemporalSettings
- **Description**: Configuration for Temporal executors.
- **Attributes**:
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `TemporalExecutor`**
- **Inherits from**: Executor
- **Description**: Executor that runs @workflows as Temporal workflows, with @workflow_tasks as Temporal activities
**Function:** `TemporalExecutor.__init__(self, config: TemporalExecutorConfig | None = None, signal_bus: SignalHandler | None = None, client: TemporalClient | None = None, context: Optional['Context'] = None)`
**Function:** `TemporalExecutor.wrap_as_activity(activity_name: str, func: Callable[..., R] | Coroutine[Any, Any, R]) -> Coroutine[Any, Any, R]`
- **Description**: Convert a function into a Temporal activity and return its info.
- **Parameters**
- `activity_name` (str)
- `func` (Callable[..., R] | Coroutine[Any, Any, R])
- **Returns**
- `Coroutine[Any, Any, R]`: Return value
**Function:** `TemporalExecutor.wrapped_activity()`
**Function:** `TemporalExecutor._execute_task_as_async(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
**Function:** `TemporalExecutor.run_task(task: Callable[..., R] | Coroutine[Any, Any, R]) -> R`
**Function:** `TemporalExecutor._execute_task(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
**Function:** `TemporalExecutor.execute(self, task: Callable[..., R] | Coroutine[Any, Any, R]) -> R | BaseException`
- **Description**: Execute multiple tasks (activities) in parallel.
- **Parameters**
- `self`
- `task` (Callable[..., R] | Coroutine[Any, Any, R])
- **Returns**
- `R | BaseException`: Return value
**Function:** `TemporalExecutor.execute_many(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> List[R | BaseException]`
- **Description**: Execute multiple tasks (activities) in parallel.
- **Parameters**
- `self`
- `tasks` (List[Callable[..., R] | Coroutine[Any, Any, R]])
- **Returns**
- `List[R | BaseException]`: Return value
**Function:** `TemporalExecutor.execute_streaming(self, tasks: List[Callable[..., R] | Coroutine[Any, Any, R]]) -> AsyncIterator[R | BaseException]`
**Function:** `TemporalExecutor.ensure_client(self)`
- **Description**: Ensure we have a connected Temporal client.
- **Parameters**
- `self`
**Function:** `TemporalExecutor.start_workflow(self, workflow_id: str) -> WorkflowHandle`
- **Description**: Starts a workflow with the given workflow ID and arguments. Args: workflow_id (str): Identifier of the workflow to be started. *workflow_args: Positional arguments to pass to the workflow. wait_for_result: Whether to wait for the workflow to complete and return the result. **workflow_kwargs: Keyword arguments to pass to the workflow. Returns: If wait_for_result is True, returns the workflow result. Otherwise, returns a WorkflowHandle for the started workflow.
- **Parameters**
- `self`
- `workflow_id` (str)
- **Returns**
- `WorkflowHandle`: If wait_for_result is True, returns the workflow result. Otherwise, returns a WorkflowHandle for the started workflow.
**Function:** `TemporalExecutor.execute_workflow(self, workflow_id: str) -> Any`
- **Description**: Execute a workflow and wait for its result. This is a convenience wrapper around start_workflow with wait_for_result=True.
- **Parameters**
- `self`
- `workflow_id` (str)
- **Returns**
- `Any`: Return value
**Function:** `TemporalExecutor.terminate_workflow(self, workflow_id: str, run_id: str | None = None, reason: str | None = 'Cancellation') -> None`
- **Description**: Terminate a workflow execution. Args: workflow_id (str): Identifier of the workflow to terminate. run_id (Optional[str]): If provided, terminates the specific run. Otherwise terminates the latest run. reason (Optional[str]): A reason for the termination.
- **Parameters**
- `self`
- `workflow_id` (str)
- `run_id` (str | None, optional): Default is None
- `reason` (str | None, optional): Default is 'Cancellation'
- **Returns**
- `None`: Return value
**Function:** `TemporalExecutor.uuid(self) -> 'UUID'`
- **Description**: Generate a UUID using Temporal's deterministic UUID generator.
- **Parameters**
- `self`
- **Returns**
- `'UUID'`: Return value
**Function:** `TemporalExecutor.random(self) -> 'Random'`
- **Description**: Get an instance of Temporal's deterministic pseudo-random number generator. Note, this random number generator is not cryptographically safe and should not be used for security purposes. Returns: The deterministically-seeded pseudo-random number generator.
- **Parameters**
- `self`
- **Returns**
- `'Random'`: The deterministically-seeded pseudo-random number generator.
**Function:** `create_temporal_worker_for_app(app: 'MCPApp')`
- **Description**: Create a Temporal worker for the given app.
- **Parameters**
- `app` ('MCPApp')
### src/mcp_agent/executor/temporal/workflow_registry.py
**Class: `TemporalWorkflowRegistry`**
- **Inherits from**: WorkflowRegistry
- **Description**: Registry for tracking workflow instances in Temporal.
This implementation queries Temporal for workflow status and manages workflows.
**Function:** `TemporalWorkflowRegistry.__init__(self, executor: 'TemporalExecutor')`
**Function:** `TemporalWorkflowRegistry.register(self, workflow: 'Workflow', run_id: str | None = None, workflow_id: str | None = None, task: Optional['asyncio.Task'] = None) -> None`
**Function:** `TemporalWorkflowRegistry.unregister(self, run_id: str, workflow_id: str | None = None) -> None`
**Function:** `TemporalWorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
**Function:** `TemporalWorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `TemporalWorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `TemporalWorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `TemporalWorkflowRegistry.list_workflow_statuses(self) -> List[Dict[str, Any]]`
**Function:** `TemporalWorkflowRegistry.list_workflows(self) -> List['Workflow']`
- **Description**: List all registered workflow instances. Returns: A list of workflow instances
- **Parameters**
- `self`
- **Returns**
- `List['Workflow']`: A list of workflow instances
**Function:** `TemporalWorkflowRegistry._get_temporal_workflow_status(self, workflow_id: str, run_id: str) -> Dict[str, Any]`
- **Description**: Get the status of a workflow directly from Temporal. Args: workflow_id: The workflow ID run_id: The run ID Returns: A dictionary with workflow status information from Temporal
- **Parameters**
- `self`
- `workflow_id` (str): The workflow ID
- `run_id` (str): The run ID
- **Returns**
- `Dict[str, Any]`: A dictionary with workflow status information from Temporal
### src/mcp_agent/executor/temporal/workflow_signal.py
**Class: `_Record`**
- **Inherits from**: <ast.Subscript object at 0x10572b2e0>
- **Description**: Record for tracking signal values with versioning for broadcast semantics
- **Attributes**:
- `value` (Optional[SignalValueT]) = None
- `version` (int) = 0
**Class: `SignalMailbox`**
- **Inherits from**: <ast.Subscript object at 0x105758e80>
- **Description**: Deterministic broadcast mailbox that stores signal values with versioning.
Each workflow run has its own mailbox instance.
**Class: `TemporalSignalHandler`**
- **Inherits from**: <ast.Subscript object at 0x1056c5520>
- **Description**: Temporal-based signal handling using workflow signals.
This implementation uses a mailbox to store signal values and version counters
to track new signals. It allows for dynamic signal handling and supports
waiting for signals.
**Function:** `SignalMailbox.__init__(self) -> None`
**Function:** `SignalMailbox.push(self, name: str, value: SignalValueT) -> None`
- **Description**: Store a signal value and increment its version counter. This enables broadcast semantics where all waiters see the same value.
- **Parameters**
- `self`
- `name` (str)
- `value` (SignalValueT)
- **Returns**
- `None`: Return value
**Function:** `SignalMailbox.version(self, name: str) -> int`
- **Description**: Get the current version counter for a signal name
- **Parameters**
- `self`
- `name` (str)
- **Returns**
- `int`: Return value
**Function:** `SignalMailbox.value(self, name: str) -> SignalValueT`
- **Description**: Get the current value for a signal name Returns: The signal value Raises: ValueError: If no value exists for the signal
- **Parameters**
- `self`
- `name` (str)
- **Returns**
- `SignalValueT`: The signal value
- **Raises**: ValueError: If no value exists for the signal
**Function:** `TemporalSignalHandler.__init__(self, executor: Optional['TemporalExecutor'] = None) -> None`
**Function:** `TemporalSignalHandler.attach_to_workflow(self, wf_instance: 'Workflow') -> None`
- **Description**: Attach this signal handler to a workflow instance. Registers a single dynamic signal handler for all signals. Args: wf_instance: The workflow instance to attach to Note: If the workflow already has a dynamic signal handler registered through @workflow.signal(dynamic=True), a Temporal runtime error will occur.
- **Parameters**
- `self`
- `wf_instance` ('Workflow'): The workflow instance to attach to
- **Returns**
- `None`: Return value
- **Note**: If the workflow already has a dynamic signal handler registered through @workflow.signal(dynamic=True), a Temporal runtime error will occur.
**Function:** `TemporalSignalHandler.wait_for_signal(self, signal: Signal[SignalValueT], timeout_seconds: int | None = None, min_version: int | None = None) -> SignalValueT`
- **Description**: Wait for a signal to be received. Args: signal: The signal to wait for timeout_seconds: Optional timeout in seconds min_version: Optional minimum version to wait for (defaults to current version). This is useful for waiting for a new signal even if one with the same name was already received. Returns: The emitted signal payload. Raises: RuntimeError: If called outside a workflow or mailbox not initialized TimeoutError: If timeout is reached ValueError: If no value exists for the signal after waiting
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT]): The signal to wait for
- `timeout_seconds` (int | None, optional): Optional timeout in seconds
- `min_version` (int | None, optional): Optional minimum version to wait for (defaults to current version). This is useful for waiting for a new signal even if one with the same name was already received.
- **Returns**
- `SignalValueT`: The emitted signal payload.
- **Raises**: RuntimeError: If called outside a workflow or mailbox not initialized TimeoutError: If timeout is reached ValueError: If no value exists for the signal after waiting
**Function:** `TemporalSignalHandler.on_signal(self, signal_name: str)`
- **Description**: Decorator that registers a callback for a signal. The callback will be invoked when the signal is received. Args: signal_name: The name of the signal to handle
- **Parameters**
- `self`
- `signal_name` (str): The name of the signal to handle
**Function:** `TemporalSignalHandler.decorator(user_cb: Callable[[Signal[SignalValueT]], Any])`
**Function:** `TemporalSignalHandler.signal(self, signal: Signal[SignalValueT]) -> None`
- **Description**: Send a signal to a running workflow. Args: signal: The signal to send Raises: ValueError: If validation fails RuntimeError: If executor is missing when called outside a workflow
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT]): The signal to send
- **Returns**
- `None`: Return value
- **Raises**: ValueError: If validation fails RuntimeError: If executor is missing when called outside a workflow
**Function:** `TemporalSignalHandler.validate_signal(self, signal)`
### src/mcp_agent/executor/workflow.py
**Class: `WorkflowState`**
- **Inherits from**: BaseModel
- **Description**: Simple container for persistent workflow state.
This can hold fields that should persist across tasks.
- **Attributes**:
- `status` (str) = 'initialized'
- `metadata` (Dict[str, Any]) = Field(default_factory=dict)
- `updated_at` (float | None) = None
- `error` (Dict[str, Any] | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `WorkflowResult`**
- **Inherits from**: BaseModel, <ast.Subscript object at 0x10574bfd0>
- **Attributes**:
- `value` (Optional[T]) = None
- `metadata` (Dict[str, Any]) = Field(default_factory=dict)
- `start_time` (float | None) = None
- `end_time` (float | None) = None
**Class: `Workflow`**
- **Inherits from**: ABC, <ast.Subscript object at 0x10572adf0>, ContextDependent
- **Description**: Base class for user-defined workflows.
Handles execution and state management.
Workflows represent user-defined application logic modules that can use Agents and AugmentedLLMs.
Typically, workflows are registered with an MCPApp and can be exposed as MCP tools via app_server.py.
Some key notes:
- The class MUST be decorated with @app.workflow.
- Persistent state: Provides a simple `state` object for storing data across tasks.
- Lifecycle management: Provides run_async, pause, resume, cancel, and get_status methods.
**Function:** `WorkflowState.record_error(self, error: Exception) -> None`
**Function:** `Workflow.__init__(self, name: str | None = None, metadata: Dict[str, Any] | None = None, context: Optional['Context'] = None)`
**Function:** `Workflow.executor(self)`
- **Description**: Get the workflow executor from the context.
- **Parameters**
- `self`
**Function:** `Workflow.id(self) -> str | None`
- **Description**: Get the workflow ID for this workflow.
- **Parameters**
- `self`
- **Returns**
- `str | None`: Return value
**Function:** `Workflow.run_id(self) -> str | None`
- **Description**: Get the workflow run ID if it has been assigned. NOTE: The run() method will assign a new workflow ID on every run.
- **Parameters**
- `self`
- **Returns**
- `str | None`: Return value
**Function:** `Workflow.create(cls, name: str | None = None, context: Optional['Context'] = None) -> 'Workflow'`
- **Description**: Factory method to create and initialize a workflow instance. This default implementation creates a workflow instance and calls initialize(). Subclasses can override this method for custom initialization logic. Args: name: Optional name for the workflow (defaults to class name) context: Optional context to use (falls back to global context if not provided) **kwargs: Additional parameters to pass to the workflow constructor Returns: An initialized workflow instance
- **Parameters**
- `cls`
- `name` (str | None, optional): Optional name for the workflow (defaults to class name)
- `context` (Optional['Context'], optional): Optional context to use (falls back to global context if not provided)
- **Returns**
- `'Workflow'`: An initialized workflow instance
**Function:** `Workflow.run(self) -> 'WorkflowResult[T]'`
- **Description**: Main workflow implementation. Must be overridden by subclasses. This is where the user-defined application logic goes. Typically, this involves: 1. Setting up Agents and attaching LLMs to them 2. Executing operations using the Agents and their LLMs 3. Processing results and returning them Returns: WorkflowResult containing the output of the workflow
- **Parameters**
- `self`
- **Returns**
- `'WorkflowResult[T]'`: WorkflowResult containing the output of the workflow
- **This is where the user-defined application logic goes. Typically, this involves**: 1. Setting up Agents and attaching LLMs to them 2. Executing operations using the Agents and their LLMs 3. Processing results and returning them
**Function:** `Workflow._cancel_task(self)`
- **Description**: Wait for a cancel signal and cancel the workflow task.
- **Parameters**
- `self`
**Function:** `Workflow.run_async(self) -> str`
- **Description**: Run the workflow asynchronously and return a workflow ID. This creates an async task that will be executed through the executor and returns immediately with a workflow run ID that can be used to check status, resume, or cancel. Args: *args: Positional arguments to pass to the run method **kwargs: Keyword arguments to pass to the run method Returns: str: A unique workflow ID that can be used to reference this workflow instance
- **Parameters**
- `self`
- **Returns**
- `str`: str: A unique workflow ID that can be used to reference this workflow instance
**Function:** `Workflow._execute_workflow()`
**Function:** `Workflow.resume(self, signal_name: str | None = 'resume', payload: str | None = None) -> bool`
- **Description**: Send a resume signal to the workflow. Args: signal_name: The name of the signal to send (default: "resume") payload: Optional data to provide to the workflow upon resuming Returns: bool: True if the resume signal was sent successfully, False otherwise
- **Parameters**
- `self`
- `signal_name` (str | None, optional): The name of the signal to send (default: "resume")
- `payload` (str | None, optional): Optional data to provide to the workflow upon resuming
- **Returns**
- `bool`: bool: True if the resume signal was sent successfully, False otherwise
**Function:** `Workflow.cancel(self) -> bool`
- **Description**: Cancel the workflow by sending a cancel signal and cancelling its task. Returns: bool: True if the workflow was cancelled successfully, False otherwise
- **Parameters**
- `self`
- **Returns**
- `bool`: bool: True if the workflow was cancelled successfully, False otherwise
**Function:** `Workflow._signal_receiver(self, name: str, args: Sequence[RawValue])`
- **Description**: Dynamic signal handler for Temporal workflows.
- **Parameters**
- `self`
- `name` (str)
- `args` (Sequence[RawValue])
**Function:** `Workflow.get_status(self) -> Dict[str, Any]`
- **Description**: Get the current status of the workflow. Returns: Dict[str, Any]: A dictionary with workflow status information
- **Parameters**
- `self`
- **Returns**
- `Dict[str, Any]`: Dict[str, Any]: A dictionary with workflow status information
**Function:** `Workflow.update_status(self, status: str) -> None`
- **Description**: Update the workflow status. Args: status: The new status to set
- **Parameters**
- `self`
- `status` (str): The new status to set
- **Returns**
- `None`: Return value
**Function:** `Workflow.update_state(self)`
- **Description**: Syntactic sugar to update workflow state.
- **Parameters**
- `self`
**Function:** `Workflow.initialize(self)`
- **Description**: Initialization method that will be called before run. Override this to set up any resources needed by the workflow. This checks the _initialized flag to prevent double initialization.
- **Parameters**
- `self`
**Function:** `Workflow.cleanup(self)`
- **Description**: Cleanup method that will be called after run. Override this to clean up any resources used by the workflow. This checks the _initialized flag to ensure cleanup is only done on initialized workflows.
- **Parameters**
- `self`
**Function:** `Workflow.__aenter__(self)`
- **Description**: Support for async context manager pattern.
- **Parameters**
- `self`
**Function:** `Workflow.__aexit__(self, exc_type, exc_val, exc_tb)`
- **Description**: Support for async context manager pattern.
- **Parameters**
- `self`
- `exc_type`
- `exc_val`
- `exc_tb`
### src/mcp_agent/executor/workflow_registry.py
**Class: `WorkflowRegistry`**
- **Inherits from**: ABC
- **Description**: Abstract base class for registry tracking workflow instances.
Provides a central place to register, look up, and manage workflow instances.
**Class: `InMemoryWorkflowRegistry`**
- **Inherits from**: WorkflowRegistry
- **Description**: Registry for tracking workflow instances in memory for AsyncioExecutor.
**Function:** `WorkflowRegistry.__init__(self)`
**Function:** `WorkflowRegistry.register(self, workflow: 'Workflow', run_id: str | None = None, workflow_id: str | None = None, task: Optional['asyncio.Task'] = None) -> None`
- **Description**: Register a workflow instance (i.e. a workflow run). Args: workflow: The workflow instance run_id: The unique ID for this specific workflow run. If unspecified, it will be retrieved from the workflow instance. workflow_id: The unique ID for the workflow type. If unspecified, it will be retrieved from the workflow instance. task: The asyncio task running the workflow
- **Parameters**
- `self`
- `workflow` ('Workflow'): The workflow instance
- `run_id` (str | None, optional): The unique ID for this specific workflow run. If unspecified, it will be retrieved from the workflow instance.
- `workflow_id` (str | None, optional): The unique ID for the workflow type. If unspecified, it will be retrieved from the workflow instance.
- `task` (Optional['asyncio.Task'], optional): The asyncio task running the workflow
- **Returns**
- `None`: Return value
**Function:** `WorkflowRegistry.unregister(self, run_id: str, workflow_id: str | None = None) -> None`
- **Description**: Remove a workflow instance from the registry. Args: run_id: The unique ID for this specific workflow run. workflow_id: The ID of the workflow.
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run.
- `workflow_id` (str | None, optional): The ID of the workflow.
- **Returns**
- `None`: Return value
**Function:** `WorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
- **Description**: Get a workflow instance by run ID. Args: run_id: The unique ID for this specific workflow run. workflow_id: The ID of the workflow to retrieve Returns: The workflow instance, or None if not found
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run.
- `workflow_id` (str | None, optional): The ID of the workflow to retrieve
- **Returns**
- `Optional['Workflow']`: The workflow instance, or None if not found
**Function:** `WorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
- **Description**: Resume a paused workflow. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to resume signal_name: Name of the signal to send to the workflow (default is "resume") payload: Payload to send with the signal Returns: True if the resume signal was sent successfully, False otherwise
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to resume
- `signal_name` (str | None, optional): Name of the signal to send to the workflow (default is "resume")
- `payload` (Any | None, optional): Payload to send with the signal
- **Returns**
- `bool`: True if the resume signal was sent successfully, False otherwise
**Function:** `WorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
- **Description**: Cancel (terminate) a running workflow. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to cancel Returns: True if the cancel signal was sent successfully, False otherwise
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to cancel
- **Returns**
- `bool`: True if the cancel signal was sent successfully, False otherwise
**Function:** `WorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
- **Description**: Get the status of a workflow run. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to cancel Returns: The last available workflow status if found, None otherwise
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to cancel
- **Returns**
- `Optional[Dict[str, Any]]`: The last available workflow status if found, None otherwise
**Function:** `WorkflowRegistry.list_workflow_statuses(self) -> List[Dict[str, Any]]`
- **Description**: List all registered workflow instances with their status. Returns: A list of dictionaries with workflow information
- **Parameters**
- `self`
- **Returns**
- `List[Dict[str, Any]]`: A list of dictionaries with workflow information
**Function:** `WorkflowRegistry.list_workflows(self) -> List['Workflow']`
- **Description**: List all registered workflow instances. Returns: A list of workflow instances
- **Parameters**
- `self`
- **Returns**
- `List['Workflow']`: A list of workflow instances
**Function:** `InMemoryWorkflowRegistry.__init__(self)`
**Function:** `InMemoryWorkflowRegistry.register(self, workflow: 'Workflow', run_id: str | None = None, workflow_id: str | None = None, task: Optional['asyncio.Task'] = None) -> None`
**Function:** `InMemoryWorkflowRegistry.unregister(self, run_id: str, workflow_id: str | None = None) -> None`
**Function:** `InMemoryWorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
**Function:** `InMemoryWorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `InMemoryWorkflowRegistry.list_workflow_statuses(self) -> List[Dict[str, Any]]`
**Function:** `InMemoryWorkflowRegistry.list_workflows(self) -> List['Workflow']`
### src/mcp_agent/executor/workflow_signal.py
**Class: `Signal`**
- **Inherits from**: BaseModel, <ast.Subscript object at 0x1057011c0>
- **Description**: Represents a signal that can be sent to a workflow.
- **Attributes**:
- `name` (str): The name of the signal. This is used to identify the signal and route it to the correct handler.
- `description` (str | None) = 'Workflow Signal': A description of the signal. This can be used to provide additional context about the signal.
- `payload` (SignalValueT | None) = None: The payload of the signal. This is the data that will be sent with the signal.
- `metadata` (Dict[str, Any] | None) = None: Additional metadata about the signal. This can be used to provide extra context or information.
- `workflow_id` (str | None) = None: The ID of the workflow that this signal is associated with. This is used in conjunction with the run_id to identify the specific workflow instance.
- `run_id` (str | None) = None: The unique ID for this specific workflow run to signal. This is used to identify the specific instance of the workflow that this signal is associated with.
- `model_config` = ConfigDict(arbitrary_types_allowed=True)
**Class: `SignalRegistration`**
- **Inherits from**: BaseModel
- **Description**: Tracks registration of a signal handler.
- **Attributes**:
- `signal_name` (str)
- `unique_name` (str)
- `workflow_id` (str | None) = None
- `run_id` (str | None) = None
- `model_config` = ConfigDict(arbitrary_types_allowed=True)
**Class: `SignalHandler`**
- **Inherits from**: Protocol, <ast.Subscript object at 0x105733a90>
- **Description**: Protocol for handling signals.
**Class: `PendingSignal`**
- **Inherits from**: BaseModel
- **Description**: Tracks a waiting signal handler and its event.
- **Attributes**:
- `registration` (SignalRegistration)
- `event` (asyncio.Event | None) = None
- `value` (SignalValueT | None) = None
- `model_config` = ConfigDict(arbitrary_types_allowed=True)
**Class: `BaseSignalHandler`**
- **Inherits from**: ABC, <ast.Subscript object at 0x105733e20>
- **Description**: Base class implementing common signal handling functionality.
**Class: `ConsoleSignalHandler`**
- **Inherits from**: <ast.Subscript object at 0x1056be100>
- **Description**: Simple console-based signal handling (blocks on input).
**Class: `AsyncioSignalHandler`**
- **Inherits from**: <ast.Subscript object at 0x10572bd00>
- **Description**: Asyncio-based signal handling using an internal dictionary of asyncio Events.
**Class: `LocalSignalStore`**
- **Description**: Simple in-memory structure that allows coroutines to wait for a signal
and triggers them when a signal is emitted.
**Class: `SignalWaitCallback`**
- **Inherits from**: Protocol
- **Description**: Protocol for callbacks that are triggered when a workflow pauses waiting for a given signal.
**Function:** `SignalHandler.signal(self, signal: Signal[SignalValueT]) -> None`
- **Description**: Emit a signal to all waiting handlers and registered callbacks.
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT])
- **Returns**
- `None`: Return value
**Function:** `SignalHandler.wait_for_signal(self, signal: Signal[SignalValueT], timeout_seconds: int | None = None) -> SignalValueT`
- **Description**: Wait for a signal to be emitted.
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT])
- `timeout_seconds` (int | None, optional): Default is None
- **Returns**
- `SignalValueT`: Return value
**Function:** `SignalHandler.on_signal(self, signal_name: str) -> Callable`
- **Description**: Decorator to register a handler for a signal. Example: @signal_handler.on_signal("approval_needed") async def handle_approval(value: str): print(f"Got approval signal with value: {value}")
- **Parameters**
- `self`
- `signal_name` (str)
- **Returns**
- `Callable`: Return value
- **Example**: @signal_handler.on_signal("approval_needed")
- **async def handle_approval(value: str)**: print(f"Got approval signal with value: {value}")
**Function:** `BaseSignalHandler.__init__(self)`
**Function:** `BaseSignalHandler.cleanup(self, signal_name: str | None = None)`
- **Description**: Clean up handlers and registrations for a signal or all signals.
- **Parameters**
- `self`
- `signal_name` (str | None, optional): Default is None
**Function:** `BaseSignalHandler.validate_signal(self, signal: Signal[SignalValueT])`
- **Description**: Validate signal properties.
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT])
**Function:** `BaseSignalHandler.on_signal(self, signal_name: str) -> Callable`
- **Description**: Register a handler for a signal.
- **Parameters**
- `self`
- `signal_name` (str)
- **Returns**
- `Callable`: Return value
**Function:** `BaseSignalHandler.decorator(func: Callable) -> Callable`
**Function:** `BaseSignalHandler.wrapped(value: SignalValueT)`
**Function:** `BaseSignalHandler.signal(self, signal: Signal[SignalValueT]) -> None`
- **Description**: Emit a signal to all waiting handlers and registered callbacks.
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT])
- **Returns**
- `None`: Return value
**Function:** `BaseSignalHandler.wait_for_signal(self, signal: Signal[SignalValueT], timeout_seconds: int | None = None) -> SignalValueT`
- **Description**: Wait for a signal to be emitted.
- **Parameters**
- `self`
- `signal` (Signal[SignalValueT])
- `timeout_seconds` (int | None, optional): Default is None
- **Returns**
- `SignalValueT`: Return value
**Function:** `ConsoleSignalHandler.__init__(self)`
**Function:** `ConsoleSignalHandler.wait_for_signal(self, signal, timeout_seconds = None)`
- **Description**: Block and wait for console input.
- **Parameters**
- `self`
- `signal`
- `timeout_seconds` (optional): Default is None
**Function:** `ConsoleSignalHandler.on_signal(self, signal_name)`
**Function:** `ConsoleSignalHandler.decorator(func)`
**Function:** `ConsoleSignalHandler.wrapped(value: SignalValueT)`
**Function:** `ConsoleSignalHandler.signal(self, signal)`
**Function:** `AsyncioSignalHandler.wait_for_signal(self, signal, timeout_seconds: int | None = None) -> SignalValueT`
**Function:** `AsyncioSignalHandler.on_signal(self, signal_name)`
**Function:** `AsyncioSignalHandler.decorator(func)`
**Function:** `AsyncioSignalHandler.wrapped(value: SignalValueT)`
**Function:** `AsyncioSignalHandler.signal(self, signal)`
**Function:** `LocalSignalStore.__init__(self)`
**Function:** `LocalSignalStore.emit(self, signal_name: str, payload: Any)`
**Function:** `LocalSignalStore.wait_for(self, signal_name: str, timeout_seconds: int | None = None) -> Any`
**Function:** `SignalWaitCallback.__call__(self, signal_name: str, request_id: str | None = None, workflow_id: str | None = None, run_id: str | None = None, metadata: Dict[str, Any] | None = None) -> None`
- **Description**: Receive a notification that a workflow is pausing on a signal. Args: signal_name: The name of the signal the workflow is pausing on. workflow_id: The ID of the workflow that is pausing (if using a workflow engine). run_id: The ID of the workflow run that is pausing (if using a workflow engine). metadata: Additional metadata about the signal.
- **Parameters**
- `self`
- `signal_name` (str): The name of the signal the workflow is pausing on.
- `request_id` (str | None, optional): Default is None
- `workflow_id` (str | None, optional): The ID of the workflow that is pausing (if using a workflow engine).
- `run_id` (str | None, optional): The ID of the workflow run that is pausing (if using a workflow engine).
- `metadata` (Dict[str, Any] | None, optional): Additional metadata about the signal.
- **Returns**
- `None`: Return value
### src/mcp_agent/executor/workflow_task.py
**Module Description**: Static decorator registry for @workflow_task. Wherever possible it is preferred to use @app.workflow_task in MCPApp
**Class: `GlobalWorkflowTaskRegistry`**
- **Attributes**:
- `_instance` = None
**Function:** `GlobalWorkflowTaskRegistry.__new__(cls)`
**Function:** `GlobalWorkflowTaskRegistry.register_task(self, func: Callable, metadata: Dict[str, Any])`
**Function:** `GlobalWorkflowTaskRegistry.get_all_tasks(self) -> List[tuple]`
**Function:** `GlobalWorkflowTaskRegistry.clear(self)`
**Function:** `workflow_task(_fn: Callable[..., R] | None = None) -> Callable[[Callable[..., R]], Callable[..., R]]`
- **Description**: Static decorator to mark a function as a workflow task without requiring direct app access. These tasks will be registered with the MCPApp during app initialization. Args: name: Optional custom name for the activity schedule_to_close_timeout: Maximum time the task can take to complete retry_policy: Retry policy configuration **meta_kwargs: Additional metadata passed to the activity registration Returns: Decorated function that preserves async and typing information
- **Parameters**
- `_fn` (Callable[..., R] | None, optional): Default is None
- **Returns**
- `Callable[[Callable[..., R]], Callable[..., R]]`: Decorated function that preserves async and typing information
**Function:** `decorator(target: Callable[..., R]) -> Callable[..., R]`
### src/mcp_agent/human_input/handler.py
**Function:** `console_input_callback(request: HumanInputRequest) -> HumanInputResponse`
- **Description**: Request input from a human user via console using rich panel and prompt.
- **Parameters**
- `request` (HumanInputRequest)
- **Returns**
- `HumanInputResponse`: Return value
### src/mcp_agent/human_input/types.py
**Class: `HumanInputRequest`**
- **Inherits from**: BaseModel
- **Description**: Represents a request for human input.
- **Attributes**:
- `prompt` (str): The prompt to show to the user
- `description` (str | None) = None: Optional description of what the input is for
- `request_id` (str | None) = None: Unique identifier for this request
- `workflow_id` (str | None) = None: Optional workflow ID if using workflow engine
- `timeout_seconds` (int | None) = None: Optional timeout in seconds
- `metadata` (dict | None) = None: Additional request payload
**Class: `HumanInputResponse`**
- **Inherits from**: BaseModel
- **Description**: Represents a response to a human input request
- **Attributes**:
- `request_id` (str): ID of the original request
- `response` (str): The input provided by the human
- `metadata` (dict[str, Any] | None) = None: Additional response payload
**Class: `HumanInputCallback`**
- **Inherits from**: Protocol
- **Description**: Protocol for callbacks that handle human input requests.
**Function:** `HumanInputCallback.__call__(self, request: HumanInputRequest) -> AsyncIterator[HumanInputResponse]`
- **Description**: Handle a human input request. Args: request: The input request to handle Returns: AsyncIterator yielding responses as they come in TODO: saqadri - Keep it simple and just return HumanInputResponse?
- **Parameters**
- `self`
- `request` (HumanInputRequest): The input request to handle
- **Returns**
- `AsyncIterator[HumanInputResponse]`: AsyncIterator yielding responses as they come in TODO: saqadri - Keep it simple and just return HumanInputResponse?
### src/mcp_agent/logging/event_progress.py
**Module Description**: Module for converting log events to progress events.
**Class: `ProgressAction`**
- **Inherits from**: str, Enum
- **Description**: Progress actions available in the system.
- **Attributes**:
- `STARTING` = 'Starting'
- `LOADED` = 'Loaded'
- `RUNNING` = 'Running'
- `INITIALIZED` = 'Initialized'
- `CHATTING` = 'Chatting'
- `ROUTING` = 'Routing'
- `PLANNING` = 'Planning'
- `READY` = 'Ready'
- `CALLING_TOOL` = 'Calling Tool'
- `FINISHED` = 'Finished'
- `SHUTDOWN` = 'Shutdown'
- `AGGREGATOR_INITIALIZED` = 'Running'
- `FATAL_ERROR` = 'Error'
**Class: `ProgressEvent`**
- **Description**: Represents a progress event converted from a log event.
- **Attributes**:
- `action` (ProgressAction)
- `target` (str)
- `details` (Optional[str]) = None
- `agent_name` (Optional[str]) = None
**Function:** `ProgressEvent.__str__(self) -> str`
- **Description**: Format the progress event for display.
- **Parameters**
- `self`
- **Returns**
- `str`: Return value
**Function:** `convert_log_event(event: Event) -> Optional[ProgressEvent]`
- **Description**: Convert a log event to a progress event if applicable.
- **Parameters**
- `event` (Event)
- **Returns**
- `Optional[ProgressEvent]`: Return value
### src/mcp_agent/logging/events.py
**Module Description**: Events and event filters for the logger module for the MCP Agent
**Class: `EventContext`**
- **Inherits from**: BaseModel
- **Description**: Stores correlation or cross-cutting data (workflow IDs, user IDs, etc.).
Also used for distributed environments or advanced logging.
- **Attributes**:
- `session_id` (str | None) = None
- `workflow_id` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `Event`**
- **Inherits from**: BaseModel
- **Description**: Core event structure. Allows both a broad 'type' (EventType)
and a more specific 'name' string for domain-specific labeling (e.g. "ORDER_PLACED").
- **Attributes**:
- `type` (EventType)
- `name` (str | None) = None
- `namespace` (str)
- `message` (str)
- `timestamp` (datetime) = Field(default_factory=datetime.now)
- `data` (Dict[str, Any]) = Field(default_factory=dict)
- `context` (EventContext | None) = None
- `span_id` (str | None) = None
- `trace_id` (str | None) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `EventFilter`**
- **Inherits from**: BaseModel
- **Description**: Filter events by:
- allowed EventTypes (types)
- allowed event 'names'
- allowed namespace prefixes
- a minimum severity level (DEBUG < INFO < WARNING < ERROR)
- **Attributes**:
- `types` (Set[EventType] | None) = Field(default_factory=set)
- `names` (Set[str] | None) = Field(default_factory=set)
- `namespaces` (Set[str] | None) = Field(default_factory=set)
- `min_level` (EventType | None) = 'debug'
**Class: `SamplingFilter`**
- **Inherits from**: EventFilter
- **Description**: Random sampling on top of base filter.
Only pass an event if it meets the base filter AND random() < sample_rate.
- **Attributes**:
- `sample_rate` (float) = 0.1: Fraction of events to pass through
**Function:** `EventFilter.matches(self, event: Event) -> bool`
- **Description**: Check if an event matches this EventFilter criteria.
- **Parameters**
- `self`
- `event` (Event)
- **Returns**
- `bool`: Return value
**Function:** `SamplingFilter.matches(self, event: Event) -> bool`
### src/mcp_agent/logging/json_serializer.py
**Class: `JSONSerializer`**
- **Description**: A robust JSON serializer that handles various Python objects by attempting
different serialization strategies recursively.
- **Attributes**:
- `MAX_DEPTH` = 99
- `SENSITIVE_FIELDS` = {'api_key', 'secret', 'password', 'token', 'auth', 'private_key', 'client_secret', 'access_token', 'refresh_token'}
**Function:** `JSONSerializer.__init__(self)`
**Function:** `JSONSerializer._redact_sensitive_value(self, value: str) -> str`
- **Description**: Redact sensitive values to show only first 10 chars.
- **Parameters**
- `self`
- `value` (str)
- **Returns**
- `str`: Return value
**Function:** `JSONSerializer.serialize(self, obj: Any) -> Any`
- **Description**: Main entry point for serialization.
- **Parameters**
- `self`
- `obj` (Any)
- **Returns**
- `Any`: Return value
**Function:** `JSONSerializer._is_sensitive_key(self, key: str) -> bool`
- **Description**: Check if a key likely contains sensitive information.
- **Parameters**
- `self`
- `key` (str)
- **Returns**
- `bool`: Return value
**Function:** `JSONSerializer._serialize_object(self, obj: Any, depth: int = 0) -> Any`
- **Description**: Recursively serialize an object using various strategies.
- **Parameters**
- `self`
- `obj` (Any)
- `depth` (int, optional): Default is 0
- **Returns**
- `Any`: Return value
**Function:** `JSONSerializer.__call__(self, obj: Any) -> Any`
- **Description**: Make the serializer callable.
- **Parameters**
- `self`
- `obj` (Any)
- **Returns**
- `Any`: Return value
### src/mcp_agent/logging/listeners.py
**Module Description**: Listeners for the logger module of MCP Agent.
**Class: `EventListener`**
- **Inherits from**: ABC
- **Description**: Base async listener that processes events.
**Class: `LifecycleAwareListener`**
- **Inherits from**: EventListener
- **Description**: Optionally override start()/stop() for setup/teardown.
The event bus calls these at bus start/stop time.
**Class: `FilteredListener`**
- **Inherits from**: LifecycleAwareListener
- **Description**: Only processes events that pass the given filter.
Subclasses override _handle_matched_event().
**Class: `LoggingListener`**
- **Inherits from**: FilteredListener
- **Description**: Routes events to Python's logging facility with appropriate severity level.
**Class: `ProgressListener`**
- **Inherits from**: LifecycleAwareListener
- **Description**: Listens for all events pre-filtering and converts them to progress events
for display. By inheriting directly from LifecycleAwareListener instead of
FilteredListener, we get events before any filtering occurs.
**Class: `BatchingListener`**
- **Inherits from**: FilteredListener
- **Description**: Accumulates events in memory, flushes them in batches.
Here we just print the batch size, but you might store or forward them.
**Function:** `EventListener.handle_event(self, event: Event)`
- **Description**: Process an incoming event.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `LifecycleAwareListener.start(self)`
- **Description**: Start an event listener, usually when the event bus is set up.
- **Parameters**
- `self`
**Function:** `LifecycleAwareListener.stop(self)`
- **Description**: Stop an event listener, usually when the event bus is shutting down.
- **Parameters**
- `self`
**Function:** `FilteredListener.__init__(self, event_filter: EventFilter | None = None)`
- **Description**: Initialize the listener. Args: filter: Event filter to apply to incoming events.
- **Parameters**
- `self`
- `event_filter` (EventFilter | None, optional): Default is None
**Function:** `FilteredListener.handle_event(self, event)`
**Function:** `FilteredListener.handle_matched_event(self, event: Event)`
- **Description**: Process an event that matches the filter.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `LoggingListener.__init__(self, event_filter: EventFilter | None = None, logger: logging.Logger | None = None)`
- **Description**: Initialize the listener. Args: logger: Logger to use for event processing. Defaults to 'mcp_agent'.
- **Parameters**
- `self`
- `event_filter` (EventFilter | None, optional): Default is None
- `logger` (logging.Logger | None, optional): Logger to use for event processing. Defaults to 'mcp_agent'.
**Function:** `LoggingListener.handle_matched_event(self, event)`
**Function:** `ProgressListener.__init__(self, display = None)`
- **Description**: Initialize the progress listener. Args: display: Optional display handler. If None, the shared progress_display will be used.
- **Parameters**
- `self`
- `display` (optional): Optional display handler. If None, the shared progress_display will be used.
**Function:** `ProgressListener.start(self)`
- **Description**: Start the progress display.
- **Parameters**
- `self`
**Function:** `ProgressListener.stop(self)`
- **Description**: Stop the progress display.
- **Parameters**
- `self`
**Function:** `ProgressListener.handle_event(self, event: Event)`
- **Description**: Process an incoming event and display progress if relevant.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `BatchingListener.__init__(self, event_filter: EventFilter | None = None, batch_size: int = 5, flush_interval: float = 2.0)`
- **Description**: Initialize the listener. Args: batch_size: Number of events to accumulate before flushing. flush_interval: Time in seconds to wait before flushing events.
- **Parameters**
- `self`
- `event_filter` (EventFilter | None, optional): Default is None
- `batch_size` (int, optional): Number of events to accumulate before flushing.
- `flush_interval` (float, optional): Time in seconds to wait before flushing events.
**Function:** `BatchingListener.start(self, loop = None)`
- **Description**: Spawn a periodic flush loop.
- **Parameters**
- `self`
- `loop` (optional): Default is None
**Function:** `BatchingListener.stop(self)`
- **Description**: Stop flush loop and flush any remaining events.
- **Parameters**
- `self`
**Function:** `BatchingListener._periodic_flush(self)`
**Function:** `BatchingListener.handle_matched_event(self, event)`
**Function:** `BatchingListener.flush(self)`
- **Description**: Flush the current batch of events.
- **Parameters**
- `self`
**Function:** `BatchingListener._process_batch(self, events: List[Event])`
### src/mcp_agent/logging/logger.py
**Module Description**: Logger module for the MCP Agent, which provides: - Local + optional remote event transport - Async event bus - OpenTelemetry tracing decorators (for distributed tracing) - Automatic injection of trace_id/span_id into events - Developer-friendly Logger that can be used anywhere
**Class: `Logger`**
- **Description**: Developer-friendly logger that sends events to the AsyncEventBus.
- `type` is a broad category (INFO, ERROR, etc.).
- `name` can be a custom domain-specific event name, e.g. "ORDER_PLACED".
**Class: `LoggingConfig`**
- **Description**: Global configuration for the logging system.
- **Attributes**:
- `_initialized` = False
**Function:** `Logger.__init__(self, namespace: str, session_id: str | None = None)`
**Function:** `Logger._ensure_event_loop(self)`
- **Description**: Ensure we have an event loop we can use.
- **Parameters**
- `self`
**Function:** `Logger._emit_event(self, event: Event)`
- **Description**: Emit an event by running it in the event loop.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `Logger.event(self, etype: EventType, ename: str | None, message: str, context: EventContext | None, data: dict)`
- **Description**: Create and emit an event.
- **Parameters**
- `self`
- `etype` (EventType)
- `ename` (str | None)
- `message` (str)
- `context` (EventContext | None)
- `data` (dict)
**Function:** `Logger.debug(self, message: str, name: str | None = None, context: EventContext = None)`
- **Description**: Log a debug message.
- **Parameters**
- `self`
- `message` (str)
- `name` (str | None, optional): Default is None
- `context` (EventContext, optional): Default is None
**Function:** `Logger.info(self, message: str, name: str | None = None, context: EventContext = None)`
- **Description**: Log an info message.
- **Parameters**
- `self`
- `message` (str)
- `name` (str | None, optional): Default is None
- `context` (EventContext, optional): Default is None
**Function:** `Logger.warning(self, message: str, name: str | None = None, context: EventContext = None)`
- **Description**: Log a warning message.
- **Parameters**
- `self`
- `message` (str)
- `name` (str | None, optional): Default is None
- `context` (EventContext, optional): Default is None
**Function:** `Logger.error(self, message: str, name: str | None = None, context: EventContext = None)`
- **Description**: Log an error message.
- **Parameters**
- `self`
- `message` (str)
- `name` (str | None, optional): Default is None
- `context` (EventContext, optional): Default is None
**Function:** `Logger.progress(self, message: str, name: str | None = None, percentage: float = None, context: EventContext = None)`
- **Description**: Log a progress message.
- **Parameters**
- `self`
- `message` (str)
- `name` (str | None, optional): Default is None
- `percentage` (float, optional): Default is None
- `context` (EventContext, optional): Default is None
**Function:** `event_context(logger: Logger, message: str, event_type: EventType = 'info', name: str | None = None)`
- **Description**: Times a synchronous block, logs an event after completion. Because logger methods are async, we schedule the final log.
- **Parameters**
- `logger` (Logger)
- `message` (str)
- `event_type` (EventType, optional): Default is 'info'
- `name` (str | None, optional): Default is None
**Function:** `async_event_context(logger: Logger, message: str, event_type: EventType = 'info', name: str | None = None)`
- **Description**: Times an asynchronous block, logs an event after completion. Because logger methods are async, we schedule the final log.
- **Parameters**
- `logger` (Logger)
- `message` (str)
- `event_type` (EventType, optional): Default is 'info'
- `name` (str | None, optional): Default is None
**Function:** `LoggingConfig.configure(cls, event_filter: EventFilter | None = None, transport: EventTransport | None = None, batch_size: int = 100, flush_interval: float = 2.0)`
- **Description**: Configure the logging system. Args: event_filter: Default filter for all loggers transport: Transport for sending events to external systems batch_size: Default batch size for batching listener flush_interval: Default flush interval for batching listener **kwargs: Additional configuration options
- **Parameters**
- `cls`
- `event_filter` (EventFilter | None, optional): Default filter for all loggers
- `transport` (EventTransport | None, optional): Transport for sending events to external systems
- `batch_size` (int, optional): Default batch size for batching listener
- `flush_interval` (float, optional): Default flush interval for batching listener
**Function:** `LoggingConfig.shutdown(cls)`
- **Description**: Shutdown the logging system gracefully.
- **Parameters**
- `cls`
**Function:** `LoggingConfig.managed(cls)`
- **Description**: Context manager for the logging system lifecycle.
- **Parameters**
- `cls`
**Function:** `get_logger(namespace: str, session_id: str | None = None) -> Logger`
- **Description**: Get a logger instance for a given namespace. Creates a new logger if one doesn't exist for this namespace. Args: namespace: The namespace for the logger (e.g. "agent.helper", "workflow.demo") session_id: Optional session ID to associate with all events from this logger Returns: A Logger instance for the given namespace
- **Parameters**
- `namespace` (str): The namespace for the logger (e.g. "agent.helper", "workflow.demo")
- `session_id` (str | None, optional): Optional session ID to associate with all events from this logger
- **Returns**
- `Logger`: A Logger instance for the given namespace
### src/mcp_agent/logging/rich_progress.py
**Module Description**: Rich-based progress display for MCP Agent.
**Class: `RichProgressDisplay`**
- **Description**: Rich-based display for progress events.
**Function:** `RichProgressDisplay.__init__(self, console: Optional[Console] = None)`
- **Description**: Initialize the progress display.
- **Parameters**
- `self`
- `console` (Optional[Console], optional): Default is None
**Function:** `RichProgressDisplay.start(self)`
- **Description**: start
- **Parameters**
- `self`
**Function:** `RichProgressDisplay.stop(self)`
- **Description**: stop
- **Parameters**
- `self`
**Function:** `RichProgressDisplay.pause(self)`
- **Description**: Pause the progress display.
- **Parameters**
- `self`
**Function:** `RichProgressDisplay.resume(self)`
- **Description**: Resume the progress display.
- **Parameters**
- `self`
**Function:** `RichProgressDisplay.paused(self)`
- **Description**: Context manager for temporarily pausing the display.
- **Parameters**
- `self`
**Function:** `RichProgressDisplay._get_action_style(self, action: ProgressAction) -> str`
- **Description**: Map actions to appropriate styles.
- **Parameters**
- `self`
- `action` (ProgressAction)
- **Returns**
- `str`: Return value
**Function:** `RichProgressDisplay.update(self, event: ProgressEvent) -> None`
- **Description**: Update the progress display with a new event.
- **Parameters**
- `self`
- `event` (ProgressEvent)
- **Returns**
- `None`: Return value
### src/mcp_agent/logging/tracing.py
**Module Description**: Telemetry manager that defines distributed tracing decorators for OpenTelemetry traces/spans for the Logger module for MCP Agent
**Class: `TelemetryManager`**
- **Inherits from**: ContextDependent
- **Description**: Simple manager for creating OpenTelemetry spans automatically.
Decorator usage: @telemetry.traced("SomeSpanName")
**Class: `MCPRequestTrace`**
- **Description**: Helper class for trace context propagation in MCP
**Function:** `TelemetryManager.__init__(self, context: Optional['Context'] = None)`
**Function:** `TelemetryManager.traced(self, name: str | None = None, kind: SpanKind = SpanKind.INTERNAL, attributes: Dict[str, Any] = None) -> Callable`
- **Description**: Decorator that automatically creates and manages a span for a function. Works for both async and sync functions.
- **Parameters**
- `self`
- `name` (str | None, optional): Default is None
- `kind` (SpanKind, optional): Default is SpanKind.INTERNAL
- `attributes` (Dict[str, Any], optional): Default is None
- **Returns**
- `Callable`: Return value
**Function:** `TelemetryManager.decorator(func)`
**Function:** `TelemetryManager.async_wrapper()`
**Function:** `TelemetryManager.sync_wrapper()`
**Function:** `TelemetryManager._record_args(self, span, args, kwargs)`
- **Description**: Optionally record primitive args as span attributes.
- **Parameters**
- `self`
- `span`
- `args`
- `kwargs`
**Function:** `MCPRequestTrace.start_span_from_mcp_request(method: str, params: Dict[str, Any]) -> Tuple[trace.Span, OtelContext]`
- **Description**: Extract trace context from incoming MCP request and start a new span
- **Parameters**
- `method` (str)
- `params` (Dict[str, Any])
- **Returns**
- `Tuple[trace.Span, OtelContext]`: Return value
**Function:** `MCPRequestTrace.inject_trace_context(arguments: Dict[str, Any]) -> Dict[str, Any]`
- **Description**: Inject current trace context into outgoing MCP request arguments
- **Parameters**
- `arguments` (Dict[str, Any])
- **Returns**
- `Dict[str, Any]`: Return value
### src/mcp_agent/logging/transport.py
**Module Description**: Transports for the Logger module for MCP Agent, including: - Local + optional remote event transport - Async event bus
**Class: `EventTransport`**
- **Inherits from**: Protocol
- **Description**: Pluggable interface for sending events to a remote or external system
(Kafka, RabbitMQ, REST, etc.).
**Class: `FilteredEventTransport`**
- **Inherits from**: EventTransport, ABC
- **Description**: Event transport that filters events based on a filter before sending.
**Class: `NoOpTransport`**
- **Inherits from**: FilteredEventTransport
- **Description**: Default transport that does nothing (purely local).
**Class: `ConsoleTransport`**
- **Inherits from**: FilteredEventTransport
- **Description**: Simple transport that prints events to console.
**Class: `FileTransport`**
- **Inherits from**: FilteredEventTransport
- **Description**: Transport that writes events to a file with proper formatting.
**Class: `HTTPTransport`**
- **Inherits from**: FilteredEventTransport
- **Description**: Sends events to an HTTP endpoint in batches.
Useful for sending to remote logging services like Elasticsearch, etc.
**Class: `AsyncEventBus`**
- **Description**: Async event bus with local in-process listeners + optional remote transport.
Also injects distributed tracing (trace_id, span_id) if there's a current span.
- **Attributes**:
- `_instance` = None
**Class: `MultiTransport`**
- **Inherits from**: EventTransport
- **Description**: Transport that sends events to multiple configured transports.
**Function:** `EventTransport.send_event(self, event: Event)`
- **Description**: Send an event to the external system. Args: event: Event to send.
- **Parameters**
- `self`
- `event` (Event): Event to send.
**Function:** `FilteredEventTransport.__init__(self, event_filter: EventFilter | None = None)`
**Function:** `FilteredEventTransport.send_event(self, event: Event)`
**Function:** `FilteredEventTransport.send_matched_event(self, event: Event)`
- **Description**: Send an event to the external system.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `NoOpTransport.send_matched_event(self, event)`
- **Description**: Do nothing.
- **Parameters**
- `self`
- `event`
**Function:** `ConsoleTransport.__init__(self, event_filter: EventFilter | None = None)`
**Function:** `ConsoleTransport.send_matched_event(self, event: Event)`
**Function:** `FileTransport.__init__(self, filepath: str | Path, event_filter: EventFilter | None = None, mode: str = 'a', encoding: str = 'utf-8')`
- **Description**: Initialize FileTransport. Args: filepath: Path to the log file. If relative, the current working directory will be used event_filter: Optional filter for events mode: File open mode ('a' for append, 'w' for write) encoding: File encoding to use
- **Parameters**
- `self`
- `filepath` (str | Path): Path to the log file. If relative, the current working directory will be used
- `event_filter` (EventFilter | None, optional): Optional filter for events
- `mode` (str, optional): File open mode ('a' for append, 'w' for write)
- `encoding` (str, optional): File encoding to use
**Function:** `FileTransport.send_matched_event(self, event: Event) -> None`
- **Description**: Write matched event to log file asynchronously. Args: event: Event to write to file
- **Parameters**
- `self`
- `event` (Event): Event to write to file
- **Returns**
- `None`: Return value
**Function:** `FileTransport.close(self) -> None`
- **Description**: Clean up resources if needed.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `FileTransport.is_closed(self) -> bool`
- **Description**: Check if transport is closed.
- **Parameters**
- `self`
- **Returns**
- `bool`: Return value
**Function:** `HTTPTransport.__init__(self, endpoint: str, headers: Dict[str, str] = None, batch_size: int = 100, timeout: float = 5.0, event_filter: EventFilter | None = None)`
**Function:** `HTTPTransport.start(self)`
- **Description**: Initialize HTTP session.
- **Parameters**
- `self`
**Function:** `HTTPTransport.stop(self)`
- **Description**: Close HTTP session and flush any remaining events.
- **Parameters**
- `self`
**Function:** `HTTPTransport.send_matched_event(self, event: Event)`
- **Description**: Add event to batch, flush if batch is full.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `HTTPTransport._flush(self)`
- **Description**: Send batch of events to HTTP endpoint.
- **Parameters**
- `self`
**Function:** `AsyncEventBus.__init__(self, transport: EventTransport | None = None)`
**Function:** `AsyncEventBus.init_queue(self)`
**Function:** `AsyncEventBus.get(cls, transport: EventTransport | None = None) -> 'AsyncEventBus'`
- **Description**: Get the singleton instance of the event bus.
- **Parameters**
- `cls`
- `transport` (EventTransport | None, optional): Default is None
- **Returns**
- `'AsyncEventBus'`: Return value
**Function:** `AsyncEventBus.reset(cls) -> None`
- **Description**: Reset the singleton instance. This is primarily useful for testing scenarios where you need to ensure a clean state between tests.
- **Parameters**
- `cls`
- **Returns**
- `None`: Return value
**Function:** `AsyncEventBus.start(self)`
- **Description**: Start the event bus and all lifecycle-aware listeners.
- **Parameters**
- `self`
**Function:** `AsyncEventBus.stop(self)`
- **Description**: Stop the event bus and all lifecycle-aware listeners.
- **Parameters**
- `self`
**Function:** `AsyncEventBus.emit(self, event: Event)`
- **Description**: Emit an event to all listeners and transport.
- **Parameters**
- `self`
- `event` (Event)
**Function:** `AsyncEventBus.add_listener(self, name: str, listener: EventListener)`
- **Description**: Add a listener to the event bus.
- **Parameters**
- `self`
- `name` (str)
- `listener` (EventListener)
**Function:** `AsyncEventBus.remove_listener(self, name: str)`
- **Description**: Remove a listener from the event bus.
- **Parameters**
- `self`
- `name` (str)
**Function:** `AsyncEventBus._process_events(self)`
- **Description**: Process events from the queue until stopped.
- **Parameters**
- `self`
**Function:** `MultiTransport.__init__(self, transports: List[EventTransport])`
- **Description**: Initialize MultiTransport with a list of transports. Args: transports: List of EventTransport instances to use
- **Parameters**
- `self`
- `transports` (List[EventTransport]): List of EventTransport instances to use
**Function:** `MultiTransport.send_event(self, event: Event)`
- **Description**: Send event to all configured transports in parallel. Args: event: Event to send
- **Parameters**
- `self`
- `event` (Event): Event to send
**Function:** `MultiTransport.send_with_exception_handling(transport)`
**Function:** `get_log_filename(settings: LoggerSettings, session_id: str | None = None) -> str`
- **Description**: Generate a log filename based on the configuration. Args: settings: Logger settings containing path configuration session_id: Optional session ID to use in the filename Returns: String path for the log file
- **Parameters**
- `settings` (LoggerSettings): Logger settings containing path configuration
- `session_id` (str | None, optional): Optional session ID to use in the filename
- **Returns**
- `str`: String path for the log file
**Function:** `create_transport(settings: LoggerSettings, event_filter: EventFilter | None = None, session_id: str | None = None) -> EventTransport`
- **Description**: Create event transport based on settings.
- **Parameters**
- `settings` (LoggerSettings)
- `event_filter` (EventFilter | None, optional): Default is None
- `session_id` (str | None, optional): Default is None
- **Returns**
- `EventTransport`: Return value
### src/mcp_agent/mcp/gen_client.py
**Function:** `gen_client(server_name: str, server_registry: ServerRegistry, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = MCPAgentClientSession, session_id: str | None = None) -> AsyncGenerator[ClientSession, None]`
- **Description**: Create a client session to the specified server. Handles server startup, initialization, and message receive loop setup. If required, callers can specify their own message receive loop and ClientSession class constructor to customize further. For persistent connections, use connect() or MCPConnectionManager instead.
- **Parameters**
- `server_name` (str)
- `server_registry` (ServerRegistry)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is MCPAgentClientSession
- `session_id` (str | None, optional): Default is None
- **Returns**
- `AsyncGenerator[ClientSession, None]`: Return value
**Function:** `connect(server_name: str, server_registry: ServerRegistry, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = MCPAgentClientSession, session_id: str | None = None) -> ClientSession`
- **Description**: Create a persistent client session to the specified server. Handles server startup, initialization, and message receive loop setup. If required, callers can specify their own message receive loop and ClientSession class constructor to customize further.
- **Parameters**
- `server_name` (str)
- `server_registry` (ServerRegistry)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is MCPAgentClientSession
- `session_id` (str | None, optional): Default is None
- **Returns**
- `ClientSession`: Return value
**Function:** `disconnect(server_name: str | None, server_registry: ServerRegistry) -> None`
- **Description**: Disconnect from the specified server. If server_name is None, disconnect from all servers.
- **Parameters**
- `server_name` (str | None)
- `server_registry` (ServerRegistry)
- **Returns**
- `None`: Return value
### src/mcp_agent/mcp/mcp_agent_client_session.py
**Module Description**: A derived client session for the MCP Agent framework. It adds logging and supports sampling requests.
**Class: `MCPAgentClientSession`**
- **Inherits from**: ClientSession, ContextDependent
- **Description**: MCP Agent framework acts as a client to the servers providing tools/resources/prompts for the agent workloads.
This is a simple client session for those server connections, and supports
- handling sampling requests
- notifications
- MCP root configuration
Developers can extend this class to add more custom functionality as needed
**Function:** `MCPAgentClientSession.__init__(self, read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[JSONRPCMessage], read_timeout_seconds: timedelta | None = None, sampling_callback: SamplingFnT | None = None, list_roots_callback: ListRootsFnT | None = None, logging_callback: LoggingFnT | None = None, message_handler: MessageHandlerFnT | None = None, client_info: Implementation | None = None, context: Optional['Context'] = None)`
**Function:** `MCPAgentClientSession.set_session_id_callback(self, callback: Callable[[], str | None]) -> None`
- **Description**: Set the callback for retrieving the session ID. This is used by transports that support session IDs, like Streamable HTTP. Args: callback: A function that returns the current session ID or None
- **Parameters**
- `self`
- `callback` (Callable[[], str | None]): A function that returns the current session ID or None
- **Returns**
- `None`: Return value
**Function:** `MCPAgentClientSession.get_session_id(self) -> str | None`
- **Description**: Get the current session ID if available for this session's transport. Returns: The session ID if available, None otherwise
- **Parameters**
- `self`
- **Returns**
- `str | None`: The session ID if available, None otherwise
**Function:** `MCPAgentClientSession.send_request(self, request: SendRequestT, result_type: type[ReceiveResultT], request_read_timeout_seconds: timedelta | None = None, metadata: MessageMetadata = None, progress_callback: ProgressFnT | None = None) -> ReceiveResultT`
**Function:** `MCPAgentClientSession.send_notification(self, notification: SendNotificationT, related_request_id: RequestId | None = None) -> None`
**Function:** `MCPAgentClientSession._send_response(self, request_id: RequestId, response: SendResultT | ErrorData) -> None`
**Function:** `MCPAgentClientSession._received_notification(self, notification: ReceiveNotificationT) -> None`
- **Description**: Can be overridden by subclasses to handle a notification without needing to listen on the message stream.
- **Parameters**
- `self`
- `notification` (ReceiveNotificationT)
- **Returns**
- `None`: Return value
**Function:** `MCPAgentClientSession.send_progress_notification(self, progress_token: str | int, progress: float, total: float | None = None) -> None`
- **Description**: Sends a progress notification for a request that is currently being processed.
- **Parameters**
- `self`
- `progress_token` (str | int)
- `progress` (float)
- `total` (float | None, optional): Default is None
- **Returns**
- `None`: Return value
**Function:** `MCPAgentClientSession._handle_sampling_callback(self, context: RequestContext['ClientSession', Any], params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData`
**Function:** `MCPAgentClientSession._handle_list_roots_callback(self, context: RequestContext['ClientSession', Any]) -> ListRootsResult | ErrorData`
### src/mcp_agent/mcp/mcp_aggregator.py
**Class: `NamespacedTool`**
- **Inherits from**: BaseModel
- **Description**: A tool that is namespaced by server name.
- **Attributes**:
- `tool` (Tool)
- `server_name` (str)
- `namespaced_tool_name` (str)
**Class: `NamespacedPrompt`**
- **Inherits from**: BaseModel
- **Description**: A prompt that is namespaced by server name.
- **Attributes**:
- `prompt` (Prompt)
- `server_name` (str)
- `namespaced_prompt_name` (str)
**Class: `MCPAggregator`**
- **Inherits from**: ContextDependent
- **Description**: Aggregates multiple MCP servers. When a developer calls, e.g. call_tool(...),
the aggregator searches all servers in its list for a server that provides that tool.
- **Attributes**:
- `initialized` (bool) = False: Whether the aggregator has been initialized with tools and resources from all servers.
- `connection_persistence` (bool) = False: Whether to maintain a persistent connection to the server.
- `server_names` (List[str]): A list of server names to connect to.
**Class: `MCPCompoundServer`**
- **Inherits from**: Server
- **Description**: A compound server (server-of-servers) that aggregates multiple MCP servers and is itself an MCP server
**Function:** `MCPAggregator.__aenter__(self)`
**Function:** `MCPAggregator.__aexit__(self, exc_type, exc_val, exc_tb)`
**Function:** `MCPAggregator.__init__(self, server_names: List[str], connection_persistence: bool = True, context: Optional['Context'] = None, name: str = None)`
- **Description**: :param server_names: A list of server names to connect to. :param connection_persistence: Whether to maintain persistent connections to servers (default: True). Note: The server names must be resolvable by the gen_client function, and specified in the server registry.
- **Parameters**
- `self`
- `server_names` (List[str])
- `connection_persistence` (bool, optional): Default is True
- `context` (Optional['Context'], optional): Default is None
- `name` (str, optional): Default is None
- **Note**: The server names must be resolvable by the gen_client function, and specified in the server registry.
**Function:** `MCPAggregator.initialize(self, force: bool = False)`
- **Description**: Initialize the application.
- **Parameters**
- `self`
- `force` (bool, optional): Default is False
**Function:** `MCPAggregator.close(self)`
- **Description**: Close all persistent connections when the aggregator is deleted.
- **Parameters**
- `self`
**Function:** `MCPAggregator.create(cls, server_names: List[str], connection_persistence: bool = False) -> 'MCPAggregator'`
- **Description**: Factory method to create and initialize an MCPAggregator. Use this instead of constructor since we need async initialization. If connection_persistence is True, the aggregator will maintain a persistent connection to the servers for as long as this aggregator is around. By default we do not maintain a persistent connection.
- **Parameters**
- `cls`
- `server_names` (List[str])
- `connection_persistence` (bool, optional): Default is False
- **Returns**
- `'MCPAggregator'`: Return value
**Function:** `MCPAggregator.load_server(self, server_name: str)`
- **Description**: Load tools and prompts from a single server and update the index of namespaced tool/prompt names for that server.
- **Parameters**
- `self`
- `server_name` (str)
**Function:** `MCPAggregator.load_servers(self, force: bool = False)`
- **Description**: Discover tools and prompts from each server in parallel and build an index of namespaced tool/prompt names.
- **Parameters**
- `self`
- `force` (bool, optional): Default is False
**Function:** `MCPAggregator.get_server(self, server_name: str) -> Optional[ClientSession]`
- **Description**: Get a server connection if available.
- **Parameters**
- `self`
- `server_name` (str)
- **Returns**
- `Optional[ClientSession]`: Return value
**Function:** `MCPAggregator.get_capabilities(self, server_name: str)`
- **Description**: Get server capabilities if available.
- **Parameters**
- `self`
- `server_name` (str)
**Function:** `MCPAggregator.refresh(self, server_name: str | None = None)`
- **Description**: Refresh the tools and prompts from the specified server or all servers.
- **Parameters**
- `self`
- `server_name` (str | None, optional): Default is None
**Function:** `MCPAggregator.list_servers(self) -> List[str]`
- **Description**: Return the list of server names aggregated by this agent.
- **Parameters**
- `self`
- **Returns**
- `List[str]`: Return value
**Function:** `MCPAggregator.list_tools(self, server_name: str | None = None) -> ListToolsResult`
- **Description**: :return: Tools from all servers aggregated, and renamed to be dot-namespaced by server name.
- **Parameters**
- `self`
- `server_name` (str | None, optional): Default is None
- **Returns**
- `ListToolsResult`: Return value
**Function:** `MCPAggregator.call_tool(self, name: str, arguments: dict | None = None, server_name: str | None = None) -> CallToolResult`
- **Description**: Call a namespaced tool, e.g., 'server_name.tool_name'.
- **Parameters**
- `self`
- `name` (str)
- `arguments` (dict | None, optional): Default is None
- `server_name` (str | None, optional): Default is None
- **Returns**
- `CallToolResult`: Return value
**Function:** `MCPAggregator.try_call_tool(client: ClientSession)`
**Function:** `MCPAggregator.list_prompts(self, server_name: str | None = None) -> ListPromptsResult`
- **Description**: :return: Prompts from all servers aggregated, and renamed to be dot-namespaced by server name.
- **Parameters**
- `self`
- `server_name` (str | None, optional): Default is None
- **Returns**
- `ListPromptsResult`: Return value
**Function:** `MCPAggregator.get_prompt(self, name: str, arguments: dict[str, str] | None = None, server_name: str | None = None) -> GetPromptResult`
- **Description**: Get a prompt from a server. Args: name: Name of the prompt, optionally namespaced with server name using the format 'server_name-prompt_name' arguments: Optional dictionary of string arguments to pass to the prompt template for prompt template resolution Returns: Fully resolved prompt returned by the server
- **Parameters**
- `self`
- `name` (str): Name of the prompt, optionally namespaced with server name using the format 'server_name-prompt_name'
- `arguments` (dict[str, str] | None, optional): Optional dictionary of string arguments to pass to the prompt template for prompt template resolution
- `server_name` (str | None, optional): Default is None
- **Returns**
- `GetPromptResult`: Fully resolved prompt returned by the server
**Function:** `MCPAggregator.try_get_prompt(client: ClientSession)`
**Function:** `MCPAggregator._parse_capability_name(self, name: str, capability: Literal['tool', 'prompt']) -> tuple[str, str]`
- **Description**: Parse a capability name into server name and local capability name. Args: name: The tool or prompt name, possibly namespaced capability: The type of capability, either 'tool' or 'prompt' Returns: Tuple of (server_name, local_name)
- **Parameters**
- `self`
- `name` (str): The tool or prompt name, possibly namespaced
- `capability` (Literal['tool', 'prompt']): The type of capability, either 'tool' or 'prompt'
- **Returns**
- `tuple[str, str]`: Tuple of (server_name, local_name)
**Function:** `MCPAggregator.getter(item: NamespacedTool)`
**Function:** `MCPAggregator.getter(item: NamespacedPrompt)`
**Function:** `MCPAggregator._start_server(self, server_name: str)`
**Function:** `MCPAggregator._fetch_tools(self, client: ClientSession, server_name: str) -> List[Tool]`
**Function:** `MCPAggregator._fetch_prompts(self, client: ClientSession, server_name: str) -> List[Prompt]`
**Function:** `MCPAggregator._fetch_capabilities(self, server_name: str)`
**Function:** `MCPCompoundServer.__init__(self, server_names: List[str], name: str = 'MCPCompoundServer')`
**Function:** `MCPCompoundServer._list_tools(self) -> List[Tool]`
- **Description**: List all tools aggregated from connected MCP servers.
- **Parameters**
- `self`
- **Returns**
- `List[Tool]`: Return value
**Function:** `MCPCompoundServer._call_tool(self, name: str, arguments: dict | None = None) -> CallToolResult`
- **Description**: Call a specific tool from the aggregated servers.
- **Parameters**
- `self`
- `name` (str)
- `arguments` (dict | None, optional): Default is None
- **Returns**
- `CallToolResult`: Return value
**Function:** `MCPCompoundServer._list_prompts(self) -> List[Prompt]`
- **Description**: List available prompts from the connected MCP servers.
- **Parameters**
- `self`
- **Returns**
- `List[Prompt]`: Return value
**Function:** `MCPCompoundServer._get_prompt(self, name: str, arguments: dict[str, str] | None = None) -> GetPromptResult`
- **Description**: Get a prompt from the aggregated servers. Args: name: Name of the prompt to get (optionally namespaced) arguments: Optional dictionary of string arguments for prompt templating
- **Parameters**
- `self`
- `name` (str): Name of the prompt to get (optionally namespaced)
- `arguments` (dict[str, str] | None, optional): Optional dictionary of string arguments for prompt templating
- **Returns**
- `GetPromptResult`: Return value
**Function:** `MCPCompoundServer.run_stdio_async(self) -> None`
- **Description**: Run the server using stdio transport.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
### src/mcp_agent/mcp/mcp_connection_manager.py
**Module Description**: Manages the lifecycle of multiple MCP server connections.
**Class: `ServerConnection`**
- **Description**: Represents a long-lived MCP server connection, including:
- The ClientSession to the server
- The transport streams (via stdio/sse, etc.)
**Class: `MCPConnectionManager`**
- **Inherits from**: ContextDependent
- **Description**: Manages the lifecycle of multiple MCP server connections.
**Function:** `ServerConnection.__init__(self, server_name: str, server_config: MCPServerSettings, transport_context_factory: Callable[[], AsyncGenerator[tuple[MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage]], None]], client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], init_hook: Optional['InitHookCallable'] = None)`
**Function:** `ServerConnection.is_healthy(self) -> bool`
- **Description**: Check if the server connection is healthy and ready to use.
- **Parameters**
- `self`
- **Returns**
- `bool`: Return value
**Function:** `ServerConnection.reset_error_state(self) -> None`
- **Description**: Reset the error state, allowing reconnection attempts.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `ServerConnection.request_shutdown(self) -> None`
- **Description**: Request the server to shut down. Signals the server lifecycle task to exit.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `ServerConnection.wait_for_shutdown_request(self) -> None`
- **Description**: Wait until the shutdown event is set.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `ServerConnection.initialize_session(self) -> None`
- **Description**: Initializes the server connection and session. Must be called within an async context.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `ServerConnection.wait_for_initialized(self) -> None`
- **Description**: Wait until the session is fully initialized.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
**Function:** `ServerConnection.create_session(self, read_stream: MemoryObjectReceiveStream, send_stream: MemoryObjectSendStream) -> ClientSession`
- **Description**: Create a new session instance for this server connection.
- **Parameters**
- `self`
- `read_stream` (MemoryObjectReceiveStream)
- `send_stream` (MemoryObjectSendStream)
- **Returns**
- `ClientSession`: Return value
**Function:** `_server_lifecycle_task(server_conn: ServerConnection) -> None`
- **Description**: Manage the lifecycle of a single server connection. Runs inside the MCPConnectionManager's shared TaskGroup.
- **Parameters**
- `server_conn` (ServerConnection)
- **Returns**
- `None`: Return value
**Function:** `MCPConnectionManager.__init__(self, server_registry: 'ServerRegistry', context: Optional['Context'] = None)`
**Function:** `MCPConnectionManager.__aenter__(self)`
**Function:** `MCPConnectionManager.__aexit__(self, exc_type, exc_val, exc_tb)`
- **Description**: Ensure clean shutdown of all connections before exiting.
- **Parameters**
- `self`
- `exc_type`
- `exc_val`
- `exc_tb`
**Function:** `MCPConnectionManager.launch_server(self, server_name: str, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], init_hook: Optional['InitHookCallable'] = None, session_id: str | None = None) -> ServerConnection`
- **Description**: Connect to a server and return a RunningServer instance that will persist until explicitly disconnected.
- **Parameters**
- `self`
- `server_name` (str)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession])
- `init_hook` (Optional['InitHookCallable'], optional): Default is None
- `session_id` (str | None, optional): Default is None
- **Returns**
- `ServerConnection`: Return value
**Function:** `MCPConnectionManager.transport_context_factory()`
**Function:** `MCPConnectionManager.get_server(self, server_name: str, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = MCPAgentClientSession, init_hook: Optional['InitHookCallable'] = None, session_id: str | None = None) -> ServerConnection`
- **Description**: Get a running server instance, launching it if needed.
- **Parameters**
- `self`
- `server_name` (str)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is MCPAgentClientSession
- `init_hook` (Optional['InitHookCallable'], optional): Default is None
- `session_id` (str | None, optional): Default is None
- **Returns**
- `ServerConnection`: Return value
**Function:** `MCPConnectionManager.get_server_capabilities(self, server_name: str, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = MCPAgentClientSession) -> ServerCapabilities | None`
- **Description**: Get the capabilities of a specific server.
- **Parameters**
- `self`
- `server_name` (str)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is MCPAgentClientSession
- **Returns**
- `ServerCapabilities | None`: Return value
**Function:** `MCPConnectionManager.disconnect_server(self, server_name: str) -> None`
- **Description**: Disconnect a specific server if it's running under this connection manager.
- **Parameters**
- `self`
- `server_name` (str)
- **Returns**
- `None`: Return value
**Function:** `MCPConnectionManager.disconnect_all(self) -> None`
- **Description**: Disconnect all servers that are running under this connection manager.
- **Parameters**
- `self`
- **Returns**
- `None`: Return value
### src/mcp_agent/mcp/mcp_server_registry.py
**Module Description**: This module defines a `ServerRegistry` class for managing MCP server configurations and initialization logic. The class loads server configurations from a YAML file, supports dynamic registration of initialization hooks, and provides methods for server initialization.
**Class: `ServerRegistry`**
- **Description**: A registry for managing server configurations and initialization logic.
The `ServerRegistry` class is responsible for loading server configurations
from a YAML file, registering initialization hooks, initializing servers,
and executing post-initialization hooks dynamically.
Attributes:
config_path (str): Path to the YAML configuration file.
registry (Dict[str, MCPServerSettings]): Loaded server configurations.
init_hooks (Dict[str, InitHookCallable]): Registered initialization hooks.
**Function:** `ServerRegistry.__init__(self, config: Settings | None = None, config_path: str | None = None)`
- **Description**: Initialize the ServerRegistry with a configuration file. Args: config (Settings): The Settings object containing the server configurations. config_path (str): Path to the YAML configuration file.
- **Parameters**
- `self`
- `config` (Settings | None, optional): Default is None
- `config_path` (str | None, optional): Default is None
**Function:** `ServerRegistry.load_registry_from_file(self, config_path: str | None = None) -> Dict[str, MCPServerSettings]`
- **Description**: Load the YAML configuration file and validate it. Returns: Dict[str, MCPServerSettings]: A dictionary of server configurations. Raises: ValueError: If the configuration is invalid.
- **Parameters**
- `self`
- `config_path` (str | None, optional): Default is None
- **Returns**
- `Dict[str, MCPServerSettings]`: Dict[str, MCPServerSettings]: A dictionary of server configurations.
- **Raises**: ValueError: If the configuration is invalid.
**Function:** `ServerRegistry.start_server(self, server_name: str, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = ClientSession, session_id: str | None = None) -> AsyncGenerator[ClientSession, None]`
- **Description**: Starts the server process based on its configuration. To initialize, call initialize_server Args: server_name (str): The name of the server to initialize. Returns: StdioServerParameters: The server parameters for stdio transport. Raises: ValueError: If the server is not found or has an unsupported transport.
- **Parameters**
- `self`
- `server_name` (str)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is ClientSession
- `session_id` (str | None, optional): Default is None
- **Returns**
- `AsyncGenerator[ClientSession, None]`: StdioServerParameters: The server parameters for stdio transport.
- **Raises**: ValueError: If the server is not found or has an unsupported transport.
**Function:** `ServerRegistry.initialize_server(self, server_name: str, client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession] = ClientSession, init_hook: InitHookCallable = None, session_id: str | None = None) -> AsyncGenerator[ClientSession, None]`
- **Description**: Initialize a server based on its configuration. After initialization, also calls any registered or provided initialization hook for the server. Args: server_name (str): The name of the server to initialize. init_hook (InitHookCallable): Optional initialization hook function to call after initialization. Returns: StdioServerParameters: The server parameters for stdio transport. Raises: ValueError: If the server is not found or has an unsupported transport.
- **Parameters**
- `self`
- `server_name` (str)
- `client_session_factory` (Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], ClientSession], optional): Default is ClientSession
- `init_hook` (InitHookCallable, optional): Default is None
- `session_id` (str | None, optional): Default is None
- **Returns**
- `AsyncGenerator[ClientSession, None]`: StdioServerParameters: The server parameters for stdio transport.
- **Raises**: ValueError: If the server is not found or has an unsupported transport.
**Function:** `ServerRegistry.register_init_hook(self, server_name: str, hook: InitHookCallable) -> None`
- **Description**: Register an initialization hook for a specific server. This will get called after the server is initialized. Args: server_name (str): The name of the server. hook (callable): The initialization function to register.
- **Parameters**
- `self`
- `server_name` (str)
- `hook` (InitHookCallable)
- **Returns**
- `None`: Return value
**Function:** `ServerRegistry.execute_init_hook(self, server_name: str, session = None) -> bool`
- **Description**: Execute the initialization hook for a specific server. Args: server_name (str): The name of the server. session: The session object to pass to the initialization hook.
- **Parameters**
- `self`
- `server_name` (str)
- `session` (optional): The session object to pass to the initialization hook.
- **Returns**
- `bool`: Return value
**Function:** `ServerRegistry.get_server_config(self, server_name: str) -> MCPServerSettings | None`
- **Description**: Get the configuration for a specific server. Args: server_name (str): The name of the server. Returns: MCPServerSettings: The server configuration.
- **Parameters**
- `self`
- `server_name` (str)
- **Returns**
- `MCPServerSettings | None`: MCPServerSettings: The server configuration.
### src/mcp_agent/server/app_server.py
**Module Description**: MCPAgentServer - Exposes MCPApp as MCP server, and mcp-agent workflows and agents as MCP tools.
**Class: `ServerContext`**
- **Inherits from**: ContextDependent
- **Description**: Context object for the MCP App server.
**Function:** `ServerContext.__init__(self, mcp: FastMCP, context: 'Context')`
**Function:** `ServerContext.register_workflow(self, workflow_name: str, workflow_cls: Type[Workflow])`
- **Description**: Register a workflow class.
- **Parameters**
- `self`
- `workflow_name` (str)
- `workflow_cls` (Type[Workflow])
**Function:** `ServerContext.app(self) -> MCPApp`
- **Description**: Get the MCPApp instance associated with this server context.
- **Parameters**
- `self`
- **Returns**
- `MCPApp`: Return value
**Function:** `ServerContext.workflows(self) -> Dict[str, Type[Workflow]]`
- **Description**: Get the workflows registered in this server context.
- **Parameters**
- `self`
- **Returns**
- `Dict[str, Type[Workflow]]`: Return value
**Function:** `ServerContext.workflow_registry(self) -> WorkflowRegistry`
- **Description**: Get the workflow registry for this server context.
- **Parameters**
- `self`
- **Returns**
- `WorkflowRegistry`: Return value
**Function:** `create_mcp_server_for_app(app: MCPApp) -> FastMCP`
- **Description**: Create an MCP server for a given MCPApp instance. Args: app: The MCPApp instance to create a server for Returns: A configured FastMCP server instance
- **Parameters**
- `app` (MCPApp): The MCPApp instance to create a server for
- **Returns**
- `FastMCP`: A configured FastMCP server instance
**Function:** `app_specific_lifespan(mcp: FastMCP) -> AsyncIterator[ServerContext]`
- **Description**: Initialize and manage MCPApp lifecycle.
- **Parameters**
- `mcp` (FastMCP)
- **Returns**
- `AsyncIterator[ServerContext]`: Return value
**Function:** `list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]`
- **Description**: List all available workflow types with their detailed information. Returns information about each workflow type including name, description, and parameters. This helps in making an informed decision about which workflow to run.
- **Parameters**
- `ctx` (MCPContext)
- **Returns**
- `Dict[str, Dict[str, Any]]`: Return value
**Function:** `list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]`
- **Description**: List all workflow instances (runs) with their detailed status information. This returns information about actual workflow instances (runs), not workflow types. For each running workflow, returns its ID, name, current state, and available operations. This helps in identifying and managing active workflow instances. Returns: A dictionary mapping workflow instance IDs to their detailed status information.
- **Parameters**
- `ctx` (MCPContext)
- **Returns**
- `List[Dict[str, Any]]`: A dictionary mapping workflow instance IDs to their detailed status information.
**Function:** `run_workflow(ctx: MCPContext, workflow_name: str, run_parameters: Dict[str, Any] | None = None) -> str`
- **Description**: Run a workflow with the given name. Args: workflow_name: The name of the workflow to run. run_parameters: Arguments to pass to the workflow run. workflows/list method will return the run_parameters schema for each workflow. Returns: The run ID of the started workflow run, which can be passed to workflows/get_status, workflows/resume, and workflows/cancel.
- **Parameters**
- `ctx` (MCPContext)
- `workflow_name` (str): The name of the workflow to run.
- `run_parameters` (Dict[str, Any] | None, optional): Arguments to pass to the workflow run. workflows/list method will return the run_parameters schema for each workflow.
- **Returns**
- `str`: The run ID of the started workflow run, which can be passed to workflows/get_status, workflows/resume, and workflows/cancel.
**Function:** `get_workflow_status(ctx: MCPContext, workflow_name: str, run_id: str) -> Dict[str, Any]`
- **Description**: Get the status of a running workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered. Args: workflow_name: The name of the workflow to check. run_id: The ID of the workflow instance to check, received from workflows/run or workflows/runs/list. Returns: A dictionary with comprehensive information about the workflow status.
- **Parameters**
- `ctx` (MCPContext)
- `workflow_name` (str): The name of the workflow to check.
- `run_id` (str): The ID of the workflow instance to check, received from workflows/run or workflows/runs/list.
- **Returns**
- `Dict[str, Any]`: A dictionary with comprehensive information about the workflow status.
**Function:** `resume_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool`
- **Description**: Resume a paused workflow. Args: run_id: The ID of the workflow to resume, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to resume. signal_name: Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal. payload: Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input. Returns: True if the workflow was resumed, False otherwise.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow to resume, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to resume.
- `signal_name` (str | None, optional): Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal.
- `payload` (str | None, optional): Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input.
- **Returns**
- `bool`: True if the workflow was resumed, False otherwise.
**Function:** `cancel_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None) -> bool`
- **Description**: Cancel a running workflow. Args: run_id: The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to cancel. Returns: True if the workflow was cancelled, False otherwise.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to cancel.
- **Returns**
- `bool`: True if the workflow was cancelled, False otherwise.
**Function:** `create_workflow_tools(mcp: FastMCP, server_context: ServerContext)`
- **Description**: Create workflow-specific tools for registered workflows. This is called at server start to register specific endpoints for each workflow.
- **Parameters**
- `mcp` (FastMCP)
- `server_context` (ServerContext)
**Function:** `create_workflow_specific_tools(mcp: FastMCP, workflow_name: str, workflow_cls: Type['Workflow'])`
- **Description**: Create specific tools for a given workflow.
- **Parameters**
- `mcp` (FastMCP)
- `workflow_name` (str)
- `workflow_cls` (Type['Workflow'])
**Function:** `run(ctx: MCPContext, run_parameters: Dict[str, Any] | None = None) -> Dict[str, Any]`
**Function:** `get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]`
**Function:** `_get_server_descriptions(server_registry: ServerRegistry | None, server_names: List[str]) -> List`
**Function:** `_get_server_descriptions_as_string(server_registry: ServerRegistry | None, server_names: List[str]) -> str`
**Function:** `_workflow_run(ctx: MCPContext, workflow_id: str, run_parameters: Dict[str, Any] | None = None) -> str`
**Function:** `_workflow_status(ctx: MCPContext, run_id: str, workflow_id: str | None = None) -> Dict[str, Any]`
### src/mcp_agent/server/app_server_types.py
**Function:** `create_model_from_schema(json_schema: Dict[str, Any]) -> Type[BaseModel]`
- **Description**: Create a Pydantic model from a JSON schema
- **Parameters**
- `json_schema` (Dict[str, Any])
- **Returns**
- `Type[BaseModel]`: Return value
### src/mcp_agent/telemetry/usage_tracking.py
**Function:** `send_usage_data()`
### src/mcp_agent/utils/common.py
**Module Description**: Helper utilities that are commonly used throughout the framework, but which do not belong to any specific module.
**Function:** `unwrap(c: Callable[..., Any]) -> Callable[..., Any]`
- **Description**: Return the underlying function object for any callable.
- **Parameters**
- `c` (Callable[..., Any])
- **Returns**
- `Callable[..., Any]`: Return value
**Function:** `typed_dict_extras(d: dict, exclude: List[str])`
**Function:** `to_string(obj: BaseModel | dict) -> str`
- **Description**: Convert a Pydantic model or dictionary to a JSON string.
- **Parameters**
- `obj` (BaseModel | dict)
- **Returns**
- `str`: Return value
**Function:** `ensure_serializable(data: BaseModel) -> BaseModel`
- **Description**: Workaround for https://github.com/pydantic/pydantic/issues/7713, see https://github.com/pydantic/pydantic/issues/7713#issuecomment-2604574418
- **Parameters**
- `data` (BaseModel)
- **Returns**
- `BaseModel`: Return value
### src/mcp_agent/utils/pydantic_type_serializer.py
**Module Description**: Serializer for Pydantic model types. This allows model types to be transmitted between different processes or services, such as in a distributed workflow system like Temporal.
**Class: `PydanticTypeSerializer`**
- **Inherits from**: BaseModel
- **Description**: A utility class for serializing and reconstructing Pydantic model types.
This allows model types to be transmitted between different processes or services,
such as in a distributed workflow system.
**Class: `Config`**
- **Attributes**:
- `arbitrary_types_allowed` = True
**Class: `PydanticTypeEncoder`**
- **Inherits from**: <ast.Attribute object at 0x105785eb0>
- **Description**: Custom JSON encoder that can handle Pydantic special types like PydanticUndefinedType.
**Function:** `is_pydantic_undefined(obj: Any) -> bool`
- **Description**: Check if an object is a PydanticUndefinedType instance.
- **Parameters**
- `obj` (Any)
- **Returns**
- `bool`: Return value
**Function:** `make_serializable(value: Any) -> Any`
- **Description**: Make a value serializable by handling PydanticUndefinedType and other special cases.
- **Parameters**
- `value` (Any)
- **Returns**
- `Any`: Return value
**Function:** `PydanticTypeSerializer._get_type_origin_name(origin: Any) -> str`
- **Description**: Get a standardized name for a type origin.
- **Parameters**
- `origin` (Any)
- **Returns**
- `str`: Return value
**Function:** `PydanticTypeSerializer.serialize_type(typ: Any) -> Dict[str, Any]`
- **Description**: Serialize a type object into a JSON-serializable dictionary. Args: typ: The type to serialize Returns: A dictionary representing the serialized type
- **Parameters**
- `typ` (Any): The type to serialize
- **Returns**
- `Dict[str, Any]`: A dictionary representing the serialized type
**Function:** `PydanticTypeSerializer._serialize_validators(model_class: Type[BaseModel]) -> List[Dict[str, Any]]`
- **Description**: Serialize the validators of a model class.
- **Parameters**
- `model_class` (Type[BaseModel])
- **Returns**
- `List[Dict[str, Any]]`: Return value
**Function:** `PydanticTypeSerializer._get_all_fields(model_class: Type[BaseModel]) -> Dict[str, Dict[str, Any]]`
- **Description**: Get all field definitions for a model class, including fields from parent classes. Args: model_class: The Pydantic model class Returns: A dictionary of field definitions
- **Parameters**
- `model_class` (Type[BaseModel]): The Pydantic model class
- **Returns**
- `Dict[str, Dict[str, Any]]`: A dictionary of field definitions
**Function:** `PydanticTypeSerializer._serialize_fields(model_class: Type[BaseModel]) -> Dict[str, Dict[str, Any]]`
- **Description**: Serialize the field definitions of a model class.
- **Parameters**
- `model_class` (Type[BaseModel])
- **Returns**
- `Dict[str, Dict[str, Any]]`: Return value
**Function:** `PydanticTypeSerializer._serialize_config(model_class: Type[BaseModel]) -> Dict[str, Any]`
- **Description**: Serialize the model's config.
- **Parameters**
- `model_class` (Type[BaseModel])
- **Returns**
- `Dict[str, Any]`: Return value
**Function:** `PydanticTypeSerializer.deserialize_type(serialized: Dict[str, Any]) -> Any`
- **Description**: Reconstruct a type from its serialized representation. Args: serialized: The serialized type dictionary Returns: The reconstructed type
- **Parameters**
- `serialized` (Dict[str, Any]): The serialized type dictionary
- **Returns**
- `Any`: The reconstructed type
**Function:** `PydanticTypeSerializer.reconstruct_model(serialized: Dict[str, Any]) -> Type[BaseModel]`
- **Description**: Reconstruct a Pydantic model class from its serialized representation. Args: serialized: The serialized model dictionary Returns: The reconstructed model class
- **Parameters**
- `serialized` (Dict[str, Any]): The serialized model dictionary
- **Returns**
- `Type[BaseModel]`: The reconstructed model class
**Function:** `PydanticTypeSerializer.serialize_model_type(cls, model_class: Type[BaseModel]) -> Dict[str, Any]`
- **Description**: Serialize a Pydantic model class into a JSON-serializable dictionary. Args: model_class: The Pydantic model class to serialize Returns: A dictionary containing the serialized model type
- **Parameters**
- `cls`
- `model_class` (Type[BaseModel]): The Pydantic model class to serialize
- **Returns**
- `Dict[str, Any]`: A dictionary containing the serialized model type
**Function:** `PydanticTypeSerializer.deserialize_model_type(cls, serialized: Dict[str, Any]) -> Type[BaseModel]`
- **Description**: Deserialize a dictionary back into a Pydantic model class. Args: serialized: The serialized model dictionary Returns: The reconstructed Pydantic model class
- **Parameters**
- `cls`
- `serialized` (Dict[str, Any]): The serialized model dictionary
- **Returns**
- `Type[BaseModel]`: The reconstructed Pydantic model class
**Function:** `PydanticTypeEncoder.default(self, obj)`
**Function:** `json_object_hook(obj: Dict[str, Any]) -> Any`
- **Description**: Handle special type markers in deserialized JSON.
- **Parameters**
- `obj` (Dict[str, Any])
- **Returns**
- `Any`: Return value
**Function:** `serialize_model(model_type: Type[BaseModel]) -> str`
- **Description**: Serialize a model type into a JSON string for transmission via Temporal. Args: model_type: The Pydantic model class to serialize Returns: A JSON string representing the serialized model
- **Parameters**
- `model_type` (Type[BaseModel]): The Pydantic model class to serialize
- **Returns**
- `str`: A JSON string representing the serialized model
**Function:** `deserialize_model(serialized_json: str) -> Type[BaseModel]`
- **Description**: Deserialize a JSON string back into a Pydantic model class. Args: serialized_json: The JSON string containing the serialized model Returns: The reconstructed Pydantic model class
- **Parameters**
- `serialized_json` (str): The JSON string containing the serialized model
- **Returns**
- `Type[BaseModel]`: The reconstructed Pydantic model class
### src/mcp_agent/workflows/embedding/embedding_base.py
**Class: `EmbeddingModel`**
- **Inherits from**: ABC, ContextDependent
- **Description**: Abstract interface for embedding models
**Function:** `EmbeddingModel.embed(self, data: List[str]) -> FloatArray`
- **Description**: Generate embeddings for a list of messages Args: data: List of text strings to embed Returns: Array of embeddings, shape (len(texts), embedding_dim)
- **Parameters**
- `self`
- `data` (List[str]): List of text strings to embed
- **Returns**
- `FloatArray`: Array of embeddings, shape (len(texts), embedding_dim)
**Function:** `EmbeddingModel.embedding_dim(self) -> int`
- **Description**: Return the dimensionality of the embeddings
- **Parameters**
- `self`
- **Returns**
- `int`: Return value
**Function:** `compute_similarity_scores(embedding_a: FloatArray, embedding_b: FloatArray) -> Dict[str, float]`
- **Description**: Compute different similarity metrics between embeddings
- **Parameters**
- `embedding_a` (FloatArray)
- `embedding_b` (FloatArray)
- **Returns**
- `Dict[str, float]`: Return value
**Function:** `compute_confidence(similarity_scores: Dict[str, float]) -> float`
- **Description**: Compute overall confidence score from individual similarity metrics
- **Parameters**
- `similarity_scores` (Dict[str, float])
- **Returns**
- `float`: Return value
### src/mcp_agent/workflows/embedding/embedding_cohere.py
**Class: `CohereEmbeddingModel`**
- **Inherits from**: EmbeddingModel
- **Description**: Cohere embedding model implementation
**Function:** `CohereEmbeddingModel.__init__(self, model: str = 'embed-multilingual-v3.0', context: Optional['Context'] = None)`
**Function:** `CohereEmbeddingModel.embed(self, data: List[str]) -> FloatArray`
**Function:** `CohereEmbeddingModel.embedding_dim(self) -> int`
### src/mcp_agent/workflows/embedding/embedding_openai.py
**Class: `OpenAIEmbeddingModel`**
- **Inherits from**: EmbeddingModel
- **Description**: OpenAI embedding model implementation
**Function:** `OpenAIEmbeddingModel.__init__(self, model: str = 'text-embedding-3-small', context: Optional['Context'] = None)`
**Function:** `OpenAIEmbeddingModel.embed(self, data: List[str]) -> FloatArray`
**Function:** `OpenAIEmbeddingModel.embedding_dim(self) -> int`
### src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py
**Class: `QualityRating`**
- **Inherits from**: str, Enum
- **Description**: Enum for evaluation quality ratings
- **Attributes**:
- `POOR` = 0
- `FAIR` = 1
- `GOOD` = 2
- `EXCELLENT` = 3
**Class: `EvaluationResult`**
- **Inherits from**: BaseModel
- **Description**: Model representing the evaluation result from the evaluator LLM
- **Attributes**:
- `rating` (QualityRating) = Field(description='Quality rating of the response')
- `feedback` (str) = Field(description='Specific feedback and suggestions for improvement')
- `needs_improvement` (bool) = Field(description='Whether the output needs further improvement')
- `focus_areas` (List[str]) = Field(default_factory=list, description='Specific areas to focus on in next iteration')
**Class: `EvaluatorOptimizerLLM`**
- **Inherits from**: <ast.Subscript object at 0x105646ca0>
- **Description**: Implementation of the evaluator-optimizer workflow where one LLM generates responses
while another provides evaluation and feedback in a refinement loop.
This can be used either:
1. As a standalone workflow with its own optimizer agent
2. As a wrapper around another workflow (Orchestrator, Router, ParallelLLM) to add
evaluation and refinement capabilities
When to use this workflow:
- When you have clear evaluation criteria and iterative refinement provides value
- When LLM responses improve with articulated feedback
- When the task benefits from focused iteration on specific aspects
Examples:
- Literary translation with "expert" refinement
- Complex search tasks needing multiple rounds
- Document writing requiring multiple revisions
**Function:** `EvaluatorOptimizerLLM.__init__(self, optimizer: Agent | AugmentedLLM, evaluator: str | Agent | AugmentedLLM, name: str | None = None, min_rating: QualityRating = QualityRating.GOOD, max_refinements: int = 3, llm_factory: Callable[[Agent], AugmentedLLM] | None = None, context: Optional['Context'] = None)`
- **Description**: Initialize the evaluator-optimizer workflow. Args: optimizer: The agent/LLM/workflow that generates responses. Can be: - An Agent that will be converted to an AugmentedLLM - An AugmentedLLM instance - An Orchestrator/Router/ParallelLLM workflow evaluator_agent: The agent/LLM that evaluates responses evaluation_criteria: Criteria for the evaluator to assess responses min_rating: Minimum acceptable quality rating max_refinements: Maximum refinement iterations llm_factory: Optional factory to create LLMs from agents
- **Parameters**
- `self`
- `optimizer` (Agent | AugmentedLLM)
- `evaluator` (str | Agent | AugmentedLLM)
- `name` (str | None, optional): Default is None
- `min_rating` (QualityRating, optional): Default is QualityRating.GOOD
- `max_refinements` (int, optional): Default is 3
- `llm_factory` (Callable[[Agent], AugmentedLLM] | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **optimizer: The agent/LLM/workflow that generates responses. Can be**: - An Agent that will be converted to an AugmentedLLM - An AugmentedLLM instance - An Orchestrator/Router/ParallelLLM workflow evaluator_agent: The agent/LLM that evaluates responses evaluation_criteria: Criteria for the evaluator to assess responses min_rating: Minimum acceptable quality rating max_refinements: Maximum refinement iterations llm_factory: Optional factory to create LLMs from agents
**Function:** `EvaluatorOptimizerLLM.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> List[MessageT]`
- **Description**: Generate an optimized response through evaluation-guided refinement
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `List[MessageT]`: Return value
**Function:** `EvaluatorOptimizerLLM.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> str`
- **Description**: Generate an optimized response and return it as a string
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `EvaluatorOptimizerLLM.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Generate an optimized structured response
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
**Function:** `EvaluatorOptimizerLLM._build_eval_prompt(self, original_request: str, current_response: str, iteration: int) -> str`
- **Description**: Build the evaluation prompt for the evaluator
- **Parameters**
- `self`
- `original_request` (str)
- `current_response` (str)
- `iteration` (int)
- **Returns**
- `str`: Return value
**Function:** `EvaluatorOptimizerLLM._build_refinement_prompt(self, original_request: str, current_response: str, feedback: EvaluationResult, iteration: int) -> str`
- **Description**: Build the refinement prompt for the optimizer
- **Parameters**
- `self`
- `original_request` (str)
- `current_response` (str)
- `feedback` (EvaluationResult)
- `iteration` (int)
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/intent_classifier/intent_classifier_base.py
**Class: `Intent`**
- **Inherits from**: BaseModel
- **Description**: A class that represents a single intent category
- **Attributes**:
- `name` (str): The name of the intent
- `description` (str | None) = None: A description of what this intent represents
- `examples` (List[str]) = Field(default_factory=list): Example phrases or requests that match this intent
- `metadata` (Dict[str, str]) = Field(default_factory=dict): Additional metadata about the intent that might be useful for classification
**Class: `IntentClassificationResult`**
- **Inherits from**: BaseModel
- **Description**: A class that represents the result of intent classification
- **Attributes**:
- `intent` (str): The classified intent name
- `p_score` (float | None) = None: The probability score (i.e. 0->1) of the classification. This is optional and may only be provided if the classifier is probabilistic (e.g. a probabilistic binary classifier).
- `extracted_entities` (Optional[Dict[str, str]]) = Field(default_factory=dict): Any entities or parameters extracted from the input request that are relevant to the intent
**Class: `IntentClassifier`**
- **Inherits from**: ABC, ContextDependent
- **Description**: Base class for intent classification. This can be implemented using different approaches
like LLMs, embedding models, traditional ML classification models, or rule-based systems.
When to use this:
- When you need to understand the user's intention before routing or processing
- When you want to extract structured information from natural language inputs
- When you need to handle multiple related but distinct types of requests
Examples:
- Classifying customer service requests (complaint, question, feedback)
- Understanding user commands in a chat interface
- Determining the type of analysis requested for a dataset
**Function:** `IntentClassifier.__init__(self, intents: List[Intent], context: Optional['Context'] = None)`
**Function:** `IntentClassifier.classify(self, request: str, top_k: int = 1) -> List[IntentClassificationResult]`
- **Description**: Classify the input request into one or more intents. Args: request: The input text to classify top_k: Maximum number of top intent matches to return. May return fewer. Returns: List of classification results, ordered by confidence
- **Parameters**
- `self`
- `request` (str): The input text to classify
- `top_k` (int, optional): Maximum number of top intent matches to return. May return fewer.
- **Returns**
- `List[IntentClassificationResult]`: List of classification results, ordered by confidence
**Function:** `IntentClassifier.initialize(self)`
- **Description**: Initialize the classifier. Override this method if needed.
- **Parameters**
- `self`
### src/mcp_agent/workflows/intent_classifier/intent_classifier_embedding.py
**Class: `EmbeddingIntent`**
- **Inherits from**: Intent
- **Description**: An intent with embedding information
- **Attributes**:
- `embedding` (FloatArray | None) = None: Pre-computed embedding for this intent
- `model_config` = ConfigDict(arbitrary_types_allowed=True)
**Class: `EmbeddingIntentClassifier`**
- **Inherits from**: IntentClassifier
- **Description**: An intent classifier that uses embedding similarity for classification.
Supports different embedding models through the EmbeddingModel interface.
Features:
- Semantic similarity based classification
- Support for example-based learning
- Flexible embedding model support
- Multiple similarity computation strategies
**Function:** `EmbeddingIntentClassifier.__init__(self, intents: List[Intent], embedding_model: EmbeddingModel, context: Optional['Context'] = None)`
**Function:** `EmbeddingIntentClassifier.create(cls, intents: List[Intent], embedding_model: EmbeddingModel) -> 'EmbeddingIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `intents` (List[Intent])
- `embedding_model` (EmbeddingModel)
- **Returns**
- `'EmbeddingIntentClassifier'`: Return value
**Function:** `EmbeddingIntentClassifier.initialize(self)`
- **Description**: Precompute embeddings for all intents by combining their descriptions and examples
- **Parameters**
- `self`
**Function:** `EmbeddingIntentClassifier.classify(self, request: str, top_k: int = 1) -> List[IntentClassificationResult]`
- **Description**: Classify the input text into one or more intents Args: text: Input text to classify top_k: Maximum number of top matches to return Returns: List of classification results, ordered by confidence
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Maximum number of top matches to return
- **Returns**
- `List[IntentClassificationResult]`: List of classification results, ordered by confidence
### src/mcp_agent/workflows/intent_classifier/intent_classifier_embedding_cohere.py
**Class: `CohereEmbeddingIntentClassifier`**
- **Inherits from**: EmbeddingIntentClassifier
- **Description**: An intent classifier that uses Cohere's embedding models for computing semantic simiarity based classifications.
**Function:** `CohereEmbeddingIntentClassifier.__init__(self, intents: List[Intent], embedding_model: CohereEmbeddingModel | None = None, context: Optional['Context'] = None)`
**Function:** `CohereEmbeddingIntentClassifier.create(cls, intents: List[Intent], embedding_model: CohereEmbeddingModel | None = None, context: Optional['Context'] = None) -> 'CohereEmbeddingIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `intents` (List[Intent])
- `embedding_model` (CohereEmbeddingModel | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'CohereEmbeddingIntentClassifier'`: Return value
### src/mcp_agent/workflows/intent_classifier/intent_classifier_embedding_openai.py
**Class: `OpenAIEmbeddingIntentClassifier`**
- **Inherits from**: EmbeddingIntentClassifier
- **Description**: An intent classifier that uses OpenAI's embedding models for computing semantic simiarity based classifications.
**Function:** `OpenAIEmbeddingIntentClassifier.__init__(self, intents: List[Intent], embedding_model: OpenAIEmbeddingModel | None = None, context: Optional['Context'] = None)`
**Function:** `OpenAIEmbeddingIntentClassifier.create(cls, intents: List[Intent], embedding_model: OpenAIEmbeddingModel | None = None, context: Optional['Context'] = None) -> 'OpenAIEmbeddingIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `intents` (List[Intent])
- `embedding_model` (OpenAIEmbeddingModel | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'OpenAIEmbeddingIntentClassifier'`: Return value
### src/mcp_agent/workflows/intent_classifier/intent_classifier_llm.py
**Class: `LLMIntentClassificationResult`**
- **Inherits from**: IntentClassificationResult
- **Description**: The result of intent classification using an LLM.
- **Attributes**:
- `confidence` (Literal['low', 'medium', 'high']): Confidence level of the classification
- `reasoning` (str | None) = None: Optional explanation of why this intent was chosen
**Class: `StructuredIntentResponse`**
- **Inherits from**: BaseModel
- **Description**: The complete structured response from the LLM
- **Attributes**:
- `classifications` (List[LLMIntentClassificationResult])
**Class: `LLMIntentClassifier`**
- **Inherits from**: IntentClassifier
- **Description**: An intent classifier that uses an LLM to determine the user's intent.
Particularly useful when you need:
- Flexible understanding of natural language
- Detailed reasoning about classifications
- Entity extraction alongside classification
**Function:** `LLMIntentClassifier.__init__(self, llm: AugmentedLLM, intents: List[Intent], classification_instruction: str | None = None, context: Optional['Context'] = None)`
**Function:** `LLMIntentClassifier.create(cls, llm: AugmentedLLM, intents: List[Intent], classification_instruction: str | None = None) -> 'LLMIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `llm` (AugmentedLLM)
- `intents` (List[Intent])
- `classification_instruction` (str | None, optional): Default is None
- **Returns**
- `'LLMIntentClassifier'`: Return value
**Function:** `LLMIntentClassifier.classify(self, request: str, top_k: int = 1) -> List[LLMIntentClassificationResult]`
**Function:** `LLMIntentClassifier._generate_context(self) -> str`
- **Description**: Generate a formatted context string describing all intents
- **Parameters**
- `self`
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/intent_classifier/intent_classifier_llm_anthropic.py
**Class: `AnthropicLLMIntentClassifier`**
- **Inherits from**: LLMIntentClassifier
- **Description**: An LLM router that uses an Anthropic model to make routing decisions.
**Function:** `AnthropicLLMIntentClassifier.__init__(self, intents: List[Intent], classification_instruction: str | None = None, name: str | None = None, llm: AnthropicAugmentedLLM | None = None, context: Optional['Context'] = None)`
**Function:** `AnthropicLLMIntentClassifier.create(cls, llm: AnthropicAugmentedLLM, intents: List[Intent], classification_instruction: str | None = None, name: str | None = None, context: Optional['Context'] = None) -> 'AnthropicLLMIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `llm` (AnthropicAugmentedLLM)
- `intents` (List[Intent])
- `classification_instruction` (str | None, optional): Default is None
- `name` (str | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'AnthropicLLMIntentClassifier'`: Return value
### src/mcp_agent/workflows/intent_classifier/intent_classifier_llm_openai.py
**Class: `OpenAILLMIntentClassifier`**
- **Inherits from**: LLMIntentClassifier
- **Description**: An LLM router that uses an OpenAI model to make routing decisions.
**Function:** `OpenAILLMIntentClassifier.__init__(self, intents: List[Intent], classification_instruction: str | None = None, name: str | None = None, llm: OpenAIAugmentedLLM | None = None, context: Optional['Context'] = None)`
**Function:** `OpenAILLMIntentClassifier.create(cls, llm: OpenAIAugmentedLLM, intents: List[Intent], classification_instruction: str | None = None, name: str | None = None, context: Optional['Context'] = None) -> 'OpenAILLMIntentClassifier'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `llm` (OpenAIAugmentedLLM)
- `intents` (List[Intent])
- `classification_instruction` (str | None, optional): Default is None
- `name` (str | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'OpenAILLMIntentClassifier'`: Return value
### src/mcp_agent/workflows/llm/augmented_llm.py
**Class: `Memory`**
- **Inherits from**: BaseModel, <ast.Subscript object at 0x1056cfac0>
- **Description**: Simple memory management for storing past interactions in-memory.
- **Attributes**:
- `model_config` = ConfigDict(arbitrary_types_allowed=True, extra='allow')
**Class: `SimpleMemory`**
- **Inherits from**: <ast.Subscript object at 0x105642460>
- **Description**: In-memory implementation that just keeps an ordered list of messages.
- **Attributes**:
- `history` (List[MessageParamT]) = Field(default_factory=list)
**Class: `RequestParams`**
- **Inherits from**: CreateMessageRequestParams
- **Description**: Parameters to configure the AugmentedLLM 'generate' requests.
- **Attributes**:
- `messages` (None) = Field(exclude=True, default=None): Ignored. 'messages' are removed from CreateMessageRequestParams to avoid confusion with the 'message' parameter on 'generate' method.
- `maxTokens` (int) = 2048: The maximum number of tokens to sample, as requested by the server.
- `model` (str | None) = None: The model to use for the LLM generation. If specified, this overrides the 'modelPreferences' selection criteria.
- `use_history` (bool) = True: Include the message history in the generate request.
- `max_iterations` (int) = 10: The maximum number of iterations to run the LLM for.
- `parallel_tool_calls` (bool) = False: Whether to allow multiple tool calls per iteration. Also known as multi-step tool use.
- `temperature` (float) = 0.7: The likelihood of the model selecting higher-probability options while generating a response.
**Class: `AugmentedLLMProtocol`**
- **Inherits from**: Protocol, <ast.Subscript object at 0x1056be7f0>
- **Description**: Protocol defining the interface for augmented LLMs
**Class: `ProviderToMCPConverter`**
- **Inherits from**: Protocol, <ast.Subscript object at 0x10562bbb0>
- **Description**: Conversions between LLM provider and MCP types
**Class: `AugmentedLLM`**
- **Inherits from**: ContextDependent, <ast.Subscript object at 0x105649280>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
Our current models can actively use these capabilities—generating their own search queries,
selecting appropriate tools, and determining what information to retain.
- **Attributes**:
- `provider` (str | None) = None
- `logger` (Union['Logger', None]) = None
**Function:** `Memory.extend(self, messages: List[MessageParamT]) -> None`
**Function:** `Memory.set(self, messages: List[MessageParamT]) -> None`
**Function:** `Memory.append(self, message: MessageParamT) -> None`
**Function:** `Memory.get(self) -> List[MessageParamT]`
**Function:** `Memory.clear(self) -> None`
**Function:** `SimpleMemory.extend(self, messages: List[MessageParamT])`
**Function:** `SimpleMemory.set(self, messages: List[MessageParamT])`
**Function:** `SimpleMemory.append(self, message: MessageParamT)`
**Function:** `SimpleMemory.get(self) -> List[MessageParamT]`
**Function:** `SimpleMemory.clear(self)`
**Function:** `AugmentedLLMProtocol.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> List[MessageT]`
- **Description**: Request an LLM generation, which may run multiple iterations, and return the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `List[MessageT]`: Return value
**Function:** `AugmentedLLMProtocol.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> str`
- **Description**: Request an LLM generation and return the string representation of the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `AugmentedLLMProtocol.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Request a structured LLM generation and return the result as a Pydantic model.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
**Function:** `ProviderToMCPConverter.to_mcp_message_result(cls, result: MessageT) -> MCPMessageResult`
- **Description**: Convert an LLM response to an MCP message result type.
- **Parameters**
- `cls`
- `result` (MessageT)
- **Returns**
- `MCPMessageResult`: Return value
**Function:** `ProviderToMCPConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> MessageT`
- **Description**: Convert an MCP message result to an LLM response type.
- **Parameters**
- `cls`
- `result` (MCPMessageResult)
- **Returns**
- `MessageT`: Return value
**Function:** `ProviderToMCPConverter.to_mcp_message_param(cls, param: MessageParamT) -> MCPMessageParam`
- **Description**: Convert an LLM input to an MCP message (SamplingMessage) type.
- **Parameters**
- `cls`
- `param` (MessageParamT)
- **Returns**
- `MCPMessageParam`: Return value
**Function:** `ProviderToMCPConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> MessageParamT`
- **Description**: Convert an MCP message (SamplingMessage) to an LLM input type.
- **Parameters**
- `cls`
- `param` (MCPMessageParam)
- **Returns**
- `MessageParamT`: Return value
**Function:** `ProviderToMCPConverter.from_mcp_tool_result(cls, result: CallToolResult, tool_use_id: str) -> MessageParamT`
- **Description**: Convert an MCP tool result to an LLM input type
- **Parameters**
- `cls`
- `result` (CallToolResult)
- `tool_use_id` (str)
- **Returns**
- `MessageParamT`: Return value
**Function:** `AugmentedLLM.__init__(self, agent: Optional['Agent'] = None, server_names: List[str] | None = None, instruction: str | None = None, name: str | None = None, default_request_params: RequestParams | None = None, type_converter: Type[ProviderToMCPConverter[MessageParamT, MessageT]] = None, context: Optional['Context'] = None)`
- **Description**: Initialize the LLM with a list of server names and an instruction. If a name is provided, it will be used to identify the LLM. If an agent is provided, all other properties are optional
- **Parameters**
- `self`
- `agent` (Optional['Agent'], optional): Default is None
- `server_names` (List[str] | None, optional): Default is None
- `instruction` (str | None, optional): Default is None
- `name` (str | None, optional): Default is None
- `default_request_params` (RequestParams | None, optional): Default is None
- `type_converter` (Type[ProviderToMCPConverter[MessageParamT, MessageT]], optional): Default is None
- `context` (Optional['Context'], optional): Default is None
**Function:** `AugmentedLLM.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> List[MessageT]`
- **Description**: Request an LLM generation, which may run multiple iterations, and return the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `List[MessageT]`: Return value
**Function:** `AugmentedLLM.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> str`
- **Description**: Request an LLM generation and return the string representation of the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `AugmentedLLM.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Request a structured LLM generation and return the result as a Pydantic model.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
**Function:** `AugmentedLLM.select_model(self, request_params: RequestParams | None = None) -> str | None`
- **Description**: Select an LLM based on the request parameters. If a model is specified in the request, it will override the model selection criteria.
- **Parameters**
- `self`
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str | None`: Return value
**Function:** `AugmentedLLM.get_request_params(self, request_params: RequestParams | None = None, default: RequestParams | None = None) -> RequestParams`
- **Description**: Get request parameters with merged-in defaults and overrides. Args: request_params: The request parameters to use as overrides. default: The default request parameters to use as the base. If unspecified, self.default_request_params will be used.
- **Parameters**
- `self`
- `request_params` (RequestParams | None, optional): The request parameters to use as overrides.
- `default` (RequestParams | None, optional): The default request parameters to use as the base. If unspecified, self.default_request_params will be used.
- **Returns**
- `RequestParams`: Return value
**Function:** `AugmentedLLM.to_mcp_message_result(self, result: MessageT) -> MCPMessageResult`
- **Description**: Convert an LLM response to an MCP message result type.
- **Parameters**
- `self`
- `result` (MessageT)
- **Returns**
- `MCPMessageResult`: Return value
**Function:** `AugmentedLLM.from_mcp_message_result(self, result: MCPMessageResult) -> MessageT`
- **Description**: Convert an MCP message result to an LLM response type.
- **Parameters**
- `self`
- `result` (MCPMessageResult)
- **Returns**
- `MessageT`: Return value
**Function:** `AugmentedLLM.to_mcp_message_param(self, param: MessageParamT) -> MCPMessageParam`
- **Description**: Convert an LLM input to an MCP message (SamplingMessage) type.
- **Parameters**
- `self`
- `param` (MessageParamT)
- **Returns**
- `MCPMessageParam`: Return value
**Function:** `AugmentedLLM.from_mcp_message_param(self, param: MCPMessageParam) -> MessageParamT`
- **Description**: Convert an MCP message (SamplingMessage) to an LLM input type.
- **Parameters**
- `self`
- `param` (MCPMessageParam)
- **Returns**
- `MessageParamT`: Return value
**Function:** `AugmentedLLM.from_mcp_tool_result(self, result: CallToolResult, tool_use_id: str) -> MessageParamT`
- **Description**: Convert an MCP tool result to an LLM input type
- **Parameters**
- `self`
- `result` (CallToolResult)
- `tool_use_id` (str)
- **Returns**
- `MessageParamT`: Return value
**Function:** `AugmentedLLM.convert_message_to_message_param(cls, message: MessageT) -> MessageParamT`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message` (MessageT)
- **Returns**
- `MessageParamT`: Return value
**Function:** `AugmentedLLM.get_last_message(self) -> MessageParamT | None`
- **Description**: Return the last message generated by the LLM or None if history is empty. This is useful for prompt chaining workflows where the last message from one LLM is used as input to another.
- **Parameters**
- `self`
- **Returns**
- `MessageParamT | None`: Return value
**Function:** `AugmentedLLM.get_last_message_str(self) -> str | None`
- **Description**: Return the string representation of the last message generated by the LLM or None if history is empty.
- **Parameters**
- `self`
- **Returns**
- `str | None`: Return value
**Function:** `AugmentedLLM.pre_tool_call(self, tool_call_id: str | None, request: CallToolRequest) -> CallToolRequest | bool`
- **Description**: Called before a tool is executed. Return False to prevent execution.
- **Parameters**
- `self`
- `tool_call_id` (str | None)
- `request` (CallToolRequest)
- **Returns**
- `CallToolRequest | bool`: Return value
**Function:** `AugmentedLLM.post_tool_call(self, tool_call_id: str | None, request: CallToolRequest, result: CallToolResult) -> CallToolResult`
- **Description**: Called after a tool execution. Can modify the result before it's returned.
- **Parameters**
- `self`
- `tool_call_id` (str | None)
- `request` (CallToolRequest)
- `result` (CallToolResult)
- **Returns**
- `CallToolResult`: Return value
**Function:** `AugmentedLLM.call_tool(self, request: CallToolRequest, tool_call_id: str | None = None) -> CallToolResult`
- **Description**: Call a tool with the given parameters and optional ID
- **Parameters**
- `self`
- `request` (CallToolRequest)
- `tool_call_id` (str | None, optional): Default is None
- **Returns**
- `CallToolResult`: Return value
**Function:** `AugmentedLLM.message_param_str(self, message: MessageParamT) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message` (MessageParamT)
- **Returns**
- `str`: Return value
**Function:** `AugmentedLLM.message_str(self, message: MessageT) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message` (MessageT)
- **Returns**
- `str`: Return value
**Function:** `AugmentedLLM._log_chat_progress(self, chat_turn: Optional[int] = None, model: Optional[str] = None)`
- **Description**: Log a chat progress event
- **Parameters**
- `self`
- `chat_turn` (Optional[int], optional): Default is None
- `model` (Optional[str], optional): Default is None
**Function:** `AugmentedLLM._log_chat_finished(self, model: Optional[str] = None)`
- **Description**: Log a chat finished event
- **Parameters**
- `self`
- `model` (Optional[str], optional): Default is None
**Function:** `AugmentedLLM._gen_name(self, name: str | None, prefix: str | None) -> str`
- **Description**: Generate a name for the LLM based on the provided name or the default prefix.
- **Parameters**
- `self`
- `name` (str | None)
- `prefix` (str | None)
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
**Class: `AnthropicAugmentedLLM`**
- **Inherits from**: <ast.Subscript object at 0x10563b790>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
Our current models can actively use these capabilities—generating their own search queries,
selecting appropriate tools, and determining what information to retain.
**Class: `RequestCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (AnthropicSettings)
- `payload` (dict)
**Class: `RequestStructuredCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (AnthropicSettings)
- `params` (RequestParams)
- `response_model` (Type[ModelT] | None) = None
- `serialized_response_model` (str | None) = None
- `response_str` (str)
- `model` (str)
**Class: `AnthropicCompletionTasks`**
**Class: `AnthropicMCPTypeConverter`**
- **Inherits from**: <ast.Subscript object at 0x1056a9160>
- **Description**: Convert between Anthropic and MCP types.
**Function:** `AnthropicAugmentedLLM.__init__(self)`
**Function:** `AnthropicAugmentedLLM.generate(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses Claude as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `AnthropicAugmentedLLM.generate_str(self, message, request_params: RequestParams | None = None) -> str`
- **Description**: Process a query using an LLM and available tools. The default implementation uses Claude as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `AnthropicAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `AnthropicAugmentedLLM.convert_message_to_message_param(cls, message: Message) -> MessageParam`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message` (Message)
- **Returns**
- `MessageParam`: Return value
**Function:** `AnthropicAugmentedLLM.message_param_str(self, message: MessageParam) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message` (MessageParam)
- **Returns**
- `str`: Return value
**Function:** `AnthropicAugmentedLLM.message_str(self, message: Message) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message` (Message)
- **Returns**
- `str`: Return value
**Function:** `AnthropicCompletionTasks.request_completion_task(request: RequestCompletionRequest) -> Message`
- **Description**: Request a completion from Anthropic's API.
- **Parameters**
- `request` (RequestCompletionRequest)
- **Returns**
- `Message`: Return value
**Function:** `AnthropicCompletionTasks.request_structured_completion_task(request: RequestStructuredCompletionRequest)`
- **Description**: Request a structured completion using Instructor's Anthropic API.
- **Parameters**
- `request` (RequestStructuredCompletionRequest)
**Function:** `AnthropicMCPTypeConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> Message`
**Function:** `AnthropicMCPTypeConverter.to_mcp_message_result(cls, result: Message) -> MCPMessageResult`
**Function:** `AnthropicMCPTypeConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> MessageParam`
**Function:** `AnthropicMCPTypeConverter.to_mcp_message_param(cls, param: MessageParam) -> MCPMessageParam`
**Function:** `AnthropicMCPTypeConverter.from_mcp_tool_result(cls, result: CallToolResult, tool_use_id: str) -> MessageParam`
- **Description**: Convert mcp tool result to user MessageParam
- **Parameters**
- `cls`
- `result` (CallToolResult)
- `tool_use_id` (str)
- **Returns**
- `MessageParam`: Return value
**Function:** `mcp_content_to_anthropic_content(content: TextContent | ImageContent | EmbeddedResource, for_message_param: bool = False) -> ContentBlock | MessageParamContent`
- **Description**: Converts MCP content types into Anthropic-compatible content blocks. Args: content (TextContent | ImageContent | EmbeddedResource): The MCP content to convert. for_message_param (bool, optional): If True, returns Anthropic message param content types. If False, returns Anthropic response message content types. Defaults to False. Returns: ContentBlock: The converted content block in Anthropic format.
- **Parameters**
- `content` (TextContent | ImageContent | EmbeddedResource)
- `for_message_param` (bool, optional): Default is False
- **Returns**
- `ContentBlock | MessageParamContent`: ContentBlock: The converted content block in Anthropic format.
**Function:** `anthropic_content_to_mcp_content(content: str | Iterable[TextBlockParam | ImageBlockParam | ToolUseBlockParam | ToolResultBlockParam | DocumentBlockParam | ContentBlock]) -> List[TextContent | ImageContent | EmbeddedResource]`
**Function:** `mcp_stop_reason_to_anthropic_stop_reason(stop_reason: StopReason)`
**Function:** `anthropic_stop_reason_to_mcp_stop_reason(stop_reason: str) -> StopReason`
### src/mcp_agent/workflows/llm/augmented_llm_azure.py
**Class: `ResponseMessage`**
- **Inherits from**: ChatResponseMessage
- **Description**: A subclass of ChatResponseMessage that makes 'content' to be optional.
This accommodates cases where the assistant response includes tool calls
without a textual message, in which 'content' may be None.
- **Attributes**:
- `content` (Optional[str])
**Class: `AzureAugmentedLLM`**
- **Inherits from**: <ast.Subscript object at 0x1056a3ca0>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
**Class: `RequestCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (AzureSettings)
- `payload` (dict)
**Class: `AzureCompletionTasks`**
**Class: `MCPAzureTypeConverter`**
- **Inherits from**: <ast.Subscript object at 0x1056a85e0>
- **Description**: Convert between Azure and MCP types.
**Function:** `AzureAugmentedLLM.__init__(self)`
**Function:** `AzureAugmentedLLM.generate(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses Azure OpenAI 4o-mini as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `AzureAugmentedLLM.generate_str(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses Azure OpenAI 4o-mini as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `AzureAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `AzureAugmentedLLM.convert_message_to_message_param(cls, message: ResponseMessage) -> AssistantMessage`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message` (ResponseMessage)
- **Returns**
- `AssistantMessage`: Return value
**Function:** `AzureAugmentedLLM.execute_tool_call(self, tool_call: ChatCompletionsToolCall) -> ToolMessage | None`
- **Description**: Execute a single tool call and return the result message. Returns None if there's no content to add to messages.
- **Parameters**
- `self`
- `tool_call` (ChatCompletionsToolCall)
- **Returns**
- `ToolMessage | None`: Return value
**Function:** `AzureAugmentedLLM.message_param_str(self, message: MessageParam) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message` (MessageParam)
- **Returns**
- `str`: Return value
**Function:** `AzureAugmentedLLM.message_str(self, message: ResponseMessage) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message` (ResponseMessage)
- **Returns**
- `str`: Return value
**Function:** `AzureCompletionTasks.request_completion_task(request: RequestCompletionRequest) -> ChatCompletions`
- **Description**: Request a completion from Azure's API.
- **Parameters**
- `request` (RequestCompletionRequest)
- **Returns**
- `ChatCompletions`: Return value
**Function:** `MCPAzureTypeConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> ResponseMessage`
**Function:** `MCPAzureTypeConverter.to_mcp_message_result(cls, result: ResponseMessage) -> MCPMessageResult`
**Function:** `MCPAzureTypeConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> MessageParam`
**Function:** `MCPAzureTypeConverter.to_mcp_message_param(cls, param: MessageParam) -> MCPMessageParam`
**Function:** `mcp_content_to_azure_content(content: list[TextContent | ImageContent | EmbeddedResource], str_only: bool = True) -> str | list[ContentItem]`
- **Description**: Convert a list of MCP content types (TextContent, ImageContent, EmbeddedResource) into Azure-compatible content types or a string. Args: content (list[TextContent | ImageContent | EmbeddedResource]): The list of MCP content objects to convert. str_only (bool, optional): If True, returns a string representation of the content. If False, returns a list of Azure ContentItem objects. Defaults to True. Returns: str | list[ContentItem]: A newline-joined string if str_only is True, otherwise a list of ContentItem.
- **Parameters**
- `content` (list[TextContent | ImageContent | EmbeddedResource])
- `str_only` (bool, optional): Default is True
- **Returns**
- `str | list[ContentItem]`: Return value
- **content (list[TextContent | ImageContent | EmbeddedResource])**: The list of MCP content objects to convert.
- **str_only (bool, optional)**: If True, returns a string representation of the content. If False, returns a list of Azure ContentItem objects. Defaults to True.
- **str | list[ContentItem]**: A newline-joined string if str_only is True, otherwise a list of ContentItem.
**Function:** `azure_content_to_mcp_content(content: str | list[ContentItem] | None) -> Iterable[TextContent | ImageContent | EmbeddedResource]`
**Function:** `image_url_to_mime_and_base64(image_url: ImageUrl) -> tuple[str, str]`
- **Description**: Extract mime type and base64 data from ImageUrl
- **Parameters**
- `image_url` (ImageUrl)
- **Returns**
- `tuple[str, str]`: Return value
### src/mcp_agent/workflows/llm/augmented_llm_bedrock.py
**Class: `BedrockAugmentedLLM`**
- **Inherits from**: <ast.Subscript object at 0x1056a3970>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
**Class: `RequestCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (BedrockSettings)
- `payload` (dict)
**Class: `RequestStructuredCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (BedrockSettings)
- `params` (RequestParams)
- `response_model` (Type[ModelT] | None) = None
- `serialized_response_model` (str | None) = None
- `response_str` (str)
- `model` (str)
**Class: `BedrockCompletionTasks`**
**Class: `BedrockMCPTypeConverter`**
- **Inherits from**: <ast.Subscript object at 0x1056c11c0>
- **Description**: Convert between Bedrock and MCP types.
**Function:** `BedrockAugmentedLLM.__init__(self)`
**Function:** `BedrockAugmentedLLM.generate(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses AWS Nova's ChatCompletion as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `BedrockAugmentedLLM.generate_str(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses AWS Nova's ChatCompletion as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `BedrockAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `BedrockAugmentedLLM.convert_message_to_message_param(cls, message: MessageOutputTypeDef) -> MessageUnionTypeDef`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message` (MessageOutputTypeDef)
- **Returns**
- `MessageUnionTypeDef`: Return value
**Function:** `BedrockAugmentedLLM.message_param_str(self, message: MessageUnionTypeDef) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message` (MessageUnionTypeDef)
- **Returns**
- `str`: Return value
**Function:** `BedrockAugmentedLLM.message_str(self, message: MessageUnionTypeDef) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message` (MessageUnionTypeDef)
- **Returns**
- `str`: Return value
**Function:** `BedrockCompletionTasks.request_completion_task(request: RequestCompletionRequest) -> ConverseResponseTypeDef`
- **Description**: Request a completion from Bedrock's API.
- **Parameters**
- `request` (RequestCompletionRequest)
- **Returns**
- `ConverseResponseTypeDef`: Return value
**Function:** `BedrockCompletionTasks.request_structured_completion_task(request: RequestStructuredCompletionRequest)`
- **Description**: Request a structured completion using Instructor's Bedrock API.
- **Parameters**
- `request` (RequestStructuredCompletionRequest)
**Function:** `BedrockMCPTypeConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> MessageUnionTypeDef`
**Function:** `BedrockMCPTypeConverter.to_mcp_message_result(cls, result: MessageUnionTypeDef) -> MCPMessageResult`
**Function:** `BedrockMCPTypeConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> MessageUnionTypeDef`
**Function:** `BedrockMCPTypeConverter.to_mcp_message_param(cls, param: MessageUnionTypeDef) -> MCPMessageParam`
**Function:** `mcp_content_to_bedrock_content(content: list[TextContent | ImageContent | EmbeddedResource]) -> list[ContentBlockUnionTypeDef]`
**Function:** `bedrock_content_to_mcp_content(content: list[ContentBlockUnionTypeDef]) -> list[TextContent | ImageContent | EmbeddedResource]`
### src/mcp_agent/workflows/llm/augmented_llm_google.py
**Class: `GoogleAugmentedLLM`**
- **Inherits from**: <ast.Subscript object at 0x1056c8af0>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
**Class: `RequestCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (GoogleSettings)
- `payload` (dict)
**Class: `RequestStructuredCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (GoogleSettings)
- `params` (RequestParams)
- `response_model` (Type[ModelT] | None) = None
- `serialized_response_model` (str | None) = None
- `response_str` (str)
- `model` (str)
**Class: `GoogleCompletionTasks`**
**Class: `GoogleMCPTypeConverter`**
- **Inherits from**: <ast.Subscript object at 0x1056adeb0>
- **Description**: Convert between Azure and MCP types.
**Function:** `GoogleAugmentedLLM.__init__(self)`
**Function:** `GoogleAugmentedLLM.generate(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses AWS Nova's ChatCompletion as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `GoogleAugmentedLLM.generate_str(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses gemini-2.0-flash as the LLM Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `GoogleAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `GoogleAugmentedLLM.convert_message_to_message_param(cls, message)`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message`
**Function:** `GoogleAugmentedLLM.execute_tool_call(self, function_call: types.FunctionCall) -> types.Content | None`
- **Description**: Execute a single tool call and return the result message. Returns None if there's no content to add to messages.
- **Parameters**
- `self`
- `function_call` (types.FunctionCall)
- **Returns**
- `types.Content | None`: Return value
**Function:** `GoogleAugmentedLLM.message_param_str(self, message) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message`
- **Returns**
- `str`: Return value
**Function:** `GoogleAugmentedLLM.message_str(self, message) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message`
- **Returns**
- `str`: Return value
**Function:** `GoogleCompletionTasks.request_completion_task(request: RequestCompletionRequest) -> types.GenerateContentResponse`
- **Description**: Request a completion from Google's API.
- **Parameters**
- `request` (RequestCompletionRequest)
- **Returns**
- `types.GenerateContentResponse`: Return value
**Function:** `GoogleCompletionTasks.request_structured_completion_task(request: RequestStructuredCompletionRequest)`
- **Description**: Request a structured completion using Instructor's Google API.
- **Parameters**
- `request` (RequestStructuredCompletionRequest)
**Function:** `GoogleMCPTypeConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> types.Content`
**Function:** `GoogleMCPTypeConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> types.Content`
**Function:** `GoogleMCPTypeConverter.to_mcp_message_result(cls, result: types.Content) -> MCPMessageResult`
**Function:** `GoogleMCPTypeConverter.to_mcp_message_param(cls, param: types.Content) -> MCPMessageParam`
**Function:** `GoogleMCPTypeConverter.from_mcp_tool_result(cls, result: CallToolResult, tool_use_id: str) -> types.Content`
- **Description**: Convert an MCP tool result to an LLM input type
- **Parameters**
- `cls`
- `result` (CallToolResult)
- `tool_use_id` (str)
- **Returns**
- `types.Content`: Return value
**Function:** `transform_mcp_tool_schema(schema: dict) -> dict`
- **Description**: Transform JSON Schema to OpenAPI Schema format compatible with Gemini. Key transformations: 1. Convert camelCase properties to snake_case (e.g., maxLength -> max_length) 2. Remove explicitly excluded fields (e.g., "default") 3. Recursively process nested structures (properties, items, anyOf) 4. Handle nullable types by setting nullable=true when anyOf includes type:"null" 5. Remove unsupported format values based on data type 6. For anyOf fields, only the first non-null type is used (true union types not supported) 7. Preserve unsupported keywords by adding them to the description field Notes: - This implementation only supports nullable types (Type | None) for anyOf fields - True union types (e.g., str | int) are not supported - only the first non-null type is used - Unsupported fields are preserved in the description to ensure the LLM understands all constraints Args: schema: A JSON Schema dictionary Returns: A cleaned OpenAPI schema dictionary compatible with Gemini
- **Parameters**
- `schema` (dict): A JSON Schema dictionary
- **Returns**
- `dict`: A cleaned OpenAPI schema dictionary compatible with Gemini
- **Note**: - This implementation only supports nullable types (Type | None) for anyOf fields - True union types (e.g., str | int) are not supported - only the first non-null type is used - Unsupported fields are preserved in the description to ensure the LLM understands all constraints
- **Key transformations**: 1. Convert camelCase properties to snake_case (e.g., maxLength -> max_length) 2. Remove explicitly excluded fields (e.g., "default") 3. Recursively process nested structures (properties, items, anyOf) 4. Handle nullable types by setting nullable=true when anyOf includes type:"null" 5. Remove unsupported format values based on data type 6. For anyOf fields, only the first non-null type is used (true union types not supported) 7. Preserve unsupported keywords by adding them to the description field
**Function:** `mcp_content_to_google_parts(content: list[TextContent | ImageContent | EmbeddedResource]) -> list[types.Part]`
**Function:** `google_parts_to_mcp_content(google_parts: list[types.Part]) -> list[TextContent | ImageContent | EmbeddedResource]`
**Function:** `image_url_to_mime_and_base64(url: str) -> tuple[str, str]`
- **Description**: Extract mime type and base64 data from ImageUrl
- **Parameters**
- `url` (str)
- **Returns**
- `tuple[str, str]`: Return value
### src/mcp_agent/workflows/llm/augmented_llm_ollama.py
**Class: `OllamaAugmentedLLM`**
- **Inherits from**: OpenAIAugmentedLLM
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
This implementation uses Ollama's OpenAI-compatible ChatCompletion API.
**Class: `OllamaCompletionTasks`**
**Function:** `OllamaAugmentedLLM.__init__(self)`
**Function:** `OllamaAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `OllamaCompletionTasks.request_structured_completion_task(request: RequestStructuredCompletionRequest) -> ModelT`
- **Description**: Request a structured completion using Instructor's OpenAI API.
- **Parameters**
- `request` (RequestStructuredCompletionRequest)
- **Returns**
- `ModelT`: Return value
### src/mcp_agent/workflows/llm/augmented_llm_openai.py
**Class: `OpenAIAugmentedLLM`**
- **Inherits from**: <ast.Subscript object at 0x1056b9c70>
- **Description**: The basic building block of agentic systems is an LLM enhanced with augmentations
such as retrieval, tools, and memory provided from a collection of MCP servers.
This implementation uses OpenAI's ChatCompletion as the LLM.
**Class: `RequestCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (OpenAISettings)
- `payload` (dict)
**Class: `RequestStructuredCompletionRequest`**
- **Inherits from**: BaseModel
- **Attributes**:
- `config` (OpenAISettings)
- `response_model` (Any | None) = None
- `serialized_response_model` (str | None) = None
- `response_str` (str)
- `model` (str)
**Class: `OpenAICompletionTasks`**
**Class: `MCPOpenAITypeConverter`**
- **Inherits from**: <ast.Subscript object at 0x1056d9b20>
- **Description**: Convert between OpenAI and MCP types.
**Function:** `OpenAIAugmentedLLM.__init__(self)`
**Function:** `OpenAIAugmentedLLM.convert_message_to_message_param(cls, message: ChatCompletionMessage) -> ChatCompletionMessageParam`
- **Description**: Convert a response object to an input parameter object to allow LLM calls to be chained.
- **Parameters**
- `cls`
- `message` (ChatCompletionMessage)
- **Returns**
- `ChatCompletionMessageParam`: Return value
**Function:** `OpenAIAugmentedLLM.generate(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses OpenAI's ChatCompletion as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `OpenAIAugmentedLLM.generate_str(self, message, request_params: RequestParams | None = None)`
- **Description**: Process a query using an LLM and available tools. The default implementation uses OpenAI's ChatCompletion as the LLM. Override this method to use a different LLM.
- **Parameters**
- `self`
- `message`
- `request_params` (RequestParams | None, optional): Default is None
**Function:** `OpenAIAugmentedLLM.generate_structured(self, message, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
**Function:** `OpenAIAugmentedLLM.pre_tool_call(self, tool_call_id: str | None, request: CallToolRequest)`
**Function:** `OpenAIAugmentedLLM.post_tool_call(self, tool_call_id: str | None, request: CallToolRequest, result: CallToolResult)`
**Function:** `OpenAIAugmentedLLM.execute_tool_call(self, tool_call: ChatCompletionMessageToolCall) -> ChatCompletionToolMessageParam | None`
- **Description**: Execute a single tool call and return the result message. Returns None if there's no content to add to messages.
- **Parameters**
- `self`
- `tool_call` (ChatCompletionMessageToolCall)
- **Returns**
- `ChatCompletionToolMessageParam | None`: Return value
**Function:** `OpenAIAugmentedLLM.message_param_str(self, message: ChatCompletionMessageParam) -> str`
- **Description**: Convert an input message to a string representation.
- **Parameters**
- `self`
- `message` (ChatCompletionMessageParam)
- **Returns**
- `str`: Return value
**Function:** `OpenAIAugmentedLLM.message_str(self, message: ChatCompletionMessage) -> str`
- **Description**: Convert an output message to a string representation.
- **Parameters**
- `self`
- `message` (ChatCompletionMessage)
- **Returns**
- `str`: Return value
**Function:** `OpenAICompletionTasks.request_completion_task(request: RequestCompletionRequest) -> ChatCompletion`
- **Description**: Request a completion from OpenAI's API.
- **Parameters**
- `request` (RequestCompletionRequest)
- **Returns**
- `ChatCompletion`: Return value
**Function:** `OpenAICompletionTasks.request_structured_completion_task(request: RequestStructuredCompletionRequest) -> ModelT`
- **Description**: Request a structured completion using Instructor's OpenAI API.
- **Parameters**
- `request` (RequestStructuredCompletionRequest)
- **Returns**
- `ModelT`: Return value
**Function:** `MCPOpenAITypeConverter.from_mcp_message_result(cls, result: MCPMessageResult) -> ChatCompletionMessage`
**Function:** `MCPOpenAITypeConverter.to_mcp_message_result(cls, result: ChatCompletionMessage) -> MCPMessageResult`
**Function:** `MCPOpenAITypeConverter.from_mcp_message_param(cls, param: MCPMessageParam) -> ChatCompletionMessageParam`
**Function:** `MCPOpenAITypeConverter.to_mcp_message_param(cls, param: ChatCompletionMessageParam) -> MCPMessageParam`
**Function:** `mcp_content_to_openai_content(content: TextContent | ImageContent | EmbeddedResource) -> ChatCompletionContentPartTextParam`
**Function:** `openai_content_to_mcp_content(content: str | Iterable[ChatCompletionContentPartParam | ChatCompletionContentPartRefusalParam]) -> Iterable[TextContent | ImageContent | EmbeddedResource]`
### src/mcp_agent/workflows/llm/llm_selector.py
**Class: `ModelBenchmarks`**
- **Inherits from**: BaseModel
- **Description**: Performance benchmarks for comparing different models.
- **Attributes**:
- `__pydantic_extra__` (dict[str, float]) = Field(init=False)
- `quality_score` (float | None) = None: A blended quality score for the model.
- `mmlu_score` (float | None) = None
- `gsm8k_score` (float | None) = None
- `bbh_score` (float | None) = None
- `model_config` = ConfigDict(extra='allow')
**Class: `ModelLatency`**
- **Inherits from**: BaseModel
- **Description**: Latency benchmarks for comparing different models.
- **Attributes**:
- `time_to_first_token_ms` (float) = Field(gt=0): Median Time to first token in milliseconds.
- `tokens_per_second` (float) = Field(gt=0): Median output tokens per second.
**Class: `ModelCost`**
- **Inherits from**: BaseModel
- **Description**: Cost benchmarks for comparing different models.
- **Attributes**:
- `blended_cost_per_1m` (float | None) = None: Blended cost mixing input/output cost per 1M tokens.
- `input_cost_per_1m` (float | None) = None: Cost per 1M input tokens.
- `output_cost_per_1m` (float | None) = None: Cost per 1M output tokens.
**Class: `ModelMetrics`**
- **Inherits from**: BaseModel
- **Description**: Model metrics for comparing different models.
- **Attributes**:
- `cost` (ModelCost)
- `speed` (ModelLatency)
- `intelligence` (ModelBenchmarks)
**Class: `ModelInfo`**
- **Inherits from**: BaseModel
- **Description**: LLM metadata, including performance benchmarks.
- **Attributes**:
- `name` (str)
- `description` (str | None) = None
- `provider` (str)
- `metrics` (ModelMetrics)
**Class: `ModelSelector`**
- **Description**: A heuristic-based selector to choose the best model from a list of models.
Because LLMs can vary along multiple dimensions, choosing the "best" model is
rarely straightforward. Different models excel in different areas—some are
faster but less capable, others are more capable but more expensive, and so
on.
MCP's ModelPreferences interface allows servers to express their priorities across multiple
dimensions to help clients make an appropriate selection for their use case.
**Function:** `ModelSelector.__init__(self, models: List[ModelInfo] = None, benchmark_weights: Dict[str, float] | None = None)`
**Function:** `ModelSelector.select_best_model(self, model_preferences: ModelPreferences, provider: str | None = None) -> ModelInfo`
- **Description**: Select the best model from a given list of models based on the given model preferences.
- **Parameters**
- `self`
- `model_preferences` (ModelPreferences)
- `provider` (str | None, optional): Default is None
- **Returns**
- `ModelInfo`: Return value
**Function:** `ModelSelector._models_by_provider(self, models: List[ModelInfo]) -> Dict[str, List[ModelInfo]]`
- **Description**: Group models by provider.
- **Parameters**
- `self`
- `models` (List[ModelInfo])
- **Returns**
- `Dict[str, List[ModelInfo]]`: Return value
**Function:** `ModelSelector._check_model_hint(self, model: ModelInfo, hint: ModelHint) -> bool`
- **Description**: Check if a model matches a specific hint.
- **Parameters**
- `self`
- `model` (ModelInfo)
- `hint` (ModelHint)
- **Returns**
- `bool`: Return value
**Function:** `ModelSelector._calculate_total_cost(self, model: ModelInfo, io_ratio: float = 3.0) -> float`
- **Description**: Calculate a single cost metric of a model based on input/output token costs, and a ratio of input to output tokens. Args: model: The model to calculate the cost for. io_ratio: The estimated ratio of input to output tokens. Defaults to 3.0.
- **Parameters**
- `self`
- `model` (ModelInfo): The model to calculate the cost for.
- `io_ratio` (float, optional): The estimated ratio of input to output tokens. Defaults to 3.0.
- **Returns**
- `float`: Return value
**Function:** `ModelSelector._calculate_cost_score(self, model: ModelInfo, model_preferences: ModelPreferences, max_cost: float) -> float`
- **Description**: Normalized 0->1 cost score for a model.
- **Parameters**
- `self`
- `model` (ModelInfo)
- `model_preferences` (ModelPreferences)
- `max_cost` (float)
- **Returns**
- `float`: Return value
**Function:** `ModelSelector._calculate_intelligence_score(self, model: ModelInfo, max_values: Dict[str, float]) -> float`
- **Description**: Return a normalized 0->1 intelligence score for a model based on its benchmark metrics.
- **Parameters**
- `self`
- `model` (ModelInfo)
- `max_values` (Dict[str, float])
- **Returns**
- `float`: Return value
**Function:** `ModelSelector._calculate_speed_score(self, model: ModelInfo, max_tokens_per_second: float, max_time_to_first_token_ms: float) -> float`
- **Description**: Normalized 0->1 cost score for a model.
- **Parameters**
- `self`
- `model` (ModelInfo)
- `max_tokens_per_second` (float)
- `max_time_to_first_token_ms` (float)
- **Returns**
- `float`: Return value
**Function:** `ModelSelector._calculate_max_scores(self, models: List[ModelInfo]) -> Dict[str, float]`
- **Description**: Of all the models, calculate the maximum value for each benchmark metric.
- **Parameters**
- `self`
- `models` (List[ModelInfo])
- **Returns**
- `Dict[str, float]`: Return value
**Function:** `load_default_models() -> List[ModelInfo]`
- **Description**: We use ArtificialAnalysis benchmarks for determining the best model.
- **Returns**
- `List[ModelInfo]`: Return value
**Function:** `_fuzzy_match(str1: str, str2: str, threshold: float = 0.8) -> bool`
- **Description**: Fuzzy match two strings Args: str1: First string to compare str2: Second string to compare threshold: Minimum similarity ratio to consider a match (0.0 to 1.0) Returns: bool: True if strings match above threshold, False otherwise
- **Parameters**
- `str1` (str): First string to compare
- `str2` (str): Second string to compare
- `threshold` (float, optional): Minimum similarity ratio to consider a match (0.0 to 1.0)
- **Returns**
- `bool`: bool: True if strings match above threshold, False otherwise
### src/mcp_agent/workflows/orchestrator/orchestrator.py
**Class: `Orchestrator`**
- **Inherits from**: <ast.Subscript object at 0x1056b66d0>
- **Description**: In the orchestrator-workers workflow, a central LLM dynamically breaks down tasks,
delegates them to worker LLMs, and synthesizes their results. It does this
in a loop until the task is complete.
When to use this workflow:
- This workflow is well-suited for complex tasks where you can’t predict the
subtasks needed (in coding, for example, the number of files that need to be
changed and the nature of the change in each file likely depend on the task).
Example where orchestrator-workers is useful:
- Coding products that make complex changes to multiple files each time.
- Search tasks that involve gathering and analyzing information from multiple sources
for possible relevant information.
**Function:** `Orchestrator.__init__(self, llm_factory: Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]], name: str | None = None, planner: AugmentedLLM | None = None, synthesizer: AugmentedLLM | None = None, available_agents: List[Agent | AugmentedLLM] | None = None, plan_type: Literal['full', 'iterative'] = 'full', context: Optional['Context'] = None)`
- **Description**: Args: llm_factory: Factory function to create an LLM for a given agent planner: LLM to use for planning steps (if not provided, a default planner will be used) plan_type: "full" planning generates the full plan first, then executes. "iterative" plans the next step, and loops until success. available_agents: List of agents available to tasks executed by this orchestrator context: Application context
- **Parameters**
- `self`
- `llm_factory` (Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]]): Factory function to create an LLM for a given agent
- `name` (str | None, optional): Default is None
- `planner` (AugmentedLLM | None, optional): LLM to use for planning steps (if not provided, a default planner will be used)
- `synthesizer` (AugmentedLLM | None, optional): Default is None
- `available_agents` (List[Agent | AugmentedLLM] | None, optional): List of agents available to tasks executed by this orchestrator
- `plan_type` (Literal['full', 'iterative'], optional): "full" planning generates the full plan first, then executes. "iterative" plans the next step, and loops until success.
- `context` (Optional['Context'], optional): Application context
**Function:** `Orchestrator.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> List[MessageT]`
- **Description**: Request an LLM generation, which may run multiple iterations, and return the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `List[MessageT]`: Return value
**Function:** `Orchestrator.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> str`
- **Description**: Request an LLM generation and return the string representation of the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `Orchestrator.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Request a structured LLM generation and return the result as a Pydantic model.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
**Function:** `Orchestrator.execute(self, objective: str, request_params: RequestParams | None = None) -> PlanResult`
- **Description**: Execute task with result chaining between steps
- **Parameters**
- `self`
- `objective` (str)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `PlanResult`: Return value
**Function:** `Orchestrator._execute_step(self, step: Step, previous_result: PlanResult, request_params: RequestParams | None = None) -> StepResult`
- **Description**: Execute a step's subtasks in parallel and synthesize results
- **Parameters**
- `self`
- `step` (Step)
- `previous_result` (PlanResult)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `StepResult`: Return value
**Function:** `Orchestrator._get_full_plan(self, objective: str, plan_result: PlanResult, request_params: RequestParams | None = None) -> Plan`
- **Description**: Generate full plan considering previous results
- **Parameters**
- `self`
- `objective` (str)
- `plan_result` (PlanResult)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `Plan`: Return value
**Function:** `Orchestrator._get_next_step(self, objective: str, plan_result: PlanResult, request_params: RequestParams | None = None) -> NextStep`
- **Description**: Generate just the next needed step
- **Parameters**
- `self`
- `objective` (str)
- `plan_result` (PlanResult)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `NextStep`: Return value
**Function:** `Orchestrator._format_server_info(self, server_name: str) -> str`
- **Description**: Format server information for display to planners
- **Parameters**
- `self`
- `server_name` (str)
- **Returns**
- `str`: Return value
**Function:** `Orchestrator._format_agent_info(self, agent_name: str) -> str`
- **Description**: Format Agent information for display to planners
- **Parameters**
- `self`
- `agent_name` (str)
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/orchestrator/orchestrator_models.py
**Class: `Task`**
- **Inherits from**: BaseModel
- **Description**: An individual task that needs to be executed
- **Attributes**:
- `description` (str) = Field(description='Description of the task')
**Class: `ServerTask`**
- **Inherits from**: Task
- **Description**: An individual task that can be accomplished by one or more MCP servers
- **Attributes**:
- `servers` (List[str]) = Field(description='Names of MCP servers that the LLM has access to for this task', default_factory=list)
**Class: `AgentTask`**
- **Inherits from**: Task
- **Description**: An individual task that can be accomplished by an Agent.
- **Attributes**:
- `agent` (str) = Field(description='Name of Agent from given list of agents that the LLM has access to for this task')
**Class: `Step`**
- **Inherits from**: BaseModel
- **Description**: A step containing independent tasks that can be executed in parallel
- **Attributes**:
- `description` (str) = Field(description='Description of the step')
- `tasks` (List[AgentTask]) = Field(description='Subtasks that can be executed in parallel', default_factory=list)
**Class: `Plan`**
- **Inherits from**: BaseModel
- **Description**: Plan generated by the orchestrator planner.
- **Attributes**:
- `steps` (List[Step]) = Field(description='List of steps to execute sequentially', default_factory=list)
- `is_complete` (bool) = Field(description='Whether the overall plan objective is complete')
**Class: `TaskWithResult`**
- **Inherits from**: Task
- **Description**: An individual task with its result
- **Attributes**:
- `result` (str) = Field(description='Result of executing the task', default='Task completed')
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `StepResult`**
- **Inherits from**: BaseModel
- **Description**: Result of executing a step
- **Attributes**:
- `step` (Step) = Field(description='The step that was executed', default_factory=Step)
- `task_results` (List[TaskWithResult]) = Field(description='Results of executing each task', default_factory=list)
- `result` (str) = Field(description='Result of executing the step', default='Step completed')
**Class: `PlanResult`**
- **Inherits from**: BaseModel
- **Description**: Results of executing a plan
- **Attributes**:
- `objective` (str): Objective of the plan
- `plan` (Plan | None) = None: The plan that was executed
- `step_results` (List[StepResult]): Results of executing each step
- `is_complete` (bool) = False: Whether the overall plan objective is complete
- `result` (str | None) = None: Result of executing the plan
**Class: `NextStep`**
- **Inherits from**: Step
- **Description**: Single next step in iterative planning
- **Attributes**:
- `is_complete` (bool) = Field(description='Whether the overall plan objective is complete')
**Function:** `StepResult.add_task_result(self, task_result: TaskWithResult)`
- **Description**: Add a task result to this step
- **Parameters**
- `self`
- `task_result` (TaskWithResult)
**Function:** `PlanResult.add_step_result(self, step_result: StepResult)`
- **Description**: Add a step result to this plan
- **Parameters**
- `self`
- `step_result` (StepResult)
**Function:** `format_task_result(task_result: TaskWithResult) -> str`
- **Description**: Format a task result for display to planners
- **Parameters**
- `task_result` (TaskWithResult)
- **Returns**
- `str`: Return value
**Function:** `format_step_result(step_result: StepResult) -> str`
- **Description**: Format a step result for display to planners
- **Parameters**
- `step_result` (StepResult)
- **Returns**
- `str`: Return value
**Function:** `format_plan_result(plan_result: PlanResult) -> str`
- **Description**: Format the full plan execution state for display to planners
- **Parameters**
- `plan_result` (PlanResult)
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/parallel/fan_in.py
**Class: `FanIn`**
- **Inherits from**: ContextDependent
- **Description**: Aggregate results from multiple parallel tasks into a single result.
This is a building block of the Parallel workflow, which can be used to fan out
work to multiple agents or other parallel tasks, and then aggregate the results.
For example, you can use FanIn to combine the results of multiple agents into a single response,
such as a Summarization Fan-In agent that combines the outputs of multiple language models.
**Function:** `FanIn.__init__(self, aggregator_agent: Agent | AugmentedLLM[MessageParamT, MessageT], llm_factory: Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]] = None, context: Optional['Context'] = None)`
- **Description**: Initialize the FanIn with an Agent responsible for processing multiple responses into a single aggregated one.
- **Parameters**
- `self`
- `aggregator_agent` (Agent | AugmentedLLM[MessageParamT, MessageT])
- `llm_factory` (Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]], optional): Default is None
- `context` (Optional['Context'], optional): Default is None
**Function:** `FanIn.generate(self, messages: FanInInput, request_params: RequestParams | None = None) -> List[MessageT]`
- **Description**: Request fan-in agent generation from a list of messages from multiple sources/agents. Internally aggregates the messages and then calls the aggregator agent to generate a response.
- **Parameters**
- `self`
- `messages` (FanInInput)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `List[MessageT]`: Return value
**Function:** `FanIn.generate_str(self, messages: FanInInput, request_params: RequestParams | None = None) -> str`
- **Description**: Request fan-in agent generation from a list of messages from multiple sources/agents. Internally aggregates the messages and then calls the aggregator agent to generate a response, which is returned as a string.
- **Parameters**
- `self`
- `messages` (FanInInput)
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `FanIn.generate_structured(self, messages: FanInInput, response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Request a structured fan-in agent generation from a list of messages from multiple sources/agents. Internally aggregates the messages and then calls the aggregator agent to generate a response, which is returned as a Pydantic model.
- **Parameters**
- `self`
- `messages` (FanInInput)
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
**Function:** `FanIn.aggregate_messages(self, messages: FanInInput) -> str | MessageParamT | List[MessageParamT]`
- **Description**: Aggregate messages from multiple sources/agents into a single message to use with the aggregator agent generation. The input can be a dictionary of agent/source name to list of messages generated by that agent, or just the unattributed lists of messages to aggregate. Args: messages: Can be one of: - Dict[str, List[MessageT] | List[MessageParamT]]: Dict of agent names to messages - Dict[str, str]: Dict of agent names to message strings - List[List[MessageT] | List[MessageParamT]]: List of message lists from agents - List[str]: List of message strings from agents Returns: Aggregated message as string, MessageParamT or List[MessageParamT] Raises: ValueError: If input is empty or contains empty/invalid elements
- **Parameters**
- `self`
- `messages` (FanInInput)
- **Returns**
- `str | MessageParamT | List[MessageParamT]`: Aggregated message as string, MessageParamT or List[MessageParamT]
- **Raises**: ValueError: If input is empty or contains empty/invalid elements
- **messages: Can be one of**: - Dict[str, List[MessageT] | List[MessageParamT]]: Dict of agent names to messages - Dict[str, str]: Dict of agent names to message strings - List[List[MessageT] | List[MessageParamT]]: List of message lists from agents - List[str]: List of message strings from agents
**Function:** `FanIn.aggregate_agent_messages(self, messages: Dict[str, List[MessageT] | List[MessageParamT]]) -> str | MessageParamT | List[MessageParamT]`
- **Description**: Aggregate message lists with agent names. Args: messages: Dictionary mapping agent names to their message lists Returns: str | List[MessageParamT]: Messages formatted with agent attribution
- **Parameters**
- `self`
- `messages` (Dict[str, List[MessageT] | List[MessageParamT]]): Dictionary mapping agent names to their message lists
- **Returns**
- `str | MessageParamT | List[MessageParamT]`: str | List[MessageParamT]: Messages formatted with agent attribution
**Function:** `FanIn.aggregate_agent_message_strings(self, messages: Dict[str, str]) -> str`
- **Description**: Aggregate string outputs with agent names. Args: messages: Dictionary mapping agent names to their string outputs Returns: str: Combined string with agent attributions
- **Parameters**
- `self`
- `messages` (Dict[str, str]): Dictionary mapping agent names to their string outputs
- **Returns**
- `str`: str: Combined string with agent attributions
**Function:** `FanIn.aggregate_message_lists(self, messages: List[List[MessageT] | List[MessageParamT]]) -> str | MessageParamT | List[MessageParamT]`
- **Description**: Aggregate message lists without agent names. Args: messages: List of message lists from different agents Returns: List[MessageParamT]: List of formatted messages
- **Parameters**
- `self`
- `messages` (List[List[MessageT] | List[MessageParamT]]): List of message lists from different agents
- **Returns**
- `str | MessageParamT | List[MessageParamT]`: List[MessageParamT]: List of formatted messages
**Function:** `FanIn.aggregate_message_strings(self, messages: List[str]) -> str`
- **Description**: Aggregate string outputs without agent names. Args: messages: List of string outputs from different agents Returns: str: Combined string with source attributions
- **Parameters**
- `self`
- `messages` (List[str]): List of string outputs from different agents
- **Returns**
- `str`: str: Combined string with source attributions
### src/mcp_agent/workflows/parallel/fan_out.py
**Class: `FanOut`**
- **Inherits from**: ContextDependent
- **Description**: Distribute work to multiple parallel tasks.
This is a building block of the Parallel workflow, which can be used to fan out
work to multiple agents or other parallel tasks, and then aggregate the results.
**Function:** `FanOut.__init__(self, agents: List[Agent | AugmentedLLM[MessageParamT, MessageT]] | None = None, functions: List[Callable[[MessageParamT], List[MessageT]]] | None = None, llm_factory: Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]] = None, context: Optional['Context'] = None)`
- **Description**: Initialize the FanOut with a list of agents, functions, or LLMs. If agents are provided, they will be wrapped in an AugmentedLLM using llm_factory if not already done so. If functions are provided, they will be invoked in parallel directly.
- **Parameters**
- `self`
- `agents` (List[Agent | AugmentedLLM[MessageParamT, MessageT]] | None, optional): Default is None
- `functions` (List[Callable[[MessageParamT], List[MessageT]]] | None, optional): Default is None
- `llm_factory` (Callable[[Agent], AugmentedLLM[MessageParamT, MessageT]], optional): Default is None
- `context` (Optional['Context'], optional): Default is None
**Function:** `FanOut.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> Dict[str, List[MessageT]]`
- **Description**: Request fan-out agent/function generations, and return the results as a dictionary. The keys are the names of the agents or functions that generated the results.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `Dict[str, List[MessageT]]`: Return value
**Function:** `FanOut.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> Dict[str, str]`
- **Description**: Request fan-out agent/function generations and return the string results as a dictionary. The keys are the names of the agents or functions that generated the results.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `Dict[str, str]`: Return value
**Function:** `FanOut.fn_result_to_string(fn, message)`
**Function:** `FanOut.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> Dict[str, ModelT]`
- **Description**: Request a structured fan-out agent/function generation and return the result as a Pydantic model. The keys are the names of the agents or functions that generated the results.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `Dict[str, ModelT]`: Return value
### src/mcp_agent/workflows/parallel/parallel_llm.py
**Class: `ParallelLLM`**
- **Inherits from**: <ast.Subscript object at 0x105644190>
- **Description**: LLMs can sometimes work simultaneously on a task (fan-out)
and have their outputs aggregated programmatically (fan-in).
This workflow performs both the fan-out and fan-in operations using LLMs.
From the user's perspective, an input is specified and the output is returned.
When to use this workflow:
Parallelization is effective when the divided subtasks can be parallelized
for speed (sectioning), or when multiple perspectives or attempts are needed for
higher confidence results (voting).
Examples:
Sectioning:
- Implementing guardrails where one model instance processes user queries
while another screens them for inappropriate content or requests.
- Automating evals for evaluating LLM performance, where each LLM call
evaluates a different aspect of the model’s performance on a given prompt.
Voting:
- Reviewing a piece of code for vulnerabilities, where several different
agents review and flag the code if they find a problem.
- Evaluating whether a given piece of content is inappropriate,
with multiple agents evaluating different aspects or requiring different
vote thresholds to balance false positives and negatives.
**Function:** `ParallelLLM.__init__(self, fan_in_agent: Agent | AugmentedLLM | Callable[[FanInInput], Any], fan_out_agents: List[Agent | AugmentedLLM] | None = None, fan_out_functions: List[Callable] | None = None, name: str | None = None, llm_factory: Callable[[Agent], AugmentedLLM] = None, context: Optional['Context'] = None)`
- **Description**: Initialize the LLM with a list of server names and an instruction. If a name is provided, it will be used to identify the LLM. If an agent is provided, all other properties are optional
- **Parameters**
- `self`
- `fan_in_agent` (Agent | AugmentedLLM | Callable[[FanInInput], Any])
- `fan_out_agents` (List[Agent | AugmentedLLM] | None, optional): Default is None
- `fan_out_functions` (List[Callable] | None, optional): Default is None
- `name` (str | None, optional): Default is None
- `llm_factory` (Callable[[Agent], AugmentedLLM], optional): Default is None
- `context` (Optional['Context'], optional): Default is None
**Function:** `ParallelLLM.generate(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> List[MessageT] | Any`
**Function:** `ParallelLLM.generate_str(self, message: str | MessageParamT | List[MessageParamT], request_params: RequestParams | None = None) -> str`
- **Description**: Request an LLM generation and return the string representation of the result
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `ParallelLLM.generate_structured(self, message: str | MessageParamT | List[MessageParamT], response_model: Type[ModelT], request_params: RequestParams | None = None) -> ModelT`
- **Description**: Request a structured LLM generation and return the result as a Pydantic model.
- **Parameters**
- `self`
- `message` (str | MessageParamT | List[MessageParamT])
- `response_model` (Type[ModelT])
- `request_params` (RequestParams | None, optional): Default is None
- **Returns**
- `ModelT`: Return value
### src/mcp_agent/workflows/router/router_base.py
**Class: `RouterResult`**
- **Inherits from**: BaseModel, <ast.Subscript object at 0x1056e27f0>
- **Description**: A class that represents the result of a Router.route request
- **Attributes**:
- `result` (ResultT): The router returns an MCP server name, an Agent, or a function to route the input to.
- `p_score` (float | None) = None: The probability score (i.e. 0->1) of the routing decision. This is optional and may only be provided if the router is probabilistic (e.g. a probabilistic binary classifier).
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `RouterCategory`**
- **Inherits from**: BaseModel
- **Description**: A class that represents a category of routing.
Used to collect information the router needs to decide.
- **Attributes**:
- `name` (str): The name of the category
- `description` (str | None) = None: A description of the category
- `category` (str | Agent | Callable): The class to route to
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `ServerRouterCategory`**
- **Inherits from**: RouterCategory
- **Description**: A class that represents a category of routing to an MCP server
- **Attributes**:
- `tools` (List[FastTool]) = Field(default_factory=list)
**Class: `AgentRouterCategory`**
- **Inherits from**: RouterCategory
- **Description**: A class that represents a category of routing to an agent
- **Attributes**:
- `servers` (List[ServerRouterCategory]) = Field(default_factory=list)
**Class: `Router`**
- **Inherits from**: ABC, ContextDependent
- **Description**: Routing classifies an input and directs it to one or more specialized followup tasks.
This class helps to route an input to a specific MCP server,
an Agent (an aggregation of MCP servers), or a function (any Callable).
When to use this workflow:
- This workflow allows for separation of concerns, and building more specialized prompts.
- Routing works well for complex tasks where there are distinct categories that
are better handled separately, and where classification can be handled accurately,
either by an LLM or a more traditional classification model/algorithm.
Examples where routing is useful:
- Directing different types of customer service queries
(general questions, refund requests, technical support)
into different downstream processes, prompts, and tools.
- Routing easy/common questions to smaller models like Claude 3.5 Haiku
and hard/unusual questions to more capable models like Claude 3.5 Sonnet
to optimize cost and speed.
Args:
routing_instruction: A string that tells the router how to route the input.
mcp_servers_names: A list of server names to route the input to.
agents: A list of agents to route the input to.
functions: A list of functions to route the input to.
**Function:** `Router.__init__(self, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None)`
**Function:** `Router.route(self, request: str, top_k: int = 1) -> List[RouterResult[str | Agent | Callable]]`
- **Description**: Route the input request to one or more MCP servers, agents, or functions. If no routing decision can be made, returns an empty list. Args: request: The input to route. top_k: The maximum number of top routing results to return. May return fewer.
- **Parameters**
- `self`
- `request` (str): The input to route.
- `top_k` (int, optional): The maximum number of top routing results to return. May return fewer.
- **Returns**
- `List[RouterResult[str | Agent | Callable]]`: Return value
**Function:** `Router.route_to_server(self, request: str, top_k: int = 1) -> List[RouterResult[str]]`
- **Description**: Route the input to one or more MCP servers.
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[str]]`: Return value
**Function:** `Router.route_to_agent(self, request: str, top_k: int = 1) -> List[RouterResult[Agent]]`
- **Description**: Route the input to one or more agents.
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[Agent]]`: Return value
**Function:** `Router.route_to_function(self, request: str, top_k: int = 1) -> List[RouterResult[Callable]]`
- **Description**: Route the input to one or more functions. Args: input: The input to route.
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[Callable]]`: Return value
**Function:** `Router.initialize(self)`
- **Description**: Initialize the router categories.
- **Parameters**
- `self`
**Function:** `Router.get_server_category(self, server_name: str) -> ServerRouterCategory`
**Function:** `Router.get_agent_category(self, agent: Agent) -> AgentRouterCategory`
**Function:** `Router.get_function_category(self, function: Callable) -> RouterCategory`
**Function:** `Router.format_category(self, category: RouterCategory, index: int | None = None) -> str`
- **Description**: Format a category into a readable string.
- **Parameters**
- `self`
- `category` (RouterCategory)
- `index` (int | None, optional): Default is None
- **Returns**
- `str`: Return value
**Function:** `Router._format_tools(self, tools: List[FastTool]) -> str`
- **Description**: Format a list of tools into a readable string.
- **Parameters**
- `self`
- `tools` (List[FastTool])
- **Returns**
- `str`: Return value
**Function:** `Router._format_server_category(self, category: ServerRouterCategory) -> str`
- **Description**: Format a server category into a readable string.
- **Parameters**
- `self`
- `category` (ServerRouterCategory)
- **Returns**
- `str`: Return value
**Function:** `Router._format_agent_category(self, category: AgentRouterCategory) -> str`
- **Description**: Format an agent category into a readable string.
- **Parameters**
- `self`
- `category` (AgentRouterCategory)
- **Returns**
- `str`: Return value
**Function:** `Router._format_function_category(self, category: RouterCategory) -> str`
- **Description**: Format a function category into a readable string.
- **Parameters**
- `self`
- `category` (RouterCategory)
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/router/router_embedding.py
**Class: `EmbeddingRouterCategory`**
- **Inherits from**: RouterCategory
- **Description**: A category for embedding-based routing
- **Attributes**:
- `embedding` (FloatArray | None) = None: Pre-computed embedding for this category
**Class: `EmbeddingRouter`**
- **Inherits from**: Router
- **Description**: A router that uses embedding similarity to route requests to appropriate categories.
This class helps to route an input to a specific MCP server, an Agent (an aggregation of MCP servers),
or a function (any Callable).
Features:
- Semantic similarity based routing using embeddings
- Flexible embedding model support
- Support for formatting and combining category metadata
Example usage:
# Initialize router with embedding model
router = EmbeddingRouter(
embedding_model=OpenAIEmbeddingModel(model="text-embedding-3-small"),
mcp_servers_names=["customer_service", "tech_support"],
)
# Route a request
results = await router.route("My laptop keeps crashing")
**Function:** `EmbeddingRouter.__init__(self, embedding_model: EmbeddingModel, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, context: Optional['Context'] = None)`
**Function:** `EmbeddingRouter.create(cls, embedding_model: EmbeddingModel, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, context: Optional['Context'] = None) -> 'EmbeddingRouter'`
- **Description**: Factory method to create and initialize a router. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `embedding_model` (EmbeddingModel)
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'EmbeddingRouter'`: Return value
**Function:** `EmbeddingRouter.initialize(self)`
- **Description**: Initialize by computing embeddings for all categories
- **Parameters**
- `self`
**Function:** `EmbeddingRouter.create_category_with_embedding(category: RouterCategory) -> EmbeddingRouterCategory`
**Function:** `EmbeddingRouter.route(self, request: str, top_k: int = 1) -> List[RouterResult[str | Agent | Callable]]`
- **Description**: Route the request based on embedding similarity
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[str | Agent | Callable]]`: Return value
**Function:** `EmbeddingRouter.route_to_server(self, request: str, top_k: int = 1) -> List[RouterResult[str]]`
- **Description**: Route specifically to server categories
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[str]]`: Return value
**Function:** `EmbeddingRouter.route_to_agent(self, request: str, top_k: int = 1) -> List[RouterResult[Agent]]`
- **Description**: Route specifically to agent categories
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[Agent]]`: Return value
**Function:** `EmbeddingRouter.route_to_function(self, request: str, top_k: int = 1) -> List[RouterResult[Callable]]`
- **Description**: Route specifically to function categories
- **Parameters**
- `self`
- `request` (str)
- `top_k` (int, optional): Default is 1
- **Returns**
- `List[RouterResult[Callable]]`: Return value
**Function:** `EmbeddingRouter._route_with_embedding(self, request: str, top_k: int = 1, include_servers: bool = True, include_agents: bool = True, include_functions: bool = True) -> List[RouterResult]`
**Function:** `EmbeddingRouter.create_result(category: RouterCategory, request_embedding)`
**Function:** `EmbeddingRouter._compute_embedding(self, data: List[str])`
### src/mcp_agent/workflows/router/router_embedding_cohere.py
**Class: `CohereEmbeddingRouter`**
- **Inherits from**: EmbeddingRouter
- **Description**: A router that uses Cohere embedding similarity to route requests to appropriate categories.
This class helps to route an input to a specific MCP server, an Agent (an aggregation of MCP servers),
or a function (any Callable).
**Function:** `CohereEmbeddingRouter.__init__(self, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, embedding_model: CohereEmbeddingModel | None = None, context: Optional['Context'] = None)`
**Function:** `CohereEmbeddingRouter.create(cls, embedding_model: CohereEmbeddingModel | None = None, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, context: Optional['Context'] = None) -> 'CohereEmbeddingRouter'`
- **Description**: Factory method to create and initialize a router. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `embedding_model` (CohereEmbeddingModel | None, optional): Default is None
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'CohereEmbeddingRouter'`: Return value
### src/mcp_agent/workflows/router/router_embedding_openai.py
**Class: `OpenAIEmbeddingRouter`**
- **Inherits from**: EmbeddingRouter
- **Description**: A router that uses OpenAI embedding similarity to route requests to appropriate categories.
This class helps to route an input to a specific MCP server, an Agent (an aggregation of MCP servers),
or a function (any Callable).
**Function:** `OpenAIEmbeddingRouter.__init__(self, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, embedding_model: OpenAIEmbeddingModel | None = None, context: Optional['Context'] = None)`
**Function:** `OpenAIEmbeddingRouter.create(cls, embedding_model: OpenAIEmbeddingModel | None = None, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, context: Optional['Context'] = None) -> 'OpenAIEmbeddingRouter'`
- **Description**: Factory method to create and initialize a router. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `embedding_model` (OpenAIEmbeddingModel | None, optional): Default is None
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'OpenAIEmbeddingRouter'`: Return value
### src/mcp_agent/workflows/router/router_llm.py
**Class: `LLMRouterResult`**
- **Inherits from**: <ast.Subscript object at 0x1056a8730>
- **Description**: A class that represents the result of an LLMRouter.route request
- **Attributes**:
- `confidence` (Literal['high', 'medium', 'low']): The confidence level of the routing decision.
- `reasoning` (str | None) = None: A brief explanation of the routing decision. This is optional and may only be provided if the router is an LLM
**Class: `StructuredResponseCategory`**
- **Inherits from**: BaseModel
- **Description**: A class that represents a single category returned by an LLM router
- **Attributes**:
- `category` (str): The name of the category (i.e. MCP server, Agent or function) to route the input to.
- `confidence` (Literal['high', 'medium', 'low']): The confidence level of the routing decision.
- `reasoning` (str | None) = None: A brief explanation of the routing decision.
**Class: `StructuredResponse`**
- **Inherits from**: BaseModel
- **Description**: A class that represents the structured response of an LLM router
- **Attributes**:
- `categories` (List[StructuredResponseCategory]): A list of categories to route the input to.
**Class: `LLMRouter`**
- **Inherits from**: Router
- **Description**: A router that uses an LLM to route an input to a specific category.
**Function:** `LLMRouter.__init__(self, llm: AugmentedLLM, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None)`
**Function:** `LLMRouter.create(cls, llm: AugmentedLLM, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None) -> 'LLMRouter'`
- **Description**: Factory method to create and initialize a router. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `llm` (AugmentedLLM)
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `routing_instruction` (str | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'LLMRouter'`: Return value
**Function:** `LLMRouter.route(self, request: str, top_k: int = 1) -> List[LLMRouterResult[str | Agent | Callable]]`
**Function:** `LLMRouter.route_to_server(self, request: str, top_k: int = 1) -> List[LLMRouterResult[str]]`
**Function:** `LLMRouter.route_to_agent(self, request: str, top_k: int = 1) -> List[LLMRouterResult[Agent]]`
**Function:** `LLMRouter.route_to_function(self, request: str, top_k: int = 1) -> List[LLMRouterResult[Callable]]`
**Function:** `LLMRouter._route_with_llm(self, request: str, top_k: int = 1, include_servers: bool = True, include_agents: bool = True, include_functions: bool = True) -> List[LLMRouterResult]`
**Function:** `LLMRouter._generate_context(self, include_servers: bool = True, include_agents: bool = True, include_functions: bool = True) -> str`
- **Description**: Generate a formatted context list of categories.
- **Parameters**
- `self`
- `include_servers` (bool, optional): Default is True
- `include_agents` (bool, optional): Default is True
- `include_functions` (bool, optional): Default is True
- **Returns**
- `str`: Return value
### src/mcp_agent/workflows/router/router_llm_anthropic.py
**Class: `AnthropicLLMRouter`**
- **Inherits from**: LLMRouter
- **Description**: An LLM router that uses an Anthropic model to make routing decisions.
**Function:** `AnthropicLLMRouter.__init__(self, name: str | None = None, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None)`
**Function:** `AnthropicLLMRouter.create(cls, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None) -> 'AnthropicLLMRouter'`
- **Description**: Factory method to create and initialize a router. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `routing_instruction` (str | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'AnthropicLLMRouter'`: Return value
### src/mcp_agent/workflows/router/router_llm_openai.py
**Class: `OpenAILLMRouter`**
- **Inherits from**: LLMRouter
- **Description**: An LLM router that uses an OpenAI model to make routing decisions.
**Function:** `OpenAILLMRouter.__init__(self, name: str | None = None, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None)`
**Function:** `OpenAILLMRouter.create(cls, server_names: List[str] | None = None, agents: List[Agent] | None = None, functions: List[Callable] | None = None, routing_instruction: str | None = None, context: Optional['Context'] = None) -> 'OpenAILLMRouter'`
- **Description**: Factory method to create and initialize a classifier. Use this instead of constructor since we need async initialization.
- **Parameters**
- `cls`
- `server_names` (List[str] | None, optional): Default is None
- `agents` (List[Agent] | None, optional): Default is None
- `functions` (List[Callable] | None, optional): Default is None
- `routing_instruction` (str | None, optional): Default is None
- `context` (Optional['Context'], optional): Default is None
- **Returns**
- `'OpenAILLMRouter'`: Return value
### src/mcp_agent/workflows/swarm/swarm.py
**Class: `AgentResource`**
- **Inherits from**: EmbeddedResource
- **Description**: A resource that returns an agent. Meant for use with tool calls that want to return an Agent for further processing.
- **Attributes**:
- `agent` (Optional['Agent']) = None
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `AgentFunctionResultResource`**
- **Inherits from**: EmbeddedResource
- **Description**: A resource that returns an AgentFunctionResult.
Meant for use with tool calls that return an AgentFunctionResult for further processing.
- **Attributes**:
- `result` ('AgentFunctionResult')
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `SwarmAgent`**
- **Inherits from**: Agent
- **Description**: A SwarmAgent is an Agent that can spawn other agents and interactively resolve a task.
Based on OpenAI Swarm: https://github.com/openai/swarm.
SwarmAgents have access to tools available on the servers they are connected to, but additionally
have a list of (possibly local) functions that can be called as tools.
**Class: `AgentFunctionResult`**
- **Inherits from**: BaseModel
- **Description**: Encapsulates the possible return values for a Swarm agent function.
Attributes:
value (str): The result value as a string.
agent (Agent): The agent instance, if applicable.
context_variables (dict): A dictionary of context variables.
- **Attributes**:
- `value` (str) = ''
- `agent` (Agent | None) = None
- `context_variables` (dict) = {}
- `model_config` = ConfigDict(extra='allow', arbitrary_types_allowed=True)
**Class: `Swarm`**
- **Inherits from**: <ast.Subscript object at 0x105649e80>, <ast.Subscript object at 0x105649c40>
- **Description**: Handles orchestrating agents that can use tools via MCP servers.
MCP version of the OpenAI Swarm class (https://github.com/openai/swarm.)
**Class: `DoneAgent`**
- **Inherits from**: SwarmAgent
- **Description**: A special agent that represents the end of a Swarm workflow.
**Function:** `create_agent_resource(agent: 'Agent') -> AgentResource`
**Function:** `create_agent_function_result_resource(result: 'AgentFunctionResult') -> AgentFunctionResultResource`
**Function:** `SwarmAgent.__init__(self, name: str, instruction: str | Callable[[Dict], str] = 'You are a helpful agent.', server_names: list[str] = None, functions: List['AgentFunctionCallable'] = None, parallel_tool_calls: bool = False, human_input_callback: HumanInputCallback = None, context: Optional['Context'] = None)`
**Function:** `SwarmAgent.call_tool(self, name: str, arguments: dict | None = None) -> CallToolResult`
**Function:** `create_transfer_to_agent_tool(agent: 'Agent', agent_function: Callable[[], None]) -> Tool`
**Function:** `create_agent_function_tool(agent_function: 'AgentFunctionCallable') -> Tool`
**Function:** `Swarm.__init__(self, agent: SwarmAgent, context_variables: Dict[str, str] = None)`
- **Description**: Initialize the LLM planner with an agent, which will be used as the starting point for the workflow.
- **Parameters**
- `self`
- `agent` (SwarmAgent)
- `context_variables` (Dict[str, str], optional): Default is None
**Function:** `Swarm.get_tool(self, tool_name: str) -> Tool | None`
- **Description**: Get the schema for a tool by name.
- **Parameters**
- `self`
- `tool_name` (str)
- **Returns**
- `Tool | None`: Return value
**Function:** `Swarm.pre_tool_call(self, tool_call_id: str | None, request: CallToolRequest) -> CallToolRequest | bool`
**Function:** `Swarm.post_tool_call(self, tool_call_id: str | None, request: CallToolRequest, result: CallToolResult) -> CallToolResult`
**Function:** `Swarm.set_agent(self, agent: SwarmAgent)`
**Function:** `Swarm.should_continue(self) -> bool`
- **Description**: Returns True if the workflow should continue, False otherwise.
- **Parameters**
- `self`
- **Returns**
- `bool`: Return value
**Function:** `DoneAgent.__init__(self)`
**Function:** `DoneAgent.call_tool(self, _name: str, _arguments: dict | None = None) -> CallToolResult`
### src/mcp_agent/workflows/swarm/swarm_anthropic.py
**Class: `AnthropicSwarm`**
- **Inherits from**: Swarm, AnthropicAugmentedLLM
- **Description**: MCP version of the OpenAI Swarm class (https://github.com/openai/swarm.),
using Anthropic's API as the LLM.
**Function:** `AnthropicSwarm.generate(self, message, request_params: RequestParams | None = None)`
### src/mcp_agent/workflows/swarm/swarm_openai.py
**Class: `OpenAISwarm`**
- **Inherits from**: Swarm, OpenAIAugmentedLLM
- **Description**: MCP version of the OpenAI Swarm class (https://github.com/openai/swarm.), using OpenAI's ChatCompletion as the LLM.
**Function:** `OpenAISwarm.generate(self, message, request_params: RequestParams | None = None)`