Skip to main content
Glama
markdown_processor.py39.4 kB
""" Enhanced Markdown processor for MCP Memory Server. Handles reading, cleaning, chunking, and optimizing markdown files with AI integration. """ import logging import re import hashlib from typing import Optional, List, Dict, Tuple, Union from pathlib import Path import aiofiles from bs4 import BeautifulSoup import markdown logger = logging.getLogger(__name__) class MarkdownProcessor: """Processes markdown files for memory storage with AI integration hooks.""" def __init__(self, chunk_size: int = 900, chunk_overlap: int = 200) -> None: """Initialize the Markdown Processor. Args: chunk_size: Maximum tokens per chunk (default 900) chunk_overlap: Token overlap between chunks (default 200) """ self.markdown_processor = markdown.Markdown( extensions=['extra', 'codehilite', 'toc'], extension_configs={ 'codehilite': {'css_class': 'highlight'}, 'extra': {} } ) self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap async def read_markdown_file(self, file_path: str) -> str: """Read a markdown file from disk.""" try: path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not path.suffix.lower() in ['.md', '.markdown']: raise ValueError(f"Not a markdown file: {file_path}") async with aiofiles.open(file_path, 'r', encoding='utf-8') as file: content = await file.read() logger.info(f"📖 Read markdown file: {file_path} " f"({len(content)} chars)") return content except Exception as e: logger.error(f"❌ Failed to read markdown file {file_path}: {e}") raise def clean_content(self, content: str) -> str: """Clean and optimize markdown content.""" try: # Remove excessive whitespace content = self._normalize_whitespace(content) # Clean up markdown formatting content = self._clean_markdown_formatting(content) # Remove empty sections content = self._remove_empty_sections(content) # Normalize line endings content = content.replace('\r\n', '\n').replace('\r', '\n') # Ensure content ends with single newline content = content.rstrip() + '\n' logger.debug(f"🧹 Cleaned content ({len(content)} chars)") return content except Exception as e: logger.error(f"❌ Failed to clean content: {e}") return content # Return original content if cleaning fails def _normalize_whitespace(self, content: str) -> str: """Normalize whitespace in content.""" # Replace multiple spaces with single space # (except at line start for indentation) lines = content.split('\n') cleaned_lines = [] for line in lines: # Preserve leading whitespace for code blocks and lists stripped = line.lstrip() if stripped.startswith(('```', ' ', '\t', '-', '*', '+')): # Keep original line for code blocks and lists cleaned_lines.append(line.rstrip()) else: # Normalize spaces in regular text leading_spaces = len(line) - len(stripped) cleaned_text = re.sub(r' +', ' ', stripped) cleaned_lines.append(' ' * leading_spaces + cleaned_text) return '\n'.join(cleaned_lines) def _clean_markdown_formatting(self, content: str) -> str: """Clean up markdown formatting issues.""" # Fix heading spacing content = re.sub(r'^(#+)\s*(.+)', r'\1 \2', content, flags=re.MULTILINE) # Fix list formatting content = re.sub(r'^(\s*)([*+-])\s*(.+)', r'\1\2 \3', content, flags=re.MULTILINE) content = re.sub(r'^(\s*)(\d+\.)\s*(.+)', r'\1\2 \3', content, flags=re.MULTILINE) # Clean up emphasis content = re.sub(r'\*{3,}', '***', content) content = re.sub(r'_{3,}', '___', content) # Fix link formatting content = re.sub(r'\[\s*([^\]]+)\s*\]\s*\(\s*([^)]+)\s*\)', r'[\1](\2)', content) # Remove HTML comments content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL) return content def _remove_empty_sections(self, content: str) -> str: """Remove empty sections and excessive line breaks.""" # Remove multiple consecutive empty lines content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) # Remove empty sections (headings with no content) content = re.sub(r'^(#+\s*.+)\n\s*\n(#+\s*.+)', r'\1\n\n\2', content, flags=re.MULTILINE) return content def extract_metadata(self, content: str) -> tuple[str, dict]: """Extract YAML front matter if present.""" metadata = {} # Check for YAML front matter yaml_pattern = r'^---\s*\n(.*?)\n---\s*\n' match = re.match(yaml_pattern, content, re.DOTALL) if match: try: import yaml metadata = yaml.safe_load(match.group(1)) or {} content = content[match.end():] logger.debug(f"📋 Extracted metadata: {list(metadata.keys())}") except ImportError: logger.warning("⚠️ YAML library not available " "for front matter parsing") except Exception as e: logger.warning(f"⚠️ Failed to parse YAML front matter: {e}") return content, metadata def extract_sections(self, content: str) -> list[dict]: """Extract sections from markdown content.""" sections = [] # Split by headings heading_pattern = r'^(#+)\s*(.+)$' lines = content.split('\n') current_section = { 'level': 0, 'title': 'Introduction', 'content': [] } for line in lines: heading_match = re.match(heading_pattern, line) if heading_match: # Save previous section if it has content if current_section['content']: current_section['content'] = '\n'.join( current_section['content']).strip() if current_section['content']: sections.append(current_section.copy()) # Start new section level = len(heading_match.group(1)) title = heading_match.group(2).strip() current_section = { 'level': level, 'title': title, 'content': [] } else: current_section['content'].append(line) # Add final section if current_section['content']: current_section['content'] = '\n'.join( current_section['content']).strip() if current_section['content']: sections.append(current_section) logger.debug(f"📄 Extracted {len(sections)} sections") return sections def to_plain_text(self, content: str) -> str: """Convert markdown to plain text.""" try: # Convert markdown to HTML first html = self.markdown_processor.convert(content) # Parse HTML and extract text soup = BeautifulSoup(html, 'html.parser') # Remove code blocks content (keep structure but simplify) for code_block in soup.find_all(['pre', 'code']): code_block.string = '[CODE]' # Get text content plain_text = soup.get_text() # Clean up whitespace plain_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', plain_text) plain_text = plain_text.strip() logger.debug(f"📝 Converted to plain text ({len(plain_text)} chars)") return plain_text except Exception as e: logger.error(f"❌ Failed to convert to plain text: {e}") # Return cleaned markdown as fallback return self.clean_content(content) def get_word_count(self, content: str) -> int: """Get word count of content.""" plain_text = self.to_plain_text(content) words = re.findall(r'\b\w+\b', plain_text) return len(words) def get_summary(self, content: str, max_length: int = 200) -> str: """Get a summary of the content.""" plain_text = self.to_plain_text(content) if len(plain_text) <= max_length: return plain_text # Find good break point (sentence end) summary = plain_text[:max_length] last_period = summary.rfind('.') last_question = summary.rfind('?') last_exclamation = summary.rfind('!') break_point = max(last_period, last_question, last_exclamation) if break_point > max_length * 0.7: # If we found a good break point summary = summary[:break_point + 1] else: # Break at word boundary summary = summary[:summary.rfind(' ')] + '...' return summary.strip() # New Methods for Step 1 Implementation async def scan_directory_for_markdown( self, directory: str = "./", recursive: bool = True ) -> List[Dict[str, Union[str, int]]]: """Scan directory for markdown files. Args: directory: Directory path to scan (default current directory) recursive: Whether to scan subdirectories Returns: List of dictionaries containing file info """ try: directory_path = Path(directory).resolve() if not directory_path.exists(): raise FileNotFoundError(f"Directory not found: {directory}") if not directory_path.is_dir(): raise ValueError(f"Path is not a directory: {directory}") markdown_files = [] pattern = "**/*.md" if recursive else "*.md" for file_path in directory_path.glob(pattern): if (file_path.is_file() and file_path.suffix.lower() in ['.md', '.markdown']): file_info = { 'path': str(file_path), 'name': file_path.name, 'relative_path': str( file_path.relative_to(directory_path)), 'size': file_path.stat().st_size, 'directory': str(file_path.parent) } markdown_files.append(file_info) # Also check for .markdown extension if recursive if recursive: for file_path in directory_path.glob("**/*.markdown"): if file_path.is_file(): # Check if we already have this file if not any(f['path'] == str(file_path) for f in markdown_files): file_info = { 'path': str(file_path), 'name': file_path.name, 'relative_path': str( file_path.relative_to(directory_path)), 'size': file_path.stat().st_size, 'directory': str(file_path.parent) } markdown_files.append(file_info) logger.info( f"📂 Found {len(markdown_files)} markdown files in " f"{directory}" + (" (recursive)" if recursive else "") ) return sorted(markdown_files, key=lambda x: x['path']) except Exception as e: logger.error(f"❌ Failed to scan directory {directory}: {e}") raise def analyze_content_for_memory_type( self, content: str, file_path: Optional[str] = None, suggest_memory_type: bool = True ) -> Dict[str, Union[str, float, Dict, bool]]: """Analyze markdown content and suggest appropriate memory type. This method provides AI integration hooks for Cursor AI to enhance analysis. Args: content: Markdown content to analyze file_path: Optional file path for context suggest_memory_type: Whether to suggest memory type Returns: Dictionary containing analysis results and memory type suggestion """ try: analysis = { 'content_length': len(content), 'word_count': self.get_word_count(content), 'sections': len(self.extract_sections(content)), 'has_code_blocks': '```' in content, 'has_links': '[' in content and '](' in content, 'has_tables': '|' in content, 'suggested_memory_type': 'global', # Default suggestion 'confidence': 0.7, # Default confidence 'reasoning': '', 'ai_enhanced': False # Flag for AI enhancement } if suggest_memory_type: # Basic heuristic-based suggestion (AI can enhance this) suggested_type, confidence, reasoning = ( self._suggest_memory_type_heuristic(content, file_path) ) analysis.update({ 'suggested_memory_type': suggested_type, 'confidence': confidence, 'reasoning': reasoning }) # AI Integration Hook: Cursor AI can enhance this analysis # The AI can improve memory type suggestions, reasoning, and # content optimization. This provides a foundation for AI-driven # content categorization logger.debug( f"🧠 Content analysis complete: " f"{analysis['suggested_memory_type']} " f"(confidence: {analysis['confidence']:.2f})" ) return analysis except Exception as e: logger.error(f"❌ Failed to analyze content: {e}") return { 'suggested_memory_type': 'global', 'confidence': 0.5, 'reasoning': 'Analysis failed, defaulting to global', 'error': str(e) } def _suggest_memory_type_heuristic( self, content: str, file_path: Optional[str] = None ) -> Tuple[str, float, str]: """Basic heuristic for memory type suggestion (AI can enhance this). Args: content: Content to analyze file_path: Optional file path for additional context Returns: Tuple of (suggested_type, confidence, reasoning) """ content_lower = content.lower() # File path based hints if file_path: path_lower = file_path.lower() if any(term in path_lower for term in ['readme', 'doc', 'guide', 'manual', 'spec']): return ('global', 0.8, 'Documentation or reference material') if any(term in path_lower for term in ['lesson', 'learn', 'pattern', 'best', 'practice']): return ('learned', 0.8, 'Lessons learned or best practices') if any(term in path_lower for term in ['personal', 'agent', 'task', 'todo', 'scratch']): return ('agent', 0.8, 'Agent-specific or personal content') # Content-based analysis global_indicators = [ 'documentation', 'specification', 'standard', 'reference', 'api', 'protocol' ] learned_indicators = [ 'lesson', 'pattern', 'insight', 'mistake', 'experience', 'learned', 'practice' ] agent_indicators = [ 'todo', 'task', 'personal', 'scratch', 'note', 'draft' ] global_score = sum(1 for term in global_indicators if term in content_lower) learned_score = sum(1 for term in learned_indicators if term in content_lower) agent_score = sum(1 for term in agent_indicators if term in content_lower) if learned_score > global_score and learned_score > agent_score: return ('learned', 0.7, f'Contains learning indicators: {learned_score}') elif agent_score > global_score and agent_score > learned_score: return ('agent', 0.7, f'Contains agent-specific indicators: {agent_score}') else: return ('global', 0.6, 'Default to global for reference material') def optimize_content_for_storage( self, content: str, memory_type: str, ai_optimization: bool = True, suggested_type: Optional[str] = None ) -> Dict[str, Union[str, bool, int]]: """Optimize content for database storage with AI enhancement hooks. This method provides integration points for Cursor AI to enhance content optimization based on the target memory type. Args: content: Content to optimize memory_type: Target memory type (global, learned, agent) ai_optimization: Whether to apply AI-driven optimizations suggested_type: Originally suggested memory type for comparison Returns: Dictionary with optimized content and metadata """ try: # Start with cleaned content optimized_content = self.clean_content(content) # Basic optimization based on memory type if memory_type == 'learned': # For learned memory, emphasize insights and patterns optimized_content = self._optimize_for_learned_memory( optimized_content) elif memory_type == 'agent': # For agent memory, preserve personal context and tasks optimized_content = self._optimize_for_agent_memory( optimized_content) else: # global # For global memory, focus on reference and documentation optimized_content = self._optimize_for_global_memory( optimized_content) result = { 'optimized_content': optimized_content, 'memory_type': memory_type, 'original_length': len(content), 'optimized_length': len(optimized_content), 'ai_enhanced': ai_optimization, 'optimization_applied': f'{memory_type}_optimization', 'suggested_type_override': ( suggested_type != memory_type if suggested_type else False ) } # AI Integration Hook: Cursor AI can enhance content optimization # here. The AI can improve content structure, add semantic tags, # enhance searchability while preserving the original meaning # and context logger.debug( f"🔧 Content optimized for {memory_type} storage " f"({len(content)} → {len(optimized_content)} chars)" ) return result except Exception as e: logger.error(f"❌ Failed to optimize content: {e}") return { 'optimized_content': content, # Return original on failure 'memory_type': memory_type, 'error': str(e) } def _optimize_for_learned_memory(self, content: str) -> str: """Optimize content for learned memory storage.""" # Emphasize lessons, insights, and patterns # AI can enhance this to better identify and highlight learning points return content def _optimize_for_agent_memory(self, content: str) -> str: """Optimize content for agent-specific memory storage.""" # Preserve personal context and actionable items # AI can enhance this to better identify tasks and personal references return content def _optimize_for_global_memory(self, content: str) -> str: """Optimize content for global memory storage.""" # Focus on reference value and documentation clarity # AI can enhance this to improve searchability and reference utility return content # Chunking Methods def chunk_content( self, content: str, preserve_headers: bool = True ) -> List[Dict[str, Union[str, int]]]: """Chunk content into smaller pieces with AI-optimized boundaries. Args: content: Content to chunk preserve_headers: Whether to preserve header context in chunks Returns: List of chunk dictionaries with content, index, and metadata """ try: chunks = [] # Handle empty content if not content or not content.strip(): return [{ 'content': '', 'chunk_index': 0, 'token_count': 0 }] if preserve_headers: # Header-aware chunking sections = self.extract_sections(content) for section in sections: section_content = f"# {section['title']}\n\n{section['content']}" section_chunks = self._split_text_by_tokens(section_content) for i, chunk_text in enumerate(section_chunks): chunks.append({ 'content': chunk_text, 'chunk_index': len(chunks), 'section_title': section['title'], 'section_level': section['level'], 'section_chunk_index': i, 'token_count': self._estimate_tokens(chunk_text) }) else: # Simple text chunking chunk_texts = self._split_text_by_tokens(content) for i, chunk_text in enumerate(chunk_texts): chunks.append({ 'content': chunk_text, 'chunk_index': i, 'token_count': self._estimate_tokens(chunk_text) }) logger.debug(f"📄 Created {len(chunks)} chunks from content") return chunks except Exception as e: logger.error(f"❌ Failed to chunk content: {e}") # Return single chunk on failure return [{ 'content': content, 'chunk_index': 0, 'token_count': self._estimate_tokens(content), 'error': str(e) }] def _estimate_tokens(self, text: str) -> int: """Rough estimation of token count (approximately 4 chars per token).""" return len(text) // 4 def _split_text_by_tokens(self, text: str) -> List[str]: """Split text into chunks based on estimated token count.""" estimated_tokens = self._estimate_tokens(text) if estimated_tokens <= self.chunk_size: return [text] chunks = [] sentences = re.split(r'(?<=[.!?])\s+', text) current_chunk = "" current_tokens = 0 for sentence in sentences: sentence_tokens = self._estimate_tokens(sentence) if (current_tokens + sentence_tokens > self.chunk_size and current_chunk): # Add current chunk and start new one with overlap chunks.append(current_chunk.strip()) # Create overlap by keeping last part of current chunk overlap_text = self._get_overlap_text( current_chunk, self.chunk_overlap) current_chunk = overlap_text + " " + sentence current_tokens = self._estimate_tokens(current_chunk) else: current_chunk += " " + sentence if current_chunk else sentence current_tokens += sentence_tokens # Add final chunk if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks def _get_overlap_text(self, text: str, max_overlap_tokens: int) -> str: """Get overlap text from the end of a chunk.""" overlap_chars = max_overlap_tokens * 4 # Rough estimation if len(text) <= overlap_chars: return text # Find good break point from the end overlap_text = text[-overlap_chars:] first_space = overlap_text.find(' ') if first_space > 0: return overlap_text[first_space:].strip() return overlap_text.strip() # Policy Processing Methods async def scan_policy_directory( self, directory: str = "./policy" ) -> List[Dict[str, Union[str, int, List[str], bool]]]: """Scan policy directory for markdown files with rule validation. Args: directory: Policy directory path (default ./policy) Returns: List of policy file information with rule counts """ try: policy_files = await self.scan_directory_for_markdown( directory, recursive=False) for file_info in policy_files: # Add policy-specific metadata content = await self.read_markdown_file(file_info['path']) rules = self.extract_policy_rules(content) file_info.update({ 'rule_count': len(rules), 'rule_ids': [rule['rule_id'] for rule in rules], 'is_policy_file': True }) logger.info(f"📋 Found {len(policy_files)} policy files with rules") return policy_files except Exception as e: logger.error(f"❌ Failed to scan policy directory {directory}: {e}") raise def extract_policy_rules(self, content: str) -> List[Dict[str, str]]: """Extract rule IDs and content from policy markdown. Args: content: Policy markdown content Returns: List of rules with IDs, sections, and content """ try: rules = [] sections = self.extract_sections(content) # Rule ID pattern: [ANY-FORMAT] to capture all potential rules rule_pattern = r'\[([A-Z]+(?:-\d+)?[^\]]*)\]\s*(.+?)(?=\n|$)' for section in sections: section_rules = re.finditer( rule_pattern, section['content'], re.MULTILINE) for match in section_rules: rule_id = match.group(1) rule_text = match.group(2).strip() # Extract full rule context full_content = self._extract_rule_context( section['content'], match.start()) rules.append({ 'rule_id': rule_id, 'section': section['title'], 'section_level': section['level'], 'rule_text': rule_text, 'full_content': full_content, 'position': len(rules) }) logger.debug(f"📋 Extracted {len(rules)} policy rules") return rules except Exception as e: logger.error(f"❌ Failed to extract policy rules: {e}") return [] def _extract_rule_context(self, content: str, rule_start_pos: int) -> str: """Extract full context for a policy rule.""" lines = content[rule_start_pos:].split('\n') context_lines = [] for line in lines: # Stop at next rule or empty lines that might indicate section break if (len(context_lines) > 0 and (re.match(r'\[([A-Z]+-\d+)\]', line) or (line.strip() == '' and len(context_lines) > 3))): break context_lines.append(line) return '\n'.join(context_lines).strip() def validate_policy_rules( self, rules: List[Dict[str, str]], policy_version: Optional[str] = None ) -> Dict[str, Union[bool, List[str], int, str]]: """Validate policy rules for uniqueness and format compliance. Args: rules: List of extracted rules policy_version: Optional policy version for tracking Returns: Validation results with success status and any errors """ try: validation_result = { 'valid': True, 'errors': [], 'warnings': [], 'rule_count': len(rules), 'unique_rules': len(set(rule['rule_id'] for rule in rules)), 'policy_version': policy_version } # Check for duplicate rule IDs rule_ids = [rule['rule_id'] for rule in rules] duplicates = [rule_id for rule_id in set(rule_ids) if rule_ids.count(rule_id) > 1] if duplicates: validation_result['valid'] = False validation_result['errors'].append( f"Duplicate rule IDs found: {duplicates}") # Check rule ID format (P-001, F-101, R-201, etc.) valid_pattern = re.compile(r'^[A-Z]+-\d+$') invalid_ids = [rule['rule_id'] for rule in rules if not valid_pattern.match(rule['rule_id'])] if invalid_ids: validation_result['valid'] = False validation_result['errors'].append( f"Invalid rule ID format: {invalid_ids}") # Check for empty rules empty_rules = [rule['rule_id'] for rule in rules if not rule['rule_text'].strip()] if empty_rules: validation_result['warnings'].append( f"Empty rules found: {empty_rules}") logger.debug( f"📋 Policy validation: " f"{validation_result['unique_rules']}/" f"{validation_result['rule_count']} unique rules" ) return validation_result except Exception as e: logger.error(f"❌ Failed to validate policy rules: {e}") return { 'valid': False, 'errors': [f"Validation failed: {str(e)}"], 'rule_count': len(rules) if rules else 0 } def generate_policy_hash( self, rules: List[Dict[str, str]], policy_version: str ) -> str: """Generate SHA-256 hash for policy version. Args: rules: List of policy rules policy_version: Version identifier Returns: SHA-256 hash of policy content """ try: # Create deterministic string from rules rule_strings = [] for rule in sorted(rules, key=lambda x: x['rule_id']): rule_string = (f"{rule['rule_id']}:{rule['section']}:" f"{rule['rule_text']}") rule_strings.append(rule_string) combined_content = f"{policy_version}:{'|'.join(rule_strings)}" policy_hash = hashlib.sha256( combined_content.encode('utf-8')).hexdigest() logger.debug( f"📋 Generated policy hash: {policy_hash[:8]}... " f"for {len(rules)} rules" ) return policy_hash except Exception as e: logger.error(f"❌ Failed to generate policy hash: {e}") return "" # Batch Processing Methods async def process_directory_batch( self, directory: str = "./", memory_type: Optional[str] = None, auto_suggest: bool = True, ai_enhance: bool = True, recursive: bool = True ) -> Dict[str, Union[List[Dict], int, str, Dict[str, int], bool]]: """Process entire directory with batch AI-enhanced analysis. Args: directory: Directory to process memory_type: Fixed memory type (None for auto-suggestion) auto_suggest: Whether to auto-suggest memory types ai_enhance: Whether to apply AI enhancements recursive: Whether to scan subdirectories Returns: Dictionary with processing results and file analysis """ try: # Scan directory for files files = await self.scan_directory_for_markdown(directory, recursive) processing_results = { 'total_files': len(files), 'processed_files': [], 'failed_files': [], 'memory_type_suggestions': {}, 'directory': directory, 'recursive': recursive, 'ai_enhanced': ai_enhance } for file_info in files: try: # Read and analyze file content = await self.read_markdown_file(file_info['path']) analysis = self.analyze_content_for_memory_type( content, file_info['path'], auto_suggest ) # Use fixed memory type or suggested type final_memory_type = (memory_type or analysis.get('suggested_memory_type', 'global')) # Optimize content for storage optimization = self.optimize_content_for_storage( content, final_memory_type, ai_enhance, analysis.get('suggested_memory_type') ) # Create file processing result file_result = { **file_info, 'analysis': analysis, 'optimization': optimization, 'final_memory_type': final_memory_type, 'processing_status': 'success' } processing_results['processed_files'].append(file_result) # Track memory type suggestions suggested_type = analysis.get('suggested_memory_type', 'global') if suggested_type not in processing_results['memory_type_suggestions']: processing_results['memory_type_suggestions'][suggested_type] = 0 processing_results['memory_type_suggestions'][suggested_type] += 1 except Exception as file_error: logger.error( f"❌ Failed to process file {file_info['path']}: " f"{file_error}" ) processing_results['failed_files'].append({ **file_info, 'error': str(file_error), 'processing_status': 'failed' }) success_count = len(processing_results['processed_files']) logger.info( f"📁 Batch processing complete: {success_count}/" f"{len(files)} files processed" ) return processing_results except Exception as e: logger.error(f"❌ Failed to process directory batch {directory}: {e}") return { 'total_files': 0, 'processed_files': [], 'failed_files': [], 'error': str(e) } # Utility Methods def calculate_content_hash(self, content: str) -> str: """Calculate SHA-256 hash of content for deduplication.""" return hashlib.sha256(content.encode('utf-8')).hexdigest() def get_file_metadata( self, file_path: str, content: str ) -> Dict[str, Union[str, int, Optional[float]]]: """Generate comprehensive file metadata. Args: file_path: Path to the file content: File content Returns: Dictionary with file metadata """ try: path = Path(file_path) metadata = { 'file_path': str(path.resolve()), 'file_name': path.name, 'file_size': len(content.encode('utf-8')), 'content_hash': self.calculate_content_hash(content), 'word_count': self.get_word_count(content), 'section_count': len(self.extract_sections(content)), 'last_modified': path.stat().st_mtime if path.exists() else None, 'file_extension': path.suffix, 'directory': str(path.parent) } return metadata except Exception as e: logger.error(f"❌ Failed to generate file metadata: {e}") return { 'file_path': file_path, 'error': str(e) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server