Skip to main content
Glama
api_client.py13.8 kB
import os import json import asyncio import aiohttp import logging from typing import Dict, Any, Optional, List, Union from datetime import datetime, timezone from dataclasses import dataclass from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) @dataclass class APIResponse: """Structured response from API calls""" success: bool data: Any = None error: Optional[str] = None status_code: Optional[int] = None class MadnessAPIClient: """ HTTP client for madnessinteractive.cc/api endpoints. Handles authentication, retries, and response parsing for MCP tools. """ def __init__(self, base_url: str = None, auth_token: str = None, api_key: str = None): self.base_url = base_url or os.getenv("MADNESS_API_URL", "https://madnessinteractive.cc/api") self.auth_token = auth_token or os.getenv("MADNESS_AUTH_TOKEN") self.api_key = api_key or os.getenv("MADNESS_API_KEY") self.session: Optional[aiohttp.ClientSession] = None self.max_retries = 3 self.timeout = aiohttp.ClientTimeout(total=30) # Authentication priority: JWT token > API key self.auth_headers = {} if self.auth_token: self.auth_headers["Authorization"] = f"Bearer {self.auth_token}" logger.info("Using JWT token authentication") elif self.api_key: self.auth_headers["Authorization"] = f"Bearer {self.api_key}" logger.info("Using API key authentication") else: logger.warning("No authentication configured - API calls may fail") async def __aenter__(self): """Async context manager entry""" await self._ensure_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" await self.close() async def _ensure_session(self): """Ensure aiohttp session is created""" if not self.session: connector = aiohttp.TCPConnector(limit=10, limit_per_host=5) self.session = aiohttp.ClientSession( timeout=self.timeout, connector=connector, headers={"User-Agent": "Omnispindle-MCP/1.0"} ) async def close(self): """Close the aiohttp session""" if self.session: await self.session.close() self.session = None async def _make_request(self, method: str, endpoint: str, **kwargs) -> APIResponse: """ Make HTTP request with retries and error handling """ await self._ensure_session() url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" # Merge auth headers with any provided headers headers = {**self.auth_headers} if 'headers' in kwargs: headers.update(kwargs['headers']) kwargs['headers'] = headers # Add Content-Type for requests with data if method.upper() in ['POST', 'PUT', 'PATCH'] and 'json' in kwargs: headers.setdefault('Content-Type', 'application/json') last_error = None for attempt in range(self.max_retries + 1): try: logger.debug(f"API {method.upper()} {url} (attempt {attempt + 1})") async with self.session.request(method, url, **kwargs) as response: response_text = await response.text() # Log response details logger.debug(f"API Response: {response.status} {len(response_text)} bytes") # Try to parse JSON response try: response_data = json.loads(response_text) if response_text else {} except json.JSONDecodeError: response_data = {"raw_response": response_text} # Handle HTTP status codes if response.status == 200 or response.status == 201: return APIResponse( success=True, data=response_data, status_code=response.status ) elif response.status == 401: error_msg = f"Authentication failed (401): {response_data.get('message', 'Invalid credentials')}" logger.error(error_msg) return APIResponse( success=False, error=error_msg, status_code=response.status ) elif response.status == 403: error_msg = f"Access forbidden (403): {response_data.get('message', 'Insufficient permissions')}" logger.error(error_msg) return APIResponse( success=False, error=error_msg, status_code=response.status ) elif response.status == 404: error_msg = f"Resource not found (404): {response_data.get('message', 'Not found')}" return APIResponse( success=False, error=error_msg, status_code=response.status ) elif 400 <= response.status < 500: # Client error - don't retry error_msg = f"Client error ({response.status}): {response_data.get('message', 'Bad request')}" logger.error(error_msg) return APIResponse( success=False, error=error_msg, status_code=response.status ) elif response.status >= 500: # Server error - retry error_msg = f"Server error ({response.status}): {response_data.get('message', 'Internal server error')}" logger.warning(f"{error_msg} - will retry") last_error = error_msg if attempt < self.max_retries: # Exponential backoff wait_time = 2 ** attempt await asyncio.sleep(wait_time) continue else: return APIResponse( success=False, error=error_msg, status_code=response.status ) except aiohttp.ClientError as e: error_msg = f"Network error: {str(e)}" logger.warning(f"{error_msg} - attempt {attempt + 1}") last_error = error_msg if attempt < self.max_retries: # Exponential backoff for network errors wait_time = 2 ** attempt await asyncio.sleep(wait_time) continue else: return APIResponse( success=False, error=error_msg, status_code=None ) except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg) return APIResponse( success=False, error=error_msg, status_code=None ) # Should not reach here, but just in case return APIResponse( success=False, error=last_error or "Unknown error after retries", status_code=None ) # Health check async def health_check(self) -> APIResponse: """Check API health and connectivity""" return await self._make_request("GET", "/health") # Todo operations async def get_todos(self, project: str = None, status: str = None, priority: str = None, limit: int = 100) -> APIResponse: """Get todos with optional filtering""" params = {} if project: params["project"] = project if status: params["status"] = status if priority: params["priority"] = priority if limit: params["limit"] = limit return await self._make_request("GET", "/todos", params=params) async def get_todo(self, todo_id: str) -> APIResponse: """Get a specific todo by ID""" return await self._make_request("GET", f"/todos/{todo_id}") async def create_todo(self, description: str, project: str, priority: str = "Medium", metadata: Optional[Dict[str, Any]] = None) -> APIResponse: """Create a new todo""" payload = { "description": description, "project": project, "priority": priority } if metadata: payload["metadata"] = metadata logger.info(f"🐛 API client create_todo payload: {payload}") return await self._make_request("POST", "/todos", json=payload) async def update_todo(self, todo_id: str, updates: Dict[str, Any]) -> APIResponse: """Update an existing todo""" return await self._make_request("PUT", f"/todos/{todo_id}", json=updates) async def delete_todo(self, todo_id: str) -> APIResponse: """Delete a todo""" return await self._make_request("DELETE", f"/todos/{todo_id}") async def complete_todo(self, todo_id: str, comment: str = None) -> APIResponse: """Mark a todo as complete""" payload = {} if comment: payload["comment"] = comment return await self._make_request("POST", f"/todos/{todo_id}/complete", json=payload) async def get_todo_stats(self, project: str = None) -> APIResponse: """Get todo statistics""" params = {} if project: params["project"] = project return await self._make_request("GET", "/todos/stats", params=params) async def get_projects(self) -> APIResponse: """Get available projects""" return await self._make_request("GET", "/projects") # Chat session operations async def list_chat_sessions(self, project: Optional[str] = None, limit: int = 50, status: Optional[str] = None) -> APIResponse: """List chat sessions for the authenticated user.""" params: Dict[str, Any] = {} if project: params["project"] = project if limit: params["limit"] = limit if status: params["status"] = status return await self._make_request("GET", "/chat-sessions", params=params or None) async def get_chat_session(self, session_id: str) -> APIResponse: """Fetch a specific chat session by ID.""" return await self._make_request("GET", f"/chat-sessions/{session_id}") async def create_chat_session(self, payload: Dict[str, Any]) -> APIResponse: """Create a chat session.""" return await self._make_request("POST", "/chat-sessions", json=payload) async def update_chat_session(self, session_id: str, updates: Dict[str, Any]) -> APIResponse: """Update chat session metadata.""" return await self._make_request("PATCH", f"/chat-sessions/{session_id}", json=updates) async def append_chat_message(self, session_id: str, message: Dict[str, Any]) -> APIResponse: """Append a message to a chat session.""" return await self._make_request("POST", f"/chat-sessions/{session_id}/messages", json=message) async def fork_chat_session(self, session_id: str, payload: Dict[str, Any]) -> APIResponse: """Fork a chat session to explore alternative paths.""" return await self._make_request("POST", f"/chat-sessions/{session_id}/fork", json=payload) async def spawn_chat_session(self, session_id: str, payload: Dict[str, Any]) -> APIResponse: """Spawn a delegated child chat session.""" return await self._make_request("POST", f"/chat-sessions/{session_id}/spawn", json=payload) async def get_chat_session_genealogy(self, session_id: str) -> APIResponse: """Get genealogy details for a specific session.""" return await self._make_request("GET", f"/chat-sessions/{session_id}/genealogy") async def get_chat_session_tree(self, project: Optional[str] = None, limit: int = 200) -> APIResponse: """Fetch session tree for the authenticated user.""" params: Dict[str, Any] = {} if project: params["project"] = project if limit: params["limit"] = limit return await self._make_request("GET", "/chat-sessions/tree", params=params or None) # Factory function for creating API client instances def create_api_client(auth_token: str = None, api_key: str = None) -> MadnessAPIClient: """Factory function to create API client with authentication""" return MadnessAPIClient(auth_token=auth_token, api_key=api_key) # Singleton instance for module-level usage _default_client: Optional[MadnessAPIClient] = None async def get_default_client() -> MadnessAPIClient: """Get or create default API client instance""" global _default_client if not _default_client: _default_client = create_api_client() return _default_client

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MadnessEngineering/fastmcp-todo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server