Skip to main content
Glama
server.py5.27 kB
import sys import os sys.path.append(os.path.dirname(__file__)) import json import yaml import asyncio import uuid from typing import Dict import mcp.server.stdio from mcp.server import Server from mcp.types import TextContent from config import logger, BEARER_TOKEN from tools import get_tools from data_processor import DataProcessor from agent_manager import AgentManager class AgentCraftServer(Server): def __init__(self): super().__init__(name="agentcraft-server") self.bearer_token = BEARER_TOKEN self.data_processor = DataProcessor() self.agent_manager = AgentManager() logger.info("AgentCraftServer initialized with agents: %s", list(self.data_processor.agents.keys())) @self.list_tools() async def handle_tools(): return get_tools() @self.call_tool() async def handle_call_tool(name: str, arguments: dict): try: logger.debug(f"Tool called: {name} with arguments: {arguments}") if name in ["send_agent_data"]: input_data = self.data_processor.translate_prompt_to_input_data(arguments["prompt"]) tracking_key = await self.agent_manager.send_agent_data(input_data) return [TextContent(type="text", text=f"Tracking key: {tracking_key}")] elif name in ["receive_agent_data"]: tracking_key = self.get_tracking_key(arguments) logger.info(f"Resolved tracking_key: {tracking_key}") if not tracking_key: return [TextContent(type="text", text="Error: No valid tracking_key found")] updates = [] async for update in self.agent_manager.receive_agent_data( tracking_key, arguments.get("query", "fetch"), arguments.get("response_type", "markdown") ): updates.append(update) if not updates: return [TextContent(type="text", text="No data received")] return [TextContent(type="text", text=json.dumps(updates[-1]))] elif name in ["status_agent_data"]: tracking_key = self.get_tracking_key(arguments) if not tracking_key: return [TextContent(type="text", text="Error: Please provide a tracking_key")] update = await self.agent_manager.get_agent_status(tracking_key) return [TextContent(type="text", text=json.dumps(update))] elif name in ["get_available_agents"]: return [TextContent(type="text", text=yaml.dump(self.data_processor.agents))] raise ValueError(f"Unknown tool name: {name}") except ValueError as e: logger.error(f"ValueError in tool {name}: {str(e)}") return [TextContent(type="text", text=f"Error: {str(e)}")] except Exception as e: logger.error(f"Unexpected error in tool {name}: {str(e)}", exc_info=True) return [TextContent(type="text", text="Internal server error")] # Tracking key def get_tracking_key(self, arguments: dict) -> str | None: with self.agent_manager.lock: # Step 1: Check self.agent_manager.data_store first if self.agent_manager.data_store: tracking_key = next(iter(self.agent_manager.data_store), None) if tracking_key: try: uuid.UUID(tracking_key) # Validate UUID format logger.debug(f"Using tracking_key from data_store: {tracking_key}") return tracking_key except ValueError: logger.error(f"Invalid UUID in data_store: {tracking_key}") # Step 2: Fall back to arguments if data_store doesn’t provide a valid UUID tracking_key = arguments.get("tracking_key") if tracking_key: try: uuid.UUID(tracking_key) # Validate UUID format logger.debug(f"Using tracking_key from arguments: {tracking_key}") return tracking_key except ValueError: logger.error(f"Invalid tracking_key in arguments: {tracking_key}") return None # Return None if no valid tracking_key is found return None async def main(): try: server = AgentCraftServer() initialization_options = server.create_initialization_options() logger.info("Starting server") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, initialization_options) except Exception as e: logger.critical(f"Server failed: {str(e)}", exc_info=True) raise finally: logger.info("Server shutting down") if __name__ == "__main__": print("AGENTCRAFT_BEARER_TOKEN:", os.getenv("AGENTCRAFT_BEARER_TOKEN")) print("ENVIRONMENT:", os.getenv("ENVIRONMENT")) asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seyhunak/agentcraft-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server