import { StreamingManager } from '../../streaming/StreamingManager.js';
export class MemoryStreamingTools {
constructor(factStore) {
this.factStore = factStore;
this.streamingManager = new StreamingManager();
}
registerTools(server) {
this.registerStreamQueryTool(server);
this.registerStreamNextTool(server);
this.registerStreamStatusTool(server);
this.registerStreamCancelTool(server);
this.registerStreamPauseTool(server);
this.registerStreamResumeTool(server);
}
registerStreamQueryTool(server) {
server.registerTool(
'memory_stream_query',
'Start a streaming query for large result sets',
{
type: 'object',
properties: {
query: {
type: 'string',
description: 'Search query text',
},
type: {
type: 'string',
description: 'Filter by fact type',
},
domain: {
type: 'string',
description: 'Filter by domain',
},
chunkSize: {
type: 'integer',
description: 'Number of facts per chunk (default: 10)',
minimum: 1,
maximum: 100,
},
maxResults: {
type: 'integer',
description: 'Maximum total results (default: 1000)',
minimum: 1,
maximum: 10000,
},
},
},
async (args) => {
return await this.handleStreamQuery(args);
}
);
}
registerStreamNextTool(server) {
server.registerTool(
'memory_stream_next',
'Get the next chunk from an active stream',
{
type: 'object',
properties: {
streamId: {
type: 'string',
description: 'The stream ID returned from memory_stream_query',
},
},
required: ['streamId'],
},
async (args) => {
return await this.handleStreamNext(args);
}
);
}
registerStreamStatusTool(server) {
server.registerTool(
'memory_stream_status',
'Get the status of a streaming query',
{
type: 'object',
properties: {
streamId: {
type: 'string',
description: 'The stream ID',
},
},
required: ['streamId'],
},
async (args) => {
return await this.handleStreamStatus(args);
}
);
}
registerStreamCancelTool(server) {
server.registerTool(
'memory_stream_cancel',
'Cancel an active streaming query',
{
type: 'object',
properties: {
streamId: {
type: 'string',
description: 'The stream ID to cancel',
},
},
required: ['streamId'],
},
async (args) => {
return await this.handleStreamCancel(args);
}
);
}
registerStreamPauseTool(server) {
server.registerTool(
'memory_stream_pause',
'Pause an active streaming query',
{
type: 'object',
properties: {
streamId: {
type: 'string',
description: 'The stream ID to pause',
},
},
required: ['streamId'],
},
async (args) => {
return await this.handleStreamPause(args);
}
);
}
registerStreamResumeTool(server) {
server.registerTool(
'memory_stream_resume',
'Resume a paused streaming query',
{
type: 'object',
properties: {
streamId: {
type: 'string',
description: 'The stream ID to resume',
},
},
required: ['streamId'],
},
async (args) => {
return await this.handleStreamResume(args);
}
);
}
async handleStreamQuery(args) {
try {
const streamId = await this.streamingManager.createBatchStream(args, this.factStore, {
chunkSize: args.chunkSize || 10,
maxResults: args.maxResults || 1000,
});
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
streamId,
message: 'Streaming query started',
chunkSize: args.chunkSize || 10,
maxResults: args.maxResults || 1000,
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async handleStreamNext(args) {
try {
const chunk = await this.streamingManager.getNextChunk(args.streamId);
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
...chunk,
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async handleStreamStatus(args) {
try {
const status = await this.streamingManager.getStreamStatus(args.streamId);
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
...status,
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async handleStreamCancel(args) {
try {
const result = await this.streamingManager.cancelStream(args.streamId);
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
...result,
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async handleStreamPause(args) {
try {
const result = await this.streamingManager.pauseStream(args.streamId);
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
streamId: args.streamId,
status: result.status,
message: 'Stream paused successfully',
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async handleStreamResume(args) {
try {
const result = await this.streamingManager.resumeStream(args.streamId);
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: true,
streamId: args.streamId,
status: result.status,
message: 'Stream resumed successfully',
}),
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: JSON.stringify({
success: false,
error: error.message,
}),
},
],
isError: true,
};
}
}
async createBatchStream(query, options = {}) {
try {
const streamId = await this.streamingManager.createBatchStream(query, this.factStore, options);
return {
success: true,
streamId,
options,
};
} catch (error) {
throw new Error(`Failed to create batch stream: ${error.message}`);
}
}
async getStreamStats() {
try {
const stats = this.streamingManager.getStats();
return {
success: true,
...stats,
};
} catch (error) {
throw new Error(`Failed to get stream stats: ${error.message}`);
}
}
async cleanupCompletedStreams() {
try {
const result = await this.streamingManager.cleanupCompletedStreams();
return {
success: true,
cleaned: result.cleaned,
remaining: result.remaining,
};
} catch (error) {
throw new Error(`Failed to cleanup completed streams: ${error.message}`);
}
}
async getAllActiveStreams() {
try {
const streams = this.streamingManager.getAllActiveStreams();
return {
success: true,
streams,
count: streams.length,
};
} catch (error) {
throw new Error(`Failed to get active streams: ${error.message}`);
}
}
getStreamingManager() {
return this.streamingManager;
}
}