Skip to main content
Glama

Codebuddy MCP Server

by jacklatrobe
storage.py7.18 kB
""" JSONL storage system for task persistence. Follows Clean Code principles: - Single Responsibility: TaskStorage handles only file I/O operations - Open/Closed: Easy to extend with new storage backends - Dependency Inversion: Depends on abstractions, not concrete file operations """ import json import logging import os from pathlib import Path from threading import Lock from typing import Dict, List, Optional from uuid import UUID # Cross-platform file locking try: import fcntl HAS_FCNTL = True except ImportError: # Windows doesn't have fcntl, use threading lock as fallback HAS_FCNTL = False from models import Task, TaskStatus logger = logging.getLogger(__name__) class TaskStorageError(Exception): """Custom exception for storage-related errors.""" pass class TaskStorage: """ Thread-safe, append-only JSONL storage for tasks. Follows SRP: Handles only task persistence operations. Uses file locking to prevent concurrent write corruption. Maintains in-memory index for fast lookups. """ def __init__(self, file_path: str = "data/tasks.jsonl"): """ Initialize storage with thread-safe operations. Args: file_path: Path to the JSONL storage file """ self.file_path = Path(file_path) self._write_lock = Lock() # Thread safety for writes self._task_index: Dict[str, Task] = {} # In-memory index # Ensure directory exists self.file_path.parent.mkdir(parents=True, exist_ok=True) # Build initial index from existing file self._rebuild_index() def _rebuild_index(self) -> None: """ Rebuild in-memory index from JSONL file. Private method following encapsulation principle. Handles malformed lines gracefully. """ self._task_index.clear() if not self.file_path.exists(): return try: with open(self.file_path, 'r', encoding='utf-8') as file: for line_num, line in enumerate(file, 1): line = line.strip() if not line: continue try: task_data = json.loads(line) task = Task.model_validate(task_data) self._task_index[str(task.id)] = task except (json.JSONDecodeError, ValueError) as e: logger.warning(f"Skipping malformed line {line_num}: {e}") except OSError as e: raise TaskStorageError(f"Failed to read storage file: {e}") def _append_task_to_file(self, task: Task) -> None: """ Append task to JSONL file with cross-platform file locking. Uses fcntl on Unix systems, falls back to threading lock on Windows. Private method to encapsulate file I/O details. """ task_json = task.model_dump_json() + '\n' try: with open(self.file_path, 'a', encoding='utf-8') as file: if HAS_FCNTL: # Unix-style file locking fcntl.flock(file.fileno(), fcntl.LOCK_EX) try: file.write(task_json) file.flush() # Ensure data is written os.fsync(file.fileno()) # Force OS to write to disk finally: fcntl.flock(file.fileno(), fcntl.LOCK_UN) else: # Windows fallback - relies on threading lock in save_task file.write(task_json) file.flush() except OSError as e: raise TaskStorageError(f"Failed to append task to file: {e}") def save_task(self, task: Task) -> Task: """ Save task to storage (create or update). Thread-safe operation that updates both file and index. Returns the saved task for confirmation. """ with self._write_lock: try: self._append_task_to_file(task) self._task_index[str(task.id)] = task logger.debug(f"Saved task {task.id} with status {task.status}") return task except Exception as e: logger.error(f"Failed to save task {task.id}: {e}") raise TaskStorageError(f"Failed to save task: {e}") def get_task_by_id(self, task_id: str) -> Optional[Task]: """ Retrieve task by ID from in-memory index. Fast O(1) lookup using the index. Returns None if task doesn't exist. """ return self._task_index.get(task_id) def list_tasks(self, limit: int = 10, status_filter: Optional[TaskStatus] = None) -> List[Task]: """ Get recent tasks, optionally filtered by status. Returns tasks sorted by updated_at in descending order (newest first). Applies limit after filtering for consistent behavior. """ tasks = list(self._task_index.values()) # Apply status filter if provided if status_filter: tasks = [task for task in tasks if task.status == status_filter] # Sort by updated_at descending (newest first) tasks.sort(key=lambda t: t.updated_at, reverse=True) # Apply limit return tasks[:limit] def search_tasks(self, query: str, limit: int = 50) -> List[Task]: """ Search tasks by query string. Uses the Task.matches_query method for consistency. Returns results sorted by relevance (updated_at desc). """ if not query.strip(): return [] matching_tasks = [ task for task in self._task_index.values() if task.matches_query(query) ] # Sort by updated_at descending for relevance matching_tasks.sort(key=lambda t: t.updated_at, reverse=True) return matching_tasks[:limit] def get_tasks_by_status(self, status: TaskStatus) -> List[Task]: """ Get all tasks with specific status. Useful for analysis and reporting. Returns tasks sorted by updated_at descending. """ tasks = [task for task in self._task_index.values() if task.status == status] tasks.sort(key=lambda t: t.updated_at, reverse=True) return tasks def get_total_count(self) -> int: """Get total number of tasks in storage.""" return len(self._task_index) def get_storage_stats(self) -> Dict[str, int]: """ Get storage statistics for monitoring. Returns count breakdown by status. """ stats = {status.value: 0 for status in TaskStatus} for task in self._task_index.values(): stats[task.status.value] += 1 stats['total'] = len(self._task_index) return stats

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jacklatrobe/codebuddy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server