Skip to main content
Glama
message_bridge.py5.64 kB
# message_bridge.py from typing import List, Dict, Any, Union, Optional from pydantic import BaseModel from fastmcp import FastMCP import aiohttp import base64 class ACPMessagePart(BaseModel): name: Optional[str] = None content_type: str content: Optional[str] = None content_encoding: Optional[str] = "plain" content_url: Optional[str] = None class ACPMessage(BaseModel): parts: List[ACPMessagePart] class MCPContent(BaseModel): type: str # "text", "image", "resource" text: str = None data: str = None mimeType: str = None class MessageBridge: def __init__(self): self.supported_content_types = { "text/plain": "text", "text/markdown": "text", "image/jpeg": "image", "image/png": "image", "image/gif": "image", "application/json": "text" } async def acp_to_mcp(self, acp_messages: List[ACPMessage]) -> List[MCPContent]: """Convert ACP messages to MCP content format""" mcp_content = [] for message in acp_messages: for part in message.parts: content = await self._convert_message_part(part) if content: mcp_content.append(content) return mcp_content async def mcp_to_acp(self, text_content: str) -> List[ACPMessage]: """Convert MCP text to ACP message format""" # Create properly formatted ACP message message_part = ACPMessagePart( name=None, content_type="text/plain", content=text_content, content_encoding="plain", content_url=None ) return [ACPMessage(parts=[message_part])] async def _convert_message_part(self, part: ACPMessagePart) -> MCPContent: """Convert a single ACP message part to MCP content""" content_type = part.content_type if content_type.startswith("text/"): return MCPContent( type="text", text=part.content, mimeType=content_type ) elif content_type.startswith("image/"): # Handle image content if part.content_url: # Download image from URL image_data = await self._download_content(part.content_url) if image_data: return MCPContent( type="image", data=base64.b64encode(image_data).decode(), mimeType=content_type ) else: # Direct image data return MCPContent( type="image", data=part.content, mimeType=content_type ) elif content_type == "application/json": return MCPContent( type="text", text=part.content, mimeType=content_type ) else: # Fallback to text representation return MCPContent( type="text", text=str(part.content), mimeType="text/plain" ) async def _download_content(self, url: str) -> bytes: """Download content from URL""" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: return await response.read() except Exception as e: print(f"Error downloading content from {url}: {e}") return None # Integration with FastMCP def register_bridge_tools(mcp: FastMCP, bridge: MessageBridge): @mcp.tool() async def convert_acp_message(acp_message_json: str) -> str: """Convert ACP message format to MCP-compatible format""" try: import json message_data = json.loads(acp_message_json) acp_message = ACPMessage(**message_data) mcp_content = await bridge.acp_to_mcp([acp_message]) return json.dumps([content.dict() for content in mcp_content], indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def analyze_message_content(acp_message_json: str) -> str: """Analyze the content types and structure of an ACP message""" try: import json message_data = json.loads(acp_message_json) acp_message = ACPMessage(**message_data) analysis = { "total_parts": len(acp_message.parts), "content_types": {}, "has_urls": False, "encodings": set(), "total_size": 0 } for part in acp_message.parts: content_type = part.content_type analysis["content_types"][content_type] = analysis["content_types"].get(content_type, 0) + 1 analysis["encodings"].add(part.content_encoding) if part.content_url: analysis["has_urls"] = True if part.content: analysis["total_size"] += len(part.content) analysis["encodings"] = list(analysis["encodings"]) return json.dumps(analysis, indent=2) except Exception as e: return f"Error: {e}"

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/ACP-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server