class MyChatKitServer(ChatKitServer):
def __init__(self, data_store: Store, file_store: FileStore | None = None):
super().__init__(data_store, file_store)
assistant_agent = Agent[AgentContext](
model="gpt-4.1",
name="Assistant",
instructions="You are a helpful assistant",
)
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | ClientToolCallOutputItem,
context: Any,
) -> AsyncIterator[Event]:
agent_context = AgentContext(
thread=thread,
store=self.store,
request_context=context,
)
result = Runner.run_streamed(
self.assistant_agent,
await to_input_item(input, self.to_message_content),
context=agent_context,
)
async for event in stream_agent_response(agent_context, result):
yield event
async def to_message_content(
self, input: FilePart | ImagePart
) -> ResponseInputContentParam:
raise NotImplementedError()