storage.py•7.18 kB
"""
JSONL storage system for task persistence.
Follows Clean Code principles:
- Single Responsibility: TaskStorage handles only file I/O operations
- Open/Closed: Easy to extend with new storage backends
- Dependency Inversion: Depends on abstractions, not concrete file operations
"""
import json
import logging
import os
from pathlib import Path
from threading import Lock
from typing import Dict, List, Optional
from uuid import UUID
# Cross-platform file locking
try:
import fcntl
HAS_FCNTL = True
except ImportError:
# Windows doesn't have fcntl, use threading lock as fallback
HAS_FCNTL = False
from models import Task, TaskStatus
logger = logging.getLogger(__name__)
class TaskStorageError(Exception):
"""Custom exception for storage-related errors."""
pass
class TaskStorage:
"""
Thread-safe, append-only JSONL storage for tasks.
Follows SRP: Handles only task persistence operations.
Uses file locking to prevent concurrent write corruption.
Maintains in-memory index for fast lookups.
"""
def __init__(self, file_path: str = "data/tasks.jsonl"):
"""
Initialize storage with thread-safe operations.
Args:
file_path: Path to the JSONL storage file
"""
self.file_path = Path(file_path)
self._write_lock = Lock() # Thread safety for writes
self._task_index: Dict[str, Task] = {} # In-memory index
# Ensure directory exists
self.file_path.parent.mkdir(parents=True, exist_ok=True)
# Build initial index from existing file
self._rebuild_index()
def _rebuild_index(self) -> None:
"""
Rebuild in-memory index from JSONL file.
Private method following encapsulation principle.
Handles malformed lines gracefully.
"""
self._task_index.clear()
if not self.file_path.exists():
return
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if not line:
continue
try:
task_data = json.loads(line)
task = Task.model_validate(task_data)
self._task_index[str(task.id)] = task
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Skipping malformed line {line_num}: {e}")
except OSError as e:
raise TaskStorageError(f"Failed to read storage file: {e}")
def _append_task_to_file(self, task: Task) -> None:
"""
Append task to JSONL file with cross-platform file locking.
Uses fcntl on Unix systems, falls back to threading lock on Windows.
Private method to encapsulate file I/O details.
"""
task_json = task.model_dump_json() + '\n'
try:
with open(self.file_path, 'a', encoding='utf-8') as file:
if HAS_FCNTL:
# Unix-style file locking
fcntl.flock(file.fileno(), fcntl.LOCK_EX)
try:
file.write(task_json)
file.flush() # Ensure data is written
os.fsync(file.fileno()) # Force OS to write to disk
finally:
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
# Windows fallback - relies on threading lock in save_task
file.write(task_json)
file.flush()
except OSError as e:
raise TaskStorageError(f"Failed to append task to file: {e}")
def save_task(self, task: Task) -> Task:
"""
Save task to storage (create or update).
Thread-safe operation that updates both file and index.
Returns the saved task for confirmation.
"""
with self._write_lock:
try:
self._append_task_to_file(task)
self._task_index[str(task.id)] = task
logger.debug(f"Saved task {task.id} with status {task.status}")
return task
except Exception as e:
logger.error(f"Failed to save task {task.id}: {e}")
raise TaskStorageError(f"Failed to save task: {e}")
def get_task_by_id(self, task_id: str) -> Optional[Task]:
"""
Retrieve task by ID from in-memory index.
Fast O(1) lookup using the index.
Returns None if task doesn't exist.
"""
return self._task_index.get(task_id)
def list_tasks(self, limit: int = 10, status_filter: Optional[TaskStatus] = None) -> List[Task]:
"""
Get recent tasks, optionally filtered by status.
Returns tasks sorted by updated_at in descending order (newest first).
Applies limit after filtering for consistent behavior.
"""
tasks = list(self._task_index.values())
# Apply status filter if provided
if status_filter:
tasks = [task for task in tasks if task.status == status_filter]
# Sort by updated_at descending (newest first)
tasks.sort(key=lambda t: t.updated_at, reverse=True)
# Apply limit
return tasks[:limit]
def search_tasks(self, query: str, limit: int = 50) -> List[Task]:
"""
Search tasks by query string.
Uses the Task.matches_query method for consistency.
Returns results sorted by relevance (updated_at desc).
"""
if not query.strip():
return []
matching_tasks = [
task for task in self._task_index.values()
if task.matches_query(query)
]
# Sort by updated_at descending for relevance
matching_tasks.sort(key=lambda t: t.updated_at, reverse=True)
return matching_tasks[:limit]
def get_tasks_by_status(self, status: TaskStatus) -> List[Task]:
"""
Get all tasks with specific status.
Useful for analysis and reporting.
Returns tasks sorted by updated_at descending.
"""
tasks = [task for task in self._task_index.values() if task.status == status]
tasks.sort(key=lambda t: t.updated_at, reverse=True)
return tasks
def get_total_count(self) -> int:
"""Get total number of tasks in storage."""
return len(self._task_index)
def get_storage_stats(self) -> Dict[str, int]:
"""
Get storage statistics for monitoring.
Returns count breakdown by status.
"""
stats = {status.value: 0 for status in TaskStatus}
for task in self._task_index.values():
stats[task.status.value] += 1
stats['total'] = len(self._task_index)
return stats