from datetime import datetime
from enum import Enum
from typing import Annotated, Any, Literal, Self
from uuid import uuid4
from pydantic import (
BaseModel,
ConfigDict,
Field,
TypeAdapter,
field_serializer,
model_validator,
)
class TaskState(str, Enum):
SUBMITTED = 'submitted'
WORKING = 'working'
INPUT_REQUIRED = 'input-required'
COMPLETED = 'completed'
CANCELED = 'canceled'
FAILED = 'failed'
UNKNOWN = 'unknown'
class TextPart(BaseModel):
type: Literal['text'] = 'text'
text: str
metadata: dict[str, Any] | None = None
class FileContent(BaseModel):
name: str | None = None
mimeType: str | None = None
bytes: str | None = None
uri: str | None = None
@model_validator(mode='after')
def check_content(self) -> Self:
if not (self.bytes or self.uri):
raise ValueError(
"Either 'bytes' or 'uri' must be present in the file data"
)
if self.bytes and self.uri:
raise ValueError(
"Only one of 'bytes' or 'uri' can be present in the file data"
)
return self
class FilePart(BaseModel):
type: Literal['file'] = 'file'
file: FileContent
metadata: dict[str, Any] | None = None
class DataPart(BaseModel):
type: Literal['data'] = 'data'
data: dict[str, Any]
metadata: dict[str, Any] | None = None
Part = Annotated[TextPart | FilePart | DataPart, Field(discriminator='type')]
class Message(BaseModel):
role: Literal['user', 'agent']
parts: list[Part]
metadata: dict[str, Any] | None = None
class TaskStatus(BaseModel):
state: TaskState
message: Message | None = None
timestamp: datetime = Field(default_factory=datetime.now)
@field_serializer('timestamp')
def serialize_dt(self, dt: datetime, _info):
return dt.isoformat()
class Artifact(BaseModel):
name: str | None = None
description: str | None = None
parts: list[Part]
metadata: dict[str, Any] | None = None
index: int = 0
append: bool | None = None
lastChunk: bool | None = None
class Task(BaseModel):
id: str
sessionId: str | None = None
status: TaskStatus
artifacts: list[Artifact] | None = None
history: list[Message] | None = None
metadata: dict[str, Any] | None = None
class TaskStatusUpdateEvent(BaseModel):
id: str
status: TaskStatus
final: bool = False
metadata: dict[str, Any] | None = None
class TaskArtifactUpdateEvent(BaseModel):
id: str
artifact: Artifact
metadata: dict[str, Any] | None = None
class AuthenticationInfo(BaseModel):
model_config = ConfigDict(extra='allow')
schemes: list[str]
credentials: str | None = None
class PushNotificationConfig(BaseModel):
url: str
token: str | None = None
authentication: AuthenticationInfo | None = None
class TaskIdParams(BaseModel):
id: str
metadata: dict[str, Any] | None = None
class TaskQueryParams(TaskIdParams):
historyLength: int | None = None
class TaskSendParams(BaseModel):
id: str
sessionId: str = Field(default_factory=lambda: uuid4().hex)
message: Message
acceptedOutputModes: list[str] | None = None
pushNotification: PushNotificationConfig | None = None
historyLength: int | None = None
metadata: dict[str, Any] | None = None
class TaskPushNotificationConfig(BaseModel):
id: str
pushNotificationConfig: PushNotificationConfig
## RPC Messages
class JSONRPCMessage(BaseModel):
jsonrpc: Literal['2.0'] = '2.0'
id: int | str | None = Field(default_factory=lambda: uuid4().hex)
class JSONRPCRequest(JSONRPCMessage):
method: str
params: dict[str, Any] | None = None
class JSONRPCError(BaseModel):
code: int
message: str
data: Any | None = None
class JSONRPCResponse(JSONRPCMessage):
result: Any | None = None
error: JSONRPCError | None = None
class SendTaskRequest(JSONRPCRequest):
method: Literal['tasks/send'] = 'tasks/send'
params: TaskSendParams
class SendTaskResponse(JSONRPCResponse):
result: Task | None = None
class SendTaskStreamingRequest(JSONRPCRequest):
method: Literal['tasks/sendSubscribe'] = 'tasks/sendSubscribe'
params: TaskSendParams
class SendTaskStreamingResponse(JSONRPCResponse):
result: TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None = None
class GetTaskRequest(JSONRPCRequest):
method: Literal['tasks/get'] = 'tasks/get'
params: TaskQueryParams
class GetTaskResponse(JSONRPCResponse):
result: Task | None = None
class CancelTaskRequest(JSONRPCRequest):
method: Literal['tasks/cancel',] = 'tasks/cancel'
params: TaskIdParams
class CancelTaskResponse(JSONRPCResponse):
result: Task | None = None
class SetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal['tasks/pushNotification/set',] = (
'tasks/pushNotification/set'
)
params: TaskPushNotificationConfig
class SetTaskPushNotificationResponse(JSONRPCResponse):
result: TaskPushNotificationConfig | None = None
class GetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal['tasks/pushNotification/get',] = (
'tasks/pushNotification/get'
)
params: TaskIdParams
class GetTaskPushNotificationResponse(JSONRPCResponse):
result: TaskPushNotificationConfig | None = None
class TaskResubscriptionRequest(JSONRPCRequest):
method: Literal['tasks/resubscribe',] = 'tasks/resubscribe'
params: TaskIdParams
A2ARequest = TypeAdapter(
Annotated[
SendTaskRequest
| GetTaskRequest
| CancelTaskRequest
| SetTaskPushNotificationRequest
| GetTaskPushNotificationRequest
| TaskResubscriptionRequest
| SendTaskStreamingRequest,
Field(discriminator='method'),
]
)
## Error types
class JSONParseError(JSONRPCError):
code: int = -32700
message: str = 'Invalid JSON payload'
data: Any | None = None
class InvalidRequestError(JSONRPCError):
code: int = -32600
message: str = 'Request payload validation error'
data: Any | None = None
class MethodNotFoundError(JSONRPCError):
code: int = -32601
message: str = 'Method not found'
data: None = None
class InvalidParamsError(JSONRPCError):
code: int = -32602
message: str = 'Invalid parameters'
data: Any | None = None
class InternalError(JSONRPCError):
code: int = -32603
message: str = 'Internal error'
data: Any | None = None
class TaskNotFoundError(JSONRPCError):
code: int = -32001
message: str = 'Task not found'
data: None = None
class TaskNotCancelableError(JSONRPCError):
code: int = -32002
message: str = 'Task cannot be canceled'
data: None = None
class PushNotificationNotSupportedError(JSONRPCError):
code: int = -32003
message: str = 'Push Notification is not supported'
data: None = None
class UnsupportedOperationError(JSONRPCError):
code: int = -32004
message: str = 'This operation is not supported'
data: None = None
class ContentTypeNotSupportedError(JSONRPCError):
code: int = -32005
message: str = 'Incompatible content types'
data: None = None
class AgentProvider(BaseModel):
organization: str
url: str | None = None
class AgentCapabilities(BaseModel):
streaming: bool = False
pushNotifications: bool = False
stateTransitionHistory: bool = False
class AgentAuthentication(BaseModel):
schemes: list[str]
credentials: str | None = None
class AgentSkill(BaseModel):
id: str
name: str
description: str | None = None
tags: list[str] | None = None
examples: list[str] | None = None
inputModes: list[str] | None = None
outputModes: list[str] | None = None
class AgentCard(BaseModel):
name: str
description: str | None = None
url: str
provider: AgentProvider | None = None
version: str
documentationUrl: str | None = None
capabilities: AgentCapabilities
authentication: AgentAuthentication | None = None
defaultInputModes: list[str] = ['text']
defaultOutputModes: list[str] = ['text']
skills: list[AgentSkill]
class A2AClientError(Exception):
pass
class A2AClientHTTPError(A2AClientError):
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
super().__init__(f'HTTP Error {status_code}: {message}')
class A2AClientJSONError(A2AClientError):
def __init__(self, message: str):
self.message = message
super().__init__(f'JSON Error: {message}')
class MissingAPIKeyError(Exception):
"""Exception for missing API key."""