Skip to main content
Glama

Toggl MCP Server

by ikido
cache_service.py7.2 kB
"""Cache service for storing and retrieving aggregated time tracking data.""" import os import json import sqlite3 import hashlib import uuid from datetime import datetime, timedelta from typing import Optional, Dict, Any from pathlib import Path import logging logger = logging.getLogger(__name__) class CacheService: """Manages caching with MD5 hash-based keys, SQLite index, and JSON files.""" def __init__(self, cache_dir: str = "./cache", ttl_hours: int = 1): """ Initialize cache service. Args: cache_dir: Directory to store cache files ttl_hours: Time-to-live for cache entries in hours """ self.cache_dir = cache_dir self.ttl_hours = ttl_hours self.data_dir = os.path.join(cache_dir, "data") self.index_db = os.path.join(cache_dir, "index.db") # Create directories Path(self.cache_dir).mkdir(parents=True, exist_ok=True) Path(self.data_dir).mkdir(parents=True, exist_ok=True) # Initialize database self._init_db() def _init_db(self) -> None: """Initialize SQLite database schema.""" conn = sqlite3.connect(self.index_db) cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS cache ( id INTEGER PRIMARY KEY, hash TEXT NOT NULL UNIQUE, filename TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ ) conn.commit() conn.close() def _get_cache_key(self, start_date: str, end_date: str, user_id: Optional[str] = None) -> str: """ Generate MD5 hash-based cache key. Args: start_date: Start date in ISO 8601 format end_date: End date in ISO 8601 format user_id: Optional user ID for filtering Returns: MD5 hash string """ filter_str = "" if user_id: filter_str = f":{user_id}" key = f"{start_date}:{end_date}{filter_str}" return hashlib.md5(key.encode()).hexdigest() def _is_expired(self, created_at_str: str) -> bool: """Check if cache entry is expired.""" try: # Try parsing ISO format first created_at = datetime.fromisoformat(created_at_str) except ValueError: # Try parsing SQLite datetime format (YYYY-MM-DD HH:MM:SS) try: created_at = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S") except ValueError: logger.warning(f"Could not parse timestamp: {created_at_str}") return False expiry = created_at + timedelta(hours=self.ttl_hours) return datetime.now() > expiry def get( self, start_date: str, end_date: str, user_id: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Get cached data if available and not expired. Args: start_date: Start date in ISO 8601 format end_date: End date in ISO 8601 format user_id: Optional user ID for filtering Returns: Cached data dict or None if not found or expired """ cache_key = self._get_cache_key(start_date, end_date, user_id) conn = sqlite3.connect(self.index_db) cursor = conn.cursor() cursor.execute( "SELECT filename, created_at FROM cache WHERE hash = ?", (cache_key,) ) result = cursor.fetchone() conn.close() if not result: return None filename, created_at = result # Check expiry if self._is_expired(created_at): self.delete(cache_key) return None # Read JSON file file_path = os.path.join(self.data_dir, filename) try: with open(file_path, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): logger.warning(f"Failed to read cache file: {file_path}") return None def set( self, start_date: str, end_date: str, data: Dict[str, Any], user_id: Optional[str] = None, ) -> None: """ Store data in cache. Args: start_date: Start date in ISO 8601 format end_date: End date in ISO 8601 format data: Data to cache user_id: Optional user ID for filtering """ cache_key = self._get_cache_key(start_date, end_date, user_id) filename = f"{uuid.uuid4()}.json" file_path = os.path.join(self.data_dir, filename) # Write JSON file try: with open(file_path, "w") as f: json.dump(data, f) except Exception as e: logger.error(f"Failed to write cache file: {e}") return # Update database conn = sqlite3.connect(self.index_db) cursor = conn.cursor() try: cursor.execute( "INSERT INTO cache (hash, filename) VALUES (?, ?)", (cache_key, filename), ) conn.commit() except sqlite3.IntegrityError: # Hash already exists, update the filename cursor.execute( "UPDATE cache SET filename = ? WHERE hash = ?", (filename, cache_key), ) conn.commit() finally: conn.close() def delete(self, cache_key: str) -> None: """ Delete cache entry by key. Args: cache_key: MD5 hash key """ conn = sqlite3.connect(self.index_db) cursor = conn.cursor() cursor.execute("SELECT filename FROM cache WHERE hash = ?", (cache_key,)) result = cursor.fetchone() if result: filename = result[0] file_path = os.path.join(self.data_dir, filename) # Delete file try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: logger.error(f"Failed to delete cache file: {e}") # Delete from database cursor.execute("DELETE FROM cache WHERE hash = ?", (cache_key,)) conn.commit() conn.close() def clear_expired(self) -> None: """Remove all expired cache entries.""" conn = sqlite3.connect(self.index_db) cursor = conn.cursor() cursor.execute("SELECT id, filename, created_at FROM cache") results = cursor.fetchall() for cache_id, filename, created_at in results: if self._is_expired(created_at): file_path = os.path.join(self.data_dir, filename) try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: logger.error(f"Failed to delete cache file: {e}") cursor.execute("DELETE FROM cache WHERE id = ?", (cache_id,)) conn.commit() conn.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ikido/toggl-mcp-custom'

If you have feedback or need assistance with the MCP directory API, please join our Discord server