chess_client.pyā¢9.24 kB
"""
Chess.com API client with caching, rate limiting, and retry logic.
"""
import asyncio
import json
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
import httpx
from mxcp.runtime import db, config, on_init, on_shutdown
import logging
logger = logging.getLogger(__name__)
# Global HTTP client for connection pooling
http_client: Optional[httpx.AsyncClient] = None
rate_limiter: Optional['RateLimiter'] = None
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, requests_per_minute: int = 60, burst_size: int = 10):
self.requests_per_minute = requests_per_minute
self.burst_size = burst_size
self.tokens = burst_size
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def acquire(self):
"""Acquire a token for making an API request."""
async with self.lock:
# Refill tokens based on time passed
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * (self.requests_per_minute / 60)
self.tokens = min(self.burst_size, self.tokens + tokens_to_add)
self.last_refill = now
if self.tokens < 1:
# Need to wait for a token
wait_time = (1 - self.tokens) / (self.requests_per_minute / 60)
await asyncio.sleep(wait_time)
self.tokens = 1
self.tokens -= 1
@on_init
def initialize_client():
"""Initialize HTTP client and rate limiter on startup."""
global http_client, rate_limiter
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from chess_config import API_CONFIG
# Initialize HTTP client with connection pooling
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
headers={"User-Agent": "MXCP-Chess/1.0"}
)
# Initialize rate limiter
rate_limit = API_CONFIG.get("rate_limit", {})
rate_limiter = RateLimiter(
requests_per_minute=rate_limit.get("requests_per_minute", 60),
burst_size=rate_limit.get("burst_size", 10)
)
logger.info("Chess client initialized with connection pooling and rate limiting")
def ensure_tables_exist():
"""Ensure cache tables exist, creating them if necessary."""
try:
# Check if tables exist by trying to query them
db.execute("SELECT 1 FROM chess_api_cache LIMIT 1")
db.execute("SELECT 1 FROM chess_games_cache LIMIT 1")
except Exception:
# Tables don't exist, create them
logger.info("Creating chess cache tables")
db.execute("""
CREATE TABLE IF NOT EXISTS chess_api_cache (
cache_key VARCHAR PRIMARY KEY,
endpoint VARCHAR NOT NULL,
response_data JSON NOT NULL,
cached_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL
)
""")
db.execute("""
CREATE TABLE IF NOT EXISTS chess_games_cache (
game_id VARCHAR PRIMARY KEY,
username VARCHAR NOT NULL,
game_data JSON NOT NULL,
pgn_data TEXT,
cached_at TIMESTAMP NOT NULL,
year INTEGER,
month INTEGER
)
""")
# Create indexes
try:
db.execute("CREATE INDEX IF NOT EXISTS idx_expires ON chess_api_cache(expires_at)")
db.execute("CREATE INDEX IF NOT EXISTS idx_player_date ON chess_games_cache(username, year, month)")
db.execute("CREATE INDEX IF NOT EXISTS idx_cached ON chess_games_cache(cached_at)")
except Exception as e:
logger.debug(f"Index creation warning: {e}")
@on_shutdown
async def cleanup_client():
"""Clean up resources on shutdown."""
global http_client
if http_client:
await http_client.aclose()
logger.info("HTTP client closed")
async def make_api_request(
endpoint: str,
cache_key: Optional[str] = None,
cache_ttl: Optional[int] = None,
accept_json: bool = True
) -> Union[Dict[str, Any], str]:
"""
Make a request to the Chess.com API with caching and retry logic.
Args:
endpoint: API endpoint path (without base URL)
cache_key: Optional cache key (defaults to endpoint)
cache_ttl: Cache TTL in seconds (defaults from config)
accept_json: Whether to request JSON response (False for PGN)
Returns:
API response data (dict for JSON, str for PGN)
"""
# Ensure client is initialized
global http_client, rate_limiter
if http_client is None or rate_limiter is None:
initialize_client()
if not cache_key:
cache_key = endpoint
# Check cache first (only for JSON responses)
if accept_json and cache_ttl is not None:
cached = get_from_cache(cache_key)
if cached:
logger.debug(f"Cache hit for {cache_key}")
return cached
# Make API request with rate limiting and retry
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from chess_config import API_CONFIG
base_url = API_CONFIG.get("base_url", "https://api.chess.com/pub")
retry_attempts = API_CONFIG.get("retry_attempts", 3)
retry_delay = API_CONFIG.get("retry_delay", 1.0)
url = f"{base_url}/{endpoint}"
headers = {
"Accept": "application/json" if accept_json else "application/x-chess-pgn"
}
# Acquire rate limit token
await rate_limiter.acquire()
# Retry logic
last_error = None
for attempt in range(retry_attempts):
try:
response = await http_client.get(url, headers=headers)
response.raise_for_status()
# Log API call for audit
# Note: get_username() might not be available in standalone mode
try:
ensure_tables_exist()
db.execute("""
INSERT INTO audit_log (timestamp, event_type, endpoint, status_code, username)
VALUES (CURRENT_TIMESTAMP, 'api_call', $endpoint, $status, NULL)
""", {"endpoint": endpoint, "status": response.status_code})
except Exception as e:
logger.debug(f"Could not log to audit: {e}")
if accept_json:
data = response.json()
# Cache the response
if cache_ttl is not None:
cache_response(cache_key, endpoint, data, cache_ttl)
return data
else:
return response.text
except httpx.HTTPStatusError as e:
last_error = e
if e.response.status_code == 429: # Rate limited
logger.warning(f"Rate limited by Chess.com API, waiting longer")
await asyncio.sleep(retry_delay * (attempt + 2))
elif e.response.status_code >= 500: # Server error, retry
logger.warning(f"Server error {e.response.status_code}, retrying...")
await asyncio.sleep(retry_delay * (attempt + 1))
else:
# Client error, don't retry
raise
except Exception as e:
last_error = e
logger.error(f"Request error on attempt {attempt + 1}: {str(e)}")
if attempt < retry_attempts - 1:
await asyncio.sleep(retry_delay * (attempt + 1))
# All retries failed
raise Exception(f"API request failed after {retry_attempts} attempts: {str(last_error)}")
def get_from_cache(cache_key: str) -> Optional[Dict[str, Any]]:
"""Get data from cache if not expired."""
ensure_tables_exist()
result = db.execute("""
SELECT response_data
FROM chess_api_cache
WHERE cache_key = $key
AND expires_at > CURRENT_TIMESTAMP
""", {"key": cache_key})
if result:
return result[0]["response_data"]
return None
def cache_response(cache_key: str, endpoint: str, data: Dict[str, Any], ttl: int):
"""Cache API response with TTL."""
from datetime import datetime, timedelta
ensure_tables_exist()
now = datetime.now()
expires = now + timedelta(seconds=ttl)
db.execute("""
INSERT OR REPLACE INTO chess_api_cache
(cache_key, endpoint, response_data, cached_at, expires_at)
VALUES ($key, $endpoint, $data, $now, $expires)
""", {
"key": cache_key,
"endpoint": endpoint,
"data": data,
"now": now,
"expires": expires
})
def get_cache_ttl(cache_type: str) -> int:
"""Get cache TTL from config for a specific cache type."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from chess_config import CACHE_TTL
return CACHE_TTL.get(cache_type, 3600)