Skip to main content
Glama
pydantic

mcp-run-python

Official
by pydantic
anthropic.py68.2 kB
from __future__ import annotations as _annotations import io from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass, field, replace from datetime import datetime from typing import Any, Literal, cast, overload from pydantic import TypeAdapter from typing_extensions import assert_never from .. import ModelHTTPError, UnexpectedModelBehavior, _utils, usage from .._run_context import RunContext from .._utils import guard_tool_call_id as _guard_tool_call_id from ..builtin_tools import CodeExecutionTool, MCPServerTool, MemoryTool, WebFetchTool, WebSearchTool from ..exceptions import ModelAPIError, UserError from ..messages import ( BinaryContent, BuiltinToolCallPart, BuiltinToolReturnPart, CachePoint, DocumentUrl, FilePart, FinishReason, ImageUrl, ModelMessage, ModelRequest, ModelResponse, ModelResponsePart, ModelResponseStreamEvent, RetryPromptPart, SystemPromptPart, TextPart, ThinkingPart, ToolCallPart, ToolReturnPart, UserPromptPart, ) from ..profiles import ModelProfileSpec from ..providers import Provider, infer_provider from ..providers.anthropic import AsyncAnthropicClient from ..settings import ModelSettings, merge_model_settings from ..tools import ToolDefinition from . import Model, ModelRequestParameters, StreamedResponse, check_allow_model_requests, download_item, get_user_agent _FINISH_REASON_MAP: dict[BetaStopReason, FinishReason] = { 'end_turn': 'stop', 'max_tokens': 'length', 'stop_sequence': 'stop', 'tool_use': 'tool_call', 'pause_turn': 'stop', 'refusal': 'content_filter', } try: from anthropic import ( NOT_GIVEN, APIConnectionError, APIStatusError, AsyncAnthropicBedrock, AsyncStream, omit as OMIT, ) from anthropic.types.beta import ( BetaBase64PDFBlockParam, BetaBase64PDFSourceParam, BetaCacheControlEphemeralParam, BetaCitationsConfigParam, BetaCitationsDelta, BetaCodeExecutionTool20250522Param, BetaCodeExecutionToolResultBlock, BetaCodeExecutionToolResultBlockContent, BetaCodeExecutionToolResultBlockParam, BetaCodeExecutionToolResultBlockParamContentParam, BetaContainerParams, BetaContentBlock, BetaContentBlockParam, BetaImageBlockParam, BetaInputJSONDelta, BetaJSONOutputFormatParam, BetaMCPToolResultBlock, BetaMCPToolUseBlock, BetaMCPToolUseBlockParam, BetaMemoryTool20250818Param, BetaMessage, BetaMessageParam, BetaMessageTokensCount, BetaMetadataParam, BetaPlainTextSourceParam, BetaRawContentBlockDeltaEvent, BetaRawContentBlockStartEvent, BetaRawContentBlockStopEvent, BetaRawMessageDeltaEvent, BetaRawMessageStartEvent, BetaRawMessageStopEvent, BetaRawMessageStreamEvent, BetaRedactedThinkingBlock, BetaRedactedThinkingBlockParam, BetaRequestMCPServerToolConfigurationParam, BetaRequestMCPServerURLDefinitionParam, BetaServerToolUseBlock, BetaServerToolUseBlockParam, BetaSignatureDelta, BetaStopReason, BetaTextBlock, BetaTextBlockParam, BetaTextDelta, BetaThinkingBlock, BetaThinkingBlockParam, BetaThinkingConfigParam, BetaThinkingDelta, BetaToolChoiceParam, BetaToolParam, BetaToolResultBlockParam, BetaToolUnionParam, BetaToolUseBlock, BetaToolUseBlockParam, BetaWebFetchTool20250910Param, BetaWebFetchToolResultBlock, BetaWebFetchToolResultBlockParam, BetaWebSearchTool20250305Param, BetaWebSearchToolResultBlock, BetaWebSearchToolResultBlockContent, BetaWebSearchToolResultBlockParam, BetaWebSearchToolResultBlockParamContentParam, ) from anthropic.types.beta.beta_web_fetch_tool_result_block_param import ( Content as WebFetchToolResultBlockParamContent, ) from anthropic.types.beta.beta_web_search_tool_20250305_param import UserLocation from anthropic.types.model_param import ModelParam except ImportError as _import_error: raise ImportError( 'Please install `anthropic` to use the Anthropic model, ' 'you can use the `anthropic` optional group — `pip install "pydantic-ai-slim[anthropic]"`' ) from _import_error LatestAnthropicModelNames = ModelParam """Latest Anthropic models.""" AnthropicModelName = str | LatestAnthropicModelNames """Possible Anthropic model names. Since Anthropic supports a variety of date-stamped models, we explicitly list the latest models but allow any name in the type hints. See [the Anthropic docs](https://docs.anthropic.com/en/docs/about-claude/models) for a full list. """ class AnthropicModelSettings(ModelSettings, total=False): """Settings used for an Anthropic model request.""" # ALL FIELDS MUST BE `anthropic_` PREFIXED SO YOU CAN MERGE THEM WITH OTHER MODELS. anthropic_metadata: BetaMetadataParam """An object describing metadata about the request. Contains `user_id`, an external identifier for the user who is associated with the request. """ anthropic_thinking: BetaThinkingConfigParam """Determine whether the model should generate a thinking block. See [the Anthropic docs](https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking) for more information. """ anthropic_cache_tool_definitions: bool | Literal['5m', '1h'] """Whether to add `cache_control` to the last tool definition. When enabled, the last tool in the `tools` array will have `cache_control` set, allowing Anthropic to cache tool definitions and reduce costs. If `True`, uses TTL='5m'. You can also specify '5m' or '1h' directly. TTL is automatically omitted for Bedrock, as it does not support explicit TTL. See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching for more information. """ anthropic_cache_instructions: bool | Literal['5m', '1h'] """Whether to add `cache_control` to the last system prompt block. When enabled, the last system prompt will have `cache_control` set, allowing Anthropic to cache system instructions and reduce costs. If `True`, uses TTL='5m'. You can also specify '5m' or '1h' directly. TTL is automatically omitted for Bedrock, as it does not support explicit TTL. See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching for more information. """ anthropic_cache_messages: bool | Literal['5m', '1h'] """Convenience setting to enable caching for the last user message. When enabled, this automatically adds a cache point to the last content block in the final user message, which is useful for caching conversation history or context in multi-turn conversations. If `True`, uses TTL='5m'. You can also specify '5m' or '1h' directly. TTL is automatically omitted for Bedrock, as it does not support explicit TTL. Note: Uses 1 of Anthropic's 4 available cache points per request. Any additional CachePoint markers in messages will be automatically limited to respect the 4-cache-point maximum. See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching for more information. """ anthropic_container: BetaContainerParams | Literal[False] """Container configuration for multi-turn conversations. By default, if previous messages contain a container_id (from a prior response), it will be reused automatically. Set to `False` to force a fresh container (ignore any `container_id` from history). Set to a dict (e.g. `{'id': 'container_xxx'}`) to explicitly specify a container. """ @dataclass(init=False) class AnthropicModel(Model): """A model that uses the Anthropic API. Internally, this uses the [Anthropic Python client](https://github.com/anthropics/anthropic-sdk-python) to interact with the API. Apart from `__init__`, all methods are private or match those of the base class. """ client: AsyncAnthropicClient = field(repr=False) _model_name: AnthropicModelName = field(repr=False) _provider: Provider[AsyncAnthropicClient] = field(repr=False) def __init__( self, model_name: AnthropicModelName, *, provider: Literal['anthropic', 'gateway'] | Provider[AsyncAnthropicClient] = 'anthropic', profile: ModelProfileSpec | None = None, settings: ModelSettings | None = None, ): """Initialize an Anthropic model. Args: model_name: The name of the Anthropic model to use. List of model names available [here](https://docs.anthropic.com/en/docs/about-claude/models). provider: The provider to use for the Anthropic API. Can be either the string 'anthropic' or an instance of `Provider[AsyncAnthropicClient]`. Defaults to 'anthropic'. profile: The model profile to use. Defaults to a profile picked by the provider based on the model name. The default 'anthropic' provider will use the default `..profiles.anthropic.anthropic_model_profile`. settings: Default model settings for this model instance. """ self._model_name = model_name if isinstance(provider, str): provider = infer_provider('gateway/anthropic' if provider == 'gateway' else provider) self._provider = provider self.client = provider.client super().__init__(settings=settings, profile=profile or provider.model_profile) @property def base_url(self) -> str: return str(self.client.base_url) @property def model_name(self) -> AnthropicModelName: """The model name.""" return self._model_name @property def system(self) -> str: """The model provider.""" return self._provider.name async def request( self, messages: list[ModelMessage], model_settings: ModelSettings | None, model_request_parameters: ModelRequestParameters, ) -> ModelResponse: check_allow_model_requests() model_settings, model_request_parameters = self.prepare_request( model_settings, model_request_parameters, ) response = await self._messages_create( messages, False, cast(AnthropicModelSettings, model_settings or {}), model_request_parameters ) model_response = self._process_response(response) return model_response async def count_tokens( self, messages: list[ModelMessage], model_settings: ModelSettings | None, model_request_parameters: ModelRequestParameters, ) -> usage.RequestUsage: model_settings, model_request_parameters = self.prepare_request( model_settings, model_request_parameters, ) response = await self._messages_count_tokens( messages, cast(AnthropicModelSettings, model_settings or {}), model_request_parameters ) return usage.RequestUsage(input_tokens=response.input_tokens) @asynccontextmanager async def request_stream( self, messages: list[ModelMessage], model_settings: ModelSettings | None, model_request_parameters: ModelRequestParameters, run_context: RunContext[Any] | None = None, ) -> AsyncIterator[StreamedResponse]: check_allow_model_requests() model_settings, model_request_parameters = self.prepare_request( model_settings, model_request_parameters, ) response = await self._messages_create( messages, True, cast(AnthropicModelSettings, model_settings or {}), model_request_parameters ) async with response: yield await self._process_streamed_response(response, model_request_parameters) def prepare_request( self, model_settings: ModelSettings | None, model_request_parameters: ModelRequestParameters ) -> tuple[ModelSettings | None, ModelRequestParameters]: settings = merge_model_settings(self.settings, model_settings) if ( model_request_parameters.output_tools and settings and (thinking := settings.get('anthropic_thinking')) and thinking.get('type') == 'enabled' ): if model_request_parameters.output_mode == 'auto': output_mode = 'native' if self.profile.supports_json_schema_output else 'prompted' model_request_parameters = replace(model_request_parameters, output_mode=output_mode) elif ( model_request_parameters.output_mode == 'tool' and not model_request_parameters.allow_text_output ): # pragma: no branch # This would result in `tool_choice=required`, which Anthropic does not support with thinking. suggested_output_type = 'NativeOutput' if self.profile.supports_json_schema_output else 'PromptedOutput' raise UserError( f'Anthropic does not support thinking and output tools at the same time. Use `output_type={suggested_output_type}(...)` instead.' ) if model_request_parameters.output_mode == 'native': assert model_request_parameters.output_object is not None if model_request_parameters.output_object.strict is False: raise UserError( 'Setting `strict=False` on `output_type=NativeOutput(...)` is not allowed for Anthropic models.' ) model_request_parameters = replace( model_request_parameters, output_object=replace(model_request_parameters.output_object, strict=True) ) return super().prepare_request(model_settings, model_request_parameters) @overload async def _messages_create( self, messages: list[ModelMessage], stream: Literal[True], model_settings: AnthropicModelSettings, model_request_parameters: ModelRequestParameters, ) -> AsyncStream[BetaRawMessageStreamEvent]: pass @overload async def _messages_create( self, messages: list[ModelMessage], stream: Literal[False], model_settings: AnthropicModelSettings, model_request_parameters: ModelRequestParameters, ) -> BetaMessage: pass async def _messages_create( self, messages: list[ModelMessage], stream: bool, model_settings: AnthropicModelSettings, model_request_parameters: ModelRequestParameters, ) -> BetaMessage | AsyncStream[BetaRawMessageStreamEvent]: """Calls the Anthropic API to create a message. This is the last step before sending the request to the API. Most preprocessing has happened in `prepare_request()`. """ tools = self._get_tools(model_request_parameters, model_settings) tools, mcp_servers, builtin_tool_betas = self._add_builtin_tools(tools, model_request_parameters) tool_choice = self._infer_tool_choice(tools, model_settings, model_request_parameters) system_prompt, anthropic_messages = await self._map_message(messages, model_request_parameters, model_settings) self._limit_cache_points(system_prompt, anthropic_messages, tools) output_format = self._native_output_format(model_request_parameters) betas, extra_headers = self._get_betas_and_extra_headers(tools, model_request_parameters, model_settings) betas.update(builtin_tool_betas) container = self._get_container(messages, model_settings) try: return await self.client.beta.messages.create( max_tokens=model_settings.get('max_tokens', 4096), system=system_prompt or OMIT, messages=anthropic_messages, model=self._model_name, tools=tools or OMIT, tool_choice=tool_choice or OMIT, mcp_servers=mcp_servers or OMIT, output_format=output_format or OMIT, betas=sorted(betas) or OMIT, stream=stream, thinking=model_settings.get('anthropic_thinking', OMIT), stop_sequences=model_settings.get('stop_sequences', OMIT), temperature=model_settings.get('temperature', OMIT), top_p=model_settings.get('top_p', OMIT), timeout=model_settings.get('timeout', NOT_GIVEN), metadata=model_settings.get('anthropic_metadata', OMIT), container=container or OMIT, extra_headers=extra_headers, extra_body=model_settings.get('extra_body'), ) except APIStatusError as e: if (status_code := e.status_code) >= 400: raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e raise ModelAPIError(model_name=self.model_name, message=e.message) from e # pragma: lax no cover except APIConnectionError as e: raise ModelAPIError(model_name=self.model_name, message=e.message) from e def _get_betas_and_extra_headers( self, tools: list[BetaToolUnionParam], model_request_parameters: ModelRequestParameters, model_settings: AnthropicModelSettings, ) -> tuple[set[str], dict[str, str]]: """Prepare beta features list and extra headers for API request. Handles merging custom `anthropic-beta` header from `extra_headers` into betas set and ensuring `User-Agent` is set. """ extra_headers = dict(model_settings.get('extra_headers', {})) extra_headers.setdefault('User-Agent', get_user_agent()) betas: set[str] = set() has_strict_tools = any(tool.get('strict') for tool in tools) if has_strict_tools or model_request_parameters.output_mode == 'native': betas.add('structured-outputs-2025-11-13') if beta_header := extra_headers.pop('anthropic-beta', None): betas.update({stripped_beta for beta in beta_header.split(',') if (stripped_beta := beta.strip())}) return betas, extra_headers def _get_container( self, messages: list[ModelMessage], model_settings: AnthropicModelSettings ) -> BetaContainerParams | None: """Get container config for the API request.""" if (container := model_settings.get('anthropic_container')) is not None: return None if container is False else container for m in reversed(messages): if isinstance(m, ModelResponse) and m.provider_name == self.system and m.provider_details: if cid := m.provider_details.get('container_id'): return BetaContainerParams(id=cid) return None async def _messages_count_tokens( self, messages: list[ModelMessage], model_settings: AnthropicModelSettings, model_request_parameters: ModelRequestParameters, ) -> BetaMessageTokensCount: if isinstance(self.client, AsyncAnthropicBedrock): raise UserError('AsyncAnthropicBedrock client does not support `count_tokens` api.') # standalone function to make it easier to override tools = self._get_tools(model_request_parameters, model_settings) tools, mcp_servers, builtin_tool_betas = self._add_builtin_tools(tools, model_request_parameters) tool_choice = self._infer_tool_choice(tools, model_settings, model_request_parameters) system_prompt, anthropic_messages = await self._map_message(messages, model_request_parameters, model_settings) self._limit_cache_points(system_prompt, anthropic_messages, tools) output_format = self._native_output_format(model_request_parameters) betas, extra_headers = self._get_betas_and_extra_headers(tools, model_request_parameters, model_settings) betas.update(builtin_tool_betas) try: return await self.client.beta.messages.count_tokens( system=system_prompt or OMIT, messages=anthropic_messages, model=self._model_name, tools=tools or OMIT, tool_choice=tool_choice or OMIT, mcp_servers=mcp_servers or OMIT, betas=sorted(betas) or OMIT, output_format=output_format or OMIT, thinking=model_settings.get('anthropic_thinking', OMIT), timeout=model_settings.get('timeout', NOT_GIVEN), extra_headers=extra_headers, extra_body=model_settings.get('extra_body'), ) except APIStatusError as e: if (status_code := e.status_code) >= 400: raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e raise ModelAPIError(model_name=self.model_name, message=e.message) from e # pragma: lax no cover except APIConnectionError as e: raise ModelAPIError(model_name=self.model_name, message=e.message) from e def _process_response(self, response: BetaMessage) -> ModelResponse: """Process a non-streamed response, and prepare a message to return.""" items: list[ModelResponsePart] = [] builtin_tool_calls: dict[str, BuiltinToolCallPart] = {} for item in response.content: if isinstance(item, BetaTextBlock): items.append(TextPart(content=item.text)) elif isinstance(item, BetaServerToolUseBlock): call_part = _map_server_tool_use_block(item, self.system) builtin_tool_calls[call_part.tool_call_id] = call_part items.append(call_part) elif isinstance(item, BetaWebSearchToolResultBlock): items.append(_map_web_search_tool_result_block(item, self.system)) elif isinstance(item, BetaCodeExecutionToolResultBlock): items.append(_map_code_execution_tool_result_block(item, self.system)) elif isinstance(item, BetaWebFetchToolResultBlock): items.append(_map_web_fetch_tool_result_block(item, self.system)) elif isinstance(item, BetaRedactedThinkingBlock): items.append( ThinkingPart(id='redacted_thinking', content='', signature=item.data, provider_name=self.system) ) elif isinstance(item, BetaThinkingBlock): items.append(ThinkingPart(content=item.thinking, signature=item.signature, provider_name=self.system)) elif isinstance(item, BetaMCPToolUseBlock): call_part = _map_mcp_server_use_block(item, self.system) builtin_tool_calls[call_part.tool_call_id] = call_part items.append(call_part) elif isinstance(item, BetaMCPToolResultBlock): call_part = builtin_tool_calls.get(item.tool_use_id) items.append(_map_mcp_server_result_block(item, call_part, self.system)) else: assert isinstance(item, BetaToolUseBlock), f'unexpected item type {type(item)}' items.append( ToolCallPart( tool_name=item.name, args=cast(dict[str, Any], item.input), tool_call_id=item.id, ) ) finish_reason: FinishReason | None = None provider_details: dict[str, Any] | None = None if raw_finish_reason := response.stop_reason: # pragma: no branch provider_details = {'finish_reason': raw_finish_reason} finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) if response.container: provider_details = provider_details or {} provider_details['container_id'] = response.container.id return ModelResponse( parts=items, usage=_map_usage(response, self._provider.name, self._provider.base_url, self._model_name), model_name=response.model, provider_response_id=response.id, provider_name=self._provider.name, provider_url=self._provider.base_url, finish_reason=finish_reason, provider_details=provider_details, ) async def _process_streamed_response( self, response: AsyncStream[BetaRawMessageStreamEvent], model_request_parameters: ModelRequestParameters ) -> StreamedResponse: peekable_response = _utils.PeekableAsyncStream(response) first_chunk = await peekable_response.peek() if isinstance(first_chunk, _utils.Unset): raise UnexpectedModelBehavior('Streamed response ended without content or tool calls') # pragma: no cover assert isinstance(first_chunk, BetaRawMessageStartEvent) return AnthropicStreamedResponse( model_request_parameters=model_request_parameters, _model_name=first_chunk.message.model, _response=peekable_response, _timestamp=_utils.now_utc(), _provider_name=self._provider.name, _provider_url=self._provider.base_url, ) def _get_tools( self, model_request_parameters: ModelRequestParameters, model_settings: AnthropicModelSettings ) -> list[BetaToolUnionParam]: tools: list[BetaToolUnionParam] = [ self._map_tool_definition(r) for r in model_request_parameters.tool_defs.values() ] # Add cache_control to the last tool if enabled if tools and (cache_tool_defs := model_settings.get('anthropic_cache_tool_definitions')): # If True, use '5m'; otherwise use the specified ttl value ttl: Literal['5m', '1h'] = '5m' if cache_tool_defs is True else cache_tool_defs last_tool = tools[-1] last_tool['cache_control'] = self._build_cache_control(ttl) return tools def _add_builtin_tools( self, tools: list[BetaToolUnionParam], model_request_parameters: ModelRequestParameters ) -> tuple[list[BetaToolUnionParam], list[BetaRequestMCPServerURLDefinitionParam], set[str]]: beta_features: set[str] = set() mcp_servers: list[BetaRequestMCPServerURLDefinitionParam] = [] for tool in model_request_parameters.builtin_tools: if isinstance(tool, WebSearchTool): user_location = UserLocation(type='approximate', **tool.user_location) if tool.user_location else None tools.append( BetaWebSearchTool20250305Param( name='web_search', type='web_search_20250305', max_uses=tool.max_uses, allowed_domains=tool.allowed_domains, blocked_domains=tool.blocked_domains, user_location=user_location, ) ) elif isinstance(tool, CodeExecutionTool): # pragma: no branch tools.append(BetaCodeExecutionTool20250522Param(name='code_execution', type='code_execution_20250522')) beta_features.add('code-execution-2025-05-22') elif isinstance(tool, WebFetchTool): # pragma: no branch citations = BetaCitationsConfigParam(enabled=tool.enable_citations) if tool.enable_citations else None tools.append( BetaWebFetchTool20250910Param( name='web_fetch', type='web_fetch_20250910', max_uses=tool.max_uses, allowed_domains=tool.allowed_domains, blocked_domains=tool.blocked_domains, citations=citations, max_content_tokens=tool.max_content_tokens, ) ) beta_features.add('web-fetch-2025-09-10') elif isinstance(tool, MemoryTool): # pragma: no branch if 'memory' not in model_request_parameters.tool_defs: raise UserError("Built-in `MemoryTool` requires a 'memory' tool to be defined.") # Replace the memory tool definition with the built-in memory tool tools = [tool for tool in tools if tool.get('name') != 'memory'] tools.append(BetaMemoryTool20250818Param(name='memory', type='memory_20250818')) beta_features.add('context-management-2025-06-27') elif isinstance(tool, MCPServerTool) and tool.url: mcp_server_url_definition_param = BetaRequestMCPServerURLDefinitionParam( type='url', name=tool.id, url=tool.url, ) if tool.allowed_tools is not None: # pragma: no branch mcp_server_url_definition_param['tool_configuration'] = BetaRequestMCPServerToolConfigurationParam( enabled=bool(tool.allowed_tools), allowed_tools=tool.allowed_tools, ) if tool.authorization_token: # pragma: no cover mcp_server_url_definition_param['authorization_token'] = tool.authorization_token mcp_servers.append(mcp_server_url_definition_param) beta_features.add('mcp-client-2025-04-04') else: # pragma: no cover raise UserError( f'`{tool.__class__.__name__}` is not supported by `AnthropicModel`. If it should be, please file an issue.' ) return tools, mcp_servers, beta_features def _infer_tool_choice( self, tools: list[BetaToolUnionParam], model_settings: AnthropicModelSettings, model_request_parameters: ModelRequestParameters, ) -> BetaToolChoiceParam | None: if not tools: return None else: tool_choice: BetaToolChoiceParam if not model_request_parameters.allow_text_output: tool_choice = {'type': 'any'} else: tool_choice = {'type': 'auto'} if 'parallel_tool_calls' in model_settings: tool_choice['disable_parallel_tool_use'] = not model_settings['parallel_tool_calls'] return tool_choice async def _map_message( # noqa: C901 self, messages: list[ModelMessage], model_request_parameters: ModelRequestParameters, model_settings: AnthropicModelSettings, ) -> tuple[str | list[BetaTextBlockParam], list[BetaMessageParam]]: """Just maps a `pydantic_ai.Message` to a `anthropic.types.MessageParam`.""" system_prompt_parts: list[str] = [] anthropic_messages: list[BetaMessageParam] = [] for m in messages: if isinstance(m, ModelRequest): user_content_params: list[BetaContentBlockParam] = [] for request_part in m.parts: if isinstance(request_part, SystemPromptPart): system_prompt_parts.append(request_part.content) elif isinstance(request_part, UserPromptPart): async for content in self._map_user_prompt(request_part): if isinstance(content, CachePoint): self._add_cache_control_to_last_param(user_content_params, ttl=content.ttl) else: user_content_params.append(content) elif isinstance(request_part, ToolReturnPart): tool_result_block_param = BetaToolResultBlockParam( tool_use_id=_guard_tool_call_id(t=request_part), type='tool_result', content=request_part.model_response_str(), is_error=False, ) user_content_params.append(tool_result_block_param) elif isinstance(request_part, RetryPromptPart): # pragma: no branch if request_part.tool_name is None: text = request_part.model_response() # pragma: no cover retry_param = BetaTextBlockParam(type='text', text=text) # pragma: no cover else: retry_param = BetaToolResultBlockParam( tool_use_id=_guard_tool_call_id(t=request_part), type='tool_result', content=request_part.model_response(), is_error=True, ) user_content_params.append(retry_param) if len(user_content_params) > 0: anthropic_messages.append(BetaMessageParam(role='user', content=user_content_params)) elif isinstance(m, ModelResponse): assistant_content_params: list[ BetaTextBlockParam | BetaToolUseBlockParam | BetaServerToolUseBlockParam | BetaWebSearchToolResultBlockParam | BetaCodeExecutionToolResultBlockParam | BetaWebFetchToolResultBlockParam | BetaThinkingBlockParam | BetaRedactedThinkingBlockParam | BetaMCPToolUseBlockParam | BetaMCPToolResultBlock ] = [] for response_part in m.parts: if isinstance(response_part, TextPart): if response_part.content: assistant_content_params.append(BetaTextBlockParam(text=response_part.content, type='text')) elif isinstance(response_part, ToolCallPart): tool_use_block_param = BetaToolUseBlockParam( id=_guard_tool_call_id(t=response_part), type='tool_use', name=response_part.tool_name, input=response_part.args_as_dict(), ) assistant_content_params.append(tool_use_block_param) elif isinstance(response_part, ThinkingPart): if ( response_part.provider_name == self.system and response_part.signature is not None ): # pragma: no branch if response_part.id == 'redacted_thinking': assistant_content_params.append( BetaRedactedThinkingBlockParam( data=response_part.signature, type='redacted_thinking', ) ) else: assistant_content_params.append( BetaThinkingBlockParam( thinking=response_part.content, signature=response_part.signature, type='thinking', ) ) elif response_part.content: # pragma: no branch start_tag, end_tag = self.profile.thinking_tags assistant_content_params.append( BetaTextBlockParam( text='\n'.join([start_tag, response_part.content, end_tag]), type='text' ) ) elif isinstance(response_part, BuiltinToolCallPart): if response_part.provider_name == self.system: tool_use_id = _guard_tool_call_id(t=response_part) if response_part.tool_name == WebSearchTool.kind: server_tool_use_block_param = BetaServerToolUseBlockParam( id=tool_use_id, type='server_tool_use', name='web_search', input=response_part.args_as_dict(), ) assistant_content_params.append(server_tool_use_block_param) elif response_part.tool_name == CodeExecutionTool.kind: server_tool_use_block_param = BetaServerToolUseBlockParam( id=tool_use_id, type='server_tool_use', name='code_execution', input=response_part.args_as_dict(), ) assistant_content_params.append(server_tool_use_block_param) elif response_part.tool_name == WebFetchTool.kind: server_tool_use_block_param = BetaServerToolUseBlockParam( id=tool_use_id, type='server_tool_use', name='web_fetch', input=response_part.args_as_dict(), ) assistant_content_params.append(server_tool_use_block_param) elif ( response_part.tool_name.startswith(MCPServerTool.kind) and (server_id := response_part.tool_name.split(':', 1)[1]) and (args := response_part.args_as_dict()) and (tool_name := args.get('tool_name')) and (tool_args := args.get('tool_args')) ): # pragma: no branch mcp_tool_use_block_param = BetaMCPToolUseBlockParam( id=tool_use_id, type='mcp_tool_use', server_name=server_id, name=tool_name, input=tool_args, ) assistant_content_params.append(mcp_tool_use_block_param) elif isinstance(response_part, BuiltinToolReturnPart): if response_part.provider_name == self.system: tool_use_id = _guard_tool_call_id(t=response_part) if response_part.tool_name in ( WebSearchTool.kind, 'web_search_tool_result', # Backward compatibility ) and isinstance(response_part.content, dict | list): assistant_content_params.append( BetaWebSearchToolResultBlockParam( tool_use_id=tool_use_id, type='web_search_tool_result', content=cast( BetaWebSearchToolResultBlockParamContentParam, response_part.content, # pyright: ignore[reportUnknownMemberType] ), ) ) elif response_part.tool_name in ( # pragma: no branch CodeExecutionTool.kind, 'code_execution_tool_result', # Backward compatibility ) and isinstance(response_part.content, dict): assistant_content_params.append( BetaCodeExecutionToolResultBlockParam( tool_use_id=tool_use_id, type='code_execution_tool_result', content=cast( BetaCodeExecutionToolResultBlockParamContentParam, response_part.content, # pyright: ignore[reportUnknownMemberType] ), ) ) elif response_part.tool_name == WebFetchTool.kind and isinstance( response_part.content, dict ): assistant_content_params.append( BetaWebFetchToolResultBlockParam( tool_use_id=tool_use_id, type='web_fetch_tool_result', content=cast( WebFetchToolResultBlockParamContent, response_part.content, # pyright: ignore[reportUnknownMemberType] ), ) ) elif response_part.tool_name.startswith(MCPServerTool.kind) and isinstance( response_part.content, dict ): # pragma: no branch assistant_content_params.append( BetaMCPToolResultBlock( tool_use_id=tool_use_id, type='mcp_tool_result', **cast(dict[str, Any], response_part.content), # pyright: ignore[reportUnknownMemberType] ) ) elif isinstance(response_part, FilePart): # pragma: no cover # Files generated by models are not sent back to models that don't themselves generate files. pass else: assert_never(response_part) if len(assistant_content_params) > 0: anthropic_messages.append(BetaMessageParam(role='assistant', content=assistant_content_params)) else: assert_never(m) if instructions := self._get_instructions(messages, model_request_parameters): system_prompt_parts.insert(0, instructions) system_prompt = '\n\n'.join(system_prompt_parts) # Add cache_control to the last message content if anthropic_cache_messages is enabled if anthropic_messages and (cache_messages := model_settings.get('anthropic_cache_messages')): ttl: Literal['5m', '1h'] = '5m' if cache_messages is True else cache_messages m = anthropic_messages[-1] content = m['content'] if isinstance(content, str): # Convert string content to list format with cache_control m['content'] = [ # pragma: no cover BetaTextBlockParam( text=content, type='text', cache_control=self._build_cache_control(ttl), ) ] else: # Add cache_control to the last content block content = cast(list[BetaContentBlockParam], content) self._add_cache_control_to_last_param(content, ttl) # If anthropic_cache_instructions is enabled, return system prompt as a list with cache_control if system_prompt and (cache_instructions := model_settings.get('anthropic_cache_instructions')): # If True, use '5m'; otherwise use the specified ttl value ttl: Literal['5m', '1h'] = '5m' if cache_instructions is True else cache_instructions system_prompt_blocks = [ BetaTextBlockParam( type='text', text=system_prompt, cache_control=self._build_cache_control(ttl), ) ] return system_prompt_blocks, anthropic_messages return system_prompt, anthropic_messages @staticmethod def _limit_cache_points( system_prompt: str | list[BetaTextBlockParam], anthropic_messages: list[BetaMessageParam], tools: list[BetaToolUnionParam], ) -> None: """Limit the number of cache points in the request to Anthropic's maximum. Anthropic enforces a maximum of 4 cache points per request. This method ensures compliance by counting existing cache points and removing excess ones from messages. Strategy: 1. Count cache points in system_prompt (can be multiple if list of blocks) 2. Count cache points in tools (can be in any position, not just last) 3. Raise UserError if system + tools already exceed MAX_CACHE_POINTS 4. Calculate remaining budget for message cache points 5. Traverse messages from newest to oldest, keeping the most recent cache points within the remaining budget 6. Remove excess cache points from older messages to stay within limit Cache point priority (always preserved): - System prompt cache points - Tool definition cache points - Message cache points (newest first, oldest removed if needed) Raises: UserError: If system_prompt and tools combined already exceed MAX_CACHE_POINTS (4). This indicates a configuration error that cannot be auto-fixed. """ MAX_CACHE_POINTS = 4 # Count existing cache points in system prompt used_cache_points = ( sum(1 for block in system_prompt if 'cache_control' in cast(dict[str, Any], block)) if isinstance(system_prompt, list) else 0 ) # Count existing cache points in tools (any tool may have cache_control) # Note: cache_control can be in the middle of tools list if builtin tools are added after for tool in tools: if 'cache_control' in tool: used_cache_points += 1 # Calculate remaining cache points budget for messages remaining_budget = MAX_CACHE_POINTS - used_cache_points if remaining_budget < 0: # pragma: no cover raise UserError( f'Too many cache points for Anthropic request. ' f'System prompt and tool definitions already use {used_cache_points} cache points, ' f'which exceeds the maximum of {MAX_CACHE_POINTS}.' ) # Remove excess cache points from messages (newest to oldest) for message in reversed(anthropic_messages): content = message['content'] if isinstance(content, str): # pragma: no cover continue # Process content blocks in reverse order (newest first) for block in reversed(cast(list[BetaContentBlockParam], content)): block_dict = cast(dict[str, Any], block) if 'cache_control' in block_dict: if remaining_budget > 0: remaining_budget -= 1 else: # Exceeded limit, remove this cache point del block_dict['cache_control'] def _build_cache_control(self, ttl: Literal['5m', '1h'] = '5m') -> BetaCacheControlEphemeralParam: """Build cache control dict, automatically omitting TTL for Bedrock clients. Args: ttl: The cache time-to-live ('5m' or '1h'). Ignored for Bedrock clients. Returns: A cache control dict suitable for the current client type. """ if isinstance(self.client, AsyncAnthropicBedrock): # Bedrock doesn't support TTL, use cast to satisfy type checker return cast(BetaCacheControlEphemeralParam, {'type': 'ephemeral'}) return BetaCacheControlEphemeralParam(type='ephemeral', ttl=ttl) def _add_cache_control_to_last_param( self, params: list[BetaContentBlockParam], ttl: Literal['5m', '1h'] = '5m' ) -> None: """Add cache control to the last content block param. See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching for more information. Args: params: List of content block params to modify. ttl: The cache time-to-live ('5m' or '1h'). This is automatically ignored for Bedrock clients, which don't support explicit TTL parameters. """ if not params: raise UserError( 'CachePoint cannot be the first content in a user message - there must be previous content to attach the CachePoint to. ' 'To cache system instructions or tool definitions, use the `anthropic_cache_instructions` or `anthropic_cache_tool_definitions` settings instead.' ) # Only certain types support cache_control # See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching#what-can-be-cached cacheable_types = {'text', 'tool_use', 'server_tool_use', 'image', 'tool_result', 'document'} # Cast needed because BetaContentBlockParam is a union including response Block types (Pydantic models) # that don't support dict operations, even though at runtime we only have request Param types (TypedDicts). last_param = cast(dict[str, Any], params[-1]) if last_param['type'] not in cacheable_types: raise UserError(f'Cache control not supported for param type: {last_param["type"]}') # Add cache_control to the last param last_param['cache_control'] = self._build_cache_control(ttl) @staticmethod async def _map_user_prompt( part: UserPromptPart, ) -> AsyncGenerator[BetaContentBlockParam | CachePoint]: if isinstance(part.content, str): if part.content: # Only yield non-empty text yield BetaTextBlockParam(text=part.content, type='text') else: for item in part.content: if isinstance(item, str): if item: # Only yield non-empty text yield BetaTextBlockParam(text=item, type='text') elif isinstance(item, CachePoint): yield item elif isinstance(item, BinaryContent): if item.is_image: yield BetaImageBlockParam( source={'data': io.BytesIO(item.data), 'media_type': item.media_type, 'type': 'base64'}, # type: ignore type='image', ) elif item.media_type == 'application/pdf': yield BetaBase64PDFBlockParam( source=BetaBase64PDFSourceParam( data=io.BytesIO(item.data), media_type='application/pdf', type='base64', ), type='document', ) else: raise RuntimeError('Only images and PDFs are supported for binary content') elif isinstance(item, ImageUrl): yield BetaImageBlockParam(source={'type': 'url', 'url': item.url}, type='image') elif isinstance(item, DocumentUrl): if item.media_type == 'application/pdf': yield BetaBase64PDFBlockParam(source={'url': item.url, 'type': 'url'}, type='document') elif item.media_type == 'text/plain': downloaded_item = await download_item(item, data_format='text') yield BetaBase64PDFBlockParam( source=BetaPlainTextSourceParam( data=downloaded_item['data'], media_type=item.media_type, type='text' ), type='document', ) else: # pragma: no cover raise RuntimeError(f'Unsupported media type: {item.media_type}') else: raise RuntimeError(f'Unsupported content type: {type(item)}') # pragma: no cover def _map_tool_definition(self, f: ToolDefinition) -> BetaToolParam: """Maps a `ToolDefinition` dataclass to an Anthropic `BetaToolParam` dictionary.""" tool_param: BetaToolParam = { 'name': f.name, 'description': f.description or '', 'input_schema': f.parameters_json_schema, } if f.strict and self.profile.supports_json_schema_output: tool_param['strict'] = f.strict return tool_param @staticmethod def _native_output_format(model_request_parameters: ModelRequestParameters) -> BetaJSONOutputFormatParam | None: if model_request_parameters.output_mode != 'native': return None assert model_request_parameters.output_object is not None return {'type': 'json_schema', 'schema': model_request_parameters.output_object.json_schema} def _map_usage( message: BetaMessage | BetaRawMessageStartEvent | BetaRawMessageDeltaEvent, provider: str, provider_url: str, model: str, existing_usage: usage.RequestUsage | None = None, ) -> usage.RequestUsage: if isinstance(message, BetaMessage): response_usage = message.usage elif isinstance(message, BetaRawMessageStartEvent): response_usage = message.message.usage elif isinstance(message, BetaRawMessageDeltaEvent): response_usage = message.usage else: assert_never(message) # In streaming, usage appears in different events. # The values are cumulative, meaning new values should replace existing ones entirely. details: dict[str, int] = (existing_usage.details if existing_usage else {}) | { key: value for key, value in response_usage.model_dump().items() if isinstance(value, int) } # Note: genai-prices already extracts cache_creation_input_tokens and cache_read_input_tokens # from the Anthropic response and maps them to cache_write_tokens and cache_read_tokens return usage.RequestUsage.extract( dict(model=model, usage=details), provider=provider, provider_url=provider_url, provider_fallback='anthropic', details=details, ) @dataclass class AnthropicStreamedResponse(StreamedResponse): """Implementation of `StreamedResponse` for Anthropic models.""" _model_name: AnthropicModelName _response: AsyncIterable[BetaRawMessageStreamEvent] _timestamp: datetime _provider_name: str _provider_url: str async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901 current_block: BetaContentBlock | None = None builtin_tool_calls: dict[str, BuiltinToolCallPart] = {} async for event in self._response: if isinstance(event, BetaRawMessageStartEvent): self._usage = _map_usage(event, self._provider_name, self._provider_url, self._model_name) self.provider_response_id = event.message.id if event.message.container: self.provider_details = self.provider_details or {} self.provider_details['container_id'] = event.message.container.id elif isinstance(event, BetaRawContentBlockStartEvent): current_block = event.content_block if isinstance(current_block, BetaTextBlock) and current_block.text: for event_ in self._parts_manager.handle_text_delta( vendor_part_id=event.index, content=current_block.text ): yield event_ elif isinstance(current_block, BetaThinkingBlock): for event_ in self._parts_manager.handle_thinking_delta( vendor_part_id=event.index, content=current_block.thinking, signature=current_block.signature, provider_name=self.provider_name, ): yield event_ elif isinstance(current_block, BetaRedactedThinkingBlock): for event_ in self._parts_manager.handle_thinking_delta( vendor_part_id=event.index, id='redacted_thinking', signature=current_block.data, provider_name=self.provider_name, ): yield event_ elif isinstance(current_block, BetaToolUseBlock): maybe_event = self._parts_manager.handle_tool_call_delta( vendor_part_id=event.index, tool_name=current_block.name, args=cast(dict[str, Any], current_block.input) or None, tool_call_id=current_block.id, ) if maybe_event is not None: # pragma: no branch yield maybe_event elif isinstance(current_block, BetaServerToolUseBlock): call_part = _map_server_tool_use_block(current_block, self.provider_name) builtin_tool_calls[call_part.tool_call_id] = call_part yield self._parts_manager.handle_part( vendor_part_id=event.index, part=call_part, ) elif isinstance(current_block, BetaWebSearchToolResultBlock): yield self._parts_manager.handle_part( vendor_part_id=event.index, part=_map_web_search_tool_result_block(current_block, self.provider_name), ) elif isinstance(current_block, BetaCodeExecutionToolResultBlock): yield self._parts_manager.handle_part( vendor_part_id=event.index, part=_map_code_execution_tool_result_block(current_block, self.provider_name), ) elif isinstance(current_block, BetaWebFetchToolResultBlock): # pragma: lax no cover yield self._parts_manager.handle_part( vendor_part_id=event.index, part=_map_web_fetch_tool_result_block(current_block, self.provider_name), ) elif isinstance(current_block, BetaMCPToolUseBlock): call_part = _map_mcp_server_use_block(current_block, self.provider_name) builtin_tool_calls[call_part.tool_call_id] = call_part args_json = call_part.args_as_json_str() # Drop the final `{}}` so that we can add tool args deltas args_json_delta = args_json[:-3] assert args_json_delta.endswith('"tool_args":'), ( f'Expected {args_json_delta!r} to end in `"tool_args":`' ) yield self._parts_manager.handle_part( vendor_part_id=event.index, part=replace(call_part, args=None) ) maybe_event = self._parts_manager.handle_tool_call_delta( vendor_part_id=event.index, args=args_json_delta, ) if maybe_event is not None: # pragma: no branch yield maybe_event elif isinstance(current_block, BetaMCPToolResultBlock): call_part = builtin_tool_calls.get(current_block.tool_use_id) yield self._parts_manager.handle_part( vendor_part_id=event.index, part=_map_mcp_server_result_block(current_block, call_part, self.provider_name), ) elif isinstance(event, BetaRawContentBlockDeltaEvent): if isinstance(event.delta, BetaTextDelta): for event_ in self._parts_manager.handle_text_delta( vendor_part_id=event.index, content=event.delta.text ): yield event_ elif isinstance(event.delta, BetaThinkingDelta): for event_ in self._parts_manager.handle_thinking_delta( vendor_part_id=event.index, content=event.delta.thinking, provider_name=self.provider_name, ): yield event_ elif isinstance(event.delta, BetaSignatureDelta): for event_ in self._parts_manager.handle_thinking_delta( vendor_part_id=event.index, signature=event.delta.signature, provider_name=self.provider_name, ): yield event_ elif isinstance(event.delta, BetaInputJSONDelta): maybe_event = self._parts_manager.handle_tool_call_delta( vendor_part_id=event.index, args=event.delta.partial_json, ) if maybe_event is not None: # pragma: no branch yield maybe_event # TODO(Marcelo): We need to handle citations. elif isinstance(event.delta, BetaCitationsDelta): pass elif isinstance(event, BetaRawMessageDeltaEvent): self._usage = _map_usage(event, self._provider_name, self._provider_url, self._model_name, self._usage) if raw_finish_reason := event.delta.stop_reason: # pragma: no branch self.provider_details = self.provider_details or {} self.provider_details['finish_reason'] = raw_finish_reason self.finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) elif isinstance(event, BetaRawContentBlockStopEvent): # pragma: no branch if isinstance(current_block, BetaMCPToolUseBlock): maybe_event = self._parts_manager.handle_tool_call_delta( vendor_part_id=event.index, args='}', ) if maybe_event is not None: # pragma: no branch yield maybe_event current_block = None elif isinstance(event, BetaRawMessageStopEvent): # pragma: no branch current_block = None @property def model_name(self) -> AnthropicModelName: """Get the model name of the response.""" return self._model_name @property def provider_name(self) -> str: """Get the provider name.""" return self._provider_name @property def provider_url(self) -> str: """Get the provider base URL.""" return self._provider_url @property def timestamp(self) -> datetime: """Get the timestamp of the response.""" return self._timestamp def _map_server_tool_use_block(item: BetaServerToolUseBlock, provider_name: str) -> BuiltinToolCallPart: if item.name == 'web_search': return BuiltinToolCallPart( provider_name=provider_name, tool_name=WebSearchTool.kind, args=cast(dict[str, Any], item.input) or None, tool_call_id=item.id, ) elif item.name == 'code_execution': return BuiltinToolCallPart( provider_name=provider_name, tool_name=CodeExecutionTool.kind, args=cast(dict[str, Any], item.input) or None, tool_call_id=item.id, ) elif item.name == 'web_fetch': return BuiltinToolCallPart( provider_name=provider_name, tool_name=WebFetchTool.kind, args=cast(dict[str, Any], item.input) or None, tool_call_id=item.id, ) elif item.name in ('bash_code_execution', 'text_editor_code_execution'): # pragma: no cover raise NotImplementedError(f'Anthropic built-in tool {item.name!r} is not currently supported.') elif item.name in ('tool_search_tool_regex', 'tool_search_tool_bm25'): # pragma: no cover # NOTE this is being implemented in https://github.com/pydantic/pydantic-ai/pull/3550 raise NotImplementedError(f'Anthropic built-in tool {item.name!r} is not currently supported.') else: assert_never(item.name) web_search_tool_result_content_ta: TypeAdapter[BetaWebSearchToolResultBlockContent] = TypeAdapter( BetaWebSearchToolResultBlockContent ) def _map_web_search_tool_result_block(item: BetaWebSearchToolResultBlock, provider_name: str) -> BuiltinToolReturnPart: return BuiltinToolReturnPart( provider_name=provider_name, tool_name=WebSearchTool.kind, content=web_search_tool_result_content_ta.dump_python(item.content, mode='json'), tool_call_id=item.tool_use_id, ) code_execution_tool_result_content_ta: TypeAdapter[BetaCodeExecutionToolResultBlockContent] = TypeAdapter( BetaCodeExecutionToolResultBlockContent ) def _map_code_execution_tool_result_block( item: BetaCodeExecutionToolResultBlock, provider_name: str ) -> BuiltinToolReturnPart: return BuiltinToolReturnPart( provider_name=provider_name, tool_name=CodeExecutionTool.kind, content=code_execution_tool_result_content_ta.dump_python(item.content, mode='json'), tool_call_id=item.tool_use_id, ) def _map_web_fetch_tool_result_block(item: BetaWebFetchToolResultBlock, provider_name: str) -> BuiltinToolReturnPart: return BuiltinToolReturnPart( provider_name=provider_name, tool_name=WebFetchTool.kind, # Store just the content field (BetaWebFetchBlock) which has {content, type, url, retrieved_at} content=item.content.model_dump(mode='json'), tool_call_id=item.tool_use_id, ) def _map_mcp_server_use_block(item: BetaMCPToolUseBlock, provider_name: str) -> BuiltinToolCallPart: return BuiltinToolCallPart( provider_name=provider_name, tool_name=':'.join([MCPServerTool.kind, item.server_name]), args={ 'action': 'call_tool', 'tool_name': item.name, 'tool_args': cast(dict[str, Any], item.input), }, tool_call_id=item.id, ) def _map_mcp_server_result_block( item: BetaMCPToolResultBlock, call_part: BuiltinToolCallPart | None, provider_name: str ) -> BuiltinToolReturnPart: return BuiltinToolReturnPart( provider_name=provider_name, tool_name=call_part.tool_name if call_part else MCPServerTool.kind, content=item.model_dump(mode='json', include={'content', 'is_error'}), tool_call_id=item.tool_use_id, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pydantic/pydantic-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server