Skip to main content
Glama
firebase
by firebase
model_armor_test.ts9.11 kB
/** * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { beforeEach, describe, expect, it } from '@jest/globals'; import { genkit } from 'genkit'; import { modelArmor } from '../src/model-armor.js'; // Mock ModelArmorClient class MockModelArmorClient { sanitizeUserPrompt = async () => [{}]; sanitizeModelResponse = async () => [{}]; } function createEmptyResult() { return { sanitizationResult: {} }; } function createSdpResult(replacementText: string) { return { sanitizationResult: { filterResults: { sdp: { sdpFilterResult: { deidentifyResult: { data: { text: replacementText }, }, }, }, }, }, }; } function createRaiBlockResult() { return { sanitizationResult: { filterMatchState: 'MATCH_FOUND', filterResults: { rai: { raiFilterResult: { matchState: 'MATCH_FOUND' } }, }, }, }; } function createSdpBlockResult(replacementText: string) { const res = createSdpResult(replacementText); (res.sanitizationResult as any).filterMatchState = 'MATCH_FOUND'; return res; } describe('modelArmor', () => { let ai: any; let mockClient: any; beforeEach(() => { ai = genkit({}); ai.defineModel({ name: 'echoModel' }, async (req: any) => { return { message: { role: 'model', content: [{ text: `Echo: ${req.messages[0].content[0].text}` }], }, }; }); mockClient = new MockModelArmorClient(); }); it('passes through when no sanitization triggers', async () => { mockClient.sanitizeUserPrompt = async () => [createEmptyResult()]; mockClient.sanitizeModelResponse = async () => [createEmptyResult()]; const response = await ai.generate({ model: 'echoModel', prompt: 'hello', use: [modelArmor({ templateName: 'test', client: mockClient as any })], }); expect(response.text).toMatch(/Echo: hello/); }); it('replaces user prompt on SDP match', async () => { mockClient.sanitizeUserPrompt = async () => [ createSdpResult('sanitized_hello'), ]; mockClient.sanitizeModelResponse = async () => [createEmptyResult()]; const response = await ai.generate({ model: 'echoModel', prompt: 'hello', use: [ modelArmor({ templateName: 'test', client: mockClient as any, applyDeidentificationResults: true, }), ], }); // The echo model should receive the SANITIZED prompt expect(response.text).toMatch(/Echo: sanitized_hello/); }); it('blocks user prompt on RAI match', async () => { mockClient.sanitizeUserPrompt = async () => [createRaiBlockResult()]; await expect( ai.generate({ model: 'echoModel', prompt: 'bad stuff', use: [modelArmor({ templateName: 'test', client: mockClient as any })], }) ).rejects.toThrow( expect.objectContaining({ status: 'PERMISSION_DENIED', message: expect.stringContaining('Model Armor blocked user prompt.'), }) ); }); it('replaces model response on SDP match', async () => { mockClient.sanitizeUserPrompt = async () => [createEmptyResult()]; mockClient.sanitizeModelResponse = async () => [ createSdpResult('sanitized_response'), ]; const response = await ai.generate({ model: 'echoModel', prompt: 'hello', use: [ modelArmor({ templateName: 'test', client: mockClient as any, applyDeidentificationResults: true, }), ], }); expect(response.text).toBe('sanitized_response'); }); it('blocks model response on RAI match', async () => { mockClient.sanitizeUserPrompt = async () => [createEmptyResult()]; mockClient.sanitizeModelResponse = async () => [createRaiBlockResult()]; await expect( ai.generate({ model: 'echoModel', prompt: 'hello', use: [modelArmor({ templateName: 'test', client: mockClient as any })], }) ).rejects.toThrow( expect.objectContaining({ status: 'PERMISSION_DENIED', message: expect.stringContaining('Model Armor blocked model response.'), }) ); }); it('respects protectionTarget=userPrompt', async () => { // Should sanitize prompt but NOT response mockClient.sanitizeUserPrompt = async () => [createSdpResult('sanitized')]; // This one would block if called mockClient.sanitizeModelResponse = async () => [createRaiBlockResult()]; const response = await ai.generate({ model: 'echoModel', prompt: 'hello', use: [ modelArmor({ templateName: 'test', client: mockClient as any, protectionTarget: 'userPrompt', applyDeidentificationResults: true, }), ], }); expect(response.text).toMatch(/Echo: sanitized/); }); it('strictSdpEnforcement blocks even if remediated', async () => { mockClient.sanitizeUserPrompt = async () => [ createSdpBlockResult('sanitized_hello'), ]; await expect( ai.generate({ model: 'echoModel', prompt: 'sensitive', use: [ modelArmor({ templateName: 'test', client: mockClient as any, strictSdpEnforcement: true, applyDeidentificationResults: true, }), ], }) ).rejects.toThrow( expect.objectContaining({ status: 'PERMISSION_DENIED', message: expect.stringContaining('Model Armor blocked user prompt.'), }) ); }); it('respects filters option', async () => { // RAI match found, but we only filter 'csam' mockClient.sanitizeUserPrompt = async () => [ { sanitizationResult: { filterMatchState: 'MATCH_FOUND', filterResults: { rai: { raiFilterResult: { matchState: 'MATCH_FOUND' } }, }, }, }, ]; const response = await ai.generate({ model: 'echoModel', prompt: 'bad stuff', use: [ modelArmor({ templateName: 'test', client: mockClient as any, filters: ['csam'], }), ], }); expect(response.text).toMatch(/Echo: bad stuff/); }); it('preserves non-text parts when SDP replaces text', async () => { mockClient.sanitizeUserPrompt = async () => [ createSdpResult('sanitized_text'), ]; ai.defineModel({ name: 'inspectionModel' }, async (req: any) => { return { message: { role: 'model', content: [{ text: JSON.stringify(req.messages) }], }, }; }); const response = await ai.generate({ model: 'inspectionModel', messages: [ { role: 'user', content: [{ text: 'old stuff' }], }, { role: 'model', content: [{ text: 'response' }], }, { role: 'user', content: [ { text: 'sensitive info' }, { media: { url: 'http://example.com/image.png' } }, ], }, ], use: [ modelArmor({ templateName: 'test', client: mockClient as any, applyDeidentificationResults: true, }), ], }); const content = JSON.parse(response.text); // content should have preserved media and replaced text expect(content).toEqual([ { role: 'user', content: [{ text: 'old stuff' }], }, { role: 'model', content: [{ text: 'response' }], }, { role: 'user', content: [ { media: { url: 'http://example.com/image.png' } }, { text: 'sanitized_text' }, ], }, ]); }); it('supports custom function for applying SDP', async () => { mockClient.sanitizeUserPrompt = async () => [ createSdpResult('sanitized_text'), ]; const applyFn = ({ messages, sdpResult }: any) => { // Custom logic: replace with "CUSTOM APPLIED" instead of sdpResult data const newContent = [{ text: 'CUSTOM APPLIED' }]; return [{ ...messages[0], content: newContent }]; }; const response = await ai.generate({ model: 'echoModel', prompt: 'hello', use: [ modelArmor({ templateName: 'test', client: mockClient as any, applyDeidentificationResults: applyFn, }), ], }); expect(response.text).toMatch(/Echo: CUSTOM APPLIED/); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/firebase/genkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server