import fetch from "node-fetch";
import { TextDecoder } from "util";
/* helper: return JSON, even when reply is SSE ------------------- */
export async function safeJson(res) {
const ctype = (res.headers.get("content-type") || "").toLowerCase();
/* 1) Plain JSON */
if (ctype.includes("application/json")) return res.json();
/* 2) Assume SSE for anything else (incl. empty header) */
const decoder = new TextDecoder();
let buffer = "";
// Works for Node.js Readable (fetch v2) & ReadableStream (v3)
for await (const chunk of res.body) {
buffer += typeof chunk === "string" ? chunk : decoder.decode(chunk);
const eventEnd = buffer.indexOf("\n\n"); // end of first event
if (eventEnd !== -1) {
const event = buffer.slice(0, eventEnd);
const dataRow = event.split("\n").find((l) => l.startsWith("data:"));
if (dataRow) {
const jsonText = dataRow.replace(/^data:\s*/, "");
try {
return JSON.parse(jsonText);
} catch {
/* fall through */
}
}
break;
}
}
/* 3) If we get here the stream wasn't JSON — return empty object */
console.warn("↩ SSE stream contained no JSON payload");
return {};
}
export class McpClient {
constructor(endpoint, auth) {
this.endpoint = endpoint;
this.auth = auth;
this.sessionId = null;
this.nextId = 1;
this.catalog = { tools: [], prompts: [], resources: [] };
}
/* ────────── helper: generic MCP call with logging ────────── */
async callMcp(method, params = {}, id = this.nextId++) {
console.log(
JSON.stringify({
jsonrpc: "2.0",
id,
method,
...(params ? { params } : {}),
}),
"\n"
);
const headers = {
Authorization: this.auth,
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
};
// Only add session ID if we have one
if (this.sessionId) {
headers["Mcp-Session-Id"] = this.sessionId;
}
const res = await fetch(this.endpoint, {
method: "POST",
headers,
body: JSON.stringify({
jsonrpc: "2.0",
id,
method,
...(Object.keys(params).length ? { params } : {}),
}),
});
console.log("→ status", res.status);
return safeJson(res); // ← instead of res.json()
}
/* ────────── helper: fetch an entire paged list ────────── */
async listAll(method) {
let cursor;
const out = [];
do {
const res = await this.callMcp(method, cursor ? { cursor } : {});
if (res.error) {
console.warn(`⚠️ ${method} failed:`, res.error.message);
return out;
}
out.push(...(res.result?.tools ?? res.result?.prompts ?? res.result?.resources ?? []));
cursor = res.result?.nextCursor ?? null;
} while (cursor);
return out;
}
/* ────────── helper: read session memory ────────── */
async readMemory(callerContext = "unknown") {
try {
console.log(`🧠 [MEMORY READ] Called by: ${callerContext}`);
const memoryResource = await this.readResource("internal://session/memory");
if (memoryResource && memoryResource.contents && memoryResource.contents[0]) {
const memoryData = JSON.parse(memoryResource.contents[0].text);
// Log detailed memory state
console.log(`🧠 [MEMORY READ SUCCESS] ${callerContext}:`);
console.log(` 📝 Conversation thread: ${memoryData.fullContext?.conversationThread?.length || 0} messages`);
console.log(` 🎯 Key topics: ${memoryData.fullContext?.insights?.keyTopics?.length || 0} topics`);
console.log(` ✅ Completed tasks: ${memoryData.fullContext?.currentSession?.completedTasks || 0}`);
console.log(` 🔍 Current focus: ${memoryData.fullContext?.currentSession?.currentFocus || "None"}`);
console.log(` 📊 I/O stats: R:${memoryData.ioStatus?.reads || 0} W:${memoryData.ioStatus?.writes || 0}`);
// Log recent conversation thread if it exists
if (memoryData.fullContext?.conversationThread?.length > 0) {
const recent = memoryData.fullContext.conversationThread.slice(-3);
console.log(` 💬 Recent conversation:`);
recent.forEach((entry, i) => {
console.log(` ${i + 1}. [${entry.role}] ${entry.content.substring(0, 80)}...`);
});
} else {
console.log(` 💬 Conversation thread is empty or missing`);
console.log(` 🔍 Debug - Full memory structure keys:`, Object.keys(memoryData.fullContext || {}));
if (memoryData.fullContext?.conversationThread) {
console.log(` 🔍 Debug - conversationThread type:`, typeof memoryData.fullContext.conversationThread, 'length:', memoryData.fullContext.conversationThread.length);
}
}
return memoryData;
} else {
console.log(`🧠 [MEMORY READ EMPTY] ${callerContext}: No memory data found`);
return null;
}
} catch (error) {
console.warn(`🧠 [MEMORY READ ERROR] ${callerContext}: ${error.message}`);
return null;
}
}
/* ────────── helper: update session memory ────────── */
async updateMemory(conversationContext = [], callerContext = "unknown") {
try {
console.log(`🧠 [MEMORY WRITE] Called by: ${callerContext} with ${conversationContext.length} entries`);
// Add conversation entries to memory buffer
for (const entry of conversationContext) {
console.log(`🧠 [MEMORY WRITE] ${callerContext}: Adding ${entry.role} message: "${entry.content.substring(0, 100)}..."`);
const result = await this.callMcp("tools/call", {
name: "update_session_memory",
arguments: {
type: entry.role === "user" ? "user_message" : "assistant_message",
content: entry.content,
timestamp: Date.now(),
metadata: { source: "conversation_update", caller: callerContext }
}
});
if (result.error) {
console.warn(`🧠 [MEMORY WRITE ERROR] ${callerContext}: ${result.error.message}`);
} else {
console.log(`🧠 [MEMORY WRITE SUCCESS] ${callerContext}: ${entry.role} message stored`);
}
}
return true;
} catch (error) {
console.warn(`🧠 [MEMORY WRITE ERROR] ${callerContext}: ${error.message}`);
return false;
}
}
/* ────────── helper: silent memory update (for background updates) ────────── */
async silentMemoryUpdate(type, content, metadata = {}, callerContext = "unknown") {
try {
console.log(`🧠 [SILENT UPDATE] ${callerContext}: ${type} - "${content.substring(0, 50)}..."`);
const result = await this.callMcp("tools/call", {
name: "update_session_memory",
arguments: {
type,
content,
timestamp: Date.now(),
metadata: { ...metadata, source: "silent_update", caller: callerContext }
}
});
if (result.error) {
console.warn(`🧠 [SILENT UPDATE ERROR] ${callerContext}: ${result.error.message}`);
return false;
} else {
console.log(`🧠 [SILENT UPDATE SUCCESS] ${callerContext}: ${type} stored`);
return true;
}
} catch (error) {
// Silent update - don't log errors unless in debug mode
if (process.env.DEBUG_MEMORY) {
console.warn(`🧠 [SILENT UPDATE ERROR] ${callerContext}: ${error.message}`);
}
return false;
}
}
/* ────────── helper: read MCP resource ────────── */
async readResource(uri) {
const res = await this.callMcp("resources/read", { uri });
if (res.error) {
throw new Error(`⚠️ reading resource ${uri} failed: ${res.error.message}`);
}
return res.result;
}
/* ────────── open MCP session & cache catalogs ────────── */
async openSession() {
/* 1️⃣ try initialize */
console.log("=== STEP 1: initialize ===");
const initRes = await fetch(this.endpoint, {
method: "POST",
headers: {
Authorization: this.auth,
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
},
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2025-03-26",
capabilities: {
roots: {},
sampling: {},
tools: { call: true },
resources: { read: true },
prompts: { list: true },
memory: { read: true, write: true },
conversations: { manage: true },
sessions: { manage: true },
files: { upload: true },
embeddings: { create: true },
chatCompletions: { create: true },
completions: { create: true },
edits: { create: true },
images: { create: true },
audio: { create: true },
fineTunes: { create: true },
},
clientInfo: {
name: "MCP Burst Hub",
version: "1.0.0",
},
},
}),
});
console.log("→ status", initRes.status);
const initBody = await safeJson(initRes);
// Always get the session ID from the response header
this.sessionId = initRes.headers.get("Mcp-Session-Id");
console.log("Session ID from response:", this.sessionId);
if (initBody.error?.message?.includes("already initialized")) {
console.warn("⚠️ server says already initialized — this session is already active");
// The session is already initialized, but we have the session ID, so we can continue
} else if (initBody.error) {
console.error("Initialize failed:", initBody.error);
process.exit(1);
} else {
console.log("← initialize OK:", initBody);
console.log(" Session-ID:", this.sessionId, "\n");
/* 2️⃣ notifications/initialized (only on fresh session) */
await fetch(this.endpoint, {
method: "POST",
headers: {
Authorization: this.auth,
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
"Mcp-Session-Id": this.sessionId,
},
body: JSON.stringify({
jsonrpc: "2.0",
method: "notifications/initialized",
}),
});
}
/* 3️⃣ full catalogs */
console.log("=== STEP 3: list catalogs ===");
this.catalog.tools = await this.listAll("tools/list");
this.catalog.resources = await this.listAll("resources/list");
//this.catalog.prompts = await this.listAll("prompts/list");
console.log(
`✅ MCP session ${this.sessionId}\n` +
`🛠 tools: ${this.catalog.tools.map((t) => t.title ? t.title : t.name).join(", ") || "(none)"}\n` +
`📄 resources: ${this.catalog.resources.map((r) => r.title ? r.title : r.name).join(", ") || "(none)"}\n`
);
}
}