Skip to main content
Glama
websockets.test.ts22 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { allOk, ContentType, getReferenceString, Hl7Message, MEDPLUM_VERSION, sleep } from '@medplum/core'; import type { Agent, Bot, Device } from '@medplum/fhirtypes'; import { randomUUID } from 'crypto'; import express from 'express'; import type { Server } from 'http'; import request from 'superwstest'; import { initApp, shutdownApp } from '../app'; import * as executeBotModule from '../bots/execute'; import type { BotExecutionResult } from '../bots/types'; import { loadTestConfig } from '../config/loader'; import type { MedplumServerConfig } from '../config/types'; import { getRedis } from '../redis'; import { initTestAuth } from '../test.setup'; import type { AgentInfo } from './utils'; import { AgentConnectionState } from './utils'; const app = express(); let config: MedplumServerConfig; let server: Server; let accessToken: string; let bot: Bot; let agent: Agent; let device: Device; describe('Agent WebSockets', () => { beforeAll(async () => { config = await loadTestConfig(); config.vmContextBotsEnabled = true; config.heartbeatMilliseconds = 5000; server = await initApp(app, config); accessToken = await initTestAuth({ membership: { admin: true } }); await new Promise<void>((resolve) => { server.listen(0, 'localhost', 8512, resolve); }); // Create a test bot const res1 = await request(server) .post('/admin/projects/projectId/bot') .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ name: 'Test Bot', runtimeVersion: 'vmcontext', }); bot = res1.body as Bot; // Deploy the bot // This is a simple HL7 ack bot // Note that the code is actually run inside a vmcontext await request(server) .post(`/fhir/R4/Bot/${bot.id}/$deploy`) .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ code: ` exports.handler = async function (medplum, event) { return event.input.buildAck().toString(); }; `, }); // Create an agent const res2 = await request(server) .post('/fhir/R4/Agent') .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ resourceType: 'Agent', name: 'Test Agent', status: 'active', channel: [ { name: 'test', endpoint: { reference: 'Endpoint/123' }, targetReference: { reference: 'Bot/' + bot.id }, }, ], }); agent = res2.body as Agent; const res3 = await request(server) .post(`/fhir/R4/Device`) .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ resourceType: 'Device', url: 'mllp://192.168.50.10:56001', }); device = res3.body as Device; }); afterAll(async () => { await shutdownApp(); }); test('Success', async () => { const callback = randomUUID(); await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') // Now transmit, should succeed .sendText( JSON.stringify({ type: 'agent:transmit:request', accessToken, channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', callback, }) ) .expectJson((actual) => { expect(actual).toMatchObject({ type: 'agent:transmit:response', channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.HL7_V2, statusCode: 200, body: expect.stringMatching(/^MSH[^"]+ACK[^"]+/), callback, }); }) .close() .expectClosed(); }); test('Send non-JSON', async () => { await request(server) .ws('/ws/agent') .sendText('<html></html>') .expectText(/{"type":"agent:error","body":"Unexpected token/) .close() .expectClosed(); }); test('Connect missing access token', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', agentId: agent.id, }) ) .expectText('{"type":"agent:error","body":"Missing access token"}') .close() .expectClosed(); }); test('Connect missing agentId', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, }) ) .expectText('{"type":"agent:error","body":"Missing agent ID"}') .close() .expectClosed(); }); test('Transmit not connected', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'transmit', accessToken, channel: 'test', remote: '0.0.0.0:57000', body: 'MSH|...', }) ) .expectText('{"type":"agent:error","body":"Not connected"}') .close() .expectClosed(); }); test('Transmit missing access token', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .sendText( JSON.stringify({ type: 'transmit', channel: 'test', remote: '0.0.0.0:57000', body: 'MSH|...', }) ) .expectText('{"type":"agent:error","body":"Missing access token"}') .close() .expectClosed(); }); test('Transmit missing channel', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .sendText( JSON.stringify({ type: 'transmit', accessToken, remote: '0.0.0.0:57000', body: 'MSH|...', }) ) .expectText('{"type":"agent:error","body":"Missing channel"}') .close() .expectClosed(); }); test('Transmit channel not found', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .sendText( JSON.stringify({ type: 'transmit', accessToken, channel: 'notfound', remote: '0.0.0.0:57000', body: 'MSH|...', }) ) .expectText('{"type":"agent:error","body":"Channel not found"}') .close() .expectClosed(); }); test('Transmit missing body', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .sendText( JSON.stringify({ type: 'transmit', accessToken, channel: 'test', remote: '0.0.0.0:57000', }) ) .expectText('{"type":"agent:error","body":"Missing body"}') .close() .expectClosed(); }); test('Agent push', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .exec(async () => { const res = await request(server) .post(`/fhir/R4/Agent/${agent.id}/$push`) .set('Content-Type', ContentType.JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ destination: getReferenceString(device), contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', }); expect(res.status).toBe(200); expect(res.headers['content-type']).toBe('application/fhir+json; charset=utf-8'); expect(res.body).toMatchObject(allOk); }) .expectText(/{"type":"agent:transmit:request",.+,"body":"MSH[^"]+ADT1[^"]+"}/) .close() .expectClosed(); }); test('Push and wait for response timeout', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .exec(async () => { // Send a message that will never be responded to // Wait for the timeout const res = await request(server) .post(`/fhir/R4/Agent/${agent.id}/$push`) .set('Content-Type', ContentType.JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ waitForResponse: true, waitTimeout: 500, destination: getReferenceString(device), contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', }); expect(res.status).toBe(400); expect(res.body.issue[0].details.text).toBe('Timeout'); }) .close() .expectClosed(); }); test('Push and wait for response success', async () => { let pushRequest: any = undefined; let pushResponse: any = undefined; await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .exec(async () => { // Send the request but do not wait for the response pushRequest = request(server) .post(`/fhir/R4/Agent/${agent.id}/$push`) .set('Content-Type', ContentType.JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ waitForResponse: true, channel: 'test', destination: getReferenceString(device), contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', }); pushRequest.then(() => undefined); }) .expectText((str) => { const message = JSON.parse(str); expect(message.type).toBe('agent:transmit:request'); expect(message.remote).toBe(device.url); expect(message.callback).toBeDefined(); pushResponse = JSON.stringify({ type: 'agent:transmit:response', callback: message.callback, contentType: ContentType.HL7_V2, body: Hl7Message.parse(message.body).buildAck().toString(), }); return true; }) .exec((ws) => ws.send(pushResponse)) .exec(async () => { const res = await pushRequest; expect(res.status).toBe(200); expect(res.headers['content-type']).toBe('x-application/hl7-v2+er7; charset=utf-8'); expect(res.text).toMatch(/MSH.*ACK.*\r/); }) .close() .expectClosed(); }); test('Heartbeat', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectJson({ type: 'agent:connect:response' }) .expectJson({ type: 'agent:heartbeat:request' }) // Send a ping .sendJson({ type: 'agent:heartbeat:request' }) .expectJson({ type: 'agent:heartbeat:response', version: MEDPLUM_VERSION }) // Simulate a ping response .sendJson({ type: 'agent:heartbeat:response', version: MEDPLUM_VERSION }) .close() .expectClosed(); let info: AgentInfo = { status: AgentConnectionState.UNKNOWN, version: 'unknown' }; for (let i = 0; i < 5; i++) { await sleep(50); const infoStr = (await getRedis().get(`medplum:agent:${agent.id}:info`)) as string; info = JSON.parse(infoStr) as AgentInfo; if (info.status === AgentConnectionState.DISCONNECTED) { break; } } expect(info).toMatchObject<AgentInfo>({ status: AgentConnectionState.DISCONNECTED, version: MEDPLUM_VERSION, lastUpdated: expect.any(String), }); }); test('Ping IP', async () => { let pushRequest: any = undefined; let pushResponse: any = undefined; await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .exec(async () => { // Send the request but do not wait for the response pushRequest = request(server) .post(`/fhir/R4/Agent/${agent.id}/$push`) .set('Content-Type', ContentType.JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ waitForResponse: true, channel: 'test', destination: '8.8.8.8', contentType: ContentType.PING, body: 'PING', }); pushRequest.then(() => undefined); }) .expectText((str) => { const message = JSON.parse(str); expect(message.type).toBe('agent:transmit:request'); expect(message.remote).toBe('8.8.8.8'); expect(message.callback).toBeDefined(); pushResponse = JSON.stringify({ type: 'agent:transmit:response', callback: message.callback, contentType: ContentType.PING, body: 'PING', }); return true; }) .exec((ws) => ws.send(pushResponse)) .exec(async () => { const res = await pushRequest; expect(res.status).toBe(200); expect(res.headers['content-type']).toBe('x-application/ping; charset=utf-8'); }) .close() .expectClosed(); }); test('Unknown message type', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') .sendText(JSON.stringify({ type: 'asdfasdf' })) .expectText('{"type":"agent:error","body":"Unknown message type: asdfasdf"}') .close() .expectClosed(); }); test('Received agent:error without callback', async () => { const originalConsoleLog = console.log; console.log = jest.fn(); await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:error', body: 'Something bad happened', }) ) .close() .expectClosed(); expect(console.log).toHaveBeenLastCalledWith(expect.stringContaining('[Agent]: Error received from agent')); console.log = originalConsoleLog; }); describe('Bot failures', () => { let agent: Agent; let bot: Bot; beforeAll(async () => { // Create a test bot const res1 = await request(server) .post('/admin/projects/projectId/bot') .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ name: 'Test Bot 2', runtimeVersion: 'vmcontext', }); bot = res1.body as Bot; // Deploy the bot // This bot throws an error await request(server) .post(`/fhir/R4/Bot/${bot.id}/$deploy`) .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ code: ` exports.handler = async function (medplum, event) { throw new Error('Something is broken'); }; `, }); // Create an agent const res2 = await request(server) .post('/fhir/R4/Agent') .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ resourceType: 'Agent', name: 'Test Agent 2', status: 'active', channel: [ { name: 'test', endpoint: { reference: 'Endpoint/123' }, targetReference: { reference: 'Bot/' + bot.id }, }, ], }); agent = res2.body as Agent; const res3 = await request(server) .post(`/fhir/R4/Device`) .set('Content-Type', ContentType.FHIR_JSON) .set('Authorization', 'Bearer ' + accessToken) .send({ resourceType: 'Device', url: 'mllp://192.168.50.10:56001', }); device = res3.body as Device; }); test('Bot failure -- no returnValue', async () => { await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') // Now transmit, should succeed .sendText( JSON.stringify({ type: 'agent:transmit:request', accessToken, channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', }) ) .expectJson((actual) => { expect(actual).toMatchObject({ type: 'agent:transmit:response', channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.JSON, statusCode: 400, body: expect.stringContaining('Something is broken'), }); }) .close() .expectClosed(); }); test('Bot failure -- Error during Lambda execution, error in returnValue', async () => { jest.spyOn(executeBotModule, 'executeBot').mockImplementationOnce( async () => ({ success: false, logResult: '', returnValue: { errorType: 'Error', errorMessage: 'Something is broken', trace: [ 'Error: Something is broken', ' at Object.handler (/var/task/user.js:22:15)', ' at process.processTicksAndRejections (node:internal/process/task_queues:95:5)', ' at async exports.handler (/var/task/index.js:24:18)', ], }, }) satisfies BotExecutionResult ); await request(server) .ws('/ws/agent') .sendText( JSON.stringify({ type: 'agent:connect:request', accessToken, agentId: agent.id, }) ) .expectText('{"type":"agent:connect:response"}') // Now transmit, should succeed .sendText( JSON.stringify({ type: 'agent:transmit:request', accessToken, channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.HL7_V2, body: 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-\r' + 'NK1|1|JONES^BARBARA^K|SPO|||||20011105\r' + 'PV1|1|I|2000^2012^01||||004777^LEBAUER^SIDNEY^J.|||SUR||-||1|A0-', }) ) .expectJson((actual) => { expect(actual).toMatchObject({ type: 'agent:transmit:response', channel: 'test', remote: '0.0.0.0:57000', contentType: ContentType.JSON, statusCode: 400, body: expect.stringContaining('Something is broken'), }); }) .close() .expectClosed(); }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server