Skip to main content
Glama
firebase
by firebase
anthropic-client.ts9.91 kB
/** * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import type Anthropic from '@anthropic-ai/sdk'; import type { BetaMessage, BetaRawMessageStreamEvent, } from '@anthropic-ai/sdk/resources/beta/messages.mjs'; import type { Message, MessageStreamEvent, } from '@anthropic-ai/sdk/resources/messages.mjs'; import { mock } from 'node:test'; export interface MockAnthropicClientOptions { messageResponse?: Partial<Message>; sequentialResponses?: Partial<Message>[]; // For tool calling - multiple responses streamChunks?: MessageStreamEvent[]; modelList?: Array<{ id: string; display_name?: string }>; shouldError?: Error; streamErrorAfterChunk?: number; // Throw error after this many chunks streamError?: Error; // Error to throw during streaming abortSignal?: AbortSignal; // Abort signal to check } /** * Creates a mock Anthropic client for testing */ export function createMockAnthropicClient( options: MockAnthropicClientOptions = {} ): Anthropic { const messageResponse = { ...mockDefaultMessage(), ...options.messageResponse, }; const betaMessageResponse = toBetaMessage(messageResponse); // Support sequential responses for tool calling workflows let callCount = 0; const createStub = options.shouldError ? mock.fn(async () => { throw options.shouldError; }) : options.sequentialResponses ? mock.fn(async () => { const response = options.sequentialResponses![callCount] || messageResponse; callCount++; return { ...mockDefaultMessage(), ...response, }; }) : mock.fn(async () => messageResponse); let betaCallCount = 0; const betaCreateStub = options.shouldError ? mock.fn(async () => { throw options.shouldError; }) : options.sequentialResponses ? mock.fn(async () => { const response = options.sequentialResponses![betaCallCount] || messageResponse; betaCallCount++; return toBetaMessage({ ...mockDefaultMessage(), ...response, }); }) : mock.fn(async () => betaMessageResponse); const streamStub = options.shouldError ? mock.fn(() => { throw options.shouldError; }) : mock.fn((_body: any, opts?: { signal?: AbortSignal }) => { // Check abort signal before starting stream if (opts?.signal?.aborted) { throw new Error('AbortError'); } return createMockStream( options.streamChunks || [], messageResponse as Message, options.streamErrorAfterChunk, options.streamError, opts?.signal ); }); const betaStreamStub = options.shouldError ? mock.fn(() => { throw options.shouldError; }) : mock.fn((_body: any, opts?: { signal?: AbortSignal }) => { if (opts?.signal?.aborted) { throw new Error('AbortError'); } const betaChunks = (options.streamChunks || []).map((chunk) => toBetaStreamEvent(chunk) ); return createMockStream( betaChunks, toBetaMessage(messageResponse), options.streamErrorAfterChunk, options.streamError, opts?.signal ); }); const listStub = options.shouldError ? mock.fn(async () => { throw options.shouldError; }) : mock.fn(async () => ({ data: options.modelList || mockDefaultModels(), })); return { messages: { create: createStub, stream: streamStub, }, models: { list: listStub, }, beta: { messages: { create: betaCreateStub, stream: betaStreamStub, }, }, } as unknown as Anthropic; } /** * Creates a mock async iterable stream for streaming responses */ function createMockStream<TMessageType, TEventType>( chunks: TEventType[], finalMsg: TMessageType, errorAfterChunk?: number, streamError?: Error, abortSignal?: AbortSignal ) { let index = 0; return { [Symbol.asyncIterator]() { return { async next() { // Check abort signal if (abortSignal?.aborted) { const error = new Error('AbortError'); error.name = 'AbortError'; throw error; } // Check if we should throw an error after this chunk if ( errorAfterChunk !== undefined && streamError && index >= errorAfterChunk ) { throw streamError; } if (index < chunks.length) { return { value: chunks[index++] as TEventType, done: false }; } return { value: undefined as unknown as TEventType, done: true }; }, }; }, async finalMessage() { // Check abort signal before returning final message if (abortSignal?.aborted) { const error = new Error('AbortError'); error.name = 'AbortError'; throw error; } return finalMsg as TMessageType; }, }; } export interface CreateMockAnthropicMessageOptions { id?: string; text?: string; toolUse?: { id?: string; name: string; input: any; }; stopReason?: Message['stop_reason']; usage?: Partial<Message['usage']>; } /** * Creates a customizable mock Anthropic Message response * * @example * // Simple text response * createMockAnthropicMessage({ text: 'Hi there!' }) * * // Tool use response * createMockAnthropicMessage({ * toolUse: { name: 'get_weather', input: { city: 'NYC' } } * }) * * // Custom usage * createMockAnthropicMessage({ usage: { input_tokens: 5, output_tokens: 15 } }) */ export function createMockAnthropicMessage( options: CreateMockAnthropicMessageOptions = {} ): Message { const content: Message['content'] = []; if (options.toolUse) { content.push({ type: 'tool_use', id: options.toolUse.id || 'toolu_test123', name: options.toolUse.name, input: options.toolUse.input, }); } else { content.push({ type: 'text', text: options.text || 'Hello! How can I help you today?', citations: null, }); } const usage: Message['usage'] = { cache_creation: null, cache_creation_input_tokens: 0, cache_read_input_tokens: 0, input_tokens: 10, output_tokens: 20, server_tool_use: null, service_tier: null, ...(options.usage ?? {}), }; return { id: options.id || 'msg_test123', type: 'message', role: 'assistant', model: 'claude-3-5-sonnet-20241022', content, stop_reason: options.stopReason || (options.toolUse ? 'tool_use' : 'end_turn'), stop_sequence: null, usage, }; } /** * Creates a default mock Message response */ export function mockDefaultMessage(): Message { return createMockAnthropicMessage(); } /** * Creates a mock text content block chunk event */ export function mockTextChunk(text: string): MessageStreamEvent { return { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text, }, } as MessageStreamEvent; } /** * Creates a mock content block start event with text */ export function mockContentBlockStart(text: string): MessageStreamEvent { return { type: 'content_block_start', index: 0, content_block: { type: 'text', text, }, } as MessageStreamEvent; } /** * Creates a mock tool use content block */ export function mockToolUseChunk( id: string, name: string, input: any ): MessageStreamEvent { return { type: 'content_block_start', index: 0, content_block: { type: 'tool_use', id, name, input, }, } as MessageStreamEvent; } /** * Creates a default list of mock models */ export function mockDefaultModels() { return [ { id: 'claude-3-5-sonnet-20241022', display_name: 'Claude 3.5 Sonnet' }, { id: 'claude-3-5-haiku-20241022', display_name: 'Claude 3.5 Haiku' }, { id: 'claude-3-opus-20240229', display_name: 'Claude 3 Opus' }, ]; } /** * Creates a mock Message with tool use */ export function mockMessageWithToolUse( toolName: string, toolInput: any ): Partial<Message> { return { content: [ { type: 'tool_use', id: 'toolu_test123', name: toolName, input: toolInput, }, ], stop_reason: 'tool_use', }; } /** * Creates a mock Message with custom content */ export function mockMessageWithContent( content: Message['content'] ): Partial<Message> { return { content, stop_reason: 'end_turn', }; } function toBetaMessage(message: Message): BetaMessage { return { ...message, container: null, context_management: null, usage: { cache_creation: message.usage.cache_creation, cache_creation_input_tokens: message.usage.cache_creation_input_tokens, cache_read_input_tokens: message.usage.cache_read_input_tokens, input_tokens: message.usage.input_tokens, output_tokens: message.usage.output_tokens, server_tool_use: message.usage.server_tool_use as any, service_tier: message.usage.service_tier, }, }; } function toBetaStreamEvent( event: MessageStreamEvent ): BetaRawMessageStreamEvent { return event as unknown as BetaRawMessageStreamEvent; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server