Skip to main content
Glama
async_helpers.py9.49 kB
#!/usr/bin/env python3 """ Async Safety Helpers for Carla MCP Server Provides reusable utilities for wrapping blocking operations in async-safe wrappers. Prevents event loop blocking that causes timeout crashes. """ import asyncio import functools import logging from typing import Callable, Any, Optional, TypeVar, ParamSpec logger = logging.getLogger(__name__) P = ParamSpec('P') T = TypeVar('T') def async_safe(timeout: float = 30.0, description: str = "operation"): """ Decorator to wrap blocking functions for async-safe execution. Automatically runs the decorated function in a thread pool with timeout protection. Args: timeout: Maximum execution time in seconds (default: 30.0) description: Human-readable description for logging Usage: @async_safe(timeout=5.0, description="load project file") def _blocking_load(path): return self.carla.load_project(path) result = await _blocking_load(path) Example: # Before (BLOCKS event loop): async def load_session(self, path): success = self.carla.load_project(path) # BLOCKING! # After (async-safe): async def load_session(self, path): @async_safe(timeout=10.0, description="load project") def _load(): return self.carla.load_project(path) success = await _load() """ def decorator(func: Callable[P, T]) -> Callable[P, asyncio.Future[T]]: @functools.wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: try: # Run blocking function in thread pool with timeout result = await asyncio.wait_for( asyncio.to_thread(func, *args, **kwargs), timeout=timeout ) return result except asyncio.TimeoutError: error_msg = f"{description} exceeded {timeout}s timeout" logger.error(error_msg) raise TimeoutError(error_msg) except Exception as e: logger.error(f"{description} failed: {e}") raise return wrapper return decorator async def run_blocking( func: Callable[P, T], *args: P.args, timeout: float = 30.0, description: str = "blocking operation", **kwargs: P.kwargs ) -> T: """ Run a blocking function safely in async context. This is a functional alternative to the @async_safe decorator. Use when you can't use a decorator (e.g., calling existing methods). Args: func: Blocking function to execute *args: Positional arguments for func timeout: Maximum execution time in seconds description: Human-readable description for logging **kwargs: Keyword arguments for func Returns: Result from the blocking function Raises: TimeoutError: If operation exceeds timeout Usage: # Before (BLOCKS): result = self.carla.load_project(path) # After (async-safe): result = await run_blocking( self.carla.load_project, path, timeout=10.0, description="load project" ) Example: # File I/O content = await run_blocking( json.load, open(path), timeout=5.0, description=f"load JSON from {path}" ) # Carla API plugin_id = await run_blocking( self.carla.load_plugin, plugin_path, plugin_type, timeout=15.0, description="load plugin" ) """ try: result = await asyncio.wait_for( asyncio.to_thread(func, *args, **kwargs), timeout=timeout ) return result except asyncio.TimeoutError: error_msg = f"{description} exceeded {timeout}s timeout" logger.error(error_msg) raise TimeoutError(error_msg) except Exception as e: logger.error(f"{description} failed: {e}") raise async def batch_blocking( operations: list[tuple[Callable, tuple, dict]], batch_size: int = 20, timeout_per_batch: float = 30.0, description: str = "batch operations" ) -> list[Any]: """ Execute multiple blocking operations in batches with async safety. Processes operations in batches to prevent long blocking periods and yields control to the event loop between batches. Args: operations: List of (func, args, kwargs) tuples batch_size: Number of operations per batch timeout_per_batch: Timeout for each batch in seconds description: Human-readable description Returns: List of results from all operations Usage: # Prepare operations ops = [ (self.carla.get_plugin_info, (i,), {}), (self.carla.get_plugin_info, (i+1,), {}), ... ] # Execute in batches results = await batch_blocking( ops, batch_size=20, timeout_per_batch=30.0, description="get plugin info" ) Example: # Get info for 100 plugins (batched to prevent long blocks) plugin_ops = [ (self.carla.host.get_plugin_info, (i,), {}) for i in range(100) ] plugin_infos = await batch_blocking( plugin_ops, batch_size=20, description="fetch plugin metadata" ) """ results = [] for batch_start in range(0, len(operations), batch_size): batch_end = min(batch_start + batch_size, len(operations)) batch = operations[batch_start:batch_end] # Execute batch in thread pool def execute_batch(): batch_results = [] for func, args, kwargs in batch: try: result = func(*args, **kwargs) batch_results.append(result) except Exception as e: logger.warning(f"Batch operation failed: {e}") batch_results.append(None) return batch_results # Run batch with timeout try: batch_results = await asyncio.wait_for( asyncio.to_thread(execute_batch), timeout=timeout_per_batch ) results.extend(batch_results) except asyncio.TimeoutError: logger.error(f"{description} batch {batch_start}-{batch_end} timed out") results.extend([None] * len(batch)) # Yield control to event loop between batches await asyncio.sleep(0) return results class AsyncFileIO: """ Async-safe file I/O operations. All file operations are automatically wrapped for safe async execution. """ @staticmethod async def read_text(path: str, encoding: str = 'utf-8', timeout: float = 10.0) -> str: """Read text file async-safely""" def _read(): with open(path, 'r', encoding=encoding) as f: return f.read() return await run_blocking( _read, timeout=timeout, description=f"read {path}" ) @staticmethod async def write_text(path: str, content: str, encoding: str = 'utf-8', timeout: float = 10.0): """Write text file async-safely""" def _write(): with open(path, 'w', encoding=encoding) as f: f.write(content) await run_blocking( _write, timeout=timeout, description=f"write {path}" ) @staticmethod async def read_json(path: str, timeout: float = 10.0) -> dict: """Read JSON file async-safely""" import json def _read(): with open(path, 'r') as f: return json.load(f) return await run_blocking( _read, timeout=timeout, description=f"read JSON {path}" ) @staticmethod async def write_json(path: str, data: dict, timeout: float = 10.0): """Write JSON file async-safely""" import json def _write(): with open(path, 'w') as f: json.dump(data, f, indent=2) await run_blocking( _write, timeout=timeout, description=f"write JSON {path}" ) @staticmethod async def copy_file(src: str, dst: str, timeout: float = 30.0): """Copy file async-safely""" import shutil await run_blocking( shutil.copy2, src, dst, timeout=timeout, description=f"copy {src} → {dst}" ) @staticmethod async def remove_tree(path: str, timeout: float = 30.0): """Remove directory tree async-safely""" import shutil await run_blocking( shutil.rmtree, path, timeout=timeout, description=f"remove tree {path}" ) @staticmethod async def get_size(path: str, timeout: float = 5.0) -> int: """Get file size async-safely""" import os return await run_blocking( os.path.getsize, path, timeout=timeout, description=f"stat {path}" ) # Convenience exports __all__ = [ 'async_safe', 'run_blocking', 'batch_blocking', 'AsyncFileIO' ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agrathwohl/carla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server