Skip to main content
Glama

Documentation Search MCP Server

content_enhancer.py21.1 kB
""" Content enhancement features for documentation-search-enhanced MCP server. Adds smart parsing, code extraction, version awareness, and contextual recommendations. """ import re import os import sys from typing import Dict, List, Any, Optional from dataclasses import dataclass import httpx from datetime import datetime, timedelta @dataclass class CodeSnippet: """Represents an extracted code snippet""" language: str code: str description: str line_number: Optional[int] = None is_complete: bool = False imports: List[str] = None def __post_init__(self): if self.imports is None: self.imports = [] @dataclass class DocumentationSection: """Represents a section of documentation""" title: str content: str code_snippets: List[CodeSnippet] cross_references: List[str] section_type: str # "tutorial", "reference", "example", "guide" difficulty_level: str # "beginner", "intermediate", "advanced" SUMMARIZER_ENDPOINT = os.getenv("SUMMARY_API_URL") SUMMARIZER_KEY = os.getenv("SUMMARY_API_KEY") class ContentEnhancer: """Enhances documentation content with smart parsing and features""" def __init__(self): self.version_cache = {} self.cross_ref_cache = {} async def enhance_content( self, content: str, library: str, query: str ) -> Dict[str, Any]: """Main content enhancement pipeline""" enhanced = { "original_content": content, "library": library, "query": query, "enhanced_at": datetime.utcnow().isoformat(), "enhancements": {}, } # Extract and enhance code snippets code_snippets = self.extract_code_snippets(content) enhanced["enhancements"]["code_snippets"] = [ { "language": snippet.language, "code": snippet.code, "description": snippet.description, "is_complete": snippet.is_complete, "imports": snippet.imports, } for snippet in code_snippets ] # Parse into structured sections sections = self.parse_sections(content) enhanced["enhancements"]["sections"] = [ { "title": section.title, "content": ( section.content[:500] + "..." if len(section.content) > 500 else section.content ), "section_type": section.section_type, "difficulty_level": section.difficulty_level, "code_count": len(section.code_snippets), } for section in sections ] # Add contextual recommendations enhanced["enhancements"]["recommendations"] = ( await self.get_contextual_recommendations(library, query) ) # Extract and resolve cross-references enhanced["enhancements"]["cross_references"] = self.extract_cross_references( content, library ) # Add version information enhanced["enhancements"]["version_info"] = await self.get_version_info(library) # Generate quick summary enhanced["enhancements"]["summary"] = self.generate_summary(content, query) return enhanced def extract_code_snippets(self, content: str) -> List[CodeSnippet]: """Extract and analyze code snippets from content""" snippets = [] # Patterns for different code block formats patterns = [ r"```(\w+)?\n(.*?)```", # Markdown code blocks r"<code[^>]*>(.*?)</code>", # HTML code tags r"<pre[^>]*><code[^>]*>(.*?)</code></pre>", # HTML pre+code r".. code-block:: (\w+)\n\n(.*?)(?=\n\S|\Z)", # reStructuredText ] for pattern in patterns: matches = re.finditer(pattern, content, re.DOTALL | re.IGNORECASE) for match in matches: if len(match.groups()) == 2: language = match.group(1) or "text" code = match.group(2).strip() else: language = "text" code = match.group(1).strip() if len(code) > 10: # Filter out very short snippets snippet = self.analyze_code_snippet(code, language) snippets.append(snippet) return snippets def analyze_code_snippet(self, code: str, language: str) -> CodeSnippet: """Analyze a code snippet for completeness and imports""" description = self.generate_code_description(code, language) imports = self.extract_imports(code, language) is_complete = self.is_code_complete(code, language) return CodeSnippet( language=language.lower(), code=code, description=description, is_complete=is_complete, imports=imports, ) def generate_code_description(self, code: str, language: str) -> str: """Generate a description for a code snippet""" # Common patterns and descriptions patterns = { # Python patterns r"def\s+(\w+)": "Function definition: {}", r"class\s+(\w+)": "Class definition: {}", r"import\s+(\w+)": "Import: {}", r"from\s+(\w+)\s+import": "Import from: {}", r"@\w+": "Decorator usage", r"async\s+def": "Async function definition", r'if\s+__name__\s*==\s*["\']__main__["\']': "Main execution block", # JavaScript patterns r"function\s+(\w+)": "Function: {}", r"const\s+(\w+)": "Constant: {}", r"let\s+(\w+)": "Variable: {}", r"export\s+": "Export statement", r"import\s+.*from": "Import statement", # FastAPI/web patterns r"@app\.(get|post|put|delete)": "API endpoint definition", r"FastAPI\(\)": "FastAPI application initialization", r"app\s*=\s*FastAPI": "FastAPI app creation", } descriptions = [] for pattern, desc in patterns.items(): matches = re.finditer(pattern, code, re.IGNORECASE) for match in matches: if "{}" in desc and len(match.groups()) > 0: descriptions.append(desc.format(match.group(1))) else: descriptions.append(desc) if descriptions: return "; ".join(descriptions[:3]) # Limit to first 3 descriptions else: return f"{language.title()} code snippet" def extract_imports(self, code: str, language: str) -> List[str]: """Extract import statements from code""" imports = [] if language.lower() in ["python", "py"]: # Python imports import_patterns = [r"import\s+([^\s,\n]+)", r"from\s+([^\s,\n]+)\s+import"] for pattern in import_patterns: matches = re.finditer(pattern, code, re.MULTILINE) imports.extend([match.group(1) for match in matches]) elif language.lower() in ["javascript", "js", "typescript", "ts"]: # JavaScript/TypeScript imports import_patterns = [ r'import\s+.*from\s+["\']([^"\']+)["\']', r'require\(["\']([^"\']+)["\']\)', ] for pattern in import_patterns: matches = re.finditer(pattern, code) imports.extend([match.group(1) for match in matches]) return list(set(imports)) # Remove duplicates def is_code_complete(self, code: str, language: str) -> bool: """Determine if a code snippet is complete/runnable""" code = code.strip() # Check for common completeness indicators completeness_indicators = { "python": [ r'if\s+__name__\s*==\s*["\']__main__["\']', # Main block r"def\s+\w+.*:\s*\n.*return", # Function with return r"class\s+\w+.*:\s*\n.*def\s+__init__", # Class with constructor ], "javascript": [ r"function\s+\w+.*{.*}", # Complete function r".*\.exports\s*=", # Module export r"export\s+default", # ES6 export ], } lang_key = language.lower() if lang_key in completeness_indicators: for pattern in completeness_indicators[lang_key]: if re.search(pattern, code, re.DOTALL): return True # Basic completeness checks if language.lower() in ["python", "py"]: # Check for balanced brackets and basic structure return ( code.count("(") == code.count(")") and code.count("[") == code.count("]") and code.count("{") == code.count("}") and len(code.split("\n")) >= 3 ) return len(code) > 50 # Fallback: assume longer snippets are more complete def parse_sections(self, content: str) -> List[DocumentationSection]: """Parse content into structured sections""" sections = [] # Split by headers (markdown and HTML) header_patterns = [ r"^#{1,6}\s+(.+)$", # Markdown headers r"<h[1-6][^>]*>(.*?)</h[1-6]>", # HTML headers ] current_section = "" current_title = "Introduction" lines = content.split("\n") for line in lines: is_header = False for pattern in header_patterns: match = re.match(pattern, line, re.IGNORECASE) if match: # Save previous section if it has content if current_section.strip(): section = self.create_section(current_title, current_section) sections.append(section) # Start new section current_title = re.sub(r"<[^>]+>", "", match.group(1)).strip() current_section = "" is_header = True break if not is_header: current_section += line + "\n" # Add final section if current_section.strip(): section = self.create_section(current_title, current_section) sections.append(section) return sections def create_section(self, title: str, content: str) -> DocumentationSection: """Create a DocumentationSection with analysis""" code_snippets = self.extract_code_snippets(content) cross_refs = self.extract_cross_references(content, "") section_type = self.classify_section_type(title, content) difficulty = self.assess_difficulty(title, content, code_snippets) return DocumentationSection( title=title, content=content, code_snippets=code_snippets, cross_references=cross_refs, section_type=section_type, difficulty_level=difficulty, ) def classify_section_type(self, title: str, content: str) -> str: """Classify the type of documentation section""" title_lower = title.lower() content_lower = content.lower() # Classification patterns if any( word in title_lower for word in ["tutorial", "guide", "walkthrough", "step"] ): return "tutorial" elif any(word in title_lower for word in ["example", "demo", "sample"]): return "example" elif any(word in title_lower for word in ["api", "reference", "documentation"]): return "reference" elif any( word in content_lower for word in ["first", "getting started", "quickstart", "introduction"] ): return "guide" else: return "guide" # Default def assess_difficulty( self, title: str, content: str, code_snippets: List[CodeSnippet] ) -> str: """Assess the difficulty level of a section""" difficulty_score = 0 # Title indicators title_lower = title.lower() if any( word in title_lower for word in ["advanced", "expert", "deep", "complex"] ): difficulty_score += 3 elif any(word in title_lower for word in ["intermediate", "moderate"]): difficulty_score += 2 elif any(word in title_lower for word in ["basic", "simple", "intro", "quick"]): difficulty_score += 1 # Content complexity indicators content_lower = content.lower() # Advanced concepts advanced_terms = [ "async", "concurrent", "threading", "multiprocessing", "decorator", "metaclass", "inheritance", "polymorphism", "dependency injection", ] difficulty_score += ( sum(1 for term in advanced_terms if term in content_lower) * 0.5 ) # Code complexity if code_snippets: avg_code_length = sum(len(snippet.code) for snippet in code_snippets) / len( code_snippets ) if avg_code_length > 200: difficulty_score += 2 elif avg_code_length > 100: difficulty_score += 1 # Return difficulty level if difficulty_score >= 4: return "advanced" elif difficulty_score >= 2: return "intermediate" else: return "beginner" def extract_cross_references(self, content: str, library: str) -> List[str]: """Extract cross-references to other libraries or concepts""" cross_refs = [] # Common library mentions library_patterns = [ r"\b(fastapi|django|flask|express|react|vue|angular)\b", r"\b(numpy|pandas|matplotlib|scikit-learn)\b", r"\b(tensorflow|pytorch|keras)\b", r"\b(docker|kubernetes|aws|azure|gcp)\b", ] for pattern in library_patterns: matches = re.finditer(pattern, content, re.IGNORECASE) cross_refs.extend([match.group(1).lower() for match in matches]) # Remove the current library from cross-references cross_refs = [ref for ref in cross_refs if ref != library.lower()] return list(set(cross_refs)) # Remove duplicates async def get_contextual_recommendations( self, library: str, query: str ) -> List[Dict[str, str]]: """Get contextual recommendations based on library and query""" recommendations = [] # Library-specific recommendations lib_recommendations = { "fastapi": [ { "type": "related_library", "name": "pydantic", "reason": "Data validation and settings", }, { "type": "related_library", "name": "uvicorn", "reason": "ASGI server for FastAPI", }, { "type": "concept", "name": "async/await", "reason": "Essential for FastAPI performance", }, ], "react": [ { "type": "related_library", "name": "typescript", "reason": "Type safety for React applications", }, { "type": "related_library", "name": "tailwind", "reason": "Utility-first CSS framework", }, {"type": "concept", "name": "hooks", "reason": "Modern React pattern"}, ], "django": [ { "type": "related_library", "name": "django-rest-framework", "reason": "API development", }, { "type": "related_library", "name": "celery", "reason": "Background tasks", }, {"type": "concept", "name": "orm", "reason": "Database abstraction"}, ], } if library.lower() in lib_recommendations: recommendations.extend(lib_recommendations[library.lower()]) # Query-specific recommendations query_lower = query.lower() if "auth" in query_lower: recommendations.append( { "type": "security", "name": "JWT tokens", "reason": "Secure authentication method", } ) elif "database" in query_lower: recommendations.append( { "type": "related_library", "name": "sqlalchemy", "reason": "Python SQL toolkit and ORM", } ) elif "api" in query_lower: recommendations.append( { "type": "concept", "name": "REST principles", "reason": "API design best practices", } ) return recommendations[:5] # Limit to 5 recommendations async def get_version_info(self, library: str) -> Dict[str, Any]: """Get version information for a library""" if library in self.version_cache: cached_time, version_info = self.version_cache[library] if datetime.now() - cached_time < timedelta(hours=24): return version_info version_info = { "current_version": "unknown", "release_date": "unknown", "is_latest": True, "changelog_url": None, } try: # Try to get version info from PyPI for Python packages if library in ["fastapi", "django", "flask", "pandas", "numpy"]: async with httpx.AsyncClient() as client: response = await client.get( f"https://pypi.org/pypi/{library}/json", timeout=5.0 ) if response.status_code == 200: data = response.json() version_info.update( { "current_version": data["info"]["version"], "release_date": data["releases"][ data["info"]["version"] ][0]["upload_time"][:10], "changelog_url": data["info"] .get("project_urls", {}) .get("Changelog"), } ) except Exception: pass # Fallback to unknown version # Cache the result self.version_cache[library] = (datetime.now(), version_info) return version_info def generate_summary(self, content: str, query: str) -> str: """Generate a concise summary of the content""" if SUMMARIZER_ENDPOINT and SUMMARIZER_KEY: try: payload = {"query": query, "context": content[:4000]} headers = { "Authorization": f"Bearer {SUMMARIZER_KEY}", "Content-Type": "application/json", } response = httpx.post( SUMMARIZER_ENDPOINT, json=payload, headers=headers, timeout=15 ) response.raise_for_status() data = response.json() summary = data.get("summary") or data.get("result") if summary: return summary except Exception as exc: print(f"⚠️ LLM summarization failed: {exc}", file=sys.stderr) sentences = re.split(r"[.!?]+", content) query_words = set(query.lower().split()) scored_sentences: List[tuple[int, str]] = [] for sentence in sentences[:10]: sentence = sentence.strip() if len(sentence) > 20: words = set(sentence.lower().split()) score = len(query_words.intersection(words)) scored_sentences.append((score, sentence)) scored_sentences.sort(key=lambda x: x[0], reverse=True) top_sentences = [sent for score, sent in scored_sentences[:3] if score > 0] if top_sentences: return ". ".join(top_sentences)[:300] + "..." for sentence in sentences: sentence = sentence.strip() if len(sentence) > 20: return sentence[:300] + "..." return "Documentation content for " + query # Global content enhancer instance content_enhancer = ContentEnhancer()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anton-prosterity/documentation-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server