PolyMarket MCP Server

by berlinbra
Verified
  • src
import * as fs from 'fs/promises'; import { watch, FSWatcher } from 'fs'; import * as path from 'path'; import { FileProcessor } from './fileProcessor.js'; import { ProcessedDocument } from './types.js'; type FileChangeCallback = (type: 'add' | 'change' | 'unlink', filePath: string) => Promise<void>; export class FileWatcher { private watchers: Map<string, FSWatcher> = new Map(); private fileProcessor: FileProcessor; private processedPaths: Set<string> = new Set(); private processingQueue: Set<string> = new Set(); private onFileChange: FileChangeCallback; constructor(fileProcessor: FileProcessor, onFileChange: FileChangeCallback) { this.fileProcessor = fileProcessor; this.onFileChange = onFileChange; } setupDirectoryWatch(dirPath: string): void { // Set up watchers without waiting for file processing this.setupWatcher(dirPath); } async processDirectory(dirPath: string): Promise<void> { // Process existing files in the background await this.processExistingFiles(dirPath); } async watchDirectory(dirPath: string): Promise<void> { // For backwards compatibility and cases where blocking is desired this.setupDirectoryWatch(dirPath); await this.processDirectory(dirPath); } private async processExistingFiles(dirPath: string): Promise<void> { const items = await fs.readdir(dirPath, { withFileTypes: true }); for (const item of items) { const fullPath = path.join(dirPath, item.name); if (item.isDirectory()) { await this.processExistingFiles(fullPath); } else if (item.isFile()) { try { await this.handleFileChange('add', fullPath); } catch (error) { // Skip unsupported files if (error instanceof Error && !error.message.startsWith('Unsupported file type')) { console.error(`Error processing file ${fullPath}:`, error); } } } } } private setupWatcher(dirPath: string): void { // Recursively watch the directory const watcher = watch( dirPath, { recursive: true }, async (eventType, filename) => { if (!filename) return; const fullPath = path.join(dirPath, filename); try { // Check if the file exists to determine if it was deleted const exists = await fs.access(fullPath).then(() => true).catch(() => false); if (!exists) { await this.handleFileChange('unlink', fullPath); } else { const stats = await fs.stat(fullPath); if (stats.isFile()) { await this.handleFileChange(eventType === 'rename' ? 'add' : 'change', fullPath); } } } catch (error) { console.error(`Error handling file change for ${fullPath}:`, error); } } ); this.watchers.set(dirPath, watcher); } private async handleFileChange(type: 'add' | 'change' | 'unlink', filePath: string): Promise<void> { // Skip if file is already being processed if (this.processingQueue.has(filePath)) { return; } try { // Add to processing queue this.processingQueue.add(filePath); // Notify callback await this.onFileChange(type, filePath); // Update processed paths if (type === 'unlink') { this.processedPaths.delete(filePath); } else { this.processedPaths.add(filePath); } } finally { // Remove from processing queue this.processingQueue.delete(filePath); } } isProcessing(filePath: string): boolean { return this.processingQueue.has(filePath); } async close(): Promise<void> { for (const watcher of this.watchers.values()) { watcher.close(); } this.watchers.clear(); this.processedPaths.clear(); this.processingQueue.clear(); } }