Skip to main content
Glama
document_store.py18.1 kB
# cmcp/kb/document_store.py # container-mcp © 2025 by Martin Bukowski is licensed under Apache 2.0 """Document store for the knowledge base.""" import os import json import re import time import shutil from datetime import datetime, timezone from typing import List, Dict, Any, Optional from pathlib import Path from .models import DocumentIndex, DocumentFragment from .path import PathComponents, PartialPathComponents class DocumentStore: """Handles storage and retrieval of documents in the knowledge base.""" DEFAULT_FRAGMENT_SIZE = 4096 # 4KB chunks by default def __init__(self, base_path: str): """Initialize the document store with a base path. Args: base_path: Base path for document storage """ self.base_path = Path(base_path).resolve() os.makedirs(self.base_path, exist_ok=True) def validate_path(self, path: str) -> str: """Validate and normalize a document path. Args: path: Document path to validate (namespace/collection/name) Returns: Normalized path Raises: ValueError: If path is invalid """ # Remove leading/trailing slashes and normalize normalized = path.strip().strip('/') if not normalized: raise ValueError(f"Path cannot be empty") # Ensure the path follows the namespace/collection/name format parts = normalized.split('/') if len(parts) != 3: raise ValueError(f"Invalid path format: {path}. Path must be in the form 'namespace/collection/name'") # Validate each part for part in parts: if not part or not re.match(r'^[\w\-]+$', part): raise ValueError(f"Invalid path component: {part}. Components must be alphanumeric with hyphens.") return normalized def generate_name(self, title: Optional[str] = None) -> str: """Generate a name for a document. Args: title: Optional title to base the name on Returns: Generated name """ if title: # Create a slug from the title name = self._slugify(title) else: # Create a timestamp-based name timestamp = int(time.time()) name = f"doc-{timestamp}" return name def _slugify(self, text: str) -> str: """Convert text to a URL-friendly slug. Args: text: Text to convert Returns: URL-friendly slug """ # Convert to lowercase slug = text.lower() # Replace spaces with hyphens slug = slug.replace(' ', '-') # Remove special characters slug = re.sub(r'[^a-z0-9\-]', '', slug) # Remove consecutive hyphens slug = re.sub(r'\-+', '-', slug) # Trim hyphens from start and end slug = slug.strip('-') # Limit length if len(slug) > 64: slug = slug[:64] # Ensure we have something if not slug: timestamp = int(time.time()) slug = f"doc-{timestamp}" return slug def ensure_directory(self, components: PathComponents) -> Path: """Ensure the directory for a document exists. Args: components: PathComponents with namespace, collection, and name Returns: Path to the document directory """ document_path = self.base_path / components.path os.makedirs(document_path, exist_ok=True) return document_path def write_content(self, components: PathComponents, content: str) -> str: """Write content to a document file. Args: components: PathComponents with namespace, collection, name and optional fragment content: Content to write Returns: Path to the written file """ document_path = self.ensure_directory(components) filename = components.get_fragment_name(prefix="content", default="0000", ext="txt") file_path = document_path / filename with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return filename def read_content(self, components: PathComponents) -> str: """Read content from a document. Args: components: PathComponents with namespace, collection, and name Can include an optional fragment to specify a file Returns: Document content Raises: FileNotFoundError: If document doesn't exist """ document_path = self.base_path / components.path # If fragment is specified, try to read that specific file if components.fragment: # Use the same naming convention as write_content fragment_filename = components.get_fragment_name(prefix="content", default="0000", ext="txt") fragment_path = document_path / fragment_filename if fragment_path.exists(): with open(fragment_path, 'r', encoding='utf-8') as f: return f.read() else: raise FileNotFoundError(f"Fragment file {fragment_filename} not found for document {components.path}") # No fragment or fragment file not found, try standard content files content_path = document_path / "content.txt" if content_path.exists(): with open(content_path, 'r', encoding='utf-8') as f: return f.read() # If not found, try content.0000.txt (first fragment) chunk_path = document_path / "content.0000.txt" if chunk_path.exists(): with open(chunk_path, 'r', encoding='utf-8') as f: return f.read() # If not found, we will have to read the index and get the fragment with the lowest sequence number index = self.read_index(components) if index.fragments: for fragment, fragment_info in sorted(index.fragments.items(), key=lambda x: int(x[1].sequence_num)): chunk_path = document_path / f"content.{fragment}.txt" if chunk_path.exists(): with open(chunk_path, 'r', encoding='utf-8') as f: return f.read() raise FileNotFoundError(f"Document content not found for path: {components.path}") def check_content(self, components: PathComponents) -> bool: """Check if content exists for a document. Args: components: PathComponents with namespace, collection, and name """ document_path = self.base_path / components.path filename = components.get_fragment_name(prefix="content", default="0000", ext="txt") return (document_path / filename).exists() def chunk_content(self, content: str, max_fragment_size: Optional[int] = None) -> List[str]: """Split content into fragments. Args: content: Content to split max_fragment_size: Maximum fragment size in characters Returns: List of content chunks """ if max_fragment_size is None: max_fragment_size = self.DEFAULT_FRAGMENT_SIZE # Simple character-based chunking fragments = [] for i in range(0, len(content), max_fragment_size): fragments.append(content[i:i + max_fragment_size]) return fragments def write_index(self, components: PathComponents, index: DocumentIndex) -> Path: """Write document index file. Args: components: PathComponents with namespace, collection, and name index: Document index Returns: Path to the index file """ document_path = self.ensure_directory(components) index_path = document_path / "index.json" index_dict = index.model_dump() # Create document index using components doc_index = DocumentIndex( namespace=components.namespace, collection=components.collection, name=components.name, **{k: v for k, v in index_dict.items() if k not in ['namespace', 'collection', 'name', 'fragments', 'created_at']} ) # Add fragments information if provided if index.fragments: doc_index.chunked = True doc_index.fragments = index.fragments # Write to index file with open(index_path, 'w', encoding='utf-8') as f: f.write(doc_index.model_dump_json(indent=2)) return index_path def check_index(self, components: PathComponents) -> bool: """Check if document index file exists. Args: components: PathComponents with namespace, collection, and name """ document_path = self.base_path / components.path index_path = document_path / "index.json" return index_path.exists() def read_index(self, components: PathComponents) -> DocumentIndex: """Read document index file. Args: components: PathComponents with namespace, collection, and name Returns: Document metadata Raises: FileNotFoundError: If metadata file doesn't exist """ document_path = self.base_path / components.path index_path = document_path / "index.json" if not index_path.exists(): raise FileNotFoundError(f"Index file not found for document {components.urn}") with open(index_path, 'r', encoding='utf-8') as f: data = json.load(f) # Convert loaded data to DocumentIndex return DocumentIndex(**data) def update_index(self, components: PathComponents, updates: Dict[str, Any]) -> DocumentIndex: """Update document index file. Args: components: PathComponents with namespace, collection, and name updates: Dictionary of fields to update Returns: Updated document index Raises: FileNotFoundError: If index file doesn't exist """ # Read existing index index = self.read_index(components) # Update fields index_dict = index.model_dump() index_dict.update(updates) new_index = DocumentIndex(**index_dict) new_index.updated_at = datetime.now(timezone.utc) # Don't allow changing path components through updates new_index.namespace = index.namespace new_index.collection = index.collection new_index.name = index.name # Write back document_path = self.base_path / components.path index_path = document_path / "index.json" with open(index_path, 'w', encoding='utf-8') as f: f.write(new_index.model_dump_json(indent=2)) return new_index def find_documents_recursive(self, components: PartialPathComponents) -> List[str]: """Find all documents recursively under a namespace/collection. Args: components: PartialPathComponents with namespace, collection, and name Returns: List of document paths """ search_path = self.base_path # Build the search path based on provided filters if components.namespace: search_path = search_path / components.namespace if components.collection: search_path = search_path / components.collection if components.name: search_path = search_path / components.name # Search for index.json files documents = [] for index_file in search_path.glob("**/index.json"): doc_path = index_file.parent relative_path = doc_path.relative_to(self.base_path) documents.append(str(relative_path).replace('\\', '/')) return documents def find_documents_shallow(self, components: PartialPathComponents) -> List[str]: """Find documents in the knowledge base (non-recursive). Args: components: PartialPathComponents with namespace, collection, and name Returns: List of document paths in format namespace/collection/name """ if components.namespace is None: # List all namespaces (shallow mode) return [d.name for d in self.base_path.iterdir() if d.is_dir()] namespace_path = self.base_path / components.namespace if not namespace_path.exists(): return [] if components.collection is None: # List collections in the namespace return [f"{components.namespace}/{d.name}" for d in namespace_path.iterdir() if d.is_dir()] collection_path = namespace_path / components.collection if not collection_path.exists(): return [] if components.name is None: # List documents in the collection return [f"{components.namespace}/{components.collection}/{d.name}" for d in collection_path.iterdir() if d.is_dir()] else: # List documents in the collection/name document_path = collection_path / components.name if not document_path.exists(): return [] return [f"{components.namespace}/{components.collection}/{components.name}"] def delete_document(self, components: PathComponents) -> None: """Delete a document from the knowledge base. Args: components: PathComponents with namespace, collection, and name Raises: FileNotFoundError: If document doesn't exist """ document_path = self.base_path / components.path if not document_path.exists(): raise FileNotFoundError(f"Document not found: {components.urn}") # Delete all files in the document directory for file_path in document_path.glob("*"): file_path.unlink() # Remove the directory document_path.rmdir() # Check if parent directories are empty and remove them if they are collection_path = document_path.parent namespace_path = collection_path.parent # Try to remove collection directory if empty try: if collection_path.exists() and not any(collection_path.iterdir()): collection_path.rmdir() # Try to remove namespace directory if empty if namespace_path.exists() and not any(namespace_path.iterdir()): namespace_path.rmdir() except OSError: # If we can't remove the directories, that's ok pass return None def move_document(self, components: PathComponents, new_components: PathComponents) -> None: """Move a document to a new location. Args: components: PathComponents with namespace, collection, and name new_components: PathComponents with namespace, collection, and name """ document_path = self.base_path / components.path new_document_path = self.base_path / new_components.path if not document_path.exists(): raise FileNotFoundError(f"Document not found: {components.urn}") if new_document_path.exists(): raise FileNotFoundError(f"New document path already exists: {new_components.urn}") # Move the document directory shutil.move(document_path, new_document_path) # Update the index index = self.read_index(new_components) index.namespace = new_components.namespace index.collection = new_components.collection index.name = new_components.name self.write_index(new_components, index) return None def validate_index(self, components: PathComponents) -> None: """Validate the index of a document. Args: components: PathComponents with namespace, collection, and name """ document_path = self.base_path / components.path if not document_path.exists(): raise FileNotFoundError(f"Document not found: {components.path}") index_path = document_path / "index.json" if not index_path.exists(): raise FileNotFoundError(f"Index file not found: {components.path}") # Check if our namespace, collection, and name are correct index = self.read_index(components) if index.namespace != components.namespace: raise ValueError(f"Namespace mismatch: {index.namespace} != {components.namespace}") if index.collection != components.collection: raise ValueError(f"Collection mismatch: {index.collection} != {components.collection}") if index.name != components.name: raise ValueError(f"Name mismatch: {index.name} != {components.name}") # Check if the fragments exist for fragment in index.fragments.keys(): fragment_path = document_path / f"content.{fragment}.txt" if not fragment_path.exists(): raise FileNotFoundError(f"Fragment file not found: {fragment_path}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/54rt1n/container-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server