Skip to main content
Glama

create_streaming_workflow

Design and enable real-time streaming workflows by configuring agents and specifying workflow details using the AutoGen MCP Server's multi-agent conversation framework.

Instructions

Create a workflow with real-time streaming

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
agentsYesList of agent configurations
streamingNoEnable streaming
workflow_nameYesName for the workflow
workflow_typeYesType of workflow

Implementation Reference

  • Handler function for streaming tools including 'create_streaming_workflow'. Manages progress notifications, calls Python subprocess for core logic, and sends SSE streaming updates.
    private async handleStreamingTool(toolName: string, args: any, progressToken?: string): Promise<any> {
      if (progressToken) {
        await this.sendProgressNotification(progressToken, 25, 'Initializing streaming...');
      }
    
      const result = await this.callPythonHandler(toolName, args);
    
      if (args.streaming && this.sseTransports.size > 0) {
        for (const transport of this.sseTransports.values()) {
          try {
            await transport.send({
              jsonrpc: '2.0',
              method: 'notifications/progress',
              params: {
                progressToken: progressToken || 'streaming',
                progress: 75,
                message: 'Streaming updates...',
                data: result,
              },
            });
          } catch (error) {
            console.error('Error sending streaming update:', error);
          }
        }
      }
    
      if (progressToken) {
        await this.sendProgressNotification(progressToken, 100, 'Streaming completed');
      }
    
      return result;
  • Input schema and description for the 'create_streaming_workflow' tool, registered in ListToolsRequestSchema response.
    {
      name: 'create_streaming_workflow',
      description: 'Create a workflow with real-time streaming and progress updates',
      inputSchema: {
        type: 'object',
        properties: {
          workflow_name: { type: 'string', description: 'Name for the workflow' },
          workflow_type: { type: 'string', description: 'Type of workflow' },
          agents: { type: 'array', description: 'List of agent configurations' },
          streaming: { type: 'boolean', description: 'Enable streaming' },
          progress_token: { type: 'string', description: 'Progress token' },
        },
        required: ['workflow_name', 'workflow_type', 'agents'],
      },
  • Registration of the tool in the MCP server's ListToolsRequestSchema handler.
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: 'create_streaming_workflow',
          description: 'Create a workflow with real-time streaming and progress updates',
          inputSchema: {
            type: 'object',
            properties: {
              workflow_name: { type: 'string', description: 'Name for the workflow' },
              workflow_type: { type: 'string', description: 'Type of workflow' },
              agents: { type: 'array', description: 'List of agent configurations' },
              streaming: { type: 'boolean', description: 'Enable streaming' },
              progress_token: { type: 'string', description: 'Progress token' },
            },
            required: ['workflow_name', 'workflow_type', 'agents'],
          },
        },
        {
          name: 'start_streaming_chat',
          description: 'Start a streaming chat session with real-time updates',
          inputSchema: {
            type: 'object',
            properties: {
              agent_name: { type: 'string', description: 'Name of the agent to chat with' },
              message: { type: 'string', description: 'Initial message' },
              streaming: { type: 'boolean', description: 'Enable real-time streaming' },
              progress_token: { type: 'string', description: 'Token for progress notifications' },
            },
            required: ['agent_name', 'message'],
          },
        },
        {
          name: 'create_agent',
          description: 'Create a new AutoGen agent with enhanced capabilities',
          inputSchema: {
            type: 'object',
            properties: {
              name: { type: 'string', description: 'Unique name for the agent' },
              type: { type: 'string', description: 'Agent type' },
              system_message: { type: 'string', description: 'System message' },
              llm_config: { type: 'object', description: 'LLM configuration' },
            },
            required: ['name', 'type'],
          },
        },
        {
          name: 'execute_workflow',
          description: 'Execute a workflow with streaming support',
          inputSchema: {
            type: 'object',
            properties: {
              workflow_name: { type: 'string', description: 'Workflow name' },
              input_data: { type: 'object', description: 'Input data' },
              streaming: { type: 'boolean', description: 'Enable streaming' },
            },
            required: ['workflow_name', 'input_data'],
          },
        },
      ],
    }));
  • Dispatch logic in CallToolRequestSchema handler that routes 'create_streaming_workflow' to the streaming handler.
    if (toolName === 'create_streaming_workflow' || toolName === 'start_streaming_chat') {
      return await this.handleStreamingTool(toolName, args, progressToken);
    }
  • Helper function called by the handler to execute the toolName ('create_streaming_workflow') via spawning Python server.py process.
    private async callPythonHandler(toolName: string, args: any = {}): Promise<any> {
      const scriptPath = join(__dirname, 'autogen_mcp', 'server.py');
      const pythonArgs = [scriptPath, toolName, JSON.stringify(args)];
    
      return new Promise((resolve, reject) => {
        const process = spawn(this.pythonPath, pythonArgs);
        let stdout = '';
        let stderr = '';
    
        process.stdout.on('data', (data) => {
          stdout += data.toString();
        });
    
        process.stderr.on('data', (data) => {
          stderr += data.toString();
        });
    
        process.on('close', (code) => {
          if (code !== 0) {
            reject(new McpError(ErrorCode.InternalError, stderr || 'Python process failed'));
            return;
          }
    
          try {
            const result = JSON.parse(stdout);
            resolve(result);
          } catch (error) {
            reject(new McpError(ErrorCode.InternalError, 'Invalid JSON response from Python'));
          }
        });
    
        process.on('error', (error) => {
          reject(new McpError(ErrorCode.InternalError, error.message));
        });
      });
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DynamicEndpoints/Autogen_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server