Skip to main content
Glama
api.py9.6 kB
import logging from typing import Any from fastapi import APIRouter, HTTPException from pydantic import BaseModel from ..config import get_config from ..storage.jsonl_storage import JSONLStorage from ..storage.models import GraphData, GraphFilter, Memory, MemoryStatus from ..storage.sqlite_storage import SQLiteStorage from ..vault.markdown_writer import MarkdownWriter from .services.graph_service import get_graph_data logger = logging.getLogger(__name__) router = APIRouter() # Dependency to get storage based on config def get_storage(): config = get_config() if config.storage_backend == "sqlite": storage: Any = SQLiteStorage() else: storage = JSONLStorage() storage.connect() try: yield storage finally: storage.close() class MemoryResponse(BaseModel): id: str content: str created_at: int last_used: int use_count: int status: str tags: list[str] strength: float entities: list[str] source: str | None context: str | None promoted_at: int | None promoted_to: str | None review_count: int last_review_at: int | None review_priority: float @classmethod def from_memory(cls, memory: Memory): return cls( id=memory.id, content=memory.content, created_at=memory.created_at, last_used=memory.last_used, use_count=memory.use_count, status=memory.status.value, tags=memory.meta.tags if memory.meta and memory.meta.tags else [], strength=memory.strength, entities=memory.entities, source=memory.meta.source if memory.meta else None, context=memory.meta.context if memory.meta else None, promoted_at=memory.promoted_at, promoted_to=memory.promoted_to, review_count=memory.review_count, last_review_at=memory.last_review_at, review_priority=memory.review_priority, ) class MemoryListResponse(BaseModel): items: list[MemoryResponse] total: int class RelationshipItem(BaseModel): id: str target_memory_id: str relation_type: str strength: float direction: str # "outgoing" or "incoming" class RelationshipsResponse(BaseModel): relationships: list[RelationshipItem] @router.get("/memories/{memory_id}/relationships", response_model=RelationshipsResponse) def get_memory_relationships(memory_id: str): """Get all relationships for a specific memory.""" config = get_config() if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: # Verify memory exists memory = storage.get_memory(memory_id) if not memory: raise HTTPException(status_code=404, detail="Memory not found") # Get all relations where this memory is source or target # Note: Storage backend currently doesn't have a direct method for this # We need to implement get_relations_for_memory in storage or filter here # For MVP, we'll assume storage has get_relations(memory_id) or similar # But checking storage interface, it seems we might need to implement it. # Let's check storage interface first. # Actually, let's implement the logic here using list_relations if available # or add the method to storage. # Checking JSONLStorage/SQLiteStorage... they have get_relations(from_id, to_id) # but maybe not "get all for one ID". # Let's use a temporary implementation that filters all relations (inefficient but works for MVP) # TODO: Optimize this by adding get_relations_by_memory_id to storage backends all_relations = storage.get_all_relations() relationships = [] for rel in all_relations: if rel.from_memory_id == memory_id: relationships.append( RelationshipItem( id=rel.id, target_memory_id=rel.to_memory_id, relation_type=rel.relation_type, strength=rel.strength, direction="outgoing", ) ) elif rel.to_memory_id == memory_id: relationships.append( RelationshipItem( id=rel.id, target_memory_id=rel.from_memory_id, relation_type=rel.relation_type, strength=rel.strength, direction="incoming", ) ) return RelationshipsResponse(relationships=relationships) class SaveToVaultRequest(BaseModel): filename: str | None = None folder: str | None = None @router.get("/memories", response_model=MemoryListResponse) def list_memories( limit: int = 50, offset: int = 0, status: str | None = None, search: str | None = None ): """List memories with pagination and filtering.""" config = get_config() if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: # Convert status string to enum if provided memory_status = None if status: try: memory_status = MemoryStatus(status) except ValueError: pass # Ignore invalid status for now or raise 400 # If search query is present, use search_memories if search: # Search currently doesn't support pagination in the storage layer # So we fetch all and paginate in memory all_matches = storage.search_memories(query=search, limit=None) # Filter by status if needed if memory_status: all_matches = [m for m in all_matches if m.status == memory_status] total = len(all_matches) paginated_items = all_matches[offset : offset + limit] return MemoryListResponse( items=[MemoryResponse.from_memory(m) for m in paginated_items], total=total ) # Standard listing memories = storage.list_memories(status=memory_status, limit=limit, offset=offset) total = storage.count_memories(status=memory_status) return MemoryListResponse( items=[MemoryResponse.from_memory(m) for m in memories], total=total ) @router.get("/memories/{memory_id}", response_model=MemoryResponse) def get_memory(memory_id: str): """Get a single memory by ID.""" config = get_config() if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: memory = storage.get_memory(memory_id) if not memory: raise HTTPException(status_code=404, detail="Memory not found") return MemoryResponse.from_memory(memory) @router.post("/memories/{memory_id}/save-to-vault") def save_to_vault(memory_id: str, request: SaveToVaultRequest): """Save a memory to the Obsidian vault.""" config = get_config() if not config.ltm_vault_path: raise HTTPException(status_code=400, detail="LTM Vault path not configured") if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: memory = storage.get_memory(memory_id) if not memory: raise HTTPException(status_code=404, detail="Memory not found") try: writer = MarkdownWriter(vault_path=config.ltm_vault_path) # Determine filename title = request.filename or f"memory-{memory.id[:8]}" folder = request.folder or "Memories" # Prepare content and metadata content = memory.content tags = memory.meta.tags metadata = { "id": memory.id, "created_at": memory.created_at, "source": "cortexgraph", "type": "memory", } file_path = writer.write_note( title=title, content=content, folder=folder, tags=tags, metadata=metadata, created_at=memory.created_at, modified_at=memory.last_used, ) return {"success": True, "path": str(file_path)} except Exception as e: logger.error(f"Failed to save to vault: {e}") raise HTTPException(status_code=500, detail=str(e)) from e @router.get("/graph", response_model=GraphData) def get_graph(limit: int = 1000): """Get graph data for visualization.""" config = get_config() if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: filter = GraphFilter(limit=limit) return get_graph_data(storage, filter) @router.post("/graph/filtered", response_model=GraphData) def get_filtered_graph(filter: GraphFilter): """Get filtered graph data.""" config = get_config() if config.storage_backend == "sqlite": storage_cls: Any = SQLiteStorage else: storage_cls = JSONLStorage with storage_cls() as storage: return get_graph_data(storage, filter)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server