Skip to main content
Glama
rbt_chunker.py11 kB
""" RBT document chunker. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker """ import re from typing import Any, Dict, List, Optional from ..converter import MarkdownConverter from .models import ChunkMetadata class RBTChunker: """ Chunks RBT documents based on their structure. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker The chunker parses RBT documents and creates chunks for each section, generating stable chunk IDs based on the document structure. """ def __init__(self): """ Initialize RBTChunker. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker """ self.converter = MarkdownConverter() def chunk( self, document_content: str, project_id: str, feature_id: str, doc_type: str ) -> List[ChunkMetadata]: """ Chunk an RBT document into sections. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: document_content: The raw markdown content of the RBT document project_id: Project identifier feature_id: Feature identifier doc_type: Document type (REQ/BP/TASK) Returns: List of ChunkMetadata objects, one for each section Example: >>> chunker = RBTChunker() >>> chunks = chunker.chunk( ... document_content="# Title\\n\\n## Section\\n...", ... project_id="knowledge-smith", ... feature_id="graphiti-chunk-mcp", ... doc_type="TASK" ... ) """ # Store original content for raw extraction self._document_content = document_content # Parse document to JSON structure doc_json = self.converter.to_json(document_content) # Generate parent document ID parent_document_id = f"{project_id}+{feature_id}+{doc_type}" # Extract metadata and info for chunk metadata document_metadata = doc_json.get("metadata", {}) document_info = doc_json.get("info", {}) # Start collecting chunks chunks: List[ChunkMetadata] = [] # Create a chunk for info section if it exists and has content if document_info: info_chunk_id = f"{project_id}+{feature_id}+sec-info" info_content = self._generate_info_content(document_info) info_chunk_metadata = { "chunk_id": info_chunk_id, "parent_document_id": parent_document_id, "project_id": project_id, "feature_id": feature_id, "doc_type": doc_type, "section_id": "sec-info", "section_title": "Info Section", "section_summary": None, "info": document_info # Only info chunk has info field } info_chunk = ChunkMetadata( metadata=info_chunk_metadata, content=info_content ) chunks.append(info_chunk) # Process all sections recursively sections = doc_json.get("sections", []) for section in sections: self._process_section( section=section, project_id=project_id, feature_id=feature_id, doc_type=doc_type, parent_document_id=parent_document_id, chunks=chunks ) return chunks def _process_section( self, section: Dict[str, Any], project_id: str, feature_id: str, doc_type: str, parent_document_id: str, chunks: List[ChunkMetadata], depth: int = 0 ) -> None: """ Recursively process a section and its nested sections. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: section: Section data from JSON project_id: Project identifier feature_id: Feature identifier doc_type: Document type parent_document_id: Parent document identifier chunks: List to append chunks to depth: Current nesting depth """ section_id = section.get("id") section_title = section.get("title") section_summary = section.get("summary") # Can be None # Generate chunk_id chunk_id = f"{project_id}+{feature_id}+{section_id}" # Generate content for this section content = self._generate_section_content(section) # Build metadata dict with all required fields chunk_metadata = { "chunk_id": chunk_id, "parent_document_id": parent_document_id, "project_id": project_id, "feature_id": feature_id, "doc_type": doc_type, "section_id": section_id, "section_title": section_title, "section_summary": section_summary # Will be None if not present } # Create chunk with new structure chunk = ChunkMetadata( metadata=chunk_metadata, content=content ) chunks.append(chunk) # Process nested sections recursively nested_sections = section.get("sections", []) for nested_section in nested_sections: self._process_section( section=nested_section, project_id=project_id, feature_id=feature_id, doc_type=doc_type, parent_document_id=parent_document_id, chunks=chunks, depth=depth + 1 ) def _generate_section_content(self, section: Dict[str, Any]) -> str: """ Generate markdown content for a section. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: section: Section data from JSON Returns: Markdown content string for the section """ section_id = section.get("id") # Extract raw section content from original markdown raw_content = self._extract_raw_section_content(section_id) if raw_content: return raw_content # Fallback to JSON-based generation if raw extraction fails lines = [] # Add section title title = section.get("title", "") if title: lines.append(f"## {title}") lines.append("") # Add section summary if present summary = section.get("summary") if summary: lines.append(summary) lines.append("") # Add blocks blocks = section.get("blocks", []) for block in blocks: block_content = self._generate_block_content(block) if block_content: lines.append(block_content) lines.append("") return "\n".join(lines).strip() def _generate_info_content(self, info: Dict[str, Any]) -> str: """ Generate markdown content for info section. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: info: Info section data from JSON Returns: Markdown content string for the info section """ lines = ["<!-- info-section -->"] # Add each field from info for key, value in info.items(): lines.append(f"> {key}: {value}") return "\n".join(lines) def _generate_block_content(self, block: Dict[str, Any]) -> str: """ Generate markdown content for a block. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: block: Block data from JSON Returns: Markdown content string for the block """ block_type = block.get("type", "") if block_type == "paragraph": return block.get("content", "") elif block_type == "code": language = block.get("language", "") content = block.get("content", "") return f"```{language}\n{content}\n```" elif block_type == "list": lines = [] title = block.get("title") if title: lines.append(f"**{title}**") lines.append("") items = block.get("items", []) for item in items: lines.append(f"- {item}") return "\n".join(lines) elif block_type == "table": lines = [] header = block.get("header", []) rows = block.get("rows", []) if header: # Header row lines.append("| " + " | ".join(header) + " |") # Separator row lines.append("| " + " | ".join(["---"] * len(header)) + " |") # Data rows for row in rows: lines.append("| " + " | ".join(row) + " |") return "\n".join(lines) else: # Unknown block type, return empty string return "" def _extract_raw_section_content(self, section_id: str) -> Optional[str]: """ Extract raw section content from original markdown. This method finds the section by its ID comment and extracts all content until the next section ID comment or end of document. @REQ: REQ-graphiti-chunk-mcp @BP: BP-graphiti-chunk-mcp @TASK: TASK-002-RBTChunker Args: section_id: Section ID to extract (e.g., "sec-data-structures") Returns: Raw markdown content for the section, or None if not found """ if not hasattr(self, '_document_content'): return None content = self._document_content # Pattern to match section ID comment: <!-- id: sec-xxx --> section_pattern = rf'<!--\s*id:\s*{re.escape(section_id)}\s*-->' # Find the section start match = re.search(section_pattern, content) if not match: return None start_pos = match.end() # Find the next section ID comment or end of document # Pattern: <!-- id: sec-xxx --> or <!-- id: blk-xxx --> next_section_pattern = r'<!--\s*id:\s*sec-[\w-]+\s*-->' next_match = re.search(next_section_pattern, content[start_pos:]) if next_match: end_pos = start_pos + next_match.start() else: end_pos = len(content) # Extract section content section_content = content[start_pos:end_pos].strip() return section_content

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leo7nel23/KnowkedgeSmith-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server