Skip to main content
Glama

AnyDocs MCP Server

by funky1688
helpers.pyโ€ข17.3 kB
#!/usr/bin/env python3 """ Helper Utilities Provides common utility functions and helpers. """ import os import re import json import hashlib import secrets import asyncio import functools from pathlib import Path from typing import Any, Dict, List, Optional, Union, Callable, TypeVar, Awaitable from datetime import datetime, timezone from urllib.parse import urlparse, urljoin, quote, unquote from dataclasses import dataclass, asdict T = TypeVar('T') F = TypeVar('F', bound=Callable[..., Any]) # String utilities def slugify(text: str, max_length: int = 50) -> str: """Convert text to URL-friendly slug. Args: text: Input text max_length: Maximum length of slug Returns: URL-friendly slug """ # Convert to lowercase and replace spaces/special chars with hyphens slug = re.sub(r'[^\w\s-]', '', text.lower()) slug = re.sub(r'[-\s]+', '-', slug) slug = slug.strip('-') # Truncate if too long if len(slug) > max_length: slug = slug[:max_length].rstrip('-') return slug or 'untitled' def truncate_text(text: str, max_length: int = 100, suffix: str = '...') -> str: """Truncate text to specified length. Args: text: Input text max_length: Maximum length suffix: Suffix to append when truncated Returns: Truncated text """ if len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def extract_domain(url: str) -> Optional[str]: """Extract domain from URL. Args: url: Input URL Returns: Domain name or None if invalid """ try: parsed = urlparse(url) return parsed.netloc.lower() except Exception: return None def is_valid_url(url: str) -> bool: """Check if URL is valid. Args: url: URL to validate Returns: True if valid URL """ try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def normalize_url(url: str, base_url: Optional[str] = None) -> str: """Normalize URL. Args: url: URL to normalize base_url: Base URL for relative URLs Returns: Normalized URL """ if not url: return '' # Handle relative URLs if base_url and not url.startswith(('http://', 'https://')): url = urljoin(base_url, url) # Parse and reconstruct parsed = urlparse(url) # Normalize path path = parsed.path or '/' if not path.startswith('/'): path = '/' + path # Remove default ports netloc = parsed.netloc if ':80' in netloc and parsed.scheme == 'http': netloc = netloc.replace(':80', '') elif ':443' in netloc and parsed.scheme == 'https': netloc = netloc.replace(':443', '') return f"{parsed.scheme}://{netloc}{path}" # Hash utilities def generate_hash(data: Union[str, bytes], algorithm: str = 'sha256') -> str: """Generate hash for data. Args: data: Data to hash algorithm: Hash algorithm Returns: Hex digest of hash """ if isinstance(data, str): data = data.encode('utf-8') hasher = hashlib.new(algorithm) hasher.update(data) return hasher.hexdigest() def generate_id(length: int = 16) -> str: """Generate random ID. Args: length: Length of ID Returns: Random ID string """ return secrets.token_urlsafe(length)[:length] def generate_api_key(prefix: str = 'ak', length: int = 32) -> str: """Generate API key. Args: prefix: Key prefix length: Key length (excluding prefix) Returns: API key string """ key = secrets.token_urlsafe(length)[:length] return f"{prefix}_{key}" # File utilities def ensure_directory(path: Union[str, Path]) -> Path: """Ensure directory exists. Args: path: Directory path Returns: Path object """ path = Path(path) path.mkdir(parents=True, exist_ok=True) return path def get_file_size(path: Union[str, Path]) -> int: """Get file size in bytes. Args: path: File path Returns: File size in bytes """ return Path(path).stat().st_size def get_file_extension(path: Union[str, Path]) -> str: """Get file extension. Args: path: File path Returns: File extension (without dot) """ return Path(path).suffix.lstrip('.') def is_text_file(path: Union[str, Path]) -> bool: """Check if file is text file. Args: path: File path Returns: True if text file """ text_extensions = { 'txt', 'md', 'rst', 'py', 'js', 'ts', 'html', 'css', 'json', 'xml', 'yaml', 'yml', 'toml', 'ini', 'cfg', 'conf', 'log' } extension = get_file_extension(path).lower() return extension in text_extensions def read_file_safe(path: Union[str, Path], encoding: str = 'utf-8') -> Optional[str]: """Safely read text file. Args: path: File path encoding: File encoding Returns: File content or None if error """ try: return Path(path).read_text(encoding=encoding) except Exception: return None def write_file_safe(path: Union[str, Path], content: str, encoding: str = 'utf-8') -> bool: """Safely write text file. Args: path: File path content: File content encoding: File encoding Returns: True if successful """ try: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding=encoding) return True except Exception: return False # JSON utilities def load_json_safe(path: Union[str, Path]) -> Optional[Dict[str, Any]]: """Safely load JSON file. Args: path: JSON file path Returns: Parsed JSON data or None if error """ try: content = Path(path).read_text(encoding='utf-8') return json.loads(content) except Exception: return None def save_json_safe(path: Union[str, Path], data: Any, indent: int = 2) -> bool: """Safely save JSON file. Args: path: JSON file path data: Data to save indent: JSON indentation Returns: True if successful """ try: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) content = json.dumps(data, indent=indent, ensure_ascii=False) path.write_text(content, encoding='utf-8') return True except Exception: return False def serialize_dataclass(obj: Any) -> Dict[str, Any]: """Serialize dataclass to dictionary. Args: obj: Dataclass instance Returns: Dictionary representation """ if hasattr(obj, '__dataclass_fields__'): return asdict(obj) elif isinstance(obj, dict): return obj else: return {'value': obj} # Date/time utilities def utc_now() -> datetime: """Get current UTC datetime. Returns: Current UTC datetime """ return datetime.now(timezone.utc) def format_datetime(dt: datetime, format_str: str = '%Y-%m-%d %H:%M:%S') -> str: """Format datetime to string. Args: dt: Datetime object format_str: Format string Returns: Formatted datetime string """ return dt.strftime(format_str) def parse_datetime(dt_str: str, format_str: str = '%Y-%m-%d %H:%M:%S') -> Optional[datetime]: """Parse datetime from string. Args: dt_str: Datetime string format_str: Format string Returns: Parsed datetime or None if error """ try: return datetime.strptime(dt_str, format_str) except Exception: return None def timestamp_to_datetime(timestamp: float) -> datetime: """Convert timestamp to datetime. Args: timestamp: Unix timestamp Returns: Datetime object """ return datetime.fromtimestamp(timestamp, tz=timezone.utc) # Validation utilities def validate_email(email: str) -> bool: """Validate email address. Args: email: Email address Returns: True if valid email """ pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) def validate_username(username: str, min_length: int = 3, max_length: int = 30) -> bool: """Validate username. Args: username: Username min_length: Minimum length max_length: Maximum length Returns: True if valid username """ if not (min_length <= len(username) <= max_length): return False # Allow alphanumeric, underscore, hyphen pattern = r'^[a-zA-Z0-9_-]+$' return bool(re.match(pattern, username)) def sanitize_filename(filename: str) -> str: """Sanitize filename for filesystem. Args: filename: Original filename Returns: Sanitized filename """ # Remove/replace invalid characters filename = re.sub(r'[<>:"/\\|?*]', '_', filename) filename = re.sub(r'[\x00-\x1f]', '', filename) # Remove control characters filename = filename.strip('. ') # Remove leading/trailing dots and spaces # Ensure not empty if not filename: filename = 'untitled' return filename # Async utilities def run_sync(coro: Awaitable[T]) -> T: """Run async function synchronously. Args: coro: Coroutine to run Returns: Result of coroutine """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(coro) async def run_with_timeout(coro: Awaitable[T], timeout: float) -> Optional[T]: """Run coroutine with timeout. Args: coro: Coroutine to run timeout: Timeout in seconds Returns: Result or None if timeout """ try: return await asyncio.wait_for(coro, timeout=timeout) except asyncio.TimeoutError: return None # Decorators def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0): """Retry decorator. Args: max_attempts: Maximum retry attempts delay: Initial delay between retries backoff: Backoff multiplier """ def decorator(func: F) -> F: @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None current_delay = delay for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: import time time.sleep(current_delay) current_delay *= backoff raise last_exception return wrapper return decorator def async_retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0): """Async retry decorator. Args: max_attempts: Maximum retry attempts delay: Initial delay between retries backoff: Backoff multiplier """ def decorator(func: F) -> F: @functools.wraps(func) async def wrapper(*args, **kwargs): last_exception = None current_delay = delay for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: await asyncio.sleep(current_delay) current_delay *= backoff raise last_exception return wrapper return decorator def cache_result(ttl: Optional[float] = None): """Simple result caching decorator. Args: ttl: Time to live in seconds (None for no expiration) """ def decorator(func: F) -> F: cache = {} @functools.wraps(func) def wrapper(*args, **kwargs): import time # Create cache key key = str(args) + str(sorted(kwargs.items())) # Check cache if key in cache: result, timestamp = cache[key] if ttl is None or time.time() - timestamp < ttl: return result # Call function and cache result result = func(*args, **kwargs) cache[key] = (result, time.time()) return result return wrapper return decorator # Data structures class LRUCache: """Simple LRU cache implementation.""" def __init__(self, max_size: int = 128): """Initialize LRU cache. Args: max_size: Maximum cache size """ self.max_size = max_size self.cache = {} self.access_order = [] def get(self, key: str) -> Optional[Any]: """Get value from cache. Args: key: Cache key Returns: Cached value or None """ if key in self.cache: # Move to end (most recently used) self.access_order.remove(key) self.access_order.append(key) return self.cache[key] return None def put(self, key: str, value: Any) -> None: """Put value in cache. Args: key: Cache key value: Value to cache """ if key in self.cache: # Update existing self.cache[key] = value self.access_order.remove(key) self.access_order.append(key) else: # Add new if len(self.cache) >= self.max_size: # Remove least recently used oldest = self.access_order.pop(0) del self.cache[oldest] self.cache[key] = value self.access_order.append(key) def clear(self) -> None: """Clear cache.""" self.cache.clear() self.access_order.clear() def size(self) -> int: """Get cache size. Returns: Number of items in cache """ return len(self.cache) # Environment utilities def get_env_bool(key: str, default: bool = False) -> bool: """Get boolean from environment variable. Args: key: Environment variable name default: Default value Returns: Boolean value """ value = os.getenv(key, '').lower() return value in ('true', '1', 'yes', 'on') def get_env_int(key: str, default: int = 0) -> int: """Get integer from environment variable. Args: key: Environment variable name default: Default value Returns: Integer value """ try: return int(os.getenv(key, str(default))) except ValueError: return default def get_env_float(key: str, default: float = 0.0) -> float: """Get float from environment variable. Args: key: Environment variable name default: Default value Returns: Float value """ try: return float(os.getenv(key, str(default))) except ValueError: return default def get_env_list(key: str, separator: str = ',', default: Optional[List[str]] = None) -> List[str]: """Get list from environment variable. Args: key: Environment variable name separator: List separator default: Default value Returns: List of strings """ value = os.getenv(key, '') if not value: return default or [] return [item.strip() for item in value.split(separator) if item.strip()] # Memory utilities def get_memory_usage() -> Dict[str, float]: """Get current memory usage. Returns: Memory usage information """ import psutil import os process = psutil.Process(os.getpid()) memory_info = process.memory_info() return { 'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size 'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size 'percent': process.memory_percent(), } def format_bytes(bytes_value: int) -> str: """Format bytes to human readable string. Args: bytes_value: Number of bytes Returns: Formatted string """ for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_value < 1024.0: return f"{bytes_value:.1f} {unit}" bytes_value /= 1024.0 return f"{bytes_value:.1f} PB"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server