Skip to main content
Glama
hybrid_tools.py16.9 kB
""" Hybrid tools module that can switch between API and local database modes. Provides graceful degradation and performance comparison capabilities. """ import os import asyncio import logging from typing import Dict, Any, Optional, Union, List from enum import Enum from datetime import datetime, timezone from .context import Context from .utils import create_response from . import tools as local_tools from . import api_tools from .api_client import MadnessAPIClient logger = logging.getLogger(__name__) class OmnispindleMode(Enum): """Available operation modes for Omnispindle""" LOCAL = "local" # Direct MongoDB access API = "api" # HTTP API calls only HYBRID = "hybrid" # Try API first, fallback to local AUTO = "auto" # Automatically choose best mode class HybridConfig: """Configuration for hybrid mode operations""" def __init__(self): self.mode = self._get_mode_from_env() self.api_timeout = float(os.getenv("OMNISPINDLE_API_TIMEOUT", "10.0")) self.fallback_enabled = os.getenv("OMNISPINDLE_FALLBACK_ENABLED", "true").lower() == "true" self.performance_logging = os.getenv("OMNISPINDLE_PERFORMANCE_LOGGING", "false").lower() == "true" # Performance thresholds self.api_failure_threshold = int(os.getenv("OMNISPINDLE_API_FAILURE_THRESHOLD", "3")) self.api_timeout_threshold = float(os.getenv("OMNISPINDLE_API_TIMEOUT_THRESHOLD", "5.0")) # Performance tracking self.api_failures = 0 self.local_failures = 0 self.api_response_times = [] self.local_response_times = [] def _get_mode_from_env(self) -> OmnispindleMode: """Get operation mode from environment variable""" mode_str = os.getenv("OMNISPINDLE_MODE", "hybrid").lower() try: return OmnispindleMode(mode_str) except ValueError: logger.warning(f"Invalid OMNISPINDLE_MODE '{mode_str}', defaulting to hybrid") return OmnispindleMode.HYBRID def should_use_api(self) -> bool: """Determine if API should be used based on current state""" if self.mode == OmnispindleMode.LOCAL: return False elif self.mode == OmnispindleMode.API: return True elif self.mode in [OmnispindleMode.HYBRID, OmnispindleMode.AUTO]: # Use API unless it's consistently failing return self.api_failures < self.api_failure_threshold return True def record_api_success(self, response_time: float): """Record successful API operation""" self.api_failures = 0 # Reset failure count on success if self.performance_logging: self.api_response_times.append(response_time) # Keep only recent measurements if len(self.api_response_times) > 100: self.api_response_times = self.api_response_times[-50:] def record_api_failure(self): """Record failed API operation""" self.api_failures += 1 logger.warning(f"API failure count: {self.api_failures}/{self.api_failure_threshold}") def record_local_success(self, response_time: float): """Record successful local operation""" self.local_failures = 0 if self.performance_logging: self.local_response_times.append(response_time) if len(self.local_response_times) > 100: self.local_response_times = self.local_response_times[-50:] def record_local_failure(self): """Record failed local operation""" self.local_failures += 1 logger.warning(f"Local failure count: {self.local_failures}") def get_performance_stats(self) -> Dict[str, Any]: """Get performance statistics""" stats = { "mode": self.mode.value, "api_failures": self.api_failures, "local_failures": self.local_failures, "should_use_api": self.should_use_api() } if self.api_response_times: stats["api_avg_response_time"] = sum(self.api_response_times) / len(self.api_response_times) stats["api_recent_calls"] = len(self.api_response_times) if self.local_response_times: stats["local_avg_response_time"] = sum(self.local_response_times) / len(self.local_response_times) stats["local_recent_calls"] = len(self.local_response_times) return stats # Global configuration instance _hybrid_config = HybridConfig() def get_hybrid_config() -> HybridConfig: """Get the global hybrid configuration""" return _hybrid_config async def _execute_with_fallback(operation_name: str, api_func, local_func, *args, ctx: Optional[Context] = None, **kwargs): """ Execute a function with hybrid mode support - API first, fallback to local if needed. """ config = get_hybrid_config() # Record start time for performance tracking start_time = datetime.now(timezone.utc) # Determine primary and fallback methods use_api_first = config.should_use_api() if use_api_first: primary_func = api_func fallback_func = local_func primary_name = "API" fallback_name = "Local" else: primary_func = local_func fallback_func = api_func primary_name = "Local" fallback_name = "API" # Try primary method try: logger.debug(f"Executing {operation_name} via {primary_name}") result = await primary_func(*args, ctx=ctx, **kwargs) # Record success response_time = (datetime.now(timezone.utc) - start_time).total_seconds() if use_api_first: config.record_api_success(response_time) else: config.record_local_success(response_time) # Check if result indicates failure if isinstance(result, str) and '"success": false' in result: raise Exception(f"{primary_name} returned failure response") logger.debug(f"{operation_name} succeeded via {primary_name} in {response_time:.2f}s") return result except Exception as primary_error: logger.warning(f"{operation_name} failed via {primary_name}: {str(primary_error)}") # Record failure if use_api_first: config.record_api_failure() else: config.record_local_failure() # Try fallback if enabled and in hybrid/auto mode if config.fallback_enabled and config.mode in [OmnispindleMode.HYBRID, OmnispindleMode.AUTO]: try: logger.info(f"Falling back to {fallback_name} for {operation_name}") fallback_start = datetime.now(timezone.utc) result = await fallback_func(*args, ctx=ctx, **kwargs) # Record fallback success response_time = (datetime.now(timezone.utc) - fallback_start).total_seconds() if not use_api_first: config.record_api_success(response_time) else: config.record_local_success(response_time) logger.info(f"{operation_name} succeeded via {fallback_name} fallback in {response_time:.2f}s") return result except Exception as fallback_error: logger.error(f"{operation_name} failed via both {primary_name} and {fallback_name}") logger.error(f"Primary error: {str(primary_error)}") logger.error(f"Fallback error: {str(fallback_error)}") # Record fallback failure if not use_api_first: config.record_api_failure() else: config.record_local_failure() return create_response(False, message=f"Both {primary_name} and {fallback_name} failed. Primary: {str(primary_error)}, Fallback: {str(fallback_error)}") else: # No fallback, return primary error return create_response(False, message=f"{primary_name} failed: {str(primary_error)}") # Hybrid tool implementations async def add_todo(description: str, project: str, priority: str = "Medium", target_agent: str = "user", metadata: Optional[Dict[str, Any]] = None, ctx: Optional[Context] = None) -> str: """Create a todo using hybrid mode""" return await _execute_with_fallback( "add_todo", api_tools.add_todo, local_tools.add_todo, description, project, priority, target_agent, metadata, ctx=ctx ) async def query_todos(filter: Optional[Dict[str, Any]] = None, projection: Optional[Dict[str, Any]] = None, limit: int = 100, ctx: Optional[Context] = None) -> str: """Query todos using hybrid mode""" return await _execute_with_fallback( "query_todos", api_tools.query_todos, local_tools.query_todos, filter, projection, limit, ctx=ctx ) async def update_todo(todo_id: str, updates: dict, ctx: Optional[Context] = None) -> str: """Update todo using hybrid mode""" return await _execute_with_fallback( "update_todo", api_tools.update_todo, local_tools.update_todo, todo_id, updates, ctx=ctx ) async def delete_todo(todo_id: str, ctx: Optional[Context] = None) -> str: """Delete todo using hybrid mode""" return await _execute_with_fallback( "delete_todo", api_tools.delete_todo, local_tools.delete_todo, todo_id, ctx=ctx ) async def get_todo(todo_id: str, ctx: Optional[Context] = None) -> str: """Get todo using hybrid mode""" return await _execute_with_fallback( "get_todo", api_tools.get_todo, local_tools.get_todo, todo_id, ctx=ctx ) async def mark_todo_complete(todo_id: str, comment: Optional[str] = None, ctx: Optional[Context] = None) -> str: """Complete todo using hybrid mode""" return await _execute_with_fallback( "mark_todo_complete", api_tools.mark_todo_complete, local_tools.mark_todo_complete, todo_id, comment, ctx=ctx ) async def list_todos_by_status(status: str, limit: int = 100, ctx: Optional[Context] = None) -> str: """List todos by status using hybrid mode""" return await _execute_with_fallback( "list_todos_by_status", api_tools.list_todos_by_status, local_tools.list_todos_by_status, status, limit, ctx=ctx ) async def search_todos(query: str, fields: Optional[list] = None, limit: int = 100, ctx: Optional[Context] = None) -> str: """Search todos using hybrid mode""" return await _execute_with_fallback( "search_todos", api_tools.search_todos, local_tools.search_todos, query, fields, limit, ctx=ctx ) async def list_project_todos(project: str, limit: int = 5, ctx: Optional[Context] = None) -> str: """List project todos using hybrid mode""" return await _execute_with_fallback( "list_project_todos", api_tools.list_project_todos, local_tools.list_project_todos, project, limit, ctx=ctx ) async def list_projects(include_details: Union[bool, str] = False, madness_root: str = "/Users/d.edens/lab/madness_interactive", ctx: Optional[Context] = None) -> str: """List projects using hybrid mode""" return await _execute_with_fallback( "list_projects", api_tools.list_projects, local_tools.list_projects, include_details, madness_root, ctx=ctx ) # For non-todo operations, prefer local mode since they're not yet available via API async def add_lesson(language: str, topic: str, lesson_learned: str, tags: Optional[list] = None, ctx: Optional[Context] = None) -> str: """Add lesson - local only for now""" return await local_tools.add_lesson(language, topic, lesson_learned, tags, ctx=ctx) async def get_lesson(lesson_id: str, ctx: Optional[Context] = None) -> str: """Get lesson - local only for now""" return await local_tools.get_lesson(lesson_id, ctx=ctx) async def update_lesson(lesson_id: str, updates: dict, ctx: Optional[Context] = None) -> str: """Update lesson - local only for now""" return await local_tools.update_lesson(lesson_id, updates, ctx=ctx) async def delete_lesson(lesson_id: str, ctx: Optional[Context] = None) -> str: """Delete lesson - local only for now""" return await local_tools.delete_lesson(lesson_id, ctx=ctx) async def search_lessons(query: str, fields: Optional[list] = None, limit: int = 100, brief: bool = False, ctx: Optional[Context] = None) -> str: """Search lessons - local only for now""" return await local_tools.search_lessons(query, fields, limit, brief, ctx=ctx) async def grep_lessons(pattern: str, limit: int = 20, ctx: Optional[Context] = None) -> str: """Grep lessons - local only for now""" return await local_tools.grep_lessons(pattern, limit, ctx=ctx) async def list_lessons(limit: int = 100, brief: bool = False, ctx: Optional[Context] = None) -> str: """List lessons - local only for now""" return await local_tools.list_lessons(limit, brief, ctx=ctx) async def query_todo_logs(filter_type: str = 'all', project: str = 'all', page: int = 1, page_size: int = 20, ctx: Optional[Context] = None) -> str: """Query todo logs - local only for now""" return await local_tools.query_todo_logs(filter_type, project, page, page_size, ctx=ctx) async def add_explanation(topic: str, content: str, kind: str = "concept", author: str = "system", ctx: Optional[Context] = None) -> str: """Add explanation - local only for now""" return await local_tools.add_explanation(topic, content, kind, author, ctx=ctx) async def explain_tool(topic: str, brief: bool = False, ctx: Optional[Context] = None) -> str: """Explain tool - local only for now""" return await local_tools.explain_tool(topic, brief, ctx=ctx) async def point_out_obvious(observation: str, sarcasm_level: int = 5, ctx: Optional[Context] = None) -> str: """Point out obvious - local only for now""" return await local_tools.point_out_obvious(observation, sarcasm_level, ctx=ctx) async def bring_your_own(tool_name: str, code: str, runtime: str = "python", timeout: int = 30, args: Optional[Dict[str, Any]] = None, persist: bool = False, ctx: Optional[Context] = None) -> str: """Bring your own tool - local only for now""" return await local_tools.bring_your_own(tool_name, code, runtime, timeout, args, persist, ctx=ctx) # Utility functions for monitoring and configuration async def get_hybrid_status(ctx: Optional[Context] = None) -> str: """Get current hybrid mode status and performance stats""" config = get_hybrid_config() stats = config.get_performance_stats() return create_response(True, { "hybrid_status": stats, "configuration": { "mode": config.mode.value, "api_timeout": config.api_timeout, "fallback_enabled": config.fallback_enabled, "performance_logging": config.performance_logging, "api_failure_threshold": config.api_failure_threshold } }, message=f"Hybrid mode: {config.mode.value}, API preferred: {config.should_use_api()}") async def test_api_connectivity(ctx: Optional[Context] = None) -> str: """Test API connectivity and response times""" try: auth_token, api_key = api_tools._get_auth_from_context(ctx) start_time = datetime.now(timezone.utc) async with MadnessAPIClient(auth_token=auth_token, api_key=api_key) as client: health_response = await client.health_check() response_time = (datetime.now(timezone.utc) - start_time).total_seconds() if health_response.success: return create_response(True, { "api_status": "healthy", "response_time": response_time, "api_data": health_response.data }, message=f"API connectivity OK ({response_time:.2f}s)") else: return create_response(False, { "api_status": "unhealthy", "response_time": response_time, "error": health_response.error }, message=f"API connectivity failed: {health_response.error}") except Exception as e: return create_response(False, { "api_status": "error", "error": str(e) }, message=f"API connectivity test failed: {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MadnessEngineering/fastmcp-todo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server