Skip to main content
Glama
bridge_server.py38.3 kB
# # MCP Foxxy Bridge - Bridge Server # # Copyright (C) 2024 Billy Bryant # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # """Create an MCP server that bridges multiple MCP servers. This server aggregates capabilities from multiple MCP servers and provides a unified interface for AI tools to interact with all of them. """ import asyncio import logging import time from typing import Any, Protocol from mcp import server, types from mcp.shared.exceptions import McpError from pydantic import BaseModel from mcp_foxxy_bridge.config.config_loader import BridgeConfiguration, BridgeServerConfig from mcp_foxxy_bridge.utils.logging import get_logger from .server_manager import ServerManager logger = get_logger(__name__, facility="BRIDGE") class ServerManagerProtocol(Protocol): """Protocol for server manager interface used by bridge server.""" capability_change_notifier: Any bridge_config: Any def get_active_servers(self) -> list[Any]: """Get list of currently active servers.""" ... def get_aggregated_tools(self) -> list[types.Tool]: """Get aggregated tools from all active servers.""" ... def get_aggregated_resources(self) -> list[types.Resource]: """Get aggregated resources from all active servers.""" ... def get_aggregated_prompts(self) -> list[types.Prompt]: """Get aggregated prompts from all active servers.""" ... async def call_tool(self, name: str, arguments: dict[str, Any]) -> types.CallToolResult: """Call a tool on the appropriate server.""" ... async def read_resource(self, uri: str) -> types.ReadResourceResult: """Read a resource from the appropriate server.""" ... async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> types.GetPromptResult: """Get a prompt from the appropriate server.""" ... async def set_logging_level(self, level: types.LoggingLevel) -> None: """Set logging level on all servers.""" ... async def get_completions( self, ref: types.ResourceReference | types.PromptReference, argument: types.CompletionArgument ) -> list[str]: """Get completions for a resource or prompt.""" ... async def subscribe_resource(self, uri: str) -> None: """Subscribe to resource updates.""" ... async def unsubscribe_resource(self, uri: str) -> None: """Unsubscribe from resource updates.""" ... # Registry to store server manager instances for proper cleanup _server_manager_registry: dict[Any, ServerManager] = {} # Custom notification types for capability changes class CapabilitiesChangedParams(BaseModel): """Parameters for capabilities changed notification.""" tools_added: list[str] = [] tools_removed: list[str] = [] resources_added: list[str] = [] resources_removed: list[str] = [] prompts_added: list[str] = [] prompts_removed: list[str] = [] message: str = "Server capabilities have changed" class CapabilitiesChangedNotification(BaseModel): """Notification sent when server capabilities change.""" method: str = "notifications/capabilities_changed" params: CapabilitiesChangedParams # Store connected clients for capability change notifications _connected_clients: set[Any] = set() def _configure_prompts_capability( app: server.Server[object], server_manager: ServerManagerProtocol, ) -> None: """Configure prompts capability for the bridge server.""" logger.debug("Configuring prompts aggregation...") async def _list_prompts(_: types.ListPromptsRequest) -> types.ServerResult: try: prompts = server_manager.get_aggregated_prompts() result = types.ListPromptsResult(prompts=prompts) return types.ServerResult(result) except Exception: logger.exception("Error listing prompts") return types.ServerResult(types.ListPromptsResult(prompts=[])) app.request_handlers[types.ListPromptsRequest] = _list_prompts async def _get_prompt(req: types.GetPromptRequest) -> types.ServerResult: try: result = await server_manager.get_prompt( req.params.name, req.params.arguments, ) return types.ServerResult(result) except McpError as e: # Re-raise MCP errors so they're properly returned to the client logger.warning("MCP error getting prompt '%s': %s", req.params.name, e.error.message) raise except Exception: logger.exception("Error getting prompt '%s'", req.params.name) return types.ServerResult( types.GetPromptResult( description=f"Error retrieving prompt: {req.params.name}", messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text="Error occurred while retrieving prompt", ), ), ], ), ) app.request_handlers[types.GetPromptRequest] = _get_prompt def _configure_resources_capability( app: server.Server[object], server_manager: ServerManagerProtocol, ) -> None: """Configure resources capability for the bridge server.""" logger.debug("Configuring resources aggregation...") async def _list_resources(_: types.ListResourcesRequest) -> types.ServerResult: try: resources = server_manager.get_aggregated_resources() result = types.ListResourcesResult(resources=resources) return types.ServerResult(result) except Exception: logger.exception("Error listing resources") return types.ServerResult(types.ListResourcesResult(resources=[])) app.request_handlers[types.ListResourcesRequest] = _list_resources async def _list_resource_templates( _: types.ListResourceTemplatesRequest, ) -> types.ServerResult: # For now, return empty templates as we don't aggregate templates yet result = types.ListResourceTemplatesResult(resourceTemplates=[]) return types.ServerResult(result) app.request_handlers[types.ListResourceTemplatesRequest] = _list_resource_templates async def _read_resource(req: types.ReadResourceRequest) -> types.ServerResult: try: result = await server_manager.read_resource(str(req.params.uri)) return types.ServerResult(result) except McpError as e: # Re-raise MCP errors so they're properly returned to the client logger.warning("MCP error reading resource '%s': %s", req.params.uri, e.error.message) raise except Exception: logger.exception("Error reading resource '%s'", req.params.uri) return types.ServerResult( types.ReadResourceResult( contents=[ types.TextResourceContents( uri=req.params.uri, mimeType="text/plain", text="Error occurred while reading resource", ), ], ), ) app.request_handlers[types.ReadResourceRequest] = _read_resource async def _subscribe_resource(req: types.SubscribeRequest) -> types.ServerResult: try: await server_manager.subscribe_resource(str(req.params.uri)) logger.debug("Successfully subscribed to resource: %s", req.params.uri) return types.ServerResult(types.EmptyResult()) except Exception: logger.exception("Error subscribing to resource: %s", req.params.uri) return types.ServerResult(types.EmptyResult()) app.request_handlers[types.SubscribeRequest] = _subscribe_resource async def _unsubscribe_resource( req: types.UnsubscribeRequest, ) -> types.ServerResult: try: await server_manager.unsubscribe_resource(str(req.params.uri)) logger.debug("Successfully unsubscribed from resource: %s", req.params.uri) return types.ServerResult(types.EmptyResult()) except Exception: logger.exception("Error unsubscribing from resource: %s", req.params.uri) return types.ServerResult(types.EmptyResult()) app.request_handlers[types.UnsubscribeRequest] = _unsubscribe_resource def _configure_tools_capability( app: server.Server[object], server_manager: ServerManagerProtocol, ) -> None: """Configure tools capability for the bridge server.""" logger.debug("Configuring tools aggregation...") async def _list_tools(_: types.ListToolsRequest) -> types.ServerResult: try: logger.debug("Listing tools from server manager") # Get active servers first to diagnose active_servers = server_manager.get_active_servers() logger.debug("Active servers for tool listing: %d", len(active_servers)) for server in active_servers: logger.debug("Active server: '%s' with %d tools", server.name, len(server.tools)) tools = server_manager.get_aggregated_tools() logger.info("Found %d aggregated tools total", len(tools)) if len(tools) == 0: logger.warning("No tools available from any servers") if len(active_servers) == 0: logger.error("No active servers available") else: logger.warning("Active servers exist but no tools aggregated") else: for tool in tools: logger.debug("Aggregated tool: %s", tool.name) result = types.ListToolsResult(tools=tools) return types.ServerResult(result) except Exception: logger.exception("Error listing tools") return types.ServerResult(types.ListToolsResult(tools=[])) app.request_handlers[types.ListToolsRequest] = _list_tools async def _call_tool(req: types.CallToolRequest) -> types.ServerResult: tool_start_time = time.time() try: logger.debug("Calling tool '%s'", req.params.name) result = await server_manager.call_tool( req.params.name, req.params.arguments or {}, ) elapsed = time.time() - tool_start_time logger.debug("Tool '%s' completed in %.3f seconds", req.params.name, elapsed) return types.ServerResult(result) except TimeoutError: elapsed = time.time() - tool_start_time logger.exception("Tool '%s' timed out after %.3f seconds", req.params.name, elapsed) raise except McpError as e: elapsed = time.time() - tool_start_time logger.warning( "MCP error calling tool '%s' after %.3f seconds: %s", req.params.name, elapsed, e.error.message ) raise except Exception: elapsed = time.time() - tool_start_time logger.exception("Error calling tool '%s' after %.3f seconds", req.params.name, elapsed) return types.ServerResult( types.CallToolResult( content=[ types.TextContent( type="text", text=f"Error occurred while calling tool: {req.params.name}", ), ], ), ) app.request_handlers[types.CallToolRequest] = _call_tool def _configure_logging_capability( app: server.Server[object], server_manager: ServerManagerProtocol, ) -> None: """Configure logging capability for the bridge server.""" async def _set_logging_level(req: types.SetLevelRequest) -> types.ServerResult: try: level = req.params.level bridge_logger = logging.getLogger("mcp_foxxy_bridge") level_str = str(level).lower() if level_str == "debug": bridge_logger.setLevel(logging.DEBUG) elif level_str == "info": bridge_logger.setLevel(logging.INFO) elif level_str == "warning": bridge_logger.setLevel(logging.WARNING) elif level_str == "error": bridge_logger.setLevel(logging.ERROR) # Forward logging level to all managed servers await server_manager.set_logging_level(level) logger.info( "Set logging level to %s", str(level), ) return types.ServerResult(types.EmptyResult()) except Exception: logger.exception("Error setting logging level") return types.ServerResult(types.EmptyResult()) app.request_handlers[types.SetLevelRequest] = _set_logging_level def _configure_notifications_and_completion( app: server.Server[object], server_manager: ServerManagerProtocol, ) -> None: """Configure progress notifications and completion for the bridge server.""" # Add progress notification handler async def _send_progress_notification(req: types.ProgressNotification) -> None: logger.debug("Progress notification: %s/%s", req.params.progress, req.params.total) # Bridge typically receives progress notifications from managed servers # and relays them to clients transparently. The MCP framework handles # the actual forwarding to connected clients automatically. # Log the progress for debugging purposes if req.params.total and req.params.total > 0: percentage = (req.params.progress / req.params.total) * 100 logger.info( "Progress update: %.1f%% (%s/%s)", percentage, req.params.progress, req.params.total, ) else: logger.info("Progress update: %s", req.params.progress) app.notification_handlers[types.ProgressNotification] = _send_progress_notification # Add capability change notification handler async def _send_capability_change_notification(notification_data: dict[str, Any]) -> None: """Send capability change notification to all connected clients.""" try: # Create notification params params = CapabilitiesChangedParams(**notification_data) # Create a custom notification using the MCP types system # We'll use a generic notification structure that MCP supports _ = types.JSONRPCNotification( jsonrpc="2.0", method="notifications/capabilities_changed", params=params.model_dump(), ) # The MCP framework should handle sending this to all connected clients # For now, we'll log the notification logger.info( "Capability change notification: %d tools added, %d removed, " "%d resources added, %d removed, %d prompts added, %d removed", len(params.tools_added), len(params.tools_removed), len(params.resources_added), len(params.resources_removed), len(params.prompts_added), len(params.prompts_removed), ) # Note: The actual client notification sending needs to be done through # the MCP server framework's session management system except Exception: logger.exception("Error sending capability change notification") # Store the notification sender in the server manager for use during updates server_manager.capability_change_notifier = _send_capability_change_notification # Add completion handler async def _complete(req: types.CompleteRequest) -> types.ServerResult: try: # Aggregate completions from all managed servers completions = await server_manager.get_completions( req.params.ref, req.params.argument, ) result = types.CompleteResult(completion=types.Completion(values=completions)) logger.debug("Returning %d aggregated completions", len(completions)) return types.ServerResult(result) except Exception: logger.exception("Error handling completion") return types.ServerResult(types.CompleteResult(completion=types.Completion(values=[]))) app.request_handlers[types.CompleteRequest] = _complete async def create_bridge_server( bridge_config: BridgeConfiguration, ) -> server.Server[object]: """Create a bridge server that aggregates multiple MCP servers. Args: bridge_config: Configuration for the bridge and all MCP servers. Returns: A configured MCP server that bridges to multiple backends. """ logger.info("Creating bridge server with %d configured servers", len(bridge_config.servers)) # Create the server manager without starting it yet server_manager = ServerManager(bridge_config) # Create the bridge server first bridge_name = "MCP Foxxy Bridge" app: server.Server[object] = server.Server(name=bridge_name) # Store server manager for cleanup using registry _server_manager_registry[id(app)] = server_manager # Configure capabilities based on aggregation settings if bridge_config.bridge and bridge_config.bridge.aggregation and bridge_config.bridge.aggregation.prompts: _configure_prompts_capability(app, server_manager) if bridge_config.bridge and bridge_config.bridge.aggregation and bridge_config.bridge.aggregation.resources: _configure_resources_capability(app, server_manager) if bridge_config.bridge and bridge_config.bridge.aggregation and bridge_config.bridge.aggregation.tools: _configure_tools_capability(app, server_manager) # Add logging capability logger.debug("Configuring logging...") _configure_logging_capability(app, server_manager) # Add notifications and completion capabilities _configure_notifications_and_completion(app, server_manager) # Start server manager asynchronously in the background # This allows the bridge server to start immediately without waiting for all servers start_task = asyncio.create_task(server_manager.start()) # Store task reference to prevent garbage collection if not hasattr(app, "background_tasks"): app.background_tasks = set() # type: ignore[attr-defined] app.background_tasks.add(start_task) # type: ignore[attr-defined] start_task.add_done_callback(app.background_tasks.discard) # type: ignore[attr-defined] logger.info("Bridge server created successfully, servers connecting in background...") return app async def shutdown_bridge_server(app: server.Server[object]) -> None: """Shutdown the bridge server and clean up resources. Args: app: The bridge server to shutdown. """ logger.info("Shutting down bridge server...") # Stop the server manager if it exists in registry app_id = id(app) if app_id in _server_manager_registry: server_manager = _server_manager_registry.pop(app_id) if server_manager: await server_manager.stop() logger.info("Bridge server shutdown complete") async def create_tag_filtered_bridge( servers: dict[str, BridgeServerConfig], tags: list[str], tag_mode: str = "intersection", bridge_name_suffix: str = "", ) -> server.Server[object]: """Create a bridge server with servers filtered by tags. Args: servers: Dictionary of all available servers tags: List of tags to filter by tag_mode: "intersection" (servers must have ALL tags) or "union" (servers must have ANY tag) bridge_name_suffix: Optional suffix for the bridge name (e.g., tag names) Returns: A configured MCP server that bridges to tag-filtered servers """ def matches_tag_filter(server_config: BridgeServerConfig) -> bool: if not server_config.tags: return False server_tags = set(server_config.tags) filter_tags = set(tags) if tag_mode == "intersection": return filter_tags.issubset(server_tags) if tag_mode == "union": return bool(filter_tags.intersection(server_tags)) return False # Filter servers by tag criteria filtered_servers = { name: config for name, config in servers.items() if config.enabled and matches_tag_filter(config) } logger.info( "Creating tag-filtered bridge for tags %s (%s mode) - %d servers match", tags, tag_mode, len(filtered_servers), ) if not filtered_servers: logger.warning("No servers match the tag filter: %s (%s)", tags, tag_mode) # Create bridge configuration with filtered servers tag_bridge_config = BridgeConfiguration( servers=filtered_servers, bridge=None, # Use default bridge config ) # Create server manager with filtered servers server_manager = ServerManager(tag_bridge_config) await server_manager.start() # Create the bridge server tag_display = "+".join(tags) if tag_mode == "intersection" else ",".join(tags) bridge_name = f"MCP Foxxy Bridge - Tags: {tag_display}{bridge_name_suffix}" app: server.Server[object] = server.Server(name=bridge_name) # Store server manager for cleanup _server_manager_registry[id(app)] = server_manager # Configure capabilities with aggregation (since we may have multiple servers) # Use default aggregation settings - tools, resources, and prompts enabled _configure_prompts_capability(app, server_manager) _configure_resources_capability(app, server_manager) _configure_tools_capability(app, server_manager) _configure_logging_capability(app, server_manager) _configure_notifications_and_completion(app, server_manager) active_servers = server_manager.get_active_servers() logger.info( "Tag-filtered bridge created successfully for tags %s - %d active servers", tags, len(active_servers), ) return app async def create_server_filtered_bridge( servers: dict[str, BridgeServerConfig], server_name: str, bridge_name_suffix: str = "", ) -> server.Server[object]: """Create a bridge server with a single server filtered by name. This creates a filtered view of servers containing only the specified server, allowing individual server access while maintaining connection to the main server registry for proper tool aggregation. Args: servers: Dictionary of all available servers server_name: Name of the server to include bridge_name_suffix: Optional suffix for the bridge name Returns: Bridge server instance filtered to the specified server """ # Filter servers to include only the specified server filtered_servers = {name: config for name, config in servers.items() if name == server_name and config.enabled} if not filtered_servers: logger.warning("No enabled server found for filtered bridge") # Create empty bridge to avoid errors filtered_servers = {} bridge_name = f"mcp-foxxy-bridge-server-{server_name.lower()}" if bridge_name_suffix: bridge_name += f"-{bridge_name_suffix}" logger.info(f"Creating server-filtered bridge '{bridge_name}' with {len(filtered_servers)} server(s)") # Create filtered configuration filtered_config = BridgeConfiguration( servers=filtered_servers, bridge=None, # Use default bridge config ) return await create_bridge_server(filtered_config) class FilteredServerManager: """Wrapper around ServerManager that filters results based on criteria.""" def __init__(self, main_server_manager: ServerManagerProtocol, filter_criteria: dict[str, Any]) -> None: self.main_server_manager = main_server_manager self.filter_criteria = filter_criteria self.capability_change_notifier = None # Will be set by bridge server self.bridge_config = main_server_manager.bridge_config # Delegate bridge_config def _should_include_server(self, server_info: Any) -> bool: """Check if a server should be included based on filter criteria.""" filter_type = self.filter_criteria.get("type") if filter_type == "server_name": # Include only servers matching the specified name target_name = self.filter_criteria.get("server_name") if target_name and hasattr(server_info, "config") and hasattr(server_info.config, "name"): return bool(server_info.config.name == target_name) return False if filter_type == "tags": # Include servers matching tag criteria tags = self.filter_criteria.get("tags", []) tag_mode = self.filter_criteria.get("tag_mode", "union") if not server_info.config.tags: return False server_tags = set(server_info.config.tags) filter_tags = set(tags) if tag_mode == "intersection": return filter_tags.issubset(server_tags) if tag_mode == "union": return bool(filter_tags.intersection(server_tags)) return True def get_active_servers(self) -> list[Any]: """Get filtered active servers.""" all_servers = self.main_server_manager.get_active_servers() return [server for server in all_servers if self._should_include_server(server)] def get_aggregated_tools(self) -> list[types.Tool]: """Get tools from filtered servers only.""" tools = [] seen_names = set() # Get filtered active servers active_servers = sorted(self.get_active_servers(), key=lambda s: s.config.priority) for mcp_server in active_servers: namespace = mcp_server.get_effective_namespace("tools", self.main_server_manager.bridge_config.bridge) for tool in mcp_server.tools: tool_name = tool.name if namespace: tool_name = f"{namespace}__{tool.name}" # Handle name conflicts if tool_name in seen_names: bridge_config = self.main_server_manager.bridge_config.bridge if bridge_config and bridge_config.conflict_resolution == "error": msg = f"Tool name conflict: {tool_name}" raise ValueError(msg) if bridge_config and bridge_config.conflict_resolution == "first": continue # Create tool with potentially namespaced name filtered_tool = tool.model_copy() filtered_tool.name = tool_name tools.append(filtered_tool) seen_names.add(tool_name) return tools def get_aggregated_resources(self) -> list[types.Resource]: """Get resources from filtered servers only.""" resources = [] seen_uris = set() active_servers = sorted(self.get_active_servers(), key=lambda s: s.config.priority) for mcp_server in active_servers: namespace = mcp_server.get_effective_namespace("resources", self.main_server_manager.bridge_config.bridge) for resource in mcp_server.resources: resource_uri = str(resource.uri) if namespace: resource_uri = f"{namespace}://{resource.uri}" if resource_uri in seen_uris: bridge_config = self.main_server_manager.bridge_config.bridge if bridge_config and bridge_config.conflict_resolution == "error": msg = f"Resource URI conflict: {resource_uri}" raise ValueError(msg) if bridge_config and bridge_config.conflict_resolution == "first": continue filtered_resource = resource.model_copy() filtered_resource.uri = resource_uri resources.append(filtered_resource) seen_uris.add(resource_uri) return resources def get_aggregated_prompts(self) -> list[types.Prompt]: """Get prompts from filtered servers only.""" prompts = [] seen_names = set() active_servers = sorted(self.get_active_servers(), key=lambda s: s.config.priority) for mcp_server in active_servers: namespace = mcp_server.get_effective_namespace("prompts", self.main_server_manager.bridge_config.bridge) for prompt in mcp_server.prompts: prompt_name = prompt.name if namespace: prompt_name = f"{namespace}__{prompt.name}" if prompt_name in seen_names: bridge_config = self.main_server_manager.bridge_config.bridge if bridge_config and bridge_config.conflict_resolution == "error": msg = f"Prompt name conflict: {prompt_name}" raise ValueError(msg) if bridge_config and bridge_config.conflict_resolution == "first": continue filtered_prompt = prompt.model_copy() filtered_prompt.name = prompt_name prompts.append(filtered_prompt) seen_names.add(prompt_name) return prompts async def call_tool(self, name: str, arguments: dict[str, Any]) -> types.CallToolResult: """Delegate tool calls to main server manager.""" return await self.main_server_manager.call_tool(name, arguments) async def read_resource(self, uri: str) -> types.ReadResourceResult: """Delegate resource reads to main server manager.""" return await self.main_server_manager.read_resource(uri) async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> types.GetPromptResult: """Delegate prompt requests to main server manager.""" return await self.main_server_manager.get_prompt(name, arguments) async def set_logging_level(self, level: types.LoggingLevel) -> None: """Delegate logging level changes to main server manager.""" await self.main_server_manager.set_logging_level(level) async def get_completions( self, ref: types.ResourceReference | types.PromptReference, argument: types.CompletionArgument ) -> list[str]: """Delegate completions to main server manager.""" return await self.main_server_manager.get_completions(ref, argument) async def subscribe_resource(self, uri: str) -> None: """Delegate resource subscriptions to main server manager.""" await self.main_server_manager.subscribe_resource(uri) async def unsubscribe_resource(self, uri: str) -> None: """Delegate resource unsubscriptions to main server manager.""" await self.main_server_manager.unsubscribe_resource(uri) async def create_server_filtered_bridge_view( servers: dict[str, BridgeServerConfig], server_name: str, main_bridge_server: server.Server[object] | None = None, ) -> server.Server[object]: """Create a filtered view showing only tools from a specific server.""" if main_bridge_server is not None: # Get the main server manager from the registry main_server_manager = _server_manager_registry.get(id(main_bridge_server)) if main_server_manager: # Create filtered server manager filter_criteria = {"type": "server_name", "server_name": server_name} filtered_manager = FilteredServerManager(main_server_manager, filter_criteria) # Create a new bridge server with the filtered manager bridge_name = f"MCP Foxxy Bridge - {server_name}" app: server.Server[object] = server.Server(name=bridge_name) # Configure capabilities with filtered manager _configure_prompts_capability(app, filtered_manager) _configure_resources_capability(app, filtered_manager) _configure_tools_capability(app, filtered_manager) _configure_logging_capability(app, filtered_manager) _configure_notifications_and_completion(app, filtered_manager) logger.debug("Created server-filtered bridge view") return app # Fallback to creating a separate instance return await create_single_server_bridge(server_name, servers[server_name]) async def create_tag_filtered_bridge_view( servers: dict[str, BridgeServerConfig], tags: list[str], tag_mode: str = "intersection", main_bridge_server: server.Server[object] | None = None, ) -> server.Server[object]: """Create a filtered view of the main bridge server showing only tag-filtered servers. This creates a view that filters tools/resources/prompts to only show those from servers that match the specified tag criteria, using the shared server instances from the main bridge. Args: servers: Dictionary of all available servers tags: List of tags to filter by tag_mode: "intersection" (servers must have ALL tags) or "union" (servers must have ANY tag) main_bridge_server: The main bridge server instance to create a filtered view of Returns: Bridge server instance filtered to the specified tag criteria """ if main_bridge_server is not None: # Get the main server manager from the registry main_server_manager = _server_manager_registry.get(id(main_bridge_server)) if main_server_manager: # Create filtered server manager filter_criteria = {"type": "tags", "tags": tags, "tag_mode": tag_mode} filtered_manager = FilteredServerManager(main_server_manager, filter_criteria) # Create a new bridge server with the filtered manager tag_display = "+".join(tags) if tag_mode == "intersection" else ",".join(tags) bridge_name = f"MCP Foxxy Bridge - Tags: {tag_display}" app: server.Server[object] = server.Server(name=bridge_name) # Configure capabilities with filtered manager _configure_prompts_capability(app, filtered_manager) _configure_resources_capability(app, filtered_manager) _configure_tools_capability(app, filtered_manager) _configure_logging_capability(app, filtered_manager) _configure_notifications_and_completion(app, filtered_manager) logger.debug( "Created tag-filtered bridge view for tags: %s (%s mode)", tags, tag_mode, ) return app # Fallback to creating a separate instance (legacy approach) return await create_tag_filtered_bridge(servers, tags, tag_mode) async def create_single_server_bridge(server_name: str, server_config: BridgeServerConfig) -> server.Server[object]: """Create a bridge server that exposes only a single MCP server. This creates an MCP server instance that connects to only one backend server, without any aggregation or namespacing. Tools, resources, and prompts are exposed directly with their original names. Args: server_name: The name of the server (for logging/identification) server_config: Configuration for the single MCP server Returns: A configured MCP server that bridges to a single backend server """ logger.info("Creating single-server bridge") # Create a minimal bridge configuration with just this one server single_server_config = BridgeConfiguration( servers={server_name: server_config}, bridge=None, # Use default bridge config ) # Create a server manager with just this one server server_manager = ServerManager(single_server_config) await server_manager.start() # Create the bridge server bridge_name = f"MCP Foxxy Bridge - {server_name}" app: server.Server[object] = server.Server(name=bridge_name) # Store server manager for cleanup _server_manager_registry[id(app)] = server_manager # For single server bridges, we want to expose capabilities directly # without namespacing, so we configure all capabilities regardless of # aggregation settings (there's no aggregation conflict with one server) # Configure all capabilities (no aggregation conflicts with single server) _configure_prompts_capability(app, server_manager) _configure_resources_capability(app, server_manager) _configure_tools_capability(app, server_manager) _configure_logging_capability(app, server_manager) _configure_notifications_and_completion(app, server_manager) active_servers = server_manager.get_active_servers() if active_servers: logger.info( "Single-server bridge created successfully for '%s' (%d tools, %d resources, %d prompts)", server_name, len(active_servers[0].tools), len(active_servers[0].resources), len(active_servers[0].prompts), ) else: logger.warning("Single-server bridge created but server is not active") return app

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server