Skip to main content
Glama
by OpaqueGlass
index.ts6.94 kB
import { debugPush } from "@/logger"; import { getJSONFile, putJSONFile, removeFileAPI } from "@/syapi"; // 这是一个简化的锁实现,用于防止异步函数重入。 // 在单线程的JS环境中,它通过一个Promise队列来确保操作按顺序执行。 class AsyncLock { private promise = Promise.resolve(); public async acquire<T>(fn: () => Promise<T>): Promise<T> { return new Promise((resolve, reject) => { this.promise = this.promise .then(() => fn().then(resolve, reject)) .catch(() => {}); // 捕获之前的错误,不影响新任务 }); } } export class CacheQueue<T> { private readonly cacheDir: string; private readonly readFilePath: string; private readonly writeFilePath: string; private readCache: T[] = []; private isRotating = false; // 使用锁来确保对文件的读写和轮换操作是原子性的 private readonly readLock = new AsyncLock(); private readonly writeLock = new AsyncLock(); constructor(cacheDir: string) { this.cacheDir = cacheDir; // 为简单起见,我们使用 .json 而不是 .jsonl,因为 syapi 的函数似乎是为完整的JSON数组设计的 this.readFilePath = `${this.cacheDir}/cache_a.json`; this.writeFilePath = `${this.cacheDir}/cache_b.json`; } /** * 初始化缓存队列,确保缓存文件存在。 * 应该在使用队列前调用一次。 */ public async init(): Promise<void> { const readData = await getJSONFile(this.readFilePath); if (readData == null) { await putJSONFile(this.readFilePath, [], false); } const writeData = await getJSONFile(this.writeFilePath); if (writeData == null) { await putJSONFile(this.writeFilePath, [], false); } } /** * 将一个新项目添加到写队列(文件b)中。 * 此操作是线程安全的。 * @param item 要添加的项目。 */ public async addToQueue(item: T): Promise<void> { debugPush("item", item); return this.writeLock.acquire(async () => { let queue: T[] = []; try { // 读取文件b的现有内容 queue = await getJSONFile(this.writeFilePath); if (!Array.isArray(queue)) { queue = []; // 如果文件内容不是数组,则重置 } } catch (error) { // 文件可能为空或不存在,这没关系 queue = []; } // 添加新项目并写回 queue.push(item); await putJSONFile(this.writeFilePath, queue, false); }); } /** * 将一个新项目添加到写队列(文件b)中。 * 此操作是线程安全的。 * @param item 要添加的项目。 */ public async batchAddToQueue(items: T[]): Promise<void> { return this.writeLock.acquire(async () => { let queue: T[] = []; try { // 读取文件b的现有内容 queue = await getJSONFile(this.writeFilePath); if (!Array.isArray(queue)) { queue = []; // 如果文件内容不是数组,则重置 } } catch (error) { // 文件可能为空或不存在,这没关系 queue = []; } // 添加新项目并写回 queue.push(...items); await putJSONFile(this.writeFilePath, queue, false); }); } /** * 从读队列(文件a)中消费指定数量的项目。 * 如果文件a消费完毕,它会自动轮换文件b为新的文件a。 * 此操作是线程安全的。 * @param count 希望消费的项目的数量。 * @returns 一个包含已消费项目的数组。 */ public async consume(count: number): Promise<T[]> { return this.readLock.acquire(async () => { // 如果内存缓存为空,则从文件a加载 if (this.readCache.length === 0) { await this.preloadCacheFromFile(); } // 如果缓存仍然为空,说明文件a是空的,尝试进行文件轮换 if (this.readCache.length === 0) { await this.rotateFiles(); // 轮换后再次尝试从新的文件a加载 await this.preloadCacheFromFile(); } // 如果在所有尝试之后缓存仍然为空,则说明整个队列都是空的 if (this.readCache.length === 0) { return []; } // 从内存缓存中取出所需数量的项目 const consumedCount = Math.min(count, this.readCache.length); const consumedItems = this.readCache.splice(0, consumedCount); return consumedItems; }); } /** * 内部方法:从读文件(文件a)加载内容到内存缓存。 * 加载后,文件a将被清空。 */ private async preloadCacheFromFile(): Promise<void> { try { const data = await getJSONFile(this.readFilePath); if (Array.isArray(data) && data.length > 0) { this.readCache = data; // 清空文件a,因为内容已在内存中 await putJSONFile(this.readFilePath, [], false); } } catch (e) { // 文件为空或不存在,缓存保持为空 this.readCache = []; } } /** * 轮换文件:将文件b的内容移动到文件a,然后清空文件b。 * 这是一个原子操作。 */ private async rotateFiles(): Promise<void> { // 防止重入 if (this.isRotating) return; this.isRotating = true; try { // 使用写锁来确保在轮换期间没有新的写入操作干扰 await this.writeLock.acquire(async () => { let itemsToMove: T[] = []; try { itemsToMove = await getJSONFile(this.writeFilePath); } catch (e) { // 文件b为空或不存在,无需移动 } if (!Array.isArray(itemsToMove) || itemsToMove.length === 0) { // 如果文件b没有内容,则无需轮换 return; } // 将文件b的内容写入文件a await putJSONFile(this.readFilePath, itemsToMove, false); // 清空文件b await removeFileAPI(this.writeFilePath).catch(() => {}); // 忽略删除错误 await putJSONFile(this.writeFilePath, [], false); // 创建一个新的空文件b }); } finally { this.isRotating = false; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/OpaqueGlass/syplugin-anMCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server