cache_service.py•7.2 kB
"""Cache service for storing and retrieving aggregated time tracking data."""
import os
import json
import sqlite3
import hashlib
import uuid
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class CacheService:
"""Manages caching with MD5 hash-based keys, SQLite index, and JSON files."""
def __init__(self, cache_dir: str = "./cache", ttl_hours: int = 1):
"""
Initialize cache service.
Args:
cache_dir: Directory to store cache files
ttl_hours: Time-to-live for cache entries in hours
"""
self.cache_dir = cache_dir
self.ttl_hours = ttl_hours
self.data_dir = os.path.join(cache_dir, "data")
self.index_db = os.path.join(cache_dir, "index.db")
# Create directories
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
Path(self.data_dir).mkdir(parents=True, exist_ok=True)
# Initialize database
self._init_db()
def _init_db(self) -> None:
"""Initialize SQLite database schema."""
conn = sqlite3.connect(self.index_db)
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS cache (
id INTEGER PRIMARY KEY,
hash TEXT NOT NULL UNIQUE,
filename TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
conn.close()
def _get_cache_key(self, start_date: str, end_date: str, user_id: Optional[str] = None) -> str:
"""
Generate MD5 hash-based cache key.
Args:
start_date: Start date in ISO 8601 format
end_date: End date in ISO 8601 format
user_id: Optional user ID for filtering
Returns:
MD5 hash string
"""
filter_str = ""
if user_id:
filter_str = f":{user_id}"
key = f"{start_date}:{end_date}{filter_str}"
return hashlib.md5(key.encode()).hexdigest()
def _is_expired(self, created_at_str: str) -> bool:
"""Check if cache entry is expired."""
try:
# Try parsing ISO format first
created_at = datetime.fromisoformat(created_at_str)
except ValueError:
# Try parsing SQLite datetime format (YYYY-MM-DD HH:MM:SS)
try:
created_at = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
logger.warning(f"Could not parse timestamp: {created_at_str}")
return False
expiry = created_at + timedelta(hours=self.ttl_hours)
return datetime.now() > expiry
def get(
self, start_date: str, end_date: str, user_id: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Get cached data if available and not expired.
Args:
start_date: Start date in ISO 8601 format
end_date: End date in ISO 8601 format
user_id: Optional user ID for filtering
Returns:
Cached data dict or None if not found or expired
"""
cache_key = self._get_cache_key(start_date, end_date, user_id)
conn = sqlite3.connect(self.index_db)
cursor = conn.cursor()
cursor.execute(
"SELECT filename, created_at FROM cache WHERE hash = ?", (cache_key,)
)
result = cursor.fetchone()
conn.close()
if not result:
return None
filename, created_at = result
# Check expiry
if self._is_expired(created_at):
self.delete(cache_key)
return None
# Read JSON file
file_path = os.path.join(self.data_dir, filename)
try:
with open(file_path, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
logger.warning(f"Failed to read cache file: {file_path}")
return None
def set(
self,
start_date: str,
end_date: str,
data: Dict[str, Any],
user_id: Optional[str] = None,
) -> None:
"""
Store data in cache.
Args:
start_date: Start date in ISO 8601 format
end_date: End date in ISO 8601 format
data: Data to cache
user_id: Optional user ID for filtering
"""
cache_key = self._get_cache_key(start_date, end_date, user_id)
filename = f"{uuid.uuid4()}.json"
file_path = os.path.join(self.data_dir, filename)
# Write JSON file
try:
with open(file_path, "w") as f:
json.dump(data, f)
except Exception as e:
logger.error(f"Failed to write cache file: {e}")
return
# Update database
conn = sqlite3.connect(self.index_db)
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO cache (hash, filename) VALUES (?, ?)",
(cache_key, filename),
)
conn.commit()
except sqlite3.IntegrityError:
# Hash already exists, update the filename
cursor.execute(
"UPDATE cache SET filename = ? WHERE hash = ?",
(filename, cache_key),
)
conn.commit()
finally:
conn.close()
def delete(self, cache_key: str) -> None:
"""
Delete cache entry by key.
Args:
cache_key: MD5 hash key
"""
conn = sqlite3.connect(self.index_db)
cursor = conn.cursor()
cursor.execute("SELECT filename FROM cache WHERE hash = ?", (cache_key,))
result = cursor.fetchone()
if result:
filename = result[0]
file_path = os.path.join(self.data_dir, filename)
# Delete file
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"Failed to delete cache file: {e}")
# Delete from database
cursor.execute("DELETE FROM cache WHERE hash = ?", (cache_key,))
conn.commit()
conn.close()
def clear_expired(self) -> None:
"""Remove all expired cache entries."""
conn = sqlite3.connect(self.index_db)
cursor = conn.cursor()
cursor.execute("SELECT id, filename, created_at FROM cache")
results = cursor.fetchall()
for cache_id, filename, created_at in results:
if self._is_expired(created_at):
file_path = os.path.join(self.data_dir, filename)
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"Failed to delete cache file: {e}")
cursor.execute("DELETE FROM cache WHERE id = ?", (cache_id,))
conn.commit()
conn.close()