Skip to main content
Glama

chesscom-mxcp

chess_client.py•9.24 kB
""" Chess.com API client with caching, rate limiting, and retry logic. """ import asyncio import json import time from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union import httpx from mxcp.runtime import db, config, on_init, on_shutdown import logging logger = logging.getLogger(__name__) # Global HTTP client for connection pooling http_client: Optional[httpx.AsyncClient] = None rate_limiter: Optional['RateLimiter'] = None class RateLimiter: """Token bucket rate limiter for API calls.""" def __init__(self, requests_per_minute: int = 60, burst_size: int = 10): self.requests_per_minute = requests_per_minute self.burst_size = burst_size self.tokens = burst_size self.last_refill = time.time() self.lock = asyncio.Lock() async def acquire(self): """Acquire a token for making an API request.""" async with self.lock: # Refill tokens based on time passed now = time.time() elapsed = now - self.last_refill tokens_to_add = elapsed * (self.requests_per_minute / 60) self.tokens = min(self.burst_size, self.tokens + tokens_to_add) self.last_refill = now if self.tokens < 1: # Need to wait for a token wait_time = (1 - self.tokens) / (self.requests_per_minute / 60) await asyncio.sleep(wait_time) self.tokens = 1 self.tokens -= 1 @on_init def initialize_client(): """Initialize HTTP client and rate limiter on startup.""" global http_client, rate_limiter import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from chess_config import API_CONFIG # Initialize HTTP client with connection pooling http_client = httpx.AsyncClient( timeout=httpx.Timeout(30.0), limits=httpx.Limits(max_keepalive_connections=10, max_connections=20), headers={"User-Agent": "MXCP-Chess/1.0"} ) # Initialize rate limiter rate_limit = API_CONFIG.get("rate_limit", {}) rate_limiter = RateLimiter( requests_per_minute=rate_limit.get("requests_per_minute", 60), burst_size=rate_limit.get("burst_size", 10) ) logger.info("Chess client initialized with connection pooling and rate limiting") def ensure_tables_exist(): """Ensure cache tables exist, creating them if necessary.""" try: # Check if tables exist by trying to query them db.execute("SELECT 1 FROM chess_api_cache LIMIT 1") db.execute("SELECT 1 FROM chess_games_cache LIMIT 1") except Exception: # Tables don't exist, create them logger.info("Creating chess cache tables") db.execute(""" CREATE TABLE IF NOT EXISTS chess_api_cache ( cache_key VARCHAR PRIMARY KEY, endpoint VARCHAR NOT NULL, response_data JSON NOT NULL, cached_at TIMESTAMP NOT NULL, expires_at TIMESTAMP NOT NULL ) """) db.execute(""" CREATE TABLE IF NOT EXISTS chess_games_cache ( game_id VARCHAR PRIMARY KEY, username VARCHAR NOT NULL, game_data JSON NOT NULL, pgn_data TEXT, cached_at TIMESTAMP NOT NULL, year INTEGER, month INTEGER ) """) # Create indexes try: db.execute("CREATE INDEX IF NOT EXISTS idx_expires ON chess_api_cache(expires_at)") db.execute("CREATE INDEX IF NOT EXISTS idx_player_date ON chess_games_cache(username, year, month)") db.execute("CREATE INDEX IF NOT EXISTS idx_cached ON chess_games_cache(cached_at)") except Exception as e: logger.debug(f"Index creation warning: {e}") @on_shutdown async def cleanup_client(): """Clean up resources on shutdown.""" global http_client if http_client: await http_client.aclose() logger.info("HTTP client closed") async def make_api_request( endpoint: str, cache_key: Optional[str] = None, cache_ttl: Optional[int] = None, accept_json: bool = True ) -> Union[Dict[str, Any], str]: """ Make a request to the Chess.com API with caching and retry logic. Args: endpoint: API endpoint path (without base URL) cache_key: Optional cache key (defaults to endpoint) cache_ttl: Cache TTL in seconds (defaults from config) accept_json: Whether to request JSON response (False for PGN) Returns: API response data (dict for JSON, str for PGN) """ # Ensure client is initialized global http_client, rate_limiter if http_client is None or rate_limiter is None: initialize_client() if not cache_key: cache_key = endpoint # Check cache first (only for JSON responses) if accept_json and cache_ttl is not None: cached = get_from_cache(cache_key) if cached: logger.debug(f"Cache hit for {cache_key}") return cached # Make API request with rate limiting and retry import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from chess_config import API_CONFIG base_url = API_CONFIG.get("base_url", "https://api.chess.com/pub") retry_attempts = API_CONFIG.get("retry_attempts", 3) retry_delay = API_CONFIG.get("retry_delay", 1.0) url = f"{base_url}/{endpoint}" headers = { "Accept": "application/json" if accept_json else "application/x-chess-pgn" } # Acquire rate limit token await rate_limiter.acquire() # Retry logic last_error = None for attempt in range(retry_attempts): try: response = await http_client.get(url, headers=headers) response.raise_for_status() # Log API call for audit # Note: get_username() might not be available in standalone mode try: ensure_tables_exist() db.execute(""" INSERT INTO audit_log (timestamp, event_type, endpoint, status_code, username) VALUES (CURRENT_TIMESTAMP, 'api_call', $endpoint, $status, NULL) """, {"endpoint": endpoint, "status": response.status_code}) except Exception as e: logger.debug(f"Could not log to audit: {e}") if accept_json: data = response.json() # Cache the response if cache_ttl is not None: cache_response(cache_key, endpoint, data, cache_ttl) return data else: return response.text except httpx.HTTPStatusError as e: last_error = e if e.response.status_code == 429: # Rate limited logger.warning(f"Rate limited by Chess.com API, waiting longer") await asyncio.sleep(retry_delay * (attempt + 2)) elif e.response.status_code >= 500: # Server error, retry logger.warning(f"Server error {e.response.status_code}, retrying...") await asyncio.sleep(retry_delay * (attempt + 1)) else: # Client error, don't retry raise except Exception as e: last_error = e logger.error(f"Request error on attempt {attempt + 1}: {str(e)}") if attempt < retry_attempts - 1: await asyncio.sleep(retry_delay * (attempt + 1)) # All retries failed raise Exception(f"API request failed after {retry_attempts} attempts: {str(last_error)}") def get_from_cache(cache_key: str) -> Optional[Dict[str, Any]]: """Get data from cache if not expired.""" ensure_tables_exist() result = db.execute(""" SELECT response_data FROM chess_api_cache WHERE cache_key = $key AND expires_at > CURRENT_TIMESTAMP """, {"key": cache_key}) if result: return result[0]["response_data"] return None def cache_response(cache_key: str, endpoint: str, data: Dict[str, Any], ttl: int): """Cache API response with TTL.""" from datetime import datetime, timedelta ensure_tables_exist() now = datetime.now() expires = now + timedelta(seconds=ttl) db.execute(""" INSERT OR REPLACE INTO chess_api_cache (cache_key, endpoint, response_data, cached_at, expires_at) VALUES ($key, $endpoint, $data, $now, $expires) """, { "key": cache_key, "endpoint": endpoint, "data": data, "now": now, "expires": expires }) def get_cache_ttl(cache_type: str) -> int: """Get cache TTL from config for a specific cache type.""" import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from chess_config import CACHE_TTL return CACHE_TTL.get(cache_type, 3600)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/datYori/chesscom-mxcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server