MCP Memory Server with Qdrant Persistence
by delorenj
Verified
- mcp-qdrant-memory
- persistence
import { QdrantClient } from '@qdrant/js-client-rest';
import OpenAI from 'openai';
import https from 'https';
import type { OutgoingHttpHeaders, RequestOptions } from 'http';
import { Entity, Relation } from '../types.js';
import { QDRANT_URL, COLLECTION_NAME, OPENAI_API_KEY, QDRANT_API_KEY } from '../config.js';
// Custom fetch implementation using Node's HTTPS module
async function customFetch(url: string, options: RequestInit = {}): Promise<Response> {
return new Promise((resolve, reject) => {
const urlObj = new URL(url);
const headers: OutgoingHttpHeaders = {
'Accept': 'application/json',
'Content-Type': 'application/json'
};
if (options.headers) {
// Convert headers from RequestInit to OutgoingHttpHeaders
Object.entries(options.headers).forEach(([key, value]) => {
if (value) headers[key] = value.toString();
});
}
const requestOptions: RequestOptions = {
method: options.method || 'GET',
hostname: urlObj.hostname,
port: urlObj.port || urlObj.protocol === 'https:' ? 443 : 80,
path: `${urlObj.pathname}${urlObj.search}`,
headers,
timeout: 60000,
agent: new https.Agent({
rejectUnauthorized: false,
keepAlive: true,
timeout: 60000
})
};
const req = https.request(requestOptions, (res) => {
const chunks: Buffer[] = [];
res.on('data', chunk => chunks.push(chunk));
res.on('end', () => {
const body = Buffer.concat(chunks).toString();
const response = {
ok: res.statusCode && res.statusCode >= 200 && res.statusCode < 300,
status: res.statusCode || 500,
statusText: res.statusMessage || '',
headers: new Headers(Object.entries(res.headers).reduce((acc, [key, value]) => {
if (key && value) acc[key] = Array.isArray(value) ? value.join(', ') : value;
return acc;
}, {} as Record<string, string>)),
json: async () => JSON.parse(body),
text: async () => body
} as Response;
resolve(response);
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy();
reject(new Error('Request timed out'));
});
if (options.body) {
req.write(typeof options.body === 'string' ? options.body : JSON.stringify(options.body));
}
req.end();
});
}
// Override global fetch for the Qdrant client
if (typeof globalThis !== 'undefined') {
(globalThis as any).fetch = customFetch;
}
interface EntityPayload extends Entity {
type: 'entity';
}
interface RelationPayload extends Relation {
type: 'relation';
}
type QdrantPayload = EntityPayload | RelationPayload;
function isEntity(payload: Record<string, unknown>): payload is Entity {
return (
typeof payload.name === 'string' &&
typeof payload.entityType === 'string' &&
Array.isArray(payload.observations) &&
payload.observations.every(obs => typeof obs === 'string')
);
}
function isRelation(payload: Record<string, unknown>): payload is Relation {
return (
typeof payload.from === 'string' &&
typeof payload.to === 'string' &&
typeof payload.relationType === 'string'
);
}
export class QdrantPersistence {
private client: QdrantClient;
private openai: OpenAI;
private initialized: boolean = false;
constructor() {
// Validate QDRANT_URL format and protocol
if (!QDRANT_URL.startsWith('http://') && !QDRANT_URL.startsWith('https://')) {
throw new Error('QDRANT_URL must start with http:// or https://');
}
this.client = new QdrantClient({
url: QDRANT_URL,
timeout: 60000,
apiKey: QDRANT_API_KEY,
checkCompatibility: false
});
this.openai = new OpenAI({
apiKey: OPENAI_API_KEY,
});
}
async connect(): Promise<void> {
if (this.initialized) return;
// Add retry logic for initial connection with exponential backoff
let retries = 3;
let delay = 2000; // Start with 2 second delay
while (retries > 0) {
try {
const collections = await this.client.getCollections();
this.initialized = true;
break;
} catch (error: unknown) {
console.error('Connection attempt failed:', error instanceof Error ? error.message : error);
console.error('Full error:', error);
retries--;
if (retries === 0) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
throw new Error(`Failed to connect to Qdrant after 3 attempts: ${errorMessage}`);
}
await new Promise(resolve => setTimeout(resolve, delay));
delay *= 2; // Exponential backoff
}
}
}
async initialize(): Promise<void> {
await this.connect();
try {
await this.client.getCollection(COLLECTION_NAME);
} catch {
// Collection doesn't exist, create it
try {
await this.client.createCollection(COLLECTION_NAME, {
vectors: {
size: 1536, // OpenAI embedding dimension
distance: 'Cosine'
}
});
} catch (error) {
console.error('Error creating collection:', error);
throw error;
}
}
}
private async generateEmbedding(text: string): Promise<number[]> {
const response = await this.openai.embeddings.create({
model: 'text-embedding-ada-002',
input: text
});
return response.data[0].embedding;
}
private async hashString(str: string): Promise<number> {
const encoder = new TextEncoder();
const data = encoder.encode(str);
const hashBuffer = await crypto.subtle.digest('SHA-256', data);
const hashArray = Array.from(new Uint8Array(hashBuffer));
return new DataView(new Uint8Array(hashArray.slice(0, 4)).buffer).getUint32(0);
}
async persistEntity(entity: Entity): Promise<void> {
await this.connect();
const text = `${entity.name} (${entity.entityType}): ${entity.observations.join('. ')}`;
const vector = await this.generateEmbedding(text);
const id = await this.hashString(entity.name);
const payload: Record<string, unknown> = {
type: 'entity' as const,
...entity
};
await this.client.upsert(COLLECTION_NAME, {
points: [{
id,
vector,
payload
}]
});
}
async persistRelation(relation: Relation): Promise<void> {
await this.connect();
const text = `${relation.from} ${relation.relationType} ${relation.to}`;
const vector = await this.generateEmbedding(text);
const id = await this.hashString(`${relation.from}-${relation.relationType}-${relation.to}`);
const payload: Record<string, unknown> = {
type: 'relation' as const,
...relation
};
await this.client.upsert(COLLECTION_NAME, {
points: [{
id,
vector,
payload
}]
});
}
async searchSimilar(query: string, limit: number = 10): Promise<Array<Entity | Relation>> {
await this.connect();
const queryVector = await this.generateEmbedding(query);
const results = await this.client.search(COLLECTION_NAME, {
vector: queryVector,
limit,
with_payload: true
});
const validResults: Array<Entity | Relation> = [];
for (const result of results) {
const payload = result.payload as Record<string, unknown>;
if (payload.type === 'entity' && isEntity(payload)) {
const { type, ...entity } = payload;
validResults.push(entity as Entity);
} else if (payload.type === 'relation' && isRelation(payload)) {
const { type, ...relation } = payload;
validResults.push(relation as Relation);
}
}
return validResults;
}
async deleteEntity(entityName: string): Promise<void> {
await this.connect();
const id = await this.hashString(entityName);
await this.client.delete(COLLECTION_NAME, {
points: [id]
});
}
async deleteRelation(relation: Relation): Promise<void> {
await this.connect();
const id = await this.hashString(`${relation.from}-${relation.relationType}-${relation.to}`);
await this.client.delete(COLLECTION_NAME, {
points: [id]
});
}
}