Skip to main content
Glama

MCP Brain Service

by jomapps
payload_client.py•4.55 kB
""" PayloadCMS Client for Brain Service Fetches department configurations from the main application """ import os import logging from typing import List, Dict, Any, Optional import aiohttp logger = logging.getLogger(__name__) class PayloadCMSClient: """Client for PayloadCMS API""" def __init__(self): self.api_url = os.getenv("MAIN_APP_PAYLOAD_API_URL") self.api_key = os.getenv("MAIN_APP_PAYLOAD_API_KEY") if not self.api_url: raise ValueError("MAIN_APP_PAYLOAD_API_URL environment variable is required") if not self.api_key: raise ValueError("MAIN_APP_PAYLOAD_API_KEY environment variable is required") # Remove trailing slash if present self.api_url = self.api_url.rstrip('/') async def get_departments(self) -> List[Dict[str, Any]]: """ Fetch all departments from PayloadCMS Returns: List of department dicts with 'slug', 'name', 'description' keys """ url = f"{self.api_url}/departments" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() logger.error(f"PayloadCMS API error: {response.status} - {error_text}") # Return empty list on error instead of raising return [] result = await response.json() # PayloadCMS typically returns {docs: [...], ...} departments = result.get("docs", []) return [ { "slug": dept.get("slug", ""), "name": dept.get("name", ""), "description": dept.get("description", "") } for dept in departments ] except Exception as e: logger.error(f"Failed to fetch departments from PayloadCMS: {e}") return [] async def get_department_by_slug(self, slug: str) -> Optional[Dict[str, Any]]: """ Fetch a specific department by slug Args: slug: Department slug (e.g., 'story', 'character') Returns: Department dict or None if not found """ departments = await self.get_departments() for dept in departments: if dept.get("slug") == slug: return dept return None async def validate_department(self, slug: str) -> bool: """ Check if a department slug is valid Args: slug: Department slug to validate Returns: True if valid, False otherwise """ dept = await self.get_department_by_slug(slug) return dept is not None async def health_check(self) -> Dict[str, Any]: """Check if PayloadCMS API is accessible""" try: url = f"{self.api_url}/departments" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=5)) as response: if response.status == 200: return {"status": "healthy", "service": "PayloadCMS"} else: return { "status": "unhealthy", "service": "PayloadCMS", "error": f"HTTP {response.status}" } except Exception as e: return {"status": "unhealthy", "service": "PayloadCMS", "error": str(e)} # Singleton instance _payload_client: Optional[PayloadCMSClient] = None def get_payload_client() -> PayloadCMSClient: """Get or create PayloadCMS client singleton""" global _payload_client if _payload_client is None: _payload_client = PayloadCMSClient() return _payload_client

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jomapps/mcp-brain-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server