Skip to main content
Glama
semantic_memory.py30.7 kB
""" Semantic Memory - Hierarchical Navigation System Provides persistent, queryable memory with hierarchical structure and provenance tracking. Uses ChromaDB for embedding storage and semantic search. This module enables: - Technical continuity (decisions, lessons, patterns) - Personal memory (private notes, reflections, and confidential content) - Reference library (books, papers, documentation) Each domain maintains hierarchical structure with query cascading from general to specific. """ # Black Orchid module metadata __black_orchid_metadata__ = { "category": "memory", "description": "Semantic memory and knowledge base with ChromaDB", "aliases": { "mem": "query_memory", "remember": "add_to_memory", "recall": "query_memory", "library": "query_library", }, "priority": 1, # Core module - high priority } import chromadb from chromadb.config import Settings from pathlib import Path from typing import Dict, List, Any, Optional import re from datetime import datetime import tempfile # E-book parsing libraries (optional imports with graceful fallback) try: import pymupdf4llm PYMUPDF_AVAILABLE = True except ImportError: PYMUPDF_AVAILABLE = False try: import ebooklib from ebooklib import epub from bs4 import BeautifulSoup EPUB_AVAILABLE = True except ImportError: EPUB_AVAILABLE = False try: import mobi MOBI_AVAILABLE = True except ImportError: MOBI_AVAILABLE = False # Import config manager for domain configuration try: from modules.config_manager import get_enabled_domains, is_domain_enabled except ImportError: # Fallback if config_manager not available def get_enabled_domains(): """ Get list of enabled semantic memory domains from configuration. Returns: list: Enabled domain names (e.g., ['personal', 'technical', 'library']) """ return ['technical', 'library'] def is_domain_enabled(domain): """ Check if a semantic memory domain is enabled in configuration. Args: domain: Domain name to check (e.g., 'personal', 'technical', 'library') Returns: bool: True if domain is enabled, False otherwise """ return domain in get_enabled_domains() # Database paths DB_PATH = Path("./db/chroma") SOURCES_PATH = Path("./sources") def initialize_semantic_memory() -> Dict[str, Any]: """ Initialize ChromaDB client and create memory collections. Creates persistent client and sets up collections for each memory domain. Safe to call multiple times - will reuse existing collections. Domains are loaded from config (public + private configs merged). Returns: dict: Status and available collections Example: >>> initialize_semantic_memory() {'success': True, 'collections': ['technical', 'library']} """ try: # Create db directory if needed DB_PATH.mkdir(parents=True, exist_ok=True) # Initialize persistent client client = chromadb.PersistentClient(path=str(DB_PATH)) # Get enabled domains from config domains = get_enabled_domains() collections = {} for domain in domains: collections[domain] = client.get_or_create_collection( name=f"memory_{domain}", metadata={"domain": domain, "version": "1.0"} ) return { 'success': True, 'collections': domains, 'db_path': str(DB_PATH), 'note': 'Collections initialized or loaded successfully' } except Exception as e: return { 'success': False, 'error': f'Failed to initialize semantic memory: {e}' } def get_collections() -> Dict[str, Any]: """ List available memory collections and their statistics. Returns: dict: Collection info including document counts Example: >>> get_collections() {'success': True, 'collections': {'technical': 42, 'library': 128}} """ try: client = chromadb.PersistentClient(path=str(DB_PATH)) collections_info = {} for collection in client.list_collections(): coll_obj = client.get_collection(collection.name) collections_info[collection.name] = { 'count': coll_obj.count(), 'metadata': collection.metadata } return { 'success': True, 'collections': collections_info } except Exception as e: return { 'success': False, 'error': f'Failed to get collections: {e}' } def _parse_markdown_hierarchy(file_path: str) -> Dict[str, Any]: """ Parse markdown file into hierarchical structure based on headers. Extracts: - L0: Document (file itself) - L1: Major sections (# or ## headers) - L2: Subsections/paragraphs Args: file_path: Path to markdown file Returns: dict: Hierarchical structure with nodes at each level Example: >>> _parse_markdown_hierarchy("sources/technical/decisions.md") {'success': True, 'hierarchy': {...}, 'levels': 2} """ try: path = Path(file_path) if not path.exists(): return {'success': False, 'error': f'File not found: {file_path}'} content = path.read_text(encoding='utf-8') # L0: Document level source_id = path.stem # filename without extension hierarchy = { 'source_id': source_id, 'file_path': str(path), 'L0': { 'node_id': source_id, 'title': source_id.replace('_', ' ').title(), 'level': 0, 'parent_id': None, 'content': content[:500], # First 500 chars as summary 'path': source_id }, 'L1': [], 'L2': [] } # Split into sections by headers # Match # Header or ## Header header_pattern = re.compile(r'^(#{1,2})\s+(.+)$', re.MULTILINE) sections = [] last_pos = 0 current_section = None for match in header_pattern.finditer(content): # Save previous section's content if current_section: current_section['content'] = content[last_pos:match.start()].strip() sections.append(current_section) # Start new section level = len(match.group(1)) # Number of # symbols title = match.group(2).strip() current_section = { 'level': 1 if level <= 2 else 2, # Collapse to L1 or L2 'title': title, 'position': match.start() } last_pos = match.end() # Don't forget the last section if current_section: current_section['content'] = content[last_pos:].strip() sections.append(current_section) # Build L1 nodes for idx, section in enumerate([s for s in sections if s['level'] == 1]): node_id = f"{source_id}_L1_{idx}" hierarchy['L1'].append({ 'node_id': node_id, 'title': section['title'], 'level': 1, 'parent_id': source_id, 'content': section['content'][:1000], # First 1000 chars 'path': f"{source_id} > {section['title']}" }) # Build L2 nodes (paragraphs within sections or all paragraphs if no L1) if hierarchy['L1']: # L2 from subsections for idx, section in enumerate([s for s in sections if s['level'] == 2]): node_id = f"{source_id}_L2_{idx}" # Find parent L1 by position parent_L1 = None for l1 in hierarchy['L1']: if section['position'] > sections[[s for s in sections if s.get('title') == l1['title']][0] if any(s.get('title') == l1['title'] for s in sections) else None].get('position', 0): parent_L1 = l1['node_id'] hierarchy['L2'].append({ 'node_id': node_id, 'title': section['title'], 'level': 2, 'parent_id': parent_L1 or source_id, 'content': section['content'], 'path': f"{source_id} > ... > {section['title']}" }) else: # No headers - chunk into paragraphs as L2 paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()] for idx, para in enumerate(paragraphs[:20]): # Limit to first 20 paragraphs node_id = f"{source_id}_L2_{idx}" hierarchy['L2'].append({ 'node_id': node_id, 'title': para[:50] + '...', # First 50 chars as title 'level': 2, 'parent_id': source_id, 'content': para, 'path': f"{source_id} > para_{idx}" }) return { 'success': True, 'hierarchy': hierarchy, 'levels': 2 if hierarchy['L2'] else (1 if hierarchy['L1'] else 0), 'source_id': source_id } except Exception as e: return { 'success': False, 'error': f'Failed to parse hierarchy: {e}' } def _detect_document_format(file_path: str) -> str: """ Detect document format from file extension. Args: file_path: Path to document Returns: str: Format type ('pdf', 'epub', 'mobi', 'markdown', 'unknown') """ path = Path(file_path) extension = path.suffix.lower() format_map = { '.md': 'markdown', '.markdown': 'markdown', '.pdf': 'pdf', '.epub': 'epub', '.mobi': 'mobi', '.azw': 'mobi', # Kindle format, similar to MOBI '.azw3': 'mobi' } return format_map.get(extension, 'unknown') def _parse_pdf_document(file_path: str) -> Dict[str, Any]: """ Parse PDF document into hierarchical structure using pymupdf4llm. Converts PDF to markdown and extracts hierarchy from headings. Args: file_path: Path to PDF file Returns: dict: Hierarchical structure with nodes at each level """ try: if not PYMUPDF_AVAILABLE: return { 'success': False, 'error': 'pymupdf4llm not installed. Run: pip install pymupdf4llm' } path = Path(file_path) if not path.exists(): return {'success': False, 'error': f'File not found: {file_path}'} # Convert PDF to markdown md_text = pymupdf4llm.to_markdown(str(path)) # Parse markdown structure source_id = path.stem hierarchy = { 'source_id': source_id, 'file_path': str(path), 'format': 'pdf', 'L0': { 'node_id': source_id, 'title': source_id.replace('_', ' ').title(), 'level': 0, 'parent_id': None, 'content': md_text[:500], 'path': source_id, 'format': 'pdf' }, 'L1': [], 'L2': [], 'L3': [] } # Extract hierarchy from markdown headings lines = md_text.split('\n') current_l1 = None current_l2 = None l1_idx = 0 l2_idx = 0 l3_idx = 0 for i, line in enumerate(lines): # Detect headings if line.startswith('# ') and not line.startswith('## '): # L1: Chapter level (single #) title = line.replace('# ', '').strip() node_id = f"{source_id}_L1_{l1_idx}" current_l1 = { 'node_id': node_id, 'title': title, 'level': 1, 'parent_id': source_id, 'content': '', 'path': f"{source_id} > {title}", 'format': 'pdf' } hierarchy['L1'].append(current_l1) current_l2 = None l1_idx += 1 elif line.startswith('## '): # L2: Section level (##) title = line.replace('## ', '').strip() node_id = f"{source_id}_L2_{l2_idx}" parent_id = current_l1['node_id'] if current_l1 else source_id current_l2 = { 'node_id': node_id, 'title': title, 'level': 2, 'parent_id': parent_id, 'content': '', 'path': f"{source_id} > ... > {title}", 'format': 'pdf' } hierarchy['L2'].append(current_l2) l2_idx += 1 elif line.startswith('### '): # L3: Subsection level (###) title = line.replace('### ', '').strip() node_id = f"{source_id}_L3_{l3_idx}" parent_id = current_l2['node_id'] if current_l2 else (current_l1['node_id'] if current_l1 else source_id) hierarchy['L3'].append({ 'node_id': node_id, 'title': title, 'level': 3, 'parent_id': parent_id, 'content': '', 'path': f"{source_id} > ... > ... > {title}", 'format': 'pdf' }) l3_idx += 1 else: # Add content to current section if line.strip(): if current_l2: current_l2['content'] += line + '\n' elif current_l1: current_l1['content'] += line + '\n' # Limit content length for each node for node in hierarchy['L1']: node['content'] = node['content'][:1000] for node in hierarchy['L2']: node['content'] = node['content'][:1000] for node in hierarchy['L3']: node['content'] = node['content'][:1000] max_level = 3 if hierarchy['L3'] else (2 if hierarchy['L2'] else (1 if hierarchy['L1'] else 0)) return { 'success': True, 'hierarchy': hierarchy, 'levels': max_level, 'source_id': source_id } except Exception as e: return { 'success': False, 'error': f'Failed to parse PDF: {e}' } def _parse_epub_document(file_path: str) -> Dict[str, Any]: """ Parse EPUB document into hierarchical structure. Extracts chapters and builds hierarchy from TOC and content. Args: file_path: Path to EPUB file Returns: dict: Hierarchical structure with nodes at each level """ try: if not EPUB_AVAILABLE: return { 'success': False, 'error': 'ebooklib not installed. Run: pip install ebooklib beautifulsoup4' } path = Path(file_path) if not path.exists(): return {'success': False, 'error': f'File not found: {file_path}'} # Read EPUB book = epub.read_epub(str(path)) source_id = path.stem hierarchy = { 'source_id': source_id, 'file_path': str(path), 'format': 'epub', 'L0': { 'node_id': source_id, 'title': source_id.replace('_', ' ').title(), 'level': 0, 'parent_id': None, 'content': '', 'path': source_id, 'format': 'epub' }, 'L1': [], 'L2': [] } # Extract chapters chapter_idx = 0 for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): try: content_html = item.get_body_content() soup = BeautifulSoup(content_html, 'lxml') # Extract text from paragraphs text = ' '.join([p.get_text() for p in soup.find_all(['p', 'div'])]) if not text.strip(): continue # Try to get chapter title title_elem = soup.find(['h1', 'h2', 'title']) title = title_elem.get_text() if title_elem else f"Chapter {chapter_idx + 1}" node_id = f"{source_id}_L1_{chapter_idx}" hierarchy['L1'].append({ 'node_id': node_id, 'title': title, 'level': 1, 'parent_id': source_id, 'content': text[:1000], 'path': f"{source_id} > {title}", 'format': 'epub' }) chapter_idx += 1 except Exception as e: # Skip problematic items continue # Set L0 content to first chapter preview if hierarchy['L1']: hierarchy['L0']['content'] = hierarchy['L1'][0]['content'][:500] return { 'success': True, 'hierarchy': hierarchy, 'levels': 1 if hierarchy['L1'] else 0, 'source_id': source_id } except Exception as e: return { 'success': False, 'error': f'Failed to parse EPUB: {e}' } def _parse_mobi_document(file_path: str) -> Dict[str, Any]: """ Parse MOBI document by converting to EPUB first. Uses mobi library to unpack, then parses as EPUB. Args: file_path: Path to MOBI file Returns: dict: Hierarchical structure with nodes at each level """ try: if not MOBI_AVAILABLE: return { 'success': False, 'error': 'mobi library not installed. Run: pip install mobi' } path = Path(file_path) if not path.exists(): return {'success': False, 'error': f'File not found: {file_path}'} # Create temporary directory for conversion with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Extract MOBI to EPUB tempfile_path, epub_path = mobi.extract(str(path)) if not epub_path: return { 'success': False, 'error': 'Failed to convert MOBI to EPUB' } # Parse as EPUB result = _parse_epub_document(epub_path) # Update format in result if result['success']: result['hierarchy']['format'] = 'mobi' result['hierarchy']['L0']['format'] = 'mobi' for node in result['hierarchy'].get('L1', []): node['format'] = 'mobi' for node in result['hierarchy'].get('L2', []): node['format'] = 'mobi' return result except Exception as e: return { 'success': False, 'error': f'Failed to parse MOBI: {e}' } def ingest_document(file_path: str, domain: str = 'library') -> Dict[str, Any]: """ Ingest a document (markdown, PDF, EPUB, or MOBI) into semantic memory. Detects format and parses hierarchical structure, then embeds each level in ChromaDB. Args: file_path: Path to document file domain: Memory domain (e.g., 'technical', 'library', 'personal') Returns: dict: Ingestion status and node counts Example: >>> ingest_document("sources/library/power_electronics.pdf", domain="library") {'success': True, 'nodes_added': 245, 'levels': 3, 'format': 'pdf'} """ try: # Validate domain against config valid_domains = get_enabled_domains() if domain not in valid_domains: return { 'success': False, 'error': f'Invalid domain. Must be one of: {valid_domains}' } # Detect format and route to appropriate parser doc_format = _detect_document_format(file_path) if doc_format == 'markdown': parse_result = _parse_markdown_hierarchy(file_path) elif doc_format == 'pdf': parse_result = _parse_pdf_document(file_path) elif doc_format == 'epub': parse_result = _parse_epub_document(file_path) elif doc_format == 'mobi': parse_result = _parse_mobi_document(file_path) else: return { 'success': False, 'error': f'Unsupported format: {doc_format}. Supported: markdown, pdf, epub, mobi' } if not parse_result['success']: return parse_result hierarchy = parse_result['hierarchy'] # Get collection client = chromadb.PersistentClient(path=str(DB_PATH)) collection = client.get_or_create_collection( name=f"memory_{domain}", metadata={"domain": domain} ) # Prepare documents for embedding ids = [] documents = [] metadatas = [] # Add L0 l0 = hierarchy['L0'] ids.append(l0['node_id']) documents.append(l0['content']) metadatas.append({ 'level': l0['level'], 'parent_id': '', 'node_id': l0['node_id'], 'title': l0['title'], 'path': l0['path'], 'domain': domain, 'source_file': file_path }) # Add L1 for node in hierarchy['L1']: ids.append(node['node_id']) documents.append(node['content']) metadatas.append({ 'level': node['level'], 'parent_id': node['parent_id'], 'node_id': node['node_id'], 'title': node['title'], 'path': node['path'], 'domain': domain, 'source_file': file_path }) # Add L2 for node in hierarchy['L2']: ids.append(node['node_id']) documents.append(node['content']) metadatas.append({ 'level': node['level'], 'parent_id': node['parent_id'], 'node_id': node['node_id'], 'title': node['title'], 'path': node['path'], 'domain': domain, 'source_file': file_path, 'format': hierarchy.get('format', 'markdown') }) # Add L3 (for PDFs and deep hierarchies) if 'L3' in hierarchy: for node in hierarchy['L3']: ids.append(node['node_id']) documents.append(node['content']) metadatas.append({ 'level': node['level'], 'parent_id': node['parent_id'], 'node_id': node['node_id'], 'title': node['title'], 'path': node['path'], 'domain': domain, 'source_file': file_path, 'format': hierarchy.get('format', 'markdown') }) # Upsert to collection (adds or updates) collection.upsert( ids=ids, documents=documents, metadatas=metadatas ) return { 'success': True, 'nodes_added': len(ids), 'levels': parse_result['levels'], 'source_id': hierarchy['source_id'], 'domain': domain } except Exception as e: return { 'success': False, 'error': f'Failed to ingest document: {e}' } def query_memory(text: str, domain: str = 'technical', n_results: int = 5) -> Dict[str, Any]: """ Query semantic memory with hierarchical cascading. Searches embeddings and returns results with full provenance. Args: text: Search query domain: Memory domain to search ('technical', 'library', or custom domain) n_results: Number of results to return Returns: dict: Search results with provenance chains Example: >>> query_memory("how did we solve REPL persistence?", domain="technical") {'success': True, 'results': [...], 'count': 3} """ try: # Get collection client = chromadb.PersistentClient(path=str(DB_PATH)) try: collection = client.get_collection(name=f"memory_{domain}") except Exception: return { 'success': False, 'error': f'Collection for domain "{domain}" not found. Try initializing first.' } # Query results = collection.query( query_texts=[text], n_results=n_results, include=['documents', 'metadatas', 'distances'] ) # Format results formatted_results = [] for idx in range(len(results['ids'][0])): formatted_results.append({ 'node_id': results['ids'][0][idx], 'content': results['documents'][0][idx], 'metadata': results['metadatas'][0][idx], 'similarity_score': 1 - results['distances'][0][idx], # Convert distance to similarity 'path': results['metadatas'][0][idx].get('path', ''), 'title': results['metadatas'][0][idx].get('title', ''), 'level': results['metadatas'][0][idx].get('level', 0) }) return { 'success': True, 'results': formatted_results, 'count': len(formatted_results), 'query': text, 'domain': domain } except Exception as e: return { 'success': False, 'error': f'Query failed: {e}' } def rebuild_domain(domain: str, source_paths: Optional[List[str]] = None) -> Dict[str, Any]: """ Rebuild entire memory domain from sources directory or custom paths. Scans sources/{domain}/ for markdown files and re-ingests all, or uses provided source_paths list for custom locations. Args: domain: Domain to rebuild ('technical', 'library', 'personal', or custom domain) source_paths: Optional list of custom paths to scan instead of sources/{domain}/ Returns: dict: Rebuild status with file counts Example: >>> rebuild_domain('technical') {'success': True, 'files_processed': 12, 'nodes_added': 157} >>> rebuild_domain('personal', source_paths=['private/notes']) {'success': True, 'files_processed': 20, 'nodes_added': 42} """ try: # Determine source paths if source_paths: # Custom paths provided paths_to_scan = [Path(p) for p in source_paths] else: # Default to sources/{domain}/ domain_path = SOURCES_PATH / domain if not domain_path.exists(): return { 'success': False, 'error': f'Domain directory not found: {domain_path}' } paths_to_scan = [domain_path] # Clear existing collection client = chromadb.PersistentClient(path=str(DB_PATH)) try: client.delete_collection(name=f"memory_{domain}") except Exception: pass # Collection might not exist yet # Recreate collection client.get_or_create_collection( name=f"memory_{domain}", metadata={"domain": domain, "rebuilt_at": str(datetime.now())} ) # Find all markdown files across all paths md_files = [] for path in paths_to_scan: if path.is_file() and path.suffix == '.md': md_files.append(path) elif path.is_dir(): md_files.extend(list(path.rglob("*.md"))) if not md_files: return { 'success': True, 'files_processed': 0, 'nodes_added': 0, 'note': f'No markdown files found in specified paths' } # Ingest each file total_nodes = 0 processed = 0 errors = [] for md_file in md_files: result = ingest_document(str(md_file), domain) if result['success']: total_nodes += result['nodes_added'] processed += 1 else: errors.append(f"{md_file.name}: {result['error']}") return { 'success': True, 'files_processed': processed, 'nodes_added': total_nodes, 'domain': domain, 'errors': errors if errors else None } except Exception as e: return { 'success': False, 'error': f'Rebuild failed: {e}' } def rebuild_personal_domain() -> Dict[str, Any]: """ Rebuild personal memory domain from private/ directory. Scans and embeds private notes and confidential content from various private/ subdirectories including personal history, notes, and reference materials. Returns: dict: Rebuild status with file counts Example: >>> rebuild_personal_domain() {'success': True, 'files_processed': 30, 'nodes_added': 280, 'domain': 'personal'} """ # Build list of paths that exist personal_paths = [] potential_paths = [ 'private/story', 'private/notes', 'private/reference' ] for path_str in potential_paths: if Path(path_str).exists(): personal_paths.append(path_str) if not personal_paths: return { 'success': False, 'error': 'No personal domain directories found. Create at least one of: private/story, private/notes, private/reference' } return rebuild_domain('personal', source_paths=personal_paths) def rebuild_technical_domain() -> Dict[str, Any]: """ Rebuild technical memory domain from private/design_docs/. Scans and embeds all design documents and technical notes. Returns: dict: Rebuild status with file counts Example: >>> rebuild_technical_domain() {'success': True, 'files_processed': 8, 'nodes_added': 124, 'domain': 'technical'} """ return rebuild_domain('technical', source_paths=['private/design_docs'])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AJ-Gonzalez/black-orchid'

If you have feedback or need assistance with the MCP directory API, please join our Discord server