helpers.pyโข17.3 kB
#!/usr/bin/env python3
"""
Helper Utilities
Provides common utility functions and helpers.
"""
import os
import re
import json
import hashlib
import secrets
import asyncio
import functools
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, Callable, TypeVar, Awaitable
from datetime import datetime, timezone
from urllib.parse import urlparse, urljoin, quote, unquote
from dataclasses import dataclass, asdict
T = TypeVar('T')
F = TypeVar('F', bound=Callable[..., Any])
# String utilities
def slugify(text: str, max_length: int = 50) -> str:
"""Convert text to URL-friendly slug.
Args:
text: Input text
max_length: Maximum length of slug
Returns:
URL-friendly slug
"""
# Convert to lowercase and replace spaces/special chars with hyphens
slug = re.sub(r'[^\w\s-]', '', text.lower())
slug = re.sub(r'[-\s]+', '-', slug)
slug = slug.strip('-')
# Truncate if too long
if len(slug) > max_length:
slug = slug[:max_length].rstrip('-')
return slug or 'untitled'
def truncate_text(text: str, max_length: int = 100, suffix: str = '...') -> str:
"""Truncate text to specified length.
Args:
text: Input text
max_length: Maximum length
suffix: Suffix to append when truncated
Returns:
Truncated text
"""
if len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
def extract_domain(url: str) -> Optional[str]:
"""Extract domain from URL.
Args:
url: Input URL
Returns:
Domain name or None if invalid
"""
try:
parsed = urlparse(url)
return parsed.netloc.lower()
except Exception:
return None
def is_valid_url(url: str) -> bool:
"""Check if URL is valid.
Args:
url: URL to validate
Returns:
True if valid URL
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def normalize_url(url: str, base_url: Optional[str] = None) -> str:
"""Normalize URL.
Args:
url: URL to normalize
base_url: Base URL for relative URLs
Returns:
Normalized URL
"""
if not url:
return ''
# Handle relative URLs
if base_url and not url.startswith(('http://', 'https://')):
url = urljoin(base_url, url)
# Parse and reconstruct
parsed = urlparse(url)
# Normalize path
path = parsed.path or '/'
if not path.startswith('/'):
path = '/' + path
# Remove default ports
netloc = parsed.netloc
if ':80' in netloc and parsed.scheme == 'http':
netloc = netloc.replace(':80', '')
elif ':443' in netloc and parsed.scheme == 'https':
netloc = netloc.replace(':443', '')
return f"{parsed.scheme}://{netloc}{path}"
# Hash utilities
def generate_hash(data: Union[str, bytes], algorithm: str = 'sha256') -> str:
"""Generate hash for data.
Args:
data: Data to hash
algorithm: Hash algorithm
Returns:
Hex digest of hash
"""
if isinstance(data, str):
data = data.encode('utf-8')
hasher = hashlib.new(algorithm)
hasher.update(data)
return hasher.hexdigest()
def generate_id(length: int = 16) -> str:
"""Generate random ID.
Args:
length: Length of ID
Returns:
Random ID string
"""
return secrets.token_urlsafe(length)[:length]
def generate_api_key(prefix: str = 'ak', length: int = 32) -> str:
"""Generate API key.
Args:
prefix: Key prefix
length: Key length (excluding prefix)
Returns:
API key string
"""
key = secrets.token_urlsafe(length)[:length]
return f"{prefix}_{key}"
# File utilities
def ensure_directory(path: Union[str, Path]) -> Path:
"""Ensure directory exists.
Args:
path: Directory path
Returns:
Path object
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def get_file_size(path: Union[str, Path]) -> int:
"""Get file size in bytes.
Args:
path: File path
Returns:
File size in bytes
"""
return Path(path).stat().st_size
def get_file_extension(path: Union[str, Path]) -> str:
"""Get file extension.
Args:
path: File path
Returns:
File extension (without dot)
"""
return Path(path).suffix.lstrip('.')
def is_text_file(path: Union[str, Path]) -> bool:
"""Check if file is text file.
Args:
path: File path
Returns:
True if text file
"""
text_extensions = {
'txt', 'md', 'rst', 'py', 'js', 'ts', 'html', 'css', 'json',
'xml', 'yaml', 'yml', 'toml', 'ini', 'cfg', 'conf', 'log'
}
extension = get_file_extension(path).lower()
return extension in text_extensions
def read_file_safe(path: Union[str, Path], encoding: str = 'utf-8') -> Optional[str]:
"""Safely read text file.
Args:
path: File path
encoding: File encoding
Returns:
File content or None if error
"""
try:
return Path(path).read_text(encoding=encoding)
except Exception:
return None
def write_file_safe(path: Union[str, Path], content: str, encoding: str = 'utf-8') -> bool:
"""Safely write text file.
Args:
path: File path
content: File content
encoding: File encoding
Returns:
True if successful
"""
try:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding=encoding)
return True
except Exception:
return False
# JSON utilities
def load_json_safe(path: Union[str, Path]) -> Optional[Dict[str, Any]]:
"""Safely load JSON file.
Args:
path: JSON file path
Returns:
Parsed JSON data or None if error
"""
try:
content = Path(path).read_text(encoding='utf-8')
return json.loads(content)
except Exception:
return None
def save_json_safe(path: Union[str, Path], data: Any, indent: int = 2) -> bool:
"""Safely save JSON file.
Args:
path: JSON file path
data: Data to save
indent: JSON indentation
Returns:
True if successful
"""
try:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
content = json.dumps(data, indent=indent, ensure_ascii=False)
path.write_text(content, encoding='utf-8')
return True
except Exception:
return False
def serialize_dataclass(obj: Any) -> Dict[str, Any]:
"""Serialize dataclass to dictionary.
Args:
obj: Dataclass instance
Returns:
Dictionary representation
"""
if hasattr(obj, '__dataclass_fields__'):
return asdict(obj)
elif isinstance(obj, dict):
return obj
else:
return {'value': obj}
# Date/time utilities
def utc_now() -> datetime:
"""Get current UTC datetime.
Returns:
Current UTC datetime
"""
return datetime.now(timezone.utc)
def format_datetime(dt: datetime, format_str: str = '%Y-%m-%d %H:%M:%S') -> str:
"""Format datetime to string.
Args:
dt: Datetime object
format_str: Format string
Returns:
Formatted datetime string
"""
return dt.strftime(format_str)
def parse_datetime(dt_str: str, format_str: str = '%Y-%m-%d %H:%M:%S') -> Optional[datetime]:
"""Parse datetime from string.
Args:
dt_str: Datetime string
format_str: Format string
Returns:
Parsed datetime or None if error
"""
try:
return datetime.strptime(dt_str, format_str)
except Exception:
return None
def timestamp_to_datetime(timestamp: float) -> datetime:
"""Convert timestamp to datetime.
Args:
timestamp: Unix timestamp
Returns:
Datetime object
"""
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
# Validation utilities
def validate_email(email: str) -> bool:
"""Validate email address.
Args:
email: Email address
Returns:
True if valid email
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def validate_username(username: str, min_length: int = 3, max_length: int = 30) -> bool:
"""Validate username.
Args:
username: Username
min_length: Minimum length
max_length: Maximum length
Returns:
True if valid username
"""
if not (min_length <= len(username) <= max_length):
return False
# Allow alphanumeric, underscore, hyphen
pattern = r'^[a-zA-Z0-9_-]+$'
return bool(re.match(pattern, username))
def sanitize_filename(filename: str) -> str:
"""Sanitize filename for filesystem.
Args:
filename: Original filename
Returns:
Sanitized filename
"""
# Remove/replace invalid characters
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
filename = re.sub(r'[\x00-\x1f]', '', filename) # Remove control characters
filename = filename.strip('. ') # Remove leading/trailing dots and spaces
# Ensure not empty
if not filename:
filename = 'untitled'
return filename
# Async utilities
def run_sync(coro: Awaitable[T]) -> T:
"""Run async function synchronously.
Args:
coro: Coroutine to run
Returns:
Result of coroutine
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
async def run_with_timeout(coro: Awaitable[T], timeout: float) -> Optional[T]:
"""Run coroutine with timeout.
Args:
coro: Coroutine to run
timeout: Timeout in seconds
Returns:
Result or None if timeout
"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return None
# Decorators
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""Retry decorator.
Args:
max_attempts: Maximum retry attempts
delay: Initial delay between retries
backoff: Backoff multiplier
"""
def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
import time
time.sleep(current_delay)
current_delay *= backoff
raise last_exception
return wrapper
return decorator
def async_retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""Async retry decorator.
Args:
max_attempts: Maximum retry attempts
delay: Initial delay between retries
backoff: Backoff multiplier
"""
def decorator(func: F) -> F:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(current_delay)
current_delay *= backoff
raise last_exception
return wrapper
return decorator
def cache_result(ttl: Optional[float] = None):
"""Simple result caching decorator.
Args:
ttl: Time to live in seconds (None for no expiration)
"""
def decorator(func: F) -> F:
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
import time
# Create cache key
key = str(args) + str(sorted(kwargs.items()))
# Check cache
if key in cache:
result, timestamp = cache[key]
if ttl is None or time.time() - timestamp < ttl:
return result
# Call function and cache result
result = func(*args, **kwargs)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
# Data structures
class LRUCache:
"""Simple LRU cache implementation."""
def __init__(self, max_size: int = 128):
"""Initialize LRU cache.
Args:
max_size: Maximum cache size
"""
self.max_size = max_size
self.cache = {}
self.access_order = []
def get(self, key: str) -> Optional[Any]:
"""Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None
"""
if key in self.cache:
# Move to end (most recently used)
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def put(self, key: str, value: Any) -> None:
"""Put value in cache.
Args:
key: Cache key
value: Value to cache
"""
if key in self.cache:
# Update existing
self.cache[key] = value
self.access_order.remove(key)
self.access_order.append(key)
else:
# Add new
if len(self.cache) >= self.max_size:
# Remove least recently used
oldest = self.access_order.pop(0)
del self.cache[oldest]
self.cache[key] = value
self.access_order.append(key)
def clear(self) -> None:
"""Clear cache."""
self.cache.clear()
self.access_order.clear()
def size(self) -> int:
"""Get cache size.
Returns:
Number of items in cache
"""
return len(self.cache)
# Environment utilities
def get_env_bool(key: str, default: bool = False) -> bool:
"""Get boolean from environment variable.
Args:
key: Environment variable name
default: Default value
Returns:
Boolean value
"""
value = os.getenv(key, '').lower()
return value in ('true', '1', 'yes', 'on')
def get_env_int(key: str, default: int = 0) -> int:
"""Get integer from environment variable.
Args:
key: Environment variable name
default: Default value
Returns:
Integer value
"""
try:
return int(os.getenv(key, str(default)))
except ValueError:
return default
def get_env_float(key: str, default: float = 0.0) -> float:
"""Get float from environment variable.
Args:
key: Environment variable name
default: Default value
Returns:
Float value
"""
try:
return float(os.getenv(key, str(default)))
except ValueError:
return default
def get_env_list(key: str, separator: str = ',', default: Optional[List[str]] = None) -> List[str]:
"""Get list from environment variable.
Args:
key: Environment variable name
separator: List separator
default: Default value
Returns:
List of strings
"""
value = os.getenv(key, '')
if not value:
return default or []
return [item.strip() for item in value.split(separator) if item.strip()]
# Memory utilities
def get_memory_usage() -> Dict[str, float]:
"""Get current memory usage.
Returns:
Memory usage information
"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size
'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size
'percent': process.memory_percent(),
}
def format_bytes(bytes_value: int) -> str:
"""Format bytes to human readable string.
Args:
bytes_value: Number of bytes
Returns:
Formatted string
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.1f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.1f} PB"