Skip to main content
Glama
04-tool-proxying.md20.9 kB
# Implementation Step 4: Tool Proxying ## Overview This step implements tool proxying, allowing the hub to forward MCP tool calls to connected instances. Once a session connects to an instance, all that instance's tools become available through the hub. ## Goals 1. Proxy tool calls from hub to connected instances 2. Dynamically expose instance tools after connection 3. Handle streaming responses properly 4. Provide clear error messages for unconnected sessions 5. Support both synchronous and streaming tools ## Technical Design ### Tool Discovery Flow ``` 1. Client calls instances/connect └─> Hub maps session to instance 2. Client calls tools/list └─> Hub checks session mapping └─> If connected: fetch tools from instance └─> Merge with hub tools 3. Client calls tool └─> Hub checks if hub tool or instance tool └─> Route appropriately ``` ### Proxying Architecture ``` MCP Client Hub Instance │ │ │ ├──tools/list───>│ │ │ ├──tools/list──────->│ │ │<──tool list────────┤ │<──merged list──┤ │ │ │ │ ├──tools/call───>│ │ │ ├──tools/call──────->│ │ │<──result───────────┤ │<──result───────┤ │ ``` ## Implementation ### 1. Enhanced Hub Server ```go // internal/mcp/hub_server.go package mcp import ( "context" "encoding/json" "fmt" "strings" "github.com/mark3labs/mcp-go/mcp" ) type HubServer struct { connMgr *ConnectionManager watcher *InstanceWatcher instancesChan chan *discovery.Instance errorsChan chan error // Cache of instance tools (updated periodically) toolsCache map[string][]mcp.Tool // instanceID -> tools resourceCache map[string][]mcp.Resource } // Hub-specific tools that are always available var hubTools = []mcp.Tool{ { Name: "instances/list", Description: "List all running brummer instances", InputSchema: mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{}, }, }, { Name: "instances/connect", Description: "Connect to a specific brummer instance", InputSchema: mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{ "instance_id": map[string]interface{}{ "type": "string", "description": "The ID of the instance to connect to", }, }, Required: []string{"instance_id"}, }, }, { Name: "instances/disconnect", Description: "Disconnect from the current instance", InputSchema: mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{}, }, }, } func (h *HubServer) ListTools(ctx context.Context) (*mcp.ListToolsResult, error) { // Start with hub tools tools := make([]mcp.Tool, len(hubTools)) copy(tools, hubTools) // Get session ID from context sessionID := GetSessionID(ctx) if sessionID == "" { // No session, return hub tools only return &mcp.ListToolsResult{Tools: tools}, nil } // Get connected instance client client := h.connMgr.GetClient(sessionID) if client == nil { // Not connected to instance, return hub tools only return &mcp.ListToolsResult{Tools: tools}, nil } // Fetch tools from instance instanceTools, err := client.ListTools(ctx) if err != nil { // Log error but return hub tools log.Printf("Failed to list instance tools: %v", err) return &mcp.ListToolsResult{Tools: tools}, nil } // Merge instance tools tools = append(tools, instanceTools...) return &mcp.ListToolsResult{Tools: tools}, nil } func (h *HubServer) ListResources(ctx context.Context) (*mcp.ListResourcesResult, error) { // Similar pattern to ListTools resources := []mcp.Resource{} sessionID := GetSessionID(ctx) if sessionID == "" { return &mcp.ListResourcesResult{Resources: resources}, nil } client := h.connMgr.GetClient(sessionID) if client == nil { return &mcp.ListResourcesResult{Resources: resources}, nil } instanceResources, err := client.ListResources(ctx) if err != nil { log.Printf("Failed to list instance resources: %v", err) return &mcp.ListResourcesResult{Resources: resources}, nil } return &mcp.ListResourcesResult{Resources: instanceResources}, nil } func (h *HubServer) CallTool(ctx context.Context, request *mcp.CallToolRequest) (*mcp.CallToolResult, error) { // Check if it's a hub tool if h.isHubTool(request.Name) { return h.callHubTool(ctx, request) } // Must be an instance tool sessionID := GetSessionID(ctx) if sessionID == "" { return nil, fmt.Errorf("not connected to any instance") } client := h.connMgr.GetClient(sessionID) if client == nil { return nil, fmt.Errorf("session not connected to an instance") } // Proxy to instance return client.CallTool(ctx, request.Name, request.Arguments) } func (h *HubServer) isHubTool(name string) bool { for _, tool := range hubTools { if tool.Name == name { return true } } return false } func (h *HubServer) callHubTool(ctx context.Context, request *mcp.CallToolRequest) (*mcp.CallToolResult, error) { switch request.Name { case "instances/list": return h.handleInstancesList(ctx) case "instances/connect": return h.handleInstancesConnect(ctx, request.Arguments) case "instances/disconnect": return h.handleInstancesDisconnect(ctx) default: return nil, fmt.Errorf("unknown hub tool: %s", request.Name) } } func (h *HubServer) handleInstancesList(ctx context.Context) (*mcp.CallToolResult, error) { instances := h.connMgr.ListInstances() var output []map[string]interface{} for _, info := range instances { output = append(output, map[string]interface{}{ "id": info.InstanceID, "name": info.Name, "path": info.Path, "port": info.Port, "pid": info.PID, "state": info.State.String(), "connected_at": info.ConnectedAt, "has_package_json": info.HasPackageJSON, }) } data, _ := json.MarshalIndent(output, "", " ") return &mcp.CallToolResult{ Content: []mcp.Content{{ Type: "text", Text: string(data), }}, }, nil } func (h *HubServer) handleInstancesConnect(ctx context.Context, args json.RawMessage) (*mcp.CallToolResult, error) { var params struct { InstanceID string `json:"instance_id"` } if err := json.Unmarshal(args, &params); err != nil { return nil, fmt.Errorf("invalid arguments: %w", err) } sessionID := GetSessionID(ctx) if sessionID == "" { return nil, fmt.Errorf("no session ID in context") } // Connect session to instance if err := h.connMgr.ConnectSession(sessionID, params.InstanceID); err != nil { return nil, err } // Get instance info instances := h.connMgr.ListInstances() var instanceName string for _, info := range instances { if info.InstanceID == params.InstanceID { instanceName = info.Name break } } return &mcp.CallToolResult{ Content: []mcp.Content{{ Type: "text", Text: fmt.Sprintf("Connected to instance '%s' (%s)", instanceName, params.InstanceID), }}, }, nil } func (h *HubServer) handleInstancesDisconnect(ctx context.Context) (*mcp.CallToolResult, error) { sessionID := GetSessionID(ctx) if sessionID == "" { return nil, fmt.Errorf("no session ID in context") } if err := h.connMgr.DisconnectSession(sessionID); err != nil { return nil, err } return &mcp.CallToolResult{ Content: []mcp.Content{{ Type: "text", Text: "Disconnected from instance", }}, }, nil } // Resource proxying func (h *HubServer) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error) { sessionID := GetSessionID(ctx) if sessionID == "" { return nil, fmt.Errorf("not connected to any instance") } client := h.connMgr.GetClient(sessionID) if client == nil { return nil, fmt.Errorf("session not connected to an instance") } return client.ReadResource(ctx, uri) } // Prompt proxying func (h *HubServer) ListPrompts(ctx context.Context) (*mcp.ListPromptsResult, error) { sessionID := GetSessionID(ctx) if sessionID == "" { return &mcp.ListPromptsResult{Prompts: []mcp.Prompt{}}, nil } client := h.connMgr.GetClient(sessionID) if client == nil { return &mcp.ListPromptsResult{Prompts: []mcp.Prompt{}}, nil } return client.ListPrompts(ctx) } func (h *HubServer) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error) { sessionID := GetSessionID(ctx) if sessionID == "" { return nil, fmt.Errorf("not connected to any instance") } client := h.connMgr.GetClient(sessionID) if client == nil { return nil, fmt.Errorf("session not connected to an instance") } return client.GetPrompt(ctx, name, args) } ``` ### 2. Enhanced Hub Client ```go // internal/mcp/hub_client.go // Add these methods to HubClient func (c *HubClient) ListTools(ctx context.Context) ([]mcp.Tool, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "tools/list", } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response struct { Tools []mcp.Tool `json:"tools"` } if err := json.Unmarshal(result, &response); err != nil { return nil, err } return response.Tools, nil } func (c *HubClient) CallTool(ctx context.Context, name string, args json.RawMessage) (*mcp.CallToolResult, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "tools/call", "params": map[string]interface{}{ "name": name, "arguments": args, }, } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response mcp.CallToolResult if err := json.Unmarshal(result, &response); err != nil { return nil, err } return &response, nil } func (c *HubClient) ListResources(ctx context.Context) ([]mcp.Resource, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "resources/list", } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response struct { Resources []mcp.Resource `json:"resources"` } if err := json.Unmarshal(result, &response); err != nil { return nil, err } return response.Resources, nil } func (c *HubClient) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "resources/read", "params": map[string]interface{}{ "uri": uri, }, } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response mcp.ReadResourceResult if err := json.Unmarshal(result, &response); err != nil { return nil, err } return &response, nil } func (c *HubClient) ListPrompts(ctx context.Context) (*mcp.ListPromptsResult, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "prompts/list", } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response mcp.ListPromptsResult if err := json.Unmarshal(result, &response); err != nil { return nil, err } return &response, nil } func (c *HubClient) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "prompts/get", "params": map[string]interface{}{ "name": name, "arguments": args, }, } result, err := c.sendRequest(ctx, request) if err != nil { return nil, err } var response mcp.GetPromptResult if err := json.Unmarshal(result, &response); err != nil { return nil, err } return &response, nil } ``` ### 3. Session Context Management ```go // internal/mcp/session.go package mcp import ( "context" ) type contextKey string const sessionIDKey contextKey = "mcp-session-id" // GetSessionID extracts session ID from context func GetSessionID(ctx context.Context) string { if id, ok := ctx.Value(sessionIDKey).(string); ok { return id } return "" } // WithSessionID adds session ID to context func WithSessionID(ctx context.Context, sessionID string) context.Context { return context.WithValue(ctx, sessionIDKey, sessionID) } ``` ### 4. Streaming Support ```go // internal/mcp/streaming.go package mcp import ( "bufio" "context" "encoding/json" "io" "net/http" ) // StreamingResult handles SSE responses type StreamingResult struct { Events <-chan StreamEvent Errors <-chan error Cancel func() } type StreamEvent struct { Type string Data json.RawMessage } func (c *HubClient) CallToolStreaming(ctx context.Context, name string, args json.RawMessage) (*StreamingResult, error) { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "tools/call", "params": map[string]interface{}{ "name": name, "arguments": args, }, } body, _ := json.Marshal(request) req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL, bytes.NewReader(body)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "text/event-stream") resp, err := c.httpClient.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("HTTP %d", resp.StatusCode) } events := make(chan StreamEvent, 100) errors := make(chan error, 1) ctx, cancel := context.WithCancel(ctx) go func() { defer close(events) defer close(errors) defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) var eventType string var eventData []byte for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "event: ") { eventType = strings.TrimPrefix(line, "event: ") } else if strings.HasPrefix(line, "data: ") { eventData = []byte(strings.TrimPrefix(line, "data: ")) } else if line == "" && eventType != "" { // End of event select { case events <- StreamEvent{Type: eventType, Data: eventData}: case <-ctx.Done(): return } eventType = "" eventData = nil } } if err := scanner.Err(); err != nil { select { case errors <- err: case <-ctx.Done(): } } }() return &StreamingResult{ Events: events, Errors: errors, Cancel: cancel, }, nil } ``` ## Testing Plan ### 1. Unit Tests ```go // internal/mcp/hub_server_test.go func TestToolProxying(t *testing.T) { // Setup hub := createTestHub() instance := createTestInstance() // Connect session to instance sessionID := "test-session" err := hub.connMgr.ConnectSession(sessionID, instance.ID) require.NoError(t, err) // List tools should include instance tools ctx := WithSessionID(context.Background(), sessionID) result, err := hub.ListTools(ctx) require.NoError(t, err) // Should have hub tools + instance tools assert.True(t, len(result.Tools) > len(hubTools)) // Should be able to call instance tool toolResult, err := hub.CallTool(ctx, &mcp.CallToolRequest{ Name: "scripts/list", }) require.NoError(t, err) assert.NotNil(t, toolResult) } func TestDisconnectedSession(t *testing.T) { hub := createTestHub() // Without connection, only hub tools available ctx := context.Background() result, err := hub.ListTools(ctx) require.NoError(t, err) assert.Len(t, result.Tools, len(hubTools)) // Calling instance tool should fail _, err = hub.CallTool(ctx, &mcp.CallToolRequest{ Name: "scripts/list", }) assert.Error(t, err) assert.Contains(t, err.Error(), "not connected") } ``` ### 2. Integration Test ```bash #!/bin/bash # test_tool_proxying.sh # Start instance cd test-project brum --no-tui & INSTANCE_PID=$! sleep 2 # Start hub brum --mcp > hub.log 2>&1 & HUB_PID=$! sleep 1 # Connect to instance INSTANCE_ID=$(echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"instances/list"}}' | \ nc -q 1 localhost 7777 | jq -r '.result.content[0].text' | jq -r '.[0].id') echo '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"instances/connect","arguments":{"instance_id":"'$INSTANCE_ID'"}}}' | \ nc -q 1 localhost 7777 # Now list tools - should include instance tools TOOLS=$(echo '{"jsonrpc":"2.0","id":3,"method":"tools/list"}' | nc -q 1 localhost 7777) echo "$TOOLS" | grep -q "scripts/list" # Call instance tool through hub echo '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"scripts/list"}}' | \ nc -q 1 localhost 7777 # Cleanup kill $INSTANCE_PID $HUB_PID ``` ### 3. Streaming Test ```go func TestStreamingProxy(t *testing.T) { // Test that streaming responses are properly proxied // Verify events flow through correctly // Check cancellation works } ``` ## Success Criteria 1. ✅ Hub tools always available 2. ✅ Instance tools available after connection 3. ✅ Tool calls properly proxied 4. ✅ Error messages for unconnected sessions 5. ✅ Resource and prompt proxying works 6. ✅ Streaming responses handled correctly 7. ✅ Session context properly managed 8. ✅ Clean disconnect removes instance tools ## Edge Cases ### 1. Instance Goes Down - Tool calls should fail gracefully - tools/list should update on next call ### 2. Multiple Sessions - Each session can connect to different instance - Tools isolated per session ### 3. Tool Name Conflicts - Hub tools take precedence - Document naming conventions ### 4. Large Responses - Stream if possible - Set reasonable size limits ### 5. Slow Instance - Timeout on tool calls - Return error to client ## Security Considerations 1. **Input Validation** - Validate all tool arguments - Prevent injection attacks 2. **Session Isolation** - Sessions can't access other sessions' instances - Clear session data on disconnect 3. **Resource Limits** - Limit concurrent connections - Timeout long-running operations ## Next Steps 1. Step 5: Add health monitoring with MCP ping 2. Step 6: End-to-end testing ## Code Checklist - [ ] Update hub server with proxying logic - [ ] Add all MCP methods to hub client - [ ] Implement session context management - [ ] Add streaming support - [ ] Create comprehensive tests - [ ] Handle all error cases - [ ] Document tool naming conventions - [ ] Test with real MCP client

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server