/**
* Shared Knowledge Directory Manager
* Handles synchronization of learning data between multiple services
*/
import fs from 'fs/promises';
import path from 'path';
import crypto from 'crypto';
import logger from './logger.js';
export class SharedKnowledgeManager {
constructor(options = {}) {
this.sharedDir = options.sharedDir || '/tmp/mcp-learning-shared';
this.serviceId = options.serviceId || 'mcp-self-learning';
this.syncInterval = options.syncInterval || 60000; // 1 minute
this.lockTimeout = options.lockTimeout || 30000; // 30 seconds
this.knowledgeFile = path.join(this.sharedDir, 'shared-knowledge.json');
this.lockFile = path.join(this.sharedDir, 'knowledge.lock');
this.metadataFile = path.join(this.sharedDir, 'metadata.json');
this.isInitialized = false;
this.syncTimer = null;
}
async initialize() {
if (this.isInitialized) return;
try {
// Create shared directory if it doesn't exist
await fs.mkdir(this.sharedDir, { recursive: true });
// Initialize metadata if it doesn't exist
await this.initializeMetadata();
// Start periodic sync
this.startPeriodicSync();
this.isInitialized = true;
logger.info('Shared knowledge manager initialized', {
sharedDir: this.sharedDir,
serviceId: this.serviceId
});
} catch (error) {
logger.error('Failed to initialize shared knowledge manager', { error: error.message });
throw error;
}
}
async initializeMetadata() {
try {
await fs.access(this.metadataFile);
} catch (error) {
// File doesn't exist, create it
const metadata = {
version: '1.0.0',
created: new Date().toISOString(),
services: {},
lastSync: null,
totalSyncs: 0
};
await fs.writeFile(this.metadataFile, JSON.stringify(metadata, null, 2));
logger.info('Created shared knowledge metadata');
}
}
async acquireLock() {
const maxRetries = 10;
const retryDelay = 1000;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// Try to create lock file
await fs.writeFile(this.lockFile, JSON.stringify({
serviceId: this.serviceId,
pid: process.pid,
timestamp: new Date().toISOString(),
timeout: Date.now() + this.lockTimeout
}), { flag: 'wx' }); // Exclusive create
return true;
} catch (error) {
if (error.code === 'EEXIST') {
// Lock exists, check if it's expired
try {
const lockData = JSON.parse(await fs.readFile(this.lockFile, 'utf8'));
const now = Date.now();
if (now > lockData.timeout) {
// Lock expired, remove it
await fs.unlink(this.lockFile);
logger.warn('Removed expired lock file', {
previousService: lockData.serviceId,
expired: new Date(lockData.timeout).toISOString()
});
continue; // Try again
}
} catch (readError) {
// Corrupted lock file, remove it
await fs.unlink(this.lockFile);
continue; // Try again
}
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
} else {
throw error;
}
}
}
throw new Error('Failed to acquire lock after maximum retries');
}
async releaseLock() {
try {
await fs.unlink(this.lockFile);
} catch (error) {
if (error.code !== 'ENOENT') {
logger.warn('Failed to release lock', { error: error.message });
}
}
}
async exportKnowledgeToShared(localKnowledge) {
try {
await this.acquireLock();
// Read existing shared knowledge
let sharedKnowledge = {};
try {
const data = await fs.readFile(this.knowledgeFile, 'utf8');
sharedKnowledge = JSON.parse(data);
} catch (error) {
if (error.code !== 'ENOENT') {
throw error;
}
// File doesn't exist, start with empty knowledge
sharedKnowledge = {
version: '1.0.0',
services: {},
patterns: {},
knowledge: {},
metadata: {
created: new Date().toISOString(),
lastUpdated: new Date().toISOString(),
totalPatterns: 0,
totalKnowledge: 0
}
};
}
// Merge local knowledge into shared knowledge
const mergedData = this.mergeKnowledge(sharedKnowledge, localKnowledge);
// Update service information
mergedData.services[this.serviceId] = {
lastSync: new Date().toISOString(),
patterns: localKnowledge.patterns?.length || 0,
knowledge: localKnowledge.knowledge?.length || 0,
version: localKnowledge.version || '1.0.0'
};
// Update metadata
mergedData.metadata.lastUpdated = new Date().toISOString();
mergedData.metadata.totalPatterns = Object.keys(mergedData.patterns).length;
mergedData.metadata.totalKnowledge = Object.keys(mergedData.knowledge).length;
// Write merged knowledge back
await fs.writeFile(this.knowledgeFile, JSON.stringify(mergedData, null, 2));
// Update sync metadata
await this.updateSyncMetadata('export');
logger.info('Knowledge exported to shared directory', {
patterns: localKnowledge.patterns?.length || 0,
knowledge: localKnowledge.knowledge?.length || 0,
totalSharedPatterns: mergedData.metadata.totalPatterns
});
return { success: true, mergedData };
} finally {
await this.releaseLock();
}
}
async importKnowledgeFromShared() {
try {
await this.acquireLock();
// Read shared knowledge
const data = await fs.readFile(this.knowledgeFile, 'utf8');
const sharedKnowledge = JSON.parse(data);
// Extract knowledge relevant to this service
const importedKnowledge = this.extractRelevantKnowledge(sharedKnowledge);
// Update sync metadata
await this.updateSyncMetadata('import');
logger.info('Knowledge imported from shared directory', {
patterns: importedKnowledge.patterns?.length || 0,
knowledge: importedKnowledge.knowledge?.length || 0
});
return { success: true, knowledge: importedKnowledge };
} catch (error) {
if (error.code === 'ENOENT') {
// No shared knowledge file yet
return { success: true, knowledge: null };
}
throw error;
} finally {
await this.releaseLock();
}
}
mergeKnowledge(sharedKnowledge, localKnowledge) {
const merged = { ...sharedKnowledge };
// Merge patterns
if (localKnowledge.patterns) {
const patternsArray = Array.isArray(localKnowledge.patterns)
? localKnowledge.patterns
: Object.entries(localKnowledge.patterns);
for (const [key, pattern] of patternsArray) {
const patternKey = typeof key === 'string' ? key : this.generatePatternKey(pattern);
if (merged.patterns[patternKey]) {
// Merge existing pattern
merged.patterns[patternKey] = this.mergePattern(merged.patterns[patternKey], pattern);
} else {
// Add new pattern
merged.patterns[patternKey] = pattern;
}
}
}
// Merge knowledge
if (localKnowledge.knowledge) {
const knowledgeArray = Array.isArray(localKnowledge.knowledge)
? localKnowledge.knowledge
: Object.entries(localKnowledge.knowledge);
for (const [key, knowledge] of knowledgeArray) {
const knowledgeKey = typeof key === 'string' ? key : this.generateKnowledgeKey(knowledge);
if (merged.knowledge[knowledgeKey]) {
// Merge existing knowledge
merged.knowledge[knowledgeKey] = this.mergeKnowledgeItem(merged.knowledge[knowledgeKey], knowledge);
} else {
// Add new knowledge
merged.knowledge[knowledgeKey] = knowledge;
}
}
}
return merged;
}
mergePattern(existingPattern, newPattern) {
return {
...existingPattern,
count: (existingPattern.count || 0) + (newPattern.count || 1),
outcomes: [...(existingPattern.outcomes || []), ...(newPattern.outcomes || [])],
lastSeen: newPattern.lastSeen || new Date().toISOString(),
confidence: Math.max(existingPattern.confidence || 0, newPattern.confidence || 0),
sources: [...new Set([
...(existingPattern.sources || [this.serviceId]),
this.serviceId
])]
};
}
mergeKnowledgeItem(existingKnowledge, newKnowledge) {
return {
...existingKnowledge,
pattern: this.mergePattern(existingKnowledge.pattern || {}, newKnowledge.pattern || {}),
recommendations: [...new Set([
...(existingKnowledge.recommendations || []),
...(newKnowledge.recommendations || [])
])],
optimizations: [...new Set([
...(existingKnowledge.optimizations || []),
...(newKnowledge.optimizations || [])
])],
lastUpdated: new Date().toISOString(),
sources: [...new Set([
...(existingKnowledge.sources || [this.serviceId]),
this.serviceId
])]
};
}
extractRelevantKnowledge(sharedKnowledge) {
// For now, return all shared knowledge
// In the future, could filter based on service needs
return {
patterns: Object.entries(sharedKnowledge.patterns || {}),
knowledge: Object.entries(sharedKnowledge.knowledge || {}),
metadata: sharedKnowledge.metadata,
version: sharedKnowledge.version
};
}
generatePatternKey(pattern) {
const keyData = {
type: pattern.type,
features: pattern.features?.contextualCues
};
return crypto.createHash('md5').update(JSON.stringify(keyData)).digest('hex');
}
generateKnowledgeKey(knowledge) {
const keyData = {
pattern: knowledge.pattern?.type,
domain: knowledge.pattern?.features?.contextualCues?.domain
};
return crypto.createHash('md5').update(JSON.stringify(keyData)).digest('hex');
}
async updateSyncMetadata(operation) {
try {
const metadata = JSON.parse(await fs.readFile(this.metadataFile, 'utf8'));
metadata.lastSync = new Date().toISOString();
metadata.totalSyncs = (metadata.totalSyncs || 0) + 1;
if (!metadata.services[this.serviceId]) {
metadata.services[this.serviceId] = { created: new Date().toISOString() };
}
metadata.services[this.serviceId].lastOperation = operation;
metadata.services[this.serviceId].lastSync = new Date().toISOString();
metadata.services[this.serviceId].totalOperations = (metadata.services[this.serviceId].totalOperations || 0) + 1;
await fs.writeFile(this.metadataFile, JSON.stringify(metadata, null, 2));
} catch (error) {
logger.warn('Failed to update sync metadata', { error: error.message });
}
}
startPeriodicSync() {
if (this.syncTimer) {
clearInterval(this.syncTimer);
}
this.syncTimer = setInterval(async () => {
try {
// This would be called by the parent service with actual data
logger.debug('Periodic sync timer triggered');
} catch (error) {
logger.error('Periodic sync failed', { error: error.message });
}
}, this.syncInterval);
logger.info('Started periodic sync', { interval: this.syncInterval });
}
stopPeriodicSync() {
if (this.syncTimer) {
clearInterval(this.syncTimer);
this.syncTimer = null;
logger.info('Stopped periodic sync');
}
}
async getSharedStatus() {
try {
const [metadata, sharedKnowledge] = await Promise.all([
fs.readFile(this.metadataFile, 'utf8').then(JSON.parse),
fs.readFile(this.knowledgeFile, 'utf8').then(JSON.parse).catch(() => ({}))
]);
return {
sharedDirectory: this.sharedDir,
services: Object.keys(metadata.services),
totalPatterns: sharedKnowledge.metadata?.totalPatterns || 0,
totalKnowledge: sharedKnowledge.metadata?.totalKnowledge || 0,
lastSync: metadata.lastSync,
totalSyncs: metadata.totalSyncs
};
} catch (error) {
logger.error('Failed to get shared status', { error: error.message });
return {
sharedDirectory: this.sharedDir,
error: error.message
};
}
}
async cleanup() {
this.stopPeriodicSync();
await this.releaseLock();
logger.info('Shared knowledge manager cleaned up');
}
}
export default SharedKnowledgeManager;