Skip to main content
Glama

mcp-run-python

Official
by pydantic
abstract.py55.7 kB
from __future__ import annotations as _annotations import asyncio import inspect from abc import ABC, abstractmethod from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator, Mapping, Sequence from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager from types import FrameType from typing import TYPE_CHECKING, Any, Generic, TypeAlias, cast, overload import anyio from typing_extensions import Self, TypeIs, TypeVar from pydantic_graph import End from pydantic_graph._utils import get_event_loop from .. import ( _agent_graph, _system_prompt, _utils, exceptions, messages as _messages, models, result, usage as _usage, ) from .._tool_manager import ToolManager from ..builtin_tools import AbstractBuiltinTool from ..output import OutputDataT, OutputSpec from ..result import AgentStream, FinalResult, StreamedRunResult from ..run import AgentRun, AgentRunResult, AgentRunResultEvent from ..settings import ModelSettings from ..tools import ( AgentDepsT, DeferredToolResults, RunContext, Tool, ToolFuncEither, ) from ..toolsets import AbstractToolset from ..usage import RunUsage, UsageLimits if TYPE_CHECKING: from fasta2a.applications import FastA2A from fasta2a.broker import Broker from fasta2a.schema import AgentProvider, Skill from fasta2a.storage import Storage from starlette.middleware import Middleware from starlette.routing import BaseRoute, Route from starlette.types import ExceptionHandler, Lifespan from ..ag_ui import AGUIApp T = TypeVar('T') S = TypeVar('S') NoneType = type(None) RunOutputDataT = TypeVar('RunOutputDataT') """Type variable for the result data of a run where `output_type` was customized on the run call.""" EventStreamHandler: TypeAlias = Callable[ [RunContext[AgentDepsT], AsyncIterable[_messages.AgentStreamEvent]], Awaitable[None] ] """A function that receives agent [`RunContext`][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools.""" Instructions = ( str | _system_prompt.SystemPromptFunc[AgentDepsT] | Sequence[str | _system_prompt.SystemPromptFunc[AgentDepsT]] | None ) class AbstractAgent(Generic[AgentDepsT, OutputDataT], ABC): """Abstract superclass for [`Agent`][pydantic_ai.agent.Agent], [`WrapperAgent`][pydantic_ai.agent.WrapperAgent], and your own custom agent implementations.""" @property @abstractmethod def model(self) -> models.Model | models.KnownModelName | str | None: """The default model configured for this agent.""" raise NotImplementedError @property @abstractmethod def name(self) -> str | None: """The name of the agent, used for logging. If `None`, we try to infer the agent name from the call frame when the agent is first run. """ raise NotImplementedError @name.setter @abstractmethod def name(self, value: str | None) -> None: """Set the name of the agent, used for logging.""" raise NotImplementedError @property @abstractmethod def deps_type(self) -> type: """The type of dependencies used by the agent.""" raise NotImplementedError @property @abstractmethod def output_type(self) -> OutputSpec[OutputDataT]: """The type of data output by agent runs, used to validate the data returned by the model, defaults to `str`.""" raise NotImplementedError @property @abstractmethod def event_stream_handler(self) -> EventStreamHandler[AgentDepsT] | None: """Optional handler for events from the model's streaming response and the agent's execution of tools.""" raise NotImplementedError @property @abstractmethod def toolsets(self) -> Sequence[AbstractToolset[AgentDepsT]]: """All toolsets registered on the agent. Output tools are not included. """ raise NotImplementedError @overload async def run( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[OutputDataT]: ... @overload async def run( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT], message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[RunOutputDataT]: ... async def run( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[Any]: """Run the agent with a user prompt in async mode. This method builds an internal agent graph (using system prompts, tools and output schemas) and then runs the graph to completion. The result of the run is returned. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') async def main(): agent_run = await agent.run('What is the capital of France?') print(agent_run.output) #> The capital of France is Paris. ``` Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. message_history: History of the conversation so far. deferred_tool_results: Optional results for deferred tool calls in the message history. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. builtin_tools: Optional additional builtin tools for this run. Returns: The result of the run. """ if infer_name and self.name is None: self._infer_name(inspect.currentframe()) event_stream_handler = event_stream_handler or self.event_stream_handler async with self.iter( user_prompt=user_prompt, output_type=output_type, message_history=message_history, deferred_tool_results=deferred_tool_results, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, toolsets=toolsets, builtin_tools=builtin_tools, ) as agent_run: async for node in agent_run: if event_stream_handler is not None and ( self.is_model_request_node(node) or self.is_call_tools_node(node) ): async with node.stream(agent_run.ctx) as stream: await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream) assert agent_run.result is not None, 'The graph run did not finish properly' return agent_run.result @overload def run_sync( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[OutputDataT]: ... @overload def run_sync( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT], message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[RunOutputDataT]: ... def run_sync( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AgentRunResult[Any]: """Synchronously run the agent with a user prompt. This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') result_sync = agent.run_sync('What is the capital of Italy?') print(result_sync.output) #> The capital of Italy is Rome. ``` Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. message_history: History of the conversation so far. deferred_tool_results: Optional results for deferred tool calls in the message history. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. builtin_tools: Optional additional builtin tools for this run. Returns: The result of the run. """ if infer_name and self.name is None: self._infer_name(inspect.currentframe()) return get_event_loop().run_until_complete( self.run( user_prompt, output_type=output_type, message_history=message_history, deferred_tool_results=deferred_tool_results, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, infer_name=False, toolsets=toolsets, builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, ) ) @overload def run_stream( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AbstractAsyncContextManager[result.StreamedRunResult[AgentDepsT, OutputDataT]]: ... @overload def run_stream( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT], message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AbstractAsyncContextManager[result.StreamedRunResult[AgentDepsT, RunOutputDataT]]: ... @asynccontextmanager async def run_stream( # noqa C901 self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[result.StreamedRunResult[AgentDepsT, Any]]: """Run the agent with a user prompt in async streaming mode. This method builds an internal agent graph (using system prompts, tools and output schemas) and then runs the graph until the model produces output matching the `output_type`, for example text or structured data. At this point, a streaming run result object is yielded from which you can stream the output as it comes in, and -- once this output has completed streaming -- get the complete output, message history, and usage. As this method will consider the first output matching the `output_type` to be the final output, it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output. If you want to always run the agent graph to completion and stream events and output at the same time, use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') async def main(): async with agent.run_stream('What is the capital of the UK?') as response: print(await response.get_output()) #> The capital of the UK is London. ``` Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. message_history: History of the conversation so far. deferred_tool_results: Optional results for deferred tool calls in the message history. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager. Note that it does _not_ receive any events after the final result is found. Returns: The result of the run. """ if infer_name and self.name is None: # f_back because `asynccontextmanager` adds one frame if frame := inspect.currentframe(): # pragma: no branch self._infer_name(frame.f_back) event_stream_handler = event_stream_handler or self.event_stream_handler yielded = False async with self.iter( user_prompt, output_type=output_type, message_history=message_history, deferred_tool_results=deferred_tool_results, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, infer_name=False, toolsets=toolsets, builtin_tools=builtin_tools, ) as agent_run: first_node = agent_run.next_node # start with the first node assert isinstance(first_node, _agent_graph.UserPromptNode) # the first node should be a user prompt node node = first_node while True: graph_ctx = agent_run.ctx if self.is_model_request_node(node): async with node.stream(graph_ctx) as stream: final_result_event = None async def stream_to_final( stream: AgentStream, ) -> AsyncIterator[_messages.ModelResponseStreamEvent]: nonlocal final_result_event async for event in stream: yield event if isinstance(event, _messages.FinalResultEvent): final_result_event = event break if event_stream_handler is not None: await event_stream_handler( _agent_graph.build_run_context(graph_ctx), stream_to_final(stream) ) else: async for _ in stream_to_final(stream): pass if final_result_event is not None: final_result = FinalResult( None, final_result_event.tool_name, final_result_event.tool_call_id ) if yielded: raise exceptions.AgentRunError('Agent run produced final results') # pragma: no cover yielded = True messages = graph_ctx.state.message_history.copy() async def on_complete() -> None: """Called when the stream has completed. The model response will have been added to messages by now by `StreamedRunResult._marked_completed`. """ nonlocal final_result final_result = FinalResult( await stream.get_output(), final_result.tool_name, final_result.tool_call_id ) # When we get here, the `ModelRequestNode` has completed streaming after the final result was found. # When running an agent with `agent.run`, we'd then move to `CallToolsNode` to execute the tool calls and # find the final result. # We also want to execute tool calls (in case `agent.end_strategy == 'exhaustive'`) here, but # we don't want to use run the `CallToolsNode` logic to determine the final output, as it would be # wasteful and could produce a different result (e.g. when text output is followed by tool calls). # So we call `process_tool_calls` directly and then end the run with the found final result. parts: list[_messages.ModelRequestPart] = [] async for _event in _agent_graph.process_tool_calls( tool_manager=graph_ctx.deps.tool_manager, tool_calls=stream.response.tool_calls, tool_call_results=None, final_result=final_result, ctx=graph_ctx, output_parts=parts, ): pass # For backwards compatibility, append a new ModelRequest using the tool returns and retries if parts: messages.append(_messages.ModelRequest(parts)) await agent_run.next(_agent_graph.SetFinalResult(final_result)) yield StreamedRunResult( messages, graph_ctx.deps.new_message_index, stream, on_complete, ) break elif self.is_call_tools_node(node) and event_stream_handler is not None: async with node.stream(agent_run.ctx) as stream: await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream) next_node = await agent_run.next(node) if isinstance(next_node, End) and agent_run.result is not None: # A final output could have been produced by the CallToolsNode rather than the ModelRequestNode, # if a tool function raised CallDeferred or ApprovalRequired. # In this case there's no response to stream, but we still let the user access the output etc as normal. yield StreamedRunResult( graph_ctx.state.message_history, graph_ctx.deps.new_message_index, run_result=agent_run.result, ) yielded = True break if not isinstance(next_node, _agent_graph.AgentNode): raise exceptions.AgentRunError( # pragma: no cover 'Should have produced a StreamedRunResult before getting here' ) node = cast(_agent_graph.AgentNode[Any, Any], next_node) if not yielded: raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover @overload def run_stream_events( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... @overload def run_stream_events( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT], message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... def run_stream_events( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] and uses the `event_stream_handler` kwarg to get a stream of events from the run. Example: ```python from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent agent = Agent('openai:gpt-4o') async def main(): events: list[AgentStreamEvent | AgentRunResultEvent] = [] async for event in agent.run_stream_events('What is the capital of France?'): events.append(event) print(events) ''' [ PartStartEvent(index=0, part=TextPart(content='The capital of ')), FinalResultEvent(tool_name=None, tool_call_id=None), PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='France is Paris. ')), AgentRunResultEvent( result=AgentRunResult(output='The capital of France is Paris. ') ), ] ''' ``` Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], except that `event_stream_handler` is now allowed. Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. message_history: History of the conversation so far. deferred_tool_results: Optional results for deferred tool calls in the message history. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools for this run. Returns: An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final run result. """ # unfortunately this hack of returning a generator rather than defining it right here is # required to allow overloads of this method to work in python's typing system, or at least with pyright # or at least I couldn't make it work without return self._run_stream_events( user_prompt, output_type=output_type, message_history=message_history, deferred_tool_results=deferred_tool_results, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, ) async def _run_stream_events( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: send_stream, receive_stream = anyio.create_memory_object_stream[ _messages.AgentStreamEvent | AgentRunResultEvent[Any] ]() async def event_stream_handler( _: RunContext[AgentDepsT], events: AsyncIterable[_messages.AgentStreamEvent] ) -> None: async for event in events: await send_stream.send(event) async def run_agent() -> AgentRunResult[Any]: async with send_stream: return await self.run( user_prompt, output_type=output_type, message_history=message_history, deferred_tool_results=deferred_tool_results, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, ) task = asyncio.create_task(run_agent()) async with receive_stream: async for message in receive_stream: yield message result = await task yield AgentRunResultEvent(result) @overload def iter( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, OutputDataT]]: ... @overload def iter( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT], message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, RunOutputDataT]]: ... @asynccontextmanager @abstractmethod async def iter( self, user_prompt: str | Sequence[_messages.UserContent] | None = None, *, output_type: OutputSpec[RunOutputDataT] | None = None, message_history: Sequence[_messages.ModelMessage] | None = None, deferred_tool_results: DeferredToolResults | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[AgentRun[AgentDepsT, Any]]: """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed. This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the stream of events coming from the execution of tools. The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics, and the final result of the run once it has completed. For more details, see the documentation of `AgentRun`. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') async def main(): nodes = [] async with agent.iter('What is the capital of France?') as agent_run: async for node in agent_run: nodes.append(node) print(nodes) ''' [ UserPromptNode( user_prompt='What is the capital of France?', instructions_functions=[], system_prompts=(), system_prompt_functions=[], system_prompt_dynamic_functions={}, ), ModelRequestNode( request=ModelRequest( parts=[ UserPromptPart( content='What is the capital of France?', timestamp=datetime.datetime(...), ) ] ) ), CallToolsNode( model_response=ModelResponse( parts=[TextPart(content='The capital of France is Paris.')], usage=RequestUsage(input_tokens=56, output_tokens=7), model_name='gpt-4o', timestamp=datetime.datetime(...), ) ), End(data=FinalResult(output='The capital of France is Paris.')), ] ''' print(agent_run.result.output) #> The capital of France is Paris. ``` Args: user_prompt: User input to start/continue the conversation. output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. message_history: History of the conversation so far. deferred_tool_results: Optional results for deferred tool calls in the message history. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools for this run. Returns: The result of the run. """ raise NotImplementedError yield @contextmanager @abstractmethod def override( self, *, name: str | _utils.Unset = _utils.UNSET, deps: AgentDepsT | _utils.Unset = _utils.UNSET, model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET, toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET, tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET, instructions: Instructions[AgentDepsT] | _utils.Unset = _utils.UNSET, ) -> Iterator[None]: """Context manager to temporarily override agent name, dependencies, model, toolsets, tools, or instructions. This is particularly useful when testing. You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures). Args: name: The name to use instead of the name passed to the agent constructor and agent run. deps: The dependencies to use instead of the dependencies passed to the agent run. model: The model to use instead of the model passed to the agent run. toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run. tools: The tools to use instead of the tools registered with the agent. instructions: The instructions to use instead of the instructions registered with the agent. """ raise NotImplementedError yield def _infer_name(self, function_frame: FrameType | None) -> None: """Infer the agent name from the call frame. RunUsage should be `self._infer_name(inspect.currentframe())`. """ assert self.name is None, 'Name already set' if function_frame is not None: # pragma: no branch if parent_frame := function_frame.f_back: # pragma: no branch for name, item in parent_frame.f_locals.items(): if item is self: self.name = name return if parent_frame.f_locals != parent_frame.f_globals: # pragma: no branch # if we couldn't find the agent in locals and globals are a different dict, try globals for name, item in parent_frame.f_globals.items(): if item is self: self.name = name return @staticmethod @contextmanager def sequential_tool_calls() -> Iterator[None]: """Run tool calls sequentially during the context.""" with ToolManager.sequential_tool_calls(): yield @staticmethod def is_model_request_node( node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]], ) -> TypeIs[_agent_graph.ModelRequestNode[T, S]]: """Check if the node is a `ModelRequestNode`, narrowing the type if it is. This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`. """ return isinstance(node, _agent_graph.ModelRequestNode) @staticmethod def is_call_tools_node( node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]], ) -> TypeIs[_agent_graph.CallToolsNode[T, S]]: """Check if the node is a `CallToolsNode`, narrowing the type if it is. This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`. """ return isinstance(node, _agent_graph.CallToolsNode) @staticmethod def is_user_prompt_node( node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]], ) -> TypeIs[_agent_graph.UserPromptNode[T, S]]: """Check if the node is a `UserPromptNode`, narrowing the type if it is. This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`. """ return isinstance(node, _agent_graph.UserPromptNode) @staticmethod def is_end_node( node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]], ) -> TypeIs[End[result.FinalResult[S]]]: """Check if the node is a `End`, narrowing the type if it is. This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`. """ return isinstance(node, End) @abstractmethod async def __aenter__(self) -> AbstractAgent[AgentDepsT, OutputDataT]: raise NotImplementedError @abstractmethod async def __aexit__(self, *args: Any) -> bool | None: raise NotImplementedError def to_ag_ui( self, *, # Agent.iter parameters output_type: OutputSpec[OutputDataT] | None = None, model: models.Model | models.KnownModelName | str | None = None, deps: AgentDepsT = None, model_settings: ModelSettings | None = None, usage_limits: UsageLimits | None = None, usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, # Starlette debug: bool = False, routes: Sequence[BaseRoute] | None = None, middleware: Sequence[Middleware] | None = None, exception_handlers: Mapping[Any, ExceptionHandler] | None = None, on_startup: Sequence[Callable[[], Any]] | None = None, on_shutdown: Sequence[Callable[[], Any]] | None = None, lifespan: Lifespan[AGUIApp[AgentDepsT, OutputDataT]] | None = None, ) -> AGUIApp[AgentDepsT, OutputDataT]: """Returns an ASGI application that handles every AG-UI request by running the agent. Note that the `deps` will be the same for each request, with the exception of the AG-UI state that's injected into the `state` field of a `deps` object that implements the [`StateHandler`][pydantic_ai.ag_ui.StateHandler] protocol. To provide different `deps` for each request (e.g. based on the authenticated user), use [`pydantic_ai.ag_ui.run_ag_ui`][pydantic_ai.ag_ui.run_ag_ui] or [`pydantic_ai.ag_ui.handle_ag_ui_request`][pydantic_ai.ag_ui.handle_ag_ui_request] instead. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') app = agent.to_ag_ui() ``` The `app` is an ASGI application that can be used with any ASGI server. To run the application, you can use the following command: ```bash uvicorn app:app --host 0.0.0.0 --port 8000 ``` See [AG-UI docs](../ag-ui.md) for more information. Args: output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type. model: Optional model to use for this run, required if `model` was not set when creating the agent. deps: Optional dependencies to use for this run. model_settings: Optional settings to use for this model's request. usage_limits: Optional limits on model request count or token usage. usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. debug: Boolean indicating if debug tracebacks should be returned on errors. routes: A list of routes to serve incoming HTTP and WebSocket requests. middleware: A list of middleware to run for every request. A starlette application will always automatically include two middleware classes. `ServerErrorMiddleware` is added as the very outermost middleware, to handle any uncaught errors occurring anywhere in the entire stack. `ExceptionMiddleware` is added as the very innermost middleware, to deal with handled exception cases occurring in the routing or endpoints. exception_handlers: A mapping of either integer status codes, or exception class types onto callables which handle the exceptions. Exception handler callables should be of the form `handler(request, exc) -> response` and may be either standard functions, or async functions. on_startup: A list of callables to run on application startup. Startup handler callables do not take any arguments, and may be either standard functions, or async functions. on_shutdown: A list of callables to run on application shutdown. Shutdown handler callables do not take any arguments, and may be either standard functions, or async functions. lifespan: A lifespan context function, which can be used to perform startup and shutdown tasks. This is a newer style that replaces the `on_startup` and `on_shutdown` handlers. Use one or the other, not both. Returns: An ASGI application for running Pydantic AI agents with AG-UI protocol support. """ from ..ag_ui import AGUIApp return AGUIApp( agent=self, # Agent.iter parameters output_type=output_type, model=model, deps=deps, model_settings=model_settings, usage_limits=usage_limits, usage=usage, infer_name=infer_name, toolsets=toolsets, # Starlette debug=debug, routes=routes, middleware=middleware, exception_handlers=exception_handlers, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan, ) def to_a2a( self, *, storage: Storage | None = None, broker: Broker | None = None, # Agent card name: str | None = None, url: str = 'http://localhost:8000', version: str = '1.0.0', description: str | None = None, provider: AgentProvider | None = None, skills: list[Skill] | None = None, # Starlette debug: bool = False, routes: Sequence[Route] | None = None, middleware: Sequence[Middleware] | None = None, exception_handlers: dict[Any, ExceptionHandler] | None = None, lifespan: Lifespan[FastA2A] | None = None, ) -> FastA2A: """Convert the agent to a FastA2A application. Example: ```python from pydantic_ai import Agent agent = Agent('openai:gpt-4o') app = agent.to_a2a() ``` The `app` is an ASGI application that can be used with any ASGI server. To run the application, you can use the following command: ```bash uvicorn app:app --host 0.0.0.0 --port 8000 ``` """ from .._a2a import agent_to_a2a return agent_to_a2a( self, storage=storage, broker=broker, name=name, url=url, version=version, description=description, provider=provider, skills=skills, debug=debug, routes=routes, middleware=middleware, exception_handlers=exception_handlers, lifespan=lifespan, ) async def to_cli( self: Self, deps: AgentDepsT = None, prog_name: str = 'pydantic-ai', message_history: Sequence[_messages.ModelMessage] | None = None, ) -> None: """Run the agent in a CLI chat interface. Args: deps: The dependencies to pass to the agent. prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'. message_history: History of the conversation so far. Example: ```python {title="agent_to_cli.py" test="skip"} from pydantic_ai import Agent agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.') async def main(): await agent.to_cli() ``` """ from rich.console import Console from pydantic_ai._cli import run_chat await run_chat( stream=True, agent=self, deps=deps, console=Console(), code_theme='monokai', prog_name=prog_name, message_history=message_history, ) def to_cli_sync( self: Self, deps: AgentDepsT = None, prog_name: str = 'pydantic-ai', message_history: Sequence[_messages.ModelMessage] | None = None, ) -> None: """Run the agent in a CLI chat interface with the non-async interface. Args: deps: The dependencies to pass to the agent. prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'. message_history: History of the conversation so far. ```python {title="agent_to_cli_sync.py" test="skip"} from pydantic_ai import Agent agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.') agent.to_cli_sync() agent.to_cli_sync(prog_name='assistant') ``` """ return get_event_loop().run_until_complete( self.to_cli(deps=deps, prog_name=prog_name, message_history=message_history) )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pydantic/pydantic-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server