Skip to main content
Glama
agent.py38.9 kB
""" Agent Manager for Shannon MCP Server. This module manages AI agents with: - Agent registry and discovery - Task assignment and routing - Execution tracking - Agent collaboration - Metrics collection - GitHub agent import """ import asyncio from typing import Dict, List, Optional, Any, Tuple, Set from datetime import datetime, timedelta from dataclasses import dataclass, field import json import uuid from pathlib import Path import structlog import aiohttp import yaml from packaging import version from ..managers.base import BaseManager, ManagerConfig, HealthStatus from ..models.agent import ( Agent, AgentCategory, AgentStatus, AgentCapability, AgentExecution, ExecutionStatus, AgentMessage, create_default_agents ) from ..utils.config import AgentManagerConfig from ..utils.errors import ( SystemError, ValidationError, ConfigurationError, handle_errors, error_context ) from ..utils.notifications import emit, EventCategory, EventPriority, event_handler from ..utils.logging import get_logger logger = get_logger("shannon-mcp.agent") @dataclass class TaskRequest: """Request for agent task execution.""" id: str description: str required_capabilities: List[str] priority: str = "medium" context: Dict[str, Any] = field(default_factory=dict) timeout: Optional[int] = None dependencies: List[str] = field(default_factory=list) created_at: datetime = field(default_factory=datetime.utcnow) def __post_init__(self): """Initialize task ID if not provided.""" if not self.id: self.id = f"task_{uuid.uuid4().hex[:12]}" @dataclass class TaskAssignment: """Assignment of task to agent.""" task_id: str agent_id: str score: float # Suitability score 0-1 estimated_duration: Optional[int] = None # seconds confidence: float = 0.5 # Confidence level 0-1 class AgentManager(BaseManager[Agent]): """Manages AI agents for collaborative building.""" def __init__(self, config: AgentManagerConfig): """Initialize agent manager.""" manager_config = ManagerConfig( name="agent_manager", db_path=Path.home() / ".shannon-mcp" / "agents.db", custom_config=config.dict() ) super().__init__(manager_config) self.agent_config = config self._agents: Dict[str, Agent] = {} self._executions: Dict[str, AgentExecution] = {} self._message_queue: asyncio.Queue = asyncio.Queue() self._assignment_lock = asyncio.Lock() # Agent collaboration graph self._collaboration_graph: Dict[str, Set[str]] = {} # Performance tracking self._performance_cache: Dict[str, float] = {} async def _initialize(self) -> None: """Initialize agent manager.""" logger.info("initializing_agent_manager") # Load default agents if self.agent_config.enable_default_agents: await self._load_default_agents() # Import GitHub agents if self.agent_config.github_org: await self._import_github_agents() # Start message processor self._tasks.append( asyncio.create_task(self._process_messages()) ) async def _start(self) -> None: """Start agent manager operations.""" # Start monitoring self._tasks.append( asyncio.create_task(self._monitor_agents()) ) async def _stop(self) -> None: """Stop agent manager operations.""" # Mark all agents as offline for agent in self._agents.values(): agent.status = AgentStatus.OFFLINE await self._save_agent(agent) async def _health_check(self) -> Dict[str, Any]: """Perform health check.""" total_agents = len(self._agents) available_agents = sum( 1 for a in self._agents.values() if a.status == AgentStatus.AVAILABLE ) active_executions = sum( 1 for e in self._executions.values() if e.status == ExecutionStatus.RUNNING ) return { "total_agents": total_agents, "available_agents": available_agents, "active_executions": active_executions, "message_queue_size": self._message_queue.qsize(), "categories": self._get_category_stats() } async def _create_schema(self) -> None: """Create database schema.""" # Agents table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, description TEXT, category TEXT NOT NULL, status TEXT NOT NULL, github_url TEXT, version TEXT DEFAULT '1.0.0', config TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_agents_category ON agents(category) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status) """) # Agent capabilities table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agent_capabilities ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, expertise_level INTEGER NOT NULL, tools TEXT, FOREIGN KEY (agent_id) REFERENCES agents(id), UNIQUE(agent_id, name) ) """) # Agent metrics table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agent_metrics ( agent_id TEXT PRIMARY KEY, tasks_completed INTEGER DEFAULT 0, tasks_failed INTEGER DEFAULT 0, total_execution_time REAL DEFAULT 0.0, average_task_time REAL DEFAULT 0.0, success_rate REAL DEFAULT 0.0, last_active TEXT, FOREIGN KEY (agent_id) REFERENCES agents(id) ) """) # Agent executions table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agent_executions ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_id TEXT NOT NULL, task_description TEXT, input_data TEXT, output_data TEXT, status TEXT NOT NULL, started_at TEXT, completed_at TEXT, error TEXT, logs TEXT, FOREIGN KEY (agent_id) REFERENCES agents(id) ) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_executions_agent ON agent_executions(agent_id) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_executions_status ON agent_executions(status) """) # Agent messages table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agent_messages ( id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, message_type TEXT NOT NULL, content TEXT NOT NULL, priority TEXT DEFAULT 'medium', timestamp TEXT NOT NULL, processed BOOLEAN DEFAULT 0 ) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_to ON agent_messages(to_agent) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_processed ON agent_messages(processed) """) # Agent collaboration table await self.db.execute(""" CREATE TABLE IF NOT EXISTS agent_collaborations ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent1_id TEXT NOT NULL, agent2_id TEXT NOT NULL, collaboration_count INTEGER DEFAULT 0, success_rate REAL DEFAULT 0.0, last_collaboration TEXT, FOREIGN KEY (agent1_id) REFERENCES agents(id), FOREIGN KEY (agent2_id) REFERENCES agents(id), UNIQUE(agent1_id, agent2_id) ) """) async def _load_default_agents(self) -> None: """Load default specialized agents.""" agents = create_default_agents() for agent in agents: await self.register_agent(agent) logger.info( "default_agents_loaded", count=len(agents) ) async def _import_github_agents(self) -> None: """Import agent definitions from GitHub.""" if not self.agent_config.github_org: return try: async with aiohttp.ClientSession() as session: # Fetch agent definitions from GitHub url = f"https://api.github.com/repos/{self.agent_config.github_org}/shannon-agents/contents/agents" headers = {} if self.agent_config.github_token: headers["Authorization"] = f"token {self.agent_config.github_token}" async with session.get(url, headers=headers) as resp: if resp.status != 200: logger.error( "github_import_failed", status=resp.status, org=self.agent_config.github_org ) return files = await resp.json() # Import each agent definition imported = 0 for file in files: if file["name"].endswith(".yaml") or file["name"].endswith(".yml"): try: # Fetch file content async with session.get(file["download_url"]) as resp: content = await resp.text() # Parse agent definition agent_def = yaml.safe_load(content) # Create agent from definition agent = self._create_agent_from_github(agent_def, file["html_url"]) if agent: await self.register_agent(agent) imported += 1 except Exception as e: logger.error( "agent_import_error", file=file["name"], error=str(e) ) logger.info( "github_agents_imported", count=imported, org=self.agent_config.github_org ) except Exception as e: logger.error( "github_import_error", error=str(e), exc_info=True ) def _create_agent_from_github(self, definition: Dict[str, Any], github_url: str) -> Optional[Agent]: """Create agent from GitHub definition.""" try: # Parse capabilities capabilities = [] for cap_def in definition.get("capabilities", []): capability = AgentCapability( name=cap_def["name"], description=cap_def.get("description", ""), expertise_level=cap_def.get("expertise_level", 5), tools=cap_def.get("tools", []) ) capabilities.append(capability) # Create agent agent = Agent( name=definition["name"], description=definition.get("description", ""), category=AgentCategory(definition.get("category", "specialized")), capabilities=capabilities, github_url=github_url, version=definition.get("version", "1.0.0"), dependencies=definition.get("dependencies", []), config=definition.get("config", {}) ) return agent except Exception as e: logger.error( "agent_creation_error", definition=definition, error=str(e) ) return None async def register_agent(self, agent: Agent) -> None: """ Register a new agent. Args: agent: Agent to register Raises: ValidationError: If agent is invalid SystemError: If registration fails """ with error_context("agent_manager", "register_agent", agent_id=agent.id): # Validate agent if not agent.name: raise ValidationError("name", agent.name, "Agent name is required") if not agent.capabilities: raise ValidationError("capabilities", [], "Agent must have capabilities") # Check for duplicates if agent.id in self._agents: raise ValidationError("id", agent.id, "Agent already registered") # Save to database await self._save_agent(agent) # Add to registry self._agents[agent.id] = agent # Initialize metrics await self._initialize_agent_metrics(agent.id) # Emit event await emit( "agent_registered", EventCategory.AGENT, { "agent_id": agent.id, "name": agent.name, "category": agent.category.value } ) logger.info( "agent_registered", agent_id=agent.id, name=agent.name, category=agent.category.value, capabilities=len(agent.capabilities) ) async def get_agent(self, agent_id: str) -> Optional[Agent]: """Get agent by ID.""" return self._agents.get(agent_id) async def list_agents( self, category: Optional[AgentCategory] = None, status: Optional[AgentStatus] = None, capability: Optional[str] = None ) -> List[Agent]: """ List agents with optional filtering. Args: category: Filter by category status: Filter by status capability: Filter by capability name Returns: List of matching agents """ agents = list(self._agents.values()) if category: agents = [a for a in agents if a.category == category] if status: agents = [a for a in agents if a.status == status] if capability: agents = [ a for a in agents if any(c.name == capability for c in a.capabilities) ] return agents async def assign_task(self, request: TaskRequest) -> TaskAssignment: """ Assign a task to the best available agent. Args: request: Task request Returns: Task assignment Raises: SystemError: If no suitable agent found """ async with self._assignment_lock: with error_context("agent_manager", "assign_task", task_id=request.id): # Find suitable agents candidates = [] for agent in self._agents.values(): if agent.status != AgentStatus.AVAILABLE: continue # Check capabilities if agent.can_handle_task(request.required_capabilities): score = self._calculate_agent_score(agent, request) candidates.append((agent, score)) if not candidates: raise SystemError( f"No available agents with required capabilities: {request.required_capabilities}" ) # Sort by score candidates.sort(key=lambda x: x[1], reverse=True) # Select best agent best_agent, score = candidates[0] # Create assignment assignment = TaskAssignment( task_id=request.id, agent_id=best_agent.id, score=score, estimated_duration=self._estimate_duration(best_agent, request), confidence=self._calculate_confidence(best_agent, request) ) # Mark agent as busy best_agent.status = AgentStatus.BUSY await self._save_agent(best_agent) # Create execution record execution = AgentExecution( agent_id=best_agent.id, task_id=request.id, task_description=request.description, input_data=request.context ) self._executions[execution.id] = execution await self._save_execution(execution) # Emit event await emit( "task_assigned", EventCategory.AGENT, { "task_id": request.id, "agent_id": best_agent.id, "agent_name": best_agent.name, "score": score }, priority=EventPriority.HIGH ) logger.info( "task_assigned", task_id=request.id, agent_id=best_agent.id, agent_name=best_agent.name, score=score ) return assignment def _calculate_agent_score(self, agent: Agent, request: TaskRequest) -> float: """Calculate agent suitability score for task.""" score = 0.0 # Base score from capabilities capability_scores = [] for req_cap in request.required_capabilities: cap = agent.get_capability(req_cap) if cap: # Expertise level contributes to score capability_scores.append(cap.expertise_level / 10.0) if capability_scores: score = sum(capability_scores) / len(capability_scores) # Adjust for agent performance performance = self._get_agent_performance(agent.id) score *= performance # Adjust for current workload workload_factor = self._get_workload_factor(agent.id) score *= workload_factor # Priority boost if request.priority == "critical": score *= 1.5 elif request.priority == "high": score *= 1.2 elif request.priority == "low": score *= 0.8 return min(score, 1.0) def _estimate_duration(self, agent: Agent, request: TaskRequest) -> int: """Estimate task duration in seconds.""" # Base estimate base_duration = 300 # 5 minutes default # Adjust based on agent's average task time if agent.metrics.average_task_time > 0: base_duration = int(agent.metrics.average_task_time) # Adjust for task complexity complexity_factor = len(request.required_capabilities) * 0.5 return int(base_duration * (1 + complexity_factor)) def _calculate_confidence(self, agent: Agent, request: TaskRequest) -> float: """Calculate confidence level for assignment.""" confidence = 0.5 # Success rate contributes to confidence if agent.metrics.success_rate > 0: confidence = agent.metrics.success_rate # Adjust for capability match capability_match = sum( 1 for cap in request.required_capabilities if agent.get_capability(cap) is not None ) / len(request.required_capabilities) confidence *= capability_match return min(confidence, 1.0) def _get_agent_performance(self, agent_id: str) -> float: """Get agent performance score.""" # Check cache if agent_id in self._performance_cache: return self._performance_cache[agent_id] agent = self._agents.get(agent_id) if not agent: return 0.5 # Calculate performance if agent.metrics.success_rate > 0: performance = agent.metrics.success_rate else: performance = 0.8 # Default for new agents # Cache for 5 minutes self._performance_cache[agent_id] = performance return performance def _get_workload_factor(self, agent_id: str) -> float: """Get workload factor (lower when busy).""" # Count active executions active_count = sum( 1 for e in self._executions.values() if e.agent_id == agent_id and e.status == ExecutionStatus.RUNNING ) # More active tasks = lower score if active_count == 0: return 1.0 elif active_count == 1: return 0.8 elif active_count == 2: return 0.5 else: return 0.2 async def start_execution(self, execution_id: str) -> None: """Start task execution.""" execution = self._executions.get(execution_id) if not execution: raise ValidationError("execution_id", execution_id, "Execution not found") execution.start() await self._save_execution(execution) logger.info( "execution_started", execution_id=execution_id, agent_id=execution.agent_id, task_id=execution.task_id ) async def complete_execution( self, execution_id: str, output_data: Dict[str, Any] ) -> None: """Complete task execution.""" execution = self._executions.get(execution_id) if not execution: raise ValidationError("execution_id", execution_id, "Execution not found") execution.complete(output_data) await self._save_execution(execution) # Update agent metrics agent = self._agents.get(execution.agent_id) if agent and execution.duration: agent.metrics.update_metrics(True, execution.duration) agent.status = AgentStatus.AVAILABLE await self._save_agent(agent) await self._update_agent_metrics(agent) # Clear performance cache self._performance_cache.pop(execution.agent_id, None) logger.info( "execution_completed", execution_id=execution_id, agent_id=execution.agent_id, task_id=execution.task_id, duration=execution.duration ) async def fail_execution(self, execution_id: str, error: str) -> None: """Mark execution as failed.""" execution = self._executions.get(execution_id) if not execution: raise ValidationError("execution_id", execution_id, "Execution not found") execution.fail(error) await self._save_execution(execution) # Update agent metrics agent = self._agents.get(execution.agent_id) if agent and execution.duration: agent.metrics.update_metrics(False, execution.duration) agent.status = AgentStatus.AVAILABLE await self._save_agent(agent) await self._update_agent_metrics(agent) # Clear performance cache self._performance_cache.pop(execution.agent_id, None) logger.error( "execution_failed", execution_id=execution_id, agent_id=execution.agent_id, task_id=execution.task_id, error=error ) async def send_message( self, from_agent: str, to_agent: str, message_type: str, content: Dict[str, Any], priority: str = "medium" ) -> str: """ Send message between agents. Args: from_agent: Sender agent ID or "orchestrator" to_agent: Recipient agent ID or "all" message_type: Type of message content: Message content priority: Message priority Returns: Message ID """ message = AgentMessage( id=f"msg_{uuid.uuid4().hex[:12]}", from_agent=from_agent, to_agent=to_agent, message_type=message_type, content=content, priority=priority ) # Save to database await self._save_message(message) # Add to queue for processing await self._message_queue.put(message) logger.debug( "message_sent", message_id=message.id, from_agent=from_agent, to_agent=to_agent, message_type=message_type ) return message.id async def _process_messages(self) -> None: """Process agent messages.""" while True: try: # Get message with timeout message = await asyncio.wait_for( self._message_queue.get(), timeout=1.0 ) # Process based on type if message.message_type == "request": await self._handle_request_message(message) elif message.message_type == "response": await self._handle_response_message(message) elif message.message_type == "notification": await self._handle_notification_message(message) elif message.message_type == "review": await self._handle_review_message(message) # Mark as processed await self._mark_message_processed(message.id) except asyncio.TimeoutError: continue except asyncio.CancelledError: break except Exception as e: logger.error( "message_processing_error", error=str(e), exc_info=True ) async def _handle_request_message(self, message: AgentMessage) -> None: """Handle request message.""" # Route to appropriate agent if message.to_agent == "all": # Broadcast to all available agents for agent in self._agents.values(): if agent.status == AgentStatus.AVAILABLE: # Create task request from message request = TaskRequest( description=message.content.get("description", ""), required_capabilities=message.content.get("capabilities", []), priority=message.priority, context=message.content ) # Try to assign try: await self.assign_task(request) except Exception: pass else: # Direct to specific agent agent = self._agents.get(message.to_agent) if agent and agent.status == AgentStatus.AVAILABLE: # Process request logger.info( "agent_request_received", agent_id=agent.id, from_agent=message.from_agent ) async def _handle_response_message(self, message: AgentMessage) -> None: """Handle response message.""" # Update execution if applicable execution_id = message.content.get("execution_id") if execution_id: execution = self._executions.get(execution_id) if execution: execution.add_log(f"Response from {message.from_agent}") async def _handle_notification_message(self, message: AgentMessage) -> None: """Handle notification message.""" # Emit as event await emit( "agent_notification", EventCategory.AGENT, { "from_agent": message.from_agent, "to_agent": message.to_agent, "content": message.content } ) async def _handle_review_message(self, message: AgentMessage) -> None: """Handle code review message.""" # Track collaboration if message.from_agent != "orchestrator" and message.to_agent != "all": await self._track_collaboration(message.from_agent, message.to_agent) async def _track_collaboration(self, agent1_id: str, agent2_id: str) -> None: """Track collaboration between agents.""" # Ensure consistent ordering if agent1_id > agent2_id: agent1_id, agent2_id = agent2_id, agent1_id # Update collaboration graph if agent1_id not in self._collaboration_graph: self._collaboration_graph[agent1_id] = set() self._collaboration_graph[agent1_id].add(agent2_id) # Update database await self.db.execute(""" INSERT INTO agent_collaborations (agent1_id, agent2_id, collaboration_count, last_collaboration) VALUES (?, ?, 1, ?) ON CONFLICT(agent1_id, agent2_id) DO UPDATE SET collaboration_count = collaboration_count + 1, last_collaboration = ? """, (agent1_id, agent2_id, datetime.utcnow().isoformat(), datetime.utcnow().isoformat())) await self.db.commit() async def _monitor_agents(self) -> None: """Monitor agent health and performance.""" while True: try: await asyncio.sleep(30) # Check every 30 seconds # Check for stuck executions now = datetime.utcnow() for execution in self._executions.values(): if execution.status == ExecutionStatus.RUNNING: if execution.started_at: duration = (now - execution.started_at).total_seconds() # Check timeout agent = self._agents.get(execution.agent_id) if agent: estimated = self._estimate_duration(agent, None) if duration > estimated * 3: # 3x estimate logger.warning( "execution_timeout", execution_id=execution.id, agent_id=execution.agent_id, duration=duration ) # Mark as failed await self.fail_execution( execution.id, f"Execution timeout after {duration}s" ) # Update agent statuses for agent in self._agents.values(): if agent.status == AgentStatus.ERROR: # Try to recover agent.status = AgentStatus.AVAILABLE await self._save_agent(agent) # Clear old performance cache self._performance_cache.clear() except asyncio.CancelledError: break except Exception as e: logger.error("agent_monitor_error", error=str(e)) def _get_category_stats(self) -> Dict[str, int]: """Get agent statistics by category.""" stats = {} for category in AgentCategory: count = sum( 1 for a in self._agents.values() if a.category == category ) stats[category.value] = count return stats # Database operations async def _save_agent(self, agent: Agent) -> None: """Save agent to database.""" await self.db.execute(""" INSERT OR REPLACE INTO agents (id, name, description, category, status, github_url, version, config, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( agent.id, agent.name, agent.description, agent.category.value, agent.status.value, agent.github_url, agent.version, json.dumps(agent.config), agent.created_at.isoformat(), agent.updated_at.isoformat() )) # Save capabilities await self.db.execute(""" DELETE FROM agent_capabilities WHERE agent_id = ? """, (agent.id,)) for cap in agent.capabilities: await self.db.execute(""" INSERT INTO agent_capabilities (agent_id, name, description, expertise_level, tools) VALUES (?, ?, ?, ?, ?) """, ( agent.id, cap.name, cap.description, cap.expertise_level, json.dumps(cap.tools) )) await self.db.commit() async def _initialize_agent_metrics(self, agent_id: str) -> None: """Initialize agent metrics.""" await self.db.execute(""" INSERT OR IGNORE INTO agent_metrics (agent_id) VALUES (?) """, (agent_id,)) await self.db.commit() async def _update_agent_metrics(self, agent: Agent) -> None: """Update agent metrics in database.""" await self.db.execute(""" UPDATE agent_metrics SET tasks_completed = ?, tasks_failed = ?, total_execution_time = ?, average_task_time = ?, success_rate = ?, last_active = ? WHERE agent_id = ? """, ( agent.metrics.tasks_completed, agent.metrics.tasks_failed, agent.metrics.total_execution_time, agent.metrics.average_task_time, agent.metrics.success_rate, agent.metrics.last_active.isoformat() if agent.metrics.last_active else None, agent.id )) await self.db.commit() async def _save_execution(self, execution: AgentExecution) -> None: """Save execution to database.""" await self.db.execute(""" INSERT OR REPLACE INTO agent_executions (id, agent_id, task_id, task_description, input_data, output_data, status, started_at, completed_at, error, logs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( execution.id, execution.agent_id, execution.task_id, execution.task_description, json.dumps(execution.input_data), json.dumps(execution.output_data) if execution.output_data else None, execution.status.value, execution.started_at.isoformat() if execution.started_at else None, execution.completed_at.isoformat() if execution.completed_at else None, execution.error, json.dumps(execution.logs) )) await self.db.commit() async def _save_message(self, message: AgentMessage) -> None: """Save message to database.""" await self.db.execute(""" INSERT INTO agent_messages (id, from_agent, to_agent, message_type, content, priority, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( message.id, message.from_agent, message.to_agent, message.message_type, json.dumps(message.content), message.priority, message.timestamp.isoformat() )) await self.db.commit() async def _mark_message_processed(self, message_id: str) -> None: """Mark message as processed.""" await self.db.execute(""" UPDATE agent_messages SET processed = 1 WHERE id = ? """, (message_id,)) await self.db.commit() # Event handlers @event_handler(categories=EventCategory.AGENT, event_names="agent_status_change") async def _handle_status_change(self, event) -> None: """Handle agent status change.""" agent_id = event.data.get("agent_id") new_status = event.data.get("status") agent = self._agents.get(agent_id) if agent and new_status: agent.status = AgentStatus(new_status) await self._save_agent(agent) # Export public API __all__ = [ 'AgentManager', 'TaskRequest', 'TaskAssignment', ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server