Skip to main content
Glama
tinydb_database.py10.7 kB
from tinydb import TinyDB, Query from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime import json import threading from cryptography.fernet import Fernet import base64 import hashlib from .database_interface import DatabaseInterface from .models import ( UserStatus, Project, Task, Todo, CalendarEvent, Document, ProjectStatus, TaskStatus, Priority ) class EncryptedJSONStorage: """Custom TinyDB storage with encryption support""" def __init__(self, path: str, encryption_key: Optional[str] = None): self.path = path self._lock = threading.RLock() if encryption_key: # Use SHA256 to ensure key is proper length for Fernet key = base64.urlsafe_b64encode(hashlib.sha256(encryption_key.encode()).digest()) self.cipher = Fernet(key) else: self.cipher = None def read(self) -> Optional[Dict[str, Any]]: with self._lock: if not Path(self.path).exists(): return None with open(self.path, 'r') as file: content = file.read() if self.cipher and content: try: # Decrypt the content decrypted = self.cipher.decrypt(content.encode()) return json.loads(decrypted.decode()) except Exception: # If decryption fails, assume it's unencrypted data return json.loads(content) else: return json.loads(content) if content else None def write(self, data: Dict[str, Any]): with self._lock: content = json.dumps(data, indent=2, default=str) if self.cipher: # Encrypt the content encrypted = self.cipher.encrypt(content.encode()) content = encrypted.decode() with open(self.path, 'w') as file: file.write(content) class TinyDBDatabase(DatabaseInterface): def __init__(self, db_path: str, encryption_key: Optional[str] = None): self.db_path = Path(db_path) self.encryption_key = encryption_key self._lock = threading.RLock() self.initialize() def initialize(self): """Initialize database with custom storage""" with self._lock: self.db_path.parent.mkdir(parents=True, exist_ok=True) # Use custom encrypted storage if key is provided if self.encryption_key: storage = EncryptedJSONStorage(str(self.db_path), self.encryption_key) self.db = TinyDB(storage=storage) else: self.db = TinyDB(self.db_path) # Tables self.status_table = self.db.table('status') self.projects_table = self.db.table('projects') self.todos_table = self.db.table('todos') self.calendar_table = self.db.table('calendar') self.documents_table = self.db.table('documents') # Initialize default status if not exists if not self.status_table.all(): default_status = UserStatus(name="User") self.status_table.insert(self._serialize_datetime(default_status.model_dump())) def _serialize_datetime(self, obj: Any) -> Any: """Custom JSON serializer for datetime objects""" if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, dict): return {k: self._serialize_datetime(v) for k, v in obj.items()} elif isinstance(obj, list): return [self._serialize_datetime(item) for item in obj] return obj def _deserialize_datetime(self, obj: Any) -> Any: """Custom JSON deserializer for datetime objects""" if isinstance(obj, str): try: return datetime.fromisoformat(obj) except ValueError: return obj elif isinstance(obj, dict): return {k: self._deserialize_datetime(v) for k, v in obj.items()} elif isinstance(obj, list): return [self._deserialize_datetime(item) for item in obj] return obj # Status operations def get_status(self) -> UserStatus: with self._lock: data = self.status_table.all()[0] data = self._deserialize_datetime(data) return UserStatus(**data) def update_status(self, status: UserStatus) -> UserStatus: with self._lock: data = self._serialize_datetime(status.model_dump()) self.status_table.truncate() self.status_table.insert(data) return status # Project operations def create_project(self, project: Project) -> Project: with self._lock: data = self._serialize_datetime(project.model_dump()) self.projects_table.insert(data) return project def get_project(self, project_id: str) -> Optional[Project]: with self._lock: result = self.projects_table.get(Query().id == project_id) if result: result = self._deserialize_datetime(result) return Project(**result) return None def list_projects(self, status: Optional[ProjectStatus] = None) -> List[Project]: with self._lock: if status: results = self.projects_table.search(Query().status == status.value) else: results = self.projects_table.all() return [Project(**self._deserialize_datetime(result)) for result in results] def update_project(self, project: Project) -> Project: with self._lock: data = self._serialize_datetime(project.model_dump()) self.projects_table.update(data, Query().id == project.id) return project def delete_project(self, project_id: str) -> bool: with self._lock: return bool(self.projects_table.remove(Query().id == project_id)) # Todo operations def create_todo(self, todo: Todo) -> Todo: with self._lock: data = self._serialize_datetime(todo.model_dump()) self.todos_table.insert(data) return todo def get_todo(self, todo_id: str) -> Optional[Todo]: with self._lock: result = self.todos_table.get(Query().id == todo_id) if result: result = self._deserialize_datetime(result) return Todo(**result) return None def list_todos(self, completed: Optional[bool] = None) -> List[Todo]: with self._lock: if completed is not None: results = self.todos_table.search(Query().completed == completed) else: results = self.todos_table.all() return [Todo(**self._deserialize_datetime(result)) for result in results] def update_todo(self, todo: Todo) -> Todo: with self._lock: data = self._serialize_datetime(todo.model_dump()) self.todos_table.update(data, Query().id == todo.id) return todo def delete_todo(self, todo_id: str) -> bool: with self._lock: return bool(self.todos_table.remove(Query().id == todo_id)) # Calendar operations def create_event(self, event: CalendarEvent) -> CalendarEvent: with self._lock: data = self._serialize_datetime(event.model_dump()) self.calendar_table.insert(data) return event def get_event(self, event_id: str) -> Optional[CalendarEvent]: with self._lock: result = self.calendar_table.get(Query().id == event_id) if result: result = self._deserialize_datetime(result) return CalendarEvent(**result) return None def list_events(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[CalendarEvent]: with self._lock: results = self.calendar_table.all() events = [CalendarEvent(**self._deserialize_datetime(result)) for result in results] if start_date: events = [e for e in events if e.start_time >= start_date] if end_date: events = [e for e in events if e.end_time <= end_date] return sorted(events, key=lambda x: x.start_time) def update_event(self, event: CalendarEvent) -> CalendarEvent: with self._lock: data = self._serialize_datetime(event.model_dump()) self.calendar_table.update(data, Query().id == event.id) return event def delete_event(self, event_id: str) -> bool: with self._lock: return bool(self.calendar_table.remove(Query().id == event_id)) # Document operations def create_document(self, document: Document) -> Document: with self._lock: data = self._serialize_datetime(document.model_dump()) self.documents_table.insert(data) return document def get_document(self, document_id: str) -> Optional[Document]: with self._lock: result = self.documents_table.get(Query().id == document_id) if result: result = self._deserialize_datetime(result) return Document(**result) return None def list_documents(self, tags: Optional[List[str]] = None) -> List[Document]: with self._lock: results = self.documents_table.all() documents = [Document(**self._deserialize_datetime(result)) for result in results] if tags: documents = [d for d in documents if any(tag in d.tags for tag in tags)] return documents def update_document(self, document: Document) -> Document: with self._lock: data = self._serialize_datetime(document.model_dump()) self.documents_table.update(data, Query().id == document.id) return document def delete_document(self, document_id: str) -> bool: with self._lock: return bool(self.documents_table.remove(Query().id == document_id)) def close(self): """Close database connection""" with self._lock: self.db.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/swapnilsurdi/mcp-pa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server