Skip to main content
Glama

Documentation Search MCP Server

main.py101 kB
import json import os import hashlib import time from datetime import datetime, timedelta from typing import Dict, Any, Optional, List, Union import asyncio import httpx from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv from importlib import resources from .smart_search import smart_search, SearchResult from .web_scraper import scraper from .config_validator import validate_config, Config as AppConfig from .content_enhancer import content_enhancer import sys import atexit # Load the environment variables load_dotenv() # Initialize the MCP server mcp = FastMCP("docs") USER_AGENT = "docs-app/1.0" SERPER_URL = "https://google.serper.dev/search" # Environment variables (removing API key exposure) SERPER_API_KEY = os.getenv("SERPER_API_KEY") # Simple in-memory cache with TTL class SimpleCache: def __init__( self, ttl_hours: int = 24, max_entries: int = 1000, persistence_enabled: bool = False, persist_path: Optional[str] = None, ): self.cache: Dict[str, Dict[str, Any]] = {} self.ttl_hours = ttl_hours self.max_entries = max_entries self.persistence_enabled = persistence_enabled self.persist_path = persist_path self._lock = asyncio.Lock() if self.persistence_enabled and self.persist_path: self._load_from_disk() def _is_expired(self, timestamp: datetime) -> bool: return datetime.now() - timestamp > timedelta(hours=self.ttl_hours) async def get(self, key: str) -> Optional[str]: async with self._lock: if key in self.cache: entry = self.cache[key] if not self._is_expired(entry["timestamp"]): return entry["data"] del self.cache[key] return None async def set(self, key: str, data: str) -> None: async with self._lock: await self._cleanup_locked() if len(self.cache) >= self.max_entries: oldest_key = min( self.cache.keys(), key=lambda k: self.cache[k]["timestamp"] ) del self.cache[oldest_key] self.cache[key] = {"data": data, "timestamp": datetime.now()} await self._persist_locked() async def clear_expired(self) -> None: async with self._lock: await self._cleanup_locked() await self._persist_locked() async def stats(self) -> Dict[str, Any]: async with self._lock: expired_count = sum( 1 for entry in self.cache.values() if self._is_expired(entry["timestamp"]) ) return { "total_entries": len(self.cache), "expired_entries": expired_count, "active_entries": len(self.cache) - expired_count, "max_entries": self.max_entries, "ttl_hours": self.ttl_hours, "memory_usage_estimate": f"{len(str(self.cache)) / 1024:.2f} KB", } async def clear(self) -> int: async with self._lock: removed = len(self.cache) self.cache.clear() await self._persist_locked() return removed async def _cleanup_locked(self) -> None: expired_keys = [ k for k, v in self.cache.items() if self._is_expired(v["timestamp"]) ] for key in expired_keys: del self.cache[key] def _load_from_disk(self) -> None: try: if not os.path.exists(self.persist_path or ""): return with open(self.persist_path, "r", encoding="utf-8") as fh: raw = json.load(fh) for key, entry in raw.items(): try: timestamp = datetime.fromisoformat(entry["timestamp"]) if not self._is_expired(timestamp): self.cache[key] = { "data": entry["data"], "timestamp": timestamp, } except Exception: continue except Exception as exc: print(f"⚠️ Failed to load cache persistence: {exc}", file=sys.stderr) async def _persist_locked(self) -> None: if not (self.persistence_enabled and self.persist_path): return try: serialisable = { key: { "data": value["data"], "timestamp": value["timestamp"].isoformat(), } for key, value in self.cache.items() if not self._is_expired(value["timestamp"]) } tmp_path = f"{self.persist_path}.tmp" with open(tmp_path, "w", encoding="utf-8") as fh: json.dump(serialisable, fh) os.replace(tmp_path, self.persist_path) except Exception as exc: print(f"⚠️ Failed to persist cache: {exc}", file=sys.stderr) class TokenBucketRateLimiter: def __init__(self, requests_per_minute: int, burst: int): self.capacity = max(burst, requests_per_minute, 1) self.refill_rate = requests_per_minute / 60 if requests_per_minute > 0 else 0 self.tokens: Dict[str, float] = {} self.last_refill: Dict[str, float] = {} self._lock = asyncio.Lock() async def acquire(self, key: str = "global") -> None: if self.refill_rate == 0: return while True: async with self._lock: now = time.monotonic() tokens = self.tokens.get(key, float(self.capacity)) last = self.last_refill.get(key, now) elapsed = now - last if elapsed > 0: tokens = min(self.capacity, tokens + elapsed * self.refill_rate) if tokens >= 1: self.tokens[key] = tokens - 1 self.last_refill[key] = now return wait_time = (1 - tokens) / self.refill_rate if self.refill_rate else 0 self.tokens[key] = tokens self.last_refill[key] = now await asyncio.sleep(wait_time) def load_config() -> AppConfig: """Load and validate the configuration file. Priority: 1. Looks for `config.json` in the current working directory. 2. Falls back to the `config.json` bundled with the package. """ config_data = None local_config_path = os.path.join(os.getcwd(), "config.json") try: # 1. Prioritize local config file if os.path.exists(local_config_path): print("📝 Found local config.json. Loading...", file=sys.stderr) with open(local_config_path, "r") as f: config_data = json.load(f) else: # 2. Fallback to packaged config try: config_text = resources.read_text( "documentation_search_enhanced", "config.json" ) config_data = json.loads(config_text) except (FileNotFoundError, ModuleNotFoundError): # This is a critical failure if the package is broken print("FATAL: Packaged config.json not found.", file=sys.stderr) raise except Exception as e: print(f"FATAL: Could not read config.json. Error: {e}", file=sys.stderr) raise if not config_data: raise FileNotFoundError("Could not find or load config.json") try: validated_config = validate_config(config_data) print("✅ Configuration successfully loaded and validated.", file=sys.stderr) return validated_config except Exception as e: # Pydantic's ValidationError print( "❌ FATAL: Configuration validation failed. Please check your config.json.", file=sys.stderr, ) print(e, file=sys.stderr) raise # Load configuration config_model = load_config() config = config_model.model_dump() # Use the dict version for existing logic docs_urls = {} # Handle both old simple URL format and new enhanced format for lib_name, lib_data in config.get("docs_urls", {}).items(): if isinstance(lib_data, dict): docs_urls[lib_name] = lib_data.get("url", "") else: docs_urls[lib_name] = lib_data cache_config = config.get("cache", {"enabled": False}) cache_persistence_enabled = cache_config.get("persistence_enabled", False) cache_persist_path = cache_config.get("persist_path") if cache_persistence_enabled and not cache_persist_path: cache_persist_path = os.path.join(os.getcwd(), ".docs_cache.json") # Initialize cache if enabled cache = ( SimpleCache( ttl_hours=cache_config.get("ttl_hours", 24), max_entries=cache_config.get("max_entries", 1000), persistence_enabled=cache_persistence_enabled, persist_path=cache_persist_path, ) if cache_config.get("enabled", False) else None ) http_client: Optional[httpx.AsyncClient] = None scrape_semaphore = asyncio.Semaphore( config.get("server_config", {}).get("max_concurrent_requests", 10) ) rate_limit_config = config.get("rate_limiting", {"enabled": False}) rate_limiter = ( TokenBucketRateLimiter( requests_per_minute=rate_limit_config.get("requests_per_minute", 60), burst=rate_limit_config.get("burst_requests", 10), ) if rate_limit_config.get("enabled", False) else None ) async def enforce_rate_limit(tool_name: str) -> None: if rate_limiter: await rate_limiter.acquire(tool_name) async def search_web_with_retry( query: str, max_retries: int = 3, num_results: int = 3 ) -> dict: """Search web with exponential backoff retry logic""" if not SERPER_API_KEY: print( "⚠️ SERPER_API_KEY not set - web search functionality will be limited", file=sys.stderr, ) return {"organic": []} global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) payload = json.dumps({"q": query, "num": num_results}) headers = { "X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json", "User-Agent": USER_AGENT, } for attempt in range(max_retries): try: response = await http_client.post( SERPER_URL, headers=headers, content=payload, ) response.raise_for_status() return response.json() except httpx.TimeoutException: if attempt == max_retries - 1: print( f"Timeout after {max_retries} attempts for query: {query}", file=sys.stderr, ) return {"organic": []} await asyncio.sleep(2**attempt) # Exponential backoff except httpx.HTTPStatusError as e: if e.response.status_code == 429: # Rate limited if attempt == max_retries - 1: print(f"Rate limited after {max_retries} attempts", file=sys.stderr) return {"organic": []} await asyncio.sleep(2 ** (attempt + 2)) # Longer wait for rate limits else: print(f"HTTP error {e.response.status_code}: {e}", file=sys.stderr) return {"organic": []} except Exception as e: if attempt == max_retries - 1: print( f"Unexpected error after {max_retries} attempts: {e}", file=sys.stderr, ) return {"organic": []} await asyncio.sleep(2**attempt) return {"organic": []} async def fetch_url_with_cache(url: str, max_retries: int = 3) -> str: """Fetch URL content with caching and a Playwright-based scraper.""" cache_key = hashlib.md5(url.encode()).hexdigest() if cache: cached_content = await cache.get(cache_key) if cached_content: return cached_content # Use the new Playwright scraper async with scrape_semaphore: content = await scraper.scrape_url(url) if cache and "Error:" not in content: await cache.set(cache_key, content) return content # Backward compatibility aliases async def search_web(query: str, num_results: int = 3) -> dict: return await search_web_with_retry(query, num_results=num_results) async def fetch_url(url: str) -> str: return await fetch_url_with_cache(url) # Configure smart search now that the helpers are in place smart_search.configure(docs_urls, search_web) async def shutdown_resources() -> None: global http_client if http_client: await http_client.aclose() http_client = None await scraper.close() def _cleanup_sync() -> None: try: loop = asyncio.get_running_loop() except RuntimeError: try: asyncio.run(shutdown_resources()) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(shutdown_resources()) loop.close() else: loop.create_task(shutdown_resources()) atexit.register(_cleanup_sync) @mcp.tool() async def get_docs(query: str, libraries: Union[str, List[str]]): """ Search the latest docs for a given query and one or more libraries. Args: query: The query to search for (e.g., "Chroma DB") libraries: A single library or a list of libraries to search in (e.g., "langchain" or ["fastapi", "django"]) Returns: Dictionary with structured summaries and supporting metadata """ await enforce_rate_limit("get_docs") if isinstance(libraries, str): libraries = [libraries] config_dict = config_model.model_dump() library_summaries: List[Dict[str, Any]] = [] summary_sections: List[str] = [] for library in libraries: lib_entry: Dict[str, Any] = { "library": library, "requested_query": query, "status": "searched", "results": [], } lib_config = config_dict.get("docs_urls", {}).get(library, {}) auto_approve = lib_config.get("auto_approve", True) if not auto_approve: print( f"⚠️ Requesting approval to search {library} documentation...", file=sys.stderr, ) docs_root = docs_urls.get(library) if not docs_root: lib_entry.update( { "status": "unsupported", "message": f"Library '{library}' not supported by this tool", } ) library_summaries.append(lib_entry) summary_sections.append( f"### {library}\n- Unsupported library; no documentation root configured." ) continue search_query = f"site:{docs_root} {query}" search_results = await search_web(search_query, num_results=5) organic_results = (search_results.get("organic") or [])[:3] if not organic_results: lib_entry.update( { "status": "no_results", "message": "No indexed documentation results returned", } ) library_summaries.append(lib_entry) summary_sections.append(f"### {library}\n- No results for query '{query}'.") continue fetch_tasks = [fetch_url(result["link"]) for result in organic_results] fetched_contents = await asyncio.gather(*fetch_tasks, return_exceptions=True) library_lines = [f"### {library}"] for result, content in zip(organic_results, fetched_contents): entry: Dict[str, Any] = { "title": result.get("title") or result.get("link"), "url": result.get("link"), "source_snippet": result.get("snippet", ""), } if isinstance(content, Exception): error_message = str(content) entry["status"] = "error" entry["error"] = error_message library_lines.append( f"- {entry['title']}: failed to fetch ({error_message})" ) else: content_str = str(content) summary = content_enhancer.generate_summary(content_str, query) code_snippet_count = len( content_enhancer.extract_code_snippets(content_str) ) entry.update( { "status": "ok", "summary": summary, "code_snippet_count": code_snippet_count, } ) bullet_summary = summary if summary else "No summary extracted." library_lines.append( f"- {entry['title']}: {bullet_summary} (code snippets: {code_snippet_count})" ) lib_entry["results"].append(entry) lib_entry["total_results"] = len(lib_entry["results"]) library_summaries.append(lib_entry) summary_sections.append("\n".join(library_lines)) if cache: await cache.clear_expired() return { "query": query, "libraries": library_summaries, "summary_markdown": "\n\n".join(summary_sections), } @mcp.tool() async def suggest_libraries(partial_name: str): """ Suggest libraries based on partial input for auto-completion. Args: partial_name: Partial library name to search for (e.g. "lang" -> ["langchain"]) Returns: List of matching library names """ if not partial_name: return list(sorted(docs_urls.keys())) partial_lower = partial_name.lower() suggestions = [] # Exact matches first for lib in docs_urls.keys(): if lib.lower() == partial_lower: suggestions.append(lib) # Starts with matches for lib in docs_urls.keys(): if lib.lower().startswith(partial_lower) and lib not in suggestions: suggestions.append(lib) # Contains matches for lib in docs_urls.keys(): if partial_lower in lib.lower() and lib not in suggestions: suggestions.append(lib) return sorted(suggestions[:10]) # Limit to top 10 suggestions @mcp.tool() async def health_check(): """ Check the health and availability of documentation sources. Returns: Dictionary with health status of each library's documentation site """ results = {} # Test a sample of libraries to avoid overwhelming servers sample_libraries = list(docs_urls.items())[:5] for library, url in sample_libraries: start_time = time.time() try: async with httpx.AsyncClient() as client: response = await client.head( url, timeout=httpx.Timeout(10.0), headers={"User-Agent": USER_AGENT}, follow_redirects=True, ) response_time = time.time() - start_time results[library] = { "status": "healthy", "status_code": response.status_code, "response_time_ms": round(response_time * 1000, 2), "url": url, } except httpx.TimeoutException: results[library] = { "status": "timeout", "error": "Request timed out", "url": url, } except Exception as e: results[library] = {"status": "error", "error": str(e), "url": url} # Add cache stats if caching is enabled if cache: cache_stats = await cache.stats() results["_cache_stats"] = {"enabled": True, **cache_stats} else: results["_cache_stats"] = {"enabled": False} return results @mcp.tool() async def clear_cache(): """ Clear the documentation cache to force fresh fetches. Returns: Status message about cache clearing """ if cache: entries_cleared = await cache.clear() return f"Cache cleared. Removed {entries_cleared} cached entries." else: return "Caching is not enabled." @mcp.tool() async def get_cache_stats(): """ Get statistics about the current cache usage. Returns: Dictionary with cache statistics """ if not cache: return {"enabled": False, "message": "Caching is not enabled"} stats = await cache.stats() details = { "enabled": True, **stats, } details["persistence"] = { "enabled": cache.persistence_enabled, "path": cache.persist_path, } return details @mcp.tool() async def semantic_search( query: str, libraries: Union[str, List[str]], context: Optional[str] = None ): """ Enhanced semantic search across one or more libraries with relevance ranking. Args: query: The search query. libraries: A single library or a list of libraries to search in. context: Optional context about your project or use case. Returns: Enhanced search results with relevance scores and metadata, ranked across all libraries. """ await enforce_rate_limit("semantic_search") if isinstance(libraries, str): libraries = [libraries] search_tasks = [ smart_search.semantic_search(query, lib, context) for lib in libraries ] try: results_by_library = await asyncio.gather(*search_tasks, return_exceptions=True) all_results: List[SearchResult] = [] for res_list in results_by_library: if not isinstance(res_list, Exception): all_results.extend(res_list) # type: ignore # Sort all results from all libraries by relevance score all_results.sort(key=lambda r: r.relevance_score, reverse=True) return { "query": query, "libraries_searched": libraries, "total_results": len(all_results), "results": [ { "source_library": result.source_library, "title": result.title, "url": result.url, "snippet": ( result.snippet[:300] + "..." if len(result.snippet) > 300 else result.snippet ), "relevance_score": result.relevance_score, "content_type": result.content_type, "difficulty_level": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", "has_code_examples": result.code_snippets_count > 0, } for result in all_results[:10] # Top 10 combined results ], } except Exception as e: return {"error": f"Search failed: {str(e)}", "results": []} @mcp.tool() async def filtered_search( query: str, library: str, content_type: Optional[str] = None, difficulty_level: Optional[str] = None, has_code_examples: Optional[bool] = None, ): """ Search with advanced filtering options. Args: query: The search query library: The library to search in content_type: Filter by content type ("tutorial", "reference", "example", "guide") difficulty_level: Filter by difficulty ("beginner", "intermediate", "advanced") has_code_examples: Filter for content with code examples (true/false) Returns: Filtered search results matching specified criteria """ from .smart_search import filtered_search, SearchFilters await enforce_rate_limit("filtered_search") filters = SearchFilters( content_type=content_type, difficulty_level=difficulty_level, has_code_examples=has_code_examples, ) try: results = await filtered_search.search_with_filters(query, library, filters) return { "query": query, "library": library, "filters_applied": { "content_type": content_type, "difficulty_level": difficulty_level, "has_code_examples": has_code_examples, }, "total_results": len(results), "results": [ { "title": result.title, "url": result.url, "snippet": ( result.snippet[:200] + "..." if len(result.snippet) > 200 else result.snippet ), "relevance_score": result.relevance_score, "content_type": result.content_type, "difficulty_level": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", } for result in results[:10] ], } except Exception as e: return {"error": f"Filtered search failed: {str(e)}", "results": []} @mcp.tool() async def get_learning_path(library: str, experience_level: str = "beginner"): """ Get a structured learning path for a library based on experience level. Args: library: The library to create a learning path for experience_level: Your current level ("beginner", "intermediate", "advanced") Returns: Structured learning path with progressive topics and resources """ learning_paths = { # Frontend Development Paths "frontend-development": { "beginner": [ { "topic": "HTML Fundamentals", "query": "html elements semantic structure", "type": "tutorial", "library": "html", }, { "topic": "CSS Basics", "query": "css selectors properties styling", "type": "tutorial", "library": "css", }, { "topic": "JavaScript Essentials", "query": "javascript variables functions DOM", "type": "tutorial", "library": "javascript", }, { "topic": "Responsive Design", "query": "css responsive design flexbox", "type": "example", "library": "css", }, { "topic": "Modern CSS Layout", "query": "css grid layout modern", "type": "tutorial", "library": "css", }, ], "intermediate": [ { "topic": "React Introduction", "query": "react components JSX props", "type": "tutorial", "library": "react", }, { "topic": "State Management", "query": "react useState hooks", "type": "example", "library": "react", }, { "topic": "CSS Frameworks", "query": "tailwind utility classes responsive", "type": "tutorial", "library": "tailwind", }, { "topic": "Build Tools", "query": "vite development server bundling", "type": "tutorial", "library": "vite", }, { "topic": "Version Control", "query": "git workflow branches commits", "type": "tutorial", "library": "git", }, ], "advanced": [ { "topic": "Advanced React", "query": "react performance optimization", "type": "tutorial", "library": "react", }, { "topic": "State Management", "query": "redux state management", "type": "tutorial", "library": "redux", }, { "topic": "Next.js Framework", "query": "nextjs SSR routing", "type": "tutorial", "library": "nextjs", }, { "topic": "Testing", "query": "jest testing components", "type": "tutorial", "library": "jest", }, { "topic": "CI/CD Deployment", "query": "github actions frontend deployment", "type": "tutorial", "library": "github-actions", }, ], }, # Backend Development Paths "backend-development": { "beginner": [ { "topic": "Python Basics", "query": "python functions classes basics", "type": "tutorial", "library": "python", }, { "topic": "API Fundamentals", "query": "fastapi hello world first api", "type": "tutorial", "library": "fastapi", }, { "topic": "Database Basics", "query": "postgresql tables queries", "type": "tutorial", "library": "postgresql", }, { "topic": "Request Handling", "query": "fastapi request body validation", "type": "example", "library": "fastapi", }, { "topic": "Git Version Control", "query": "git commit push pull basics", "type": "tutorial", "library": "git", }, ], "intermediate": [ { "topic": "Database Integration", "query": "fastapi sqlalchemy database", "type": "tutorial", "library": "fastapi", }, { "topic": "NoSQL Databases", "query": "mongodb collections documents", "type": "tutorial", "library": "mongodb", }, { "topic": "Authentication", "query": "fastapi JWT authentication", "type": "tutorial", "library": "fastapi", }, { "topic": "API Testing", "query": "pytest fastapi testing", "type": "tutorial", "library": "pytest", }, { "topic": "API Documentation", "query": "openapi swagger specification", "type": "tutorial", "library": "openapi", }, ], "advanced": [ { "topic": "Microservices", "query": "fastapi microservices architecture", "type": "tutorial", "library": "fastapi", }, { "topic": "Containerization", "query": "docker python application", "type": "tutorial", "library": "docker", }, { "topic": "CI/CD Pipelines", "query": "github actions python deployment", "type": "tutorial", "library": "github-actions", }, { "topic": "Monitoring", "query": "fastapi logging monitoring", "type": "tutorial", "library": "fastapi", }, { "topic": "Cloud Deployment", "query": "aws deployment docker", "type": "tutorial", "library": "aws", }, ], }, # Full-Stack Development Paths "fullstack-development": { "beginner": [ { "topic": "Web Fundamentals", "query": "html css javascript basics", "type": "tutorial", "library": "html", }, { "topic": "Frontend Framework", "query": "react components state", "type": "tutorial", "library": "react", }, { "topic": "Backend API", "query": "fastapi rest api", "type": "tutorial", "library": "fastapi", }, { "topic": "Database Setup", "query": "mongodb setup collections", "type": "tutorial", "library": "mongodb", }, { "topic": "Version Control", "query": "git workflow collaboration", "type": "tutorial", "library": "git", }, ], "intermediate": [ { "topic": "MERN Stack Setup", "query": "mongodb express react nodejs", "type": "tutorial", "library": "mongodb", }, { "topic": "State Management", "query": "redux react state", "type": "tutorial", "library": "redux", }, { "topic": "API Design", "query": "openapi rest specification", "type": "tutorial", "library": "openapi", }, { "topic": "Authentication Flow", "query": "JWT react fastapi authentication", "type": "tutorial", "library": "fastapi", }, { "topic": "Testing Strategy", "query": "jest pytest testing", "type": "tutorial", "library": "jest", }, ], "advanced": [ { "topic": "Advanced Architecture", "query": "microservices react fastapi", "type": "tutorial", "library": "fastapi", }, { "topic": "Performance Optimization", "query": "react performance database optimization", "type": "tutorial", "library": "react", }, { "topic": "DevOps Integration", "query": "docker kubernetes deployment", "type": "tutorial", "library": "docker", }, { "topic": "CI/CD Pipeline", "query": "github actions fullstack deployment", "type": "tutorial", "library": "github-actions", }, { "topic": "Production Deployment", "query": "aws production deployment", "type": "tutorial", "library": "aws", }, ], }, # DevOps & Deployment Paths "devops": { "beginner": [ { "topic": "Git Workflows", "query": "git branches merging collaboration", "type": "tutorial", "library": "git", }, { "topic": "Docker Basics", "query": "docker containers images", "type": "tutorial", "library": "docker", }, { "topic": "CI/CD Concepts", "query": "github actions workflow automation", "type": "tutorial", "library": "github-actions", }, { "topic": "Testing Automation", "query": "pytest jest automated testing", "type": "tutorial", "library": "pytest", }, { "topic": "Basic Deployment", "query": "docker deployment simple", "type": "tutorial", "library": "docker", }, ], "intermediate": [ { "topic": "Advanced Git", "query": "git rebase cherry-pick advanced", "type": "tutorial", "library": "git", }, { "topic": "Container Orchestration", "query": "kubernetes pods services", "type": "tutorial", "library": "kubernetes", }, { "topic": "Infrastructure as Code", "query": "terraform infrastructure automation", "type": "tutorial", "library": "terraform", }, { "topic": "Cloud Platforms", "query": "aws services deployment", "type": "tutorial", "library": "aws", }, { "topic": "Monitoring Setup", "query": "monitoring logging production", "type": "tutorial", "library": "docker", }, ], "advanced": [ { "topic": "Advanced Kubernetes", "query": "kubernetes advanced deployment", "type": "tutorial", "library": "kubernetes", }, { "topic": "Multi-Cloud Strategy", "query": "aws google-cloud deployment", "type": "tutorial", "library": "aws", }, { "topic": "Security Practices", "query": "docker security kubernetes", "type": "tutorial", "library": "docker", }, { "topic": "Scaling Strategies", "query": "kubernetes scaling performance", "type": "tutorial", "library": "kubernetes", }, { "topic": "Production Operations", "query": "terraform production management", "type": "tutorial", "library": "terraform", }, ], }, # Enhanced Individual Library Paths "react": { "beginner": [ { "topic": "HTML/CSS Foundation", "query": "html semantic elements", "type": "tutorial", "library": "html", }, { "topic": "JavaScript Essentials", "query": "javascript fundamentals", "type": "tutorial", "library": "javascript", }, { "topic": "React Basics", "query": "react components JSX props", "type": "tutorial", "library": "react", }, { "topic": "State Management", "query": "react useState hooks", "type": "example", "library": "react", }, { "topic": "Styling with CSS", "query": "css styling react components", "type": "example", "library": "css", }, ], "intermediate": [ { "topic": "Advanced Hooks", "query": "react useEffect custom hooks", "type": "tutorial", "library": "react", }, { "topic": "Routing", "query": "react router navigation", "type": "tutorial", "library": "react", }, { "topic": "CSS Frameworks", "query": "tailwind react integration", "type": "tutorial", "library": "tailwind", }, { "topic": "State Libraries", "query": "zustand react state", "type": "example", "library": "zustand", }, { "topic": "Build Tools", "query": "vite react development", "type": "tutorial", "library": "vite", }, ], "advanced": [ { "topic": "Redux Integration", "query": "redux react complex state", "type": "tutorial", "library": "redux", }, { "topic": "Next.js Framework", "query": "nextjs react SSR", "type": "tutorial", "library": "nextjs", }, { "topic": "Testing Strategy", "query": "jest react testing", "type": "tutorial", "library": "jest", }, { "topic": "Performance", "query": "react performance optimization", "type": "tutorial", "library": "react", }, { "topic": "Deployment", "query": "github actions react deployment", "type": "tutorial", "library": "github-actions", }, ], }, "fastapi": { "beginner": [ { "topic": "Python Foundation", "query": "python basics functions", "type": "tutorial", "library": "python", }, { "topic": "FastAPI Setup", "query": "fastapi installation first api", "type": "tutorial", "library": "fastapi", }, { "topic": "Request Handling", "query": "fastapi path parameters", "type": "example", "library": "fastapi", }, { "topic": "Data Validation", "query": "pydantic models validation", "type": "tutorial", "library": "pydantic", }, { "topic": "Git Basics", "query": "git commit push basics", "type": "tutorial", "library": "git", }, ], "intermediate": [ { "topic": "Database Integration", "query": "fastapi sqlalchemy postgresql", "type": "tutorial", "library": "fastapi", }, { "topic": "NoSQL with MongoDB", "query": "fastapi mongodb integration", "type": "tutorial", "library": "mongodb", }, { "topic": "Authentication", "query": "fastapi JWT authentication", "type": "tutorial", "library": "fastapi", }, { "topic": "API Documentation", "query": "openapi swagger fastapi", "type": "tutorial", "library": "openapi", }, { "topic": "Testing", "query": "pytest fastapi testing", "type": "tutorial", "library": "pytest", }, ], "advanced": [ { "topic": "Advanced Features", "query": "fastapi dependency injection", "type": "reference", "library": "fastapi", }, { "topic": "Containerization", "query": "docker fastapi deployment", "type": "tutorial", "library": "docker", }, { "topic": "CI/CD Pipeline", "query": "github actions fastapi", "type": "tutorial", "library": "github-actions", }, { "topic": "Cloud Deployment", "query": "aws fastapi production", "type": "tutorial", "library": "aws", }, { "topic": "Monitoring", "query": "fastapi monitoring logging", "type": "tutorial", "library": "fastapi", }, ], }, # New Technology-Specific Paths "git": { "beginner": [ { "topic": "Git Basics", "query": "git init commit status", "type": "tutorial", "library": "git", }, { "topic": "Working with Files", "query": "git add commit push", "type": "tutorial", "library": "git", }, { "topic": "Branching", "query": "git branch checkout merge", "type": "tutorial", "library": "git", }, { "topic": "Remote Repositories", "query": "git remote origin push pull", "type": "tutorial", "library": "git", }, { "topic": "Basic Collaboration", "query": "git clone fork collaboration", "type": "tutorial", "library": "git", }, ], "intermediate": [ { "topic": "Merge Conflicts", "query": "git merge conflicts resolution", "type": "tutorial", "library": "git", }, { "topic": "Git Workflows", "query": "git workflow gitflow", "type": "tutorial", "library": "git", }, { "topic": "Rebasing", "query": "git rebase interactive", "type": "tutorial", "library": "git", }, { "topic": "Stashing", "query": "git stash temporary changes", "type": "tutorial", "library": "git", }, { "topic": "Tags and Releases", "query": "git tag versioning", "type": "tutorial", "library": "git", }, ], "advanced": [ { "topic": "Advanced Rebasing", "query": "git rebase cherry-pick", "type": "tutorial", "library": "git", }, { "topic": "Git Hooks", "query": "git hooks automation", "type": "tutorial", "library": "git", }, { "topic": "Submodules", "query": "git submodules management", "type": "reference", "library": "git", }, { "topic": "Performance", "query": "git performance large repos", "type": "tutorial", "library": "git", }, { "topic": "CI/CD Integration", "query": "git github actions integration", "type": "tutorial", "library": "github-actions", }, ], }, "mongodb": { "beginner": [ { "topic": "MongoDB Basics", "query": "mongodb documents collections", "type": "tutorial", "library": "mongodb", }, { "topic": "CRUD Operations", "query": "mongodb insert find update delete", "type": "tutorial", "library": "mongodb", }, { "topic": "Data Modeling", "query": "mongodb schema design", "type": "tutorial", "library": "mongodb", }, { "topic": "Querying", "query": "mongodb queries filtering", "type": "example", "library": "mongodb", }, { "topic": "Indexes", "query": "mongodb indexes performance", "type": "tutorial", "library": "mongodb", }, ], "intermediate": [ { "topic": "Aggregation Framework", "query": "mongodb aggregation pipeline", "type": "tutorial", "library": "mongodb", }, { "topic": "Relationships", "query": "mongodb references embedding", "type": "tutorial", "library": "mongodb", }, { "topic": "Transactions", "query": "mongodb transactions ACID", "type": "tutorial", "library": "mongodb", }, { "topic": "Atlas Cloud", "query": "mongodb atlas setup", "type": "tutorial", "library": "mongodb", }, { "topic": "Integration", "query": "mongodb fastapi python", "type": "tutorial", "library": "mongodb", }, ], "advanced": [ { "topic": "Advanced Aggregation", "query": "mongodb complex aggregation", "type": "tutorial", "library": "mongodb", }, { "topic": "Sharding", "query": "mongodb sharding scaling", "type": "tutorial", "library": "mongodb", }, { "topic": "Replication", "query": "mongodb replica sets", "type": "tutorial", "library": "mongodb", }, { "topic": "Performance Tuning", "query": "mongodb optimization performance", "type": "tutorial", "library": "mongodb", }, { "topic": "Security", "query": "mongodb security authentication", "type": "tutorial", "library": "mongodb", }, ], }, "github-actions": { "beginner": [ { "topic": "GitHub Actions Basics", "query": "github actions workflow syntax", "type": "tutorial", "library": "github-actions", }, { "topic": "First Workflow", "query": "github actions hello world", "type": "tutorial", "library": "github-actions", }, { "topic": "Triggers and Events", "query": "github actions push pull request", "type": "tutorial", "library": "github-actions", }, { "topic": "Basic CI", "query": "github actions continuous integration", "type": "tutorial", "library": "github-actions", }, { "topic": "Job Configuration", "query": "github actions jobs steps", "type": "tutorial", "library": "github-actions", }, ], "intermediate": [ { "topic": "Testing Automation", "query": "github actions testing workflows", "type": "tutorial", "library": "github-actions", }, { "topic": "Build and Deploy", "query": "github actions deployment pipeline", "type": "tutorial", "library": "github-actions", }, { "topic": "Environment Variables", "query": "github actions secrets environment", "type": "tutorial", "library": "github-actions", }, { "topic": "Matrix Builds", "query": "github actions matrix strategy", "type": "tutorial", "library": "github-actions", }, { "topic": "Caching", "query": "github actions cache dependencies", "type": "tutorial", "library": "github-actions", }, ], "advanced": [ { "topic": "Custom Actions", "query": "github actions custom action", "type": "tutorial", "library": "github-actions", }, { "topic": "Complex Workflows", "query": "github actions advanced workflows", "type": "tutorial", "library": "github-actions", }, { "topic": "Security", "query": "github actions security best practices", "type": "tutorial", "library": "github-actions", }, { "topic": "Self-Hosted Runners", "query": "github actions self hosted runners", "type": "tutorial", "library": "github-actions", }, { "topic": "Integration", "query": "github actions docker kubernetes", "type": "tutorial", "library": "github-actions", }, ], }, "openapi": { "beginner": [ { "topic": "API Documentation Basics", "query": "openapi specification intro", "type": "tutorial", "library": "openapi", }, { "topic": "Swagger UI", "query": "swagger ui documentation", "type": "tutorial", "library": "openapi", }, { "topic": "Basic Specification", "query": "openapi yaml json structure", "type": "tutorial", "library": "openapi", }, { "topic": "Paths and Operations", "query": "openapi paths methods", "type": "tutorial", "library": "openapi", }, { "topic": "Request/Response", "query": "openapi request response models", "type": "tutorial", "library": "openapi", }, ], "intermediate": [ { "topic": "Data Models", "query": "openapi schemas components", "type": "tutorial", "library": "openapi", }, { "topic": "Authentication", "query": "openapi security schemes", "type": "tutorial", "library": "openapi", }, { "topic": "Validation", "query": "openapi validation constraints", "type": "tutorial", "library": "openapi", }, { "topic": "Code Generation", "query": "openapi code generation", "type": "tutorial", "library": "openapi", }, { "topic": "API Testing", "query": "openapi testing tools", "type": "tutorial", "library": "openapi", }, ], "advanced": [ { "topic": "Advanced Schemas", "query": "openapi advanced schemas", "type": "tutorial", "library": "openapi", }, { "topic": "API Versioning", "query": "openapi versioning strategies", "type": "tutorial", "library": "openapi", }, { "topic": "Integration", "query": "openapi fastapi integration", "type": "tutorial", "library": "openapi", }, { "topic": "Documentation Automation", "query": "openapi automated documentation", "type": "tutorial", "library": "openapi", }, { "topic": "Best Practices", "query": "openapi design best practices", "type": "tutorial", "library": "openapi", }, ], }, } if library not in learning_paths: return {"error": f"Learning path not available for {library}"} if experience_level not in learning_paths[library]: return {"error": f"Experience level {experience_level} not supported"} path = learning_paths[library][experience_level] # Calculate enhanced metadata total_topics = len(path) estimated_time = f"{total_topics * 2}-{total_topics * 4} hours" # Build learning path with library references learning_steps = [] for i, item in enumerate(path): step = { "step": i + 1, "topic": item["topic"], "content_type": item["type"], "search_query": item["query"], "target_library": item.get("library", library), "estimated_time": "2-4 hours", } learning_steps.append(step) return { "library": library, "experience_level": experience_level, "total_topics": total_topics, "estimated_total_time": estimated_time, "learning_path": learning_steps, "next_level": { "beginner": "intermediate", "intermediate": "advanced", "advanced": "Consider specializing in specific areas or exploring related technologies", }.get(experience_level, ""), "related_paths": _get_related_learning_paths(library), "prerequisites": _get_prerequisites(library, experience_level), } def _get_related_learning_paths(library: str) -> List[str]: """Get related learning paths for cross-skill development""" relationships = { "react": [ "frontend-development", "fullstack-development", "javascript", "nextjs", ], "fastapi": [ "backend-development", "fullstack-development", "python", "mongodb", ], "git": [ "devops", "github-actions", "frontend-development", "backend-development", ], "mongodb": ["backend-development", "fullstack-development", "fastapi"], "github-actions": ["devops", "git", "docker", "kubernetes"], "openapi": ["fastapi", "backend-development", "fullstack-development"], "frontend-development": ["react", "nextjs", "javascript", "css"], "backend-development": ["fastapi", "python", "mongodb", "postgresql"], "fullstack-development": [ "frontend-development", "backend-development", "react", "fastapi", ], "devops": ["git", "docker", "kubernetes", "github-actions"], } return relationships.get(library, []) def _get_prerequisites(library: str, experience_level: str) -> List[str]: """Get prerequisites for learning paths""" if experience_level == "beginner": return [] prerequisites = { "react": ["HTML basics", "CSS fundamentals", "JavaScript essentials"], "fastapi": ["Python basics", "HTTP fundamentals", "API concepts"], "fullstack-development": [ "HTML/CSS basics", "JavaScript fundamentals", "Python basics", ], "devops": [ "Command line basics", "Git fundamentals", "Basic development experience", ], "mongodb": ["Database concepts", "JSON understanding"], "github-actions": ["Git basics", "YAML syntax", "CI/CD concepts"], "openapi": ["API concepts", "HTTP methods", "JSON/YAML syntax"], } return prerequisites.get(library, []) @mcp.tool() async def get_code_examples(library: str, topic: str, language: str = "python"): """ Get curated code examples for a specific topic and library. Args: library: The library to search for examples topic: The specific topic or feature language: Programming language for examples Returns: Curated code examples with explanations """ await enforce_rate_limit("get_code_examples") # Enhanced query for code-specific search code_query = f"{library} {topic} example code {language}" try: # Use filtered search to find examples with code from .smart_search import filtered_search, SearchFilters filters = SearchFilters(content_type="example", has_code_examples=True) results = await filtered_search.search_with_filters( code_query, library, filters ) if not results: # Fallback to regular search if library not in docs_urls: return {"error": f"Library {library} not supported"} query = f"site:{docs_urls[library]} {code_query}" search_results = await search_web(query) if not search_results.get("organic"): return {"error": "No code examples found"} # Process first result for code extraction first_result = search_results["organic"][0] content = await fetch_url(first_result["link"]) # Extract code snippets (simplified) code_blocks = [] import re code_pattern = r"```(?:python|javascript|typescript|js)?\n(.*?)```" matches = re.finditer(code_pattern, content, re.DOTALL) for i, match in enumerate(matches): if i >= 3: # Limit to 3 examples break code_blocks.append( { "example": i + 1, "code": match.group(1).strip(), "language": language, "source_url": first_result["link"], } ) return { "library": library, "topic": topic, "language": language, "total_examples": len(code_blocks), "examples": code_blocks, } else: # Process enhanced results examples = [] for i, result in enumerate(results[:3]): examples.append( { "example": i + 1, "title": result.title, "description": ( result.snippet[:200] + "..." if len(result.snippet) > 200 else result.snippet ), "url": result.url, "difficulty": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", } ) return { "library": library, "topic": topic, "language": language, "total_examples": len(examples), "examples": examples, } except Exception as e: return {"error": f"Failed to get code examples: {str(e)}"} @mcp.tool() async def get_environment_config(): """ Get current environment configuration and settings. Returns: Current environment configuration details """ from .config_manager import config_manager config = config_manager.get_config() return { "environment": config_manager.environment, "server_config": { "logging_level": config["server_config"]["logging_level"], "max_concurrent_requests": config["server_config"][ "max_concurrent_requests" ], "request_timeout_seconds": config["server_config"][ "request_timeout_seconds" ], }, "cache_config": { "enabled": config["cache"]["enabled"], "ttl_hours": config["cache"]["ttl_hours"], "max_entries": config["cache"]["max_entries"], }, "rate_limiting": { "enabled": config["rate_limiting"]["enabled"], "requests_per_minute": config["rate_limiting"]["requests_per_minute"], }, "features": config["server_config"]["features"], "total_libraries": len(config_manager.get_docs_urls()), "available_libraries": list(config_manager.get_docs_urls().keys())[ :10 ], # Show first 10 } @mcp.tool() async def scan_library_vulnerabilities(library_name: str, ecosystem: str = "PyPI"): """ Comprehensive vulnerability scan using OSINT sources (OSV, GitHub Advisories, Safety DB). Args: library_name: Name of the library to scan (e.g., "fastapi", "react") ecosystem: Package ecosystem ("PyPI", "npm", "Maven", "Go", etc.) Returns: Detailed security report with vulnerabilities, severity levels, and recommendations """ await enforce_rate_limit("scan_library_vulnerabilities") from .vulnerability_scanner import vulnerability_scanner try: # Perform comprehensive scan security_report = await vulnerability_scanner.scan_library( library_name, ecosystem ) return { "scan_results": security_report.to_dict(), "summary": { "library": security_report.library_name, "ecosystem": security_report.ecosystem, "security_score": security_report.security_score, "risk_level": ( "🚨 High Risk" if security_report.security_score < 50 else ( "⚠️ Medium Risk" if security_report.security_score < 70 else ( "✅ Low Risk" if security_report.security_score < 90 else "🛡️ Excellent" ) ) ), "critical_vulnerabilities": security_report.critical_count, "total_vulnerabilities": security_report.total_vulnerabilities, "primary_recommendation": ( security_report.recommendations[0] if security_report.recommendations else "No specific recommendations" ), }, "scan_timestamp": security_report.scan_date, "sources": [ "OSV Database", "GitHub Security Advisories", "Safety DB (PyPI only)", ], } except Exception as e: return { "error": f"Vulnerability scan failed: {str(e)}", "library": library_name, "ecosystem": ecosystem, "scan_timestamp": datetime.now().isoformat(), } @mcp.tool() async def get_security_summary(library_name: str, ecosystem: str = "PyPI"): """ Get quick security overview for a library without detailed vulnerability list. Args: library_name: Name of the library ecosystem: Package ecosystem (default: PyPI) Returns: Concise security summary with score and basic recommendations """ await enforce_rate_limit("get_security_summary") from .vulnerability_scanner import security_integration try: summary = await security_integration.get_security_summary( library_name, ecosystem ) # Add security badge score = summary.get("security_score", 50) if score >= 90: badge = "🛡️ EXCELLENT" elif score >= 70: badge = "✅ SECURE" elif score >= 50: badge = "⚠️ CAUTION" else: badge = "🚨 HIGH RISK" return { "library": library_name, "ecosystem": ecosystem, "security_badge": badge, "security_score": score, "status": summary.get("status", "unknown"), "vulnerabilities": { "total": summary.get("total_vulnerabilities", 0), "critical": summary.get("critical_vulnerabilities", 0), }, "recommendation": summary.get( "primary_recommendation", "No recommendations available" ), "last_scanned": datetime.now().isoformat(), } except Exception as e: return { "library": library_name, "ecosystem": ecosystem, "security_badge": "❓ UNKNOWN", "security_score": None, "status": "scan_failed", "error": str(e), } @mcp.tool() async def compare_library_security(libraries: List[str], ecosystem: str = "PyPI"): """ Compare security scores across multiple libraries to help with selection. Args: libraries: List of library names to compare ecosystem: Package ecosystem for all libraries Returns: Security comparison with rankings and recommendations """ await enforce_rate_limit("compare_library_security") from .vulnerability_scanner import security_integration if len(libraries) > 10: return {"error": "Maximum 10 libraries allowed for comparison"} results = [] # Scan all libraries in parallel for faster comparison scan_tasks = [ security_integration.get_security_summary(lib, ecosystem) for lib in libraries ] try: summaries = await asyncio.gather(*scan_tasks, return_exceptions=True) for i, (library, summary_item) in enumerate(zip(libraries, summaries)): if isinstance(summary_item, Exception): results.append( { "library": library, "security_score": 0, "status": "scan_failed", "error": str(summary_item), } ) else: summary = summary_item results.append( { "library": library, "security_score": summary.get("security_score", 0), # type: ignore "status": summary.get("status", "unknown"), # type: ignore "vulnerabilities": summary.get("total_vulnerabilities", 0), # type: ignore "critical_vulnerabilities": summary.get("critical_vulnerabilities", 0), # type: ignore "recommendation": summary.get("primary_recommendation", ""), # type: ignore } ) # Sort by security score (highest first) results.sort(key=lambda x: x.get("security_score", 0), reverse=True) # Add rankings for i, result in enumerate(results): result["rank"] = i + 1 score = result.get("security_score", 0) if score >= 90: result["rating"] = "🛡️ Excellent" elif score >= 70: result["rating"] = "✅ Secure" elif score >= 50: result["rating"] = "⚠️ Caution" else: result["rating"] = "🚨 High Risk" # Generate overall recommendation if results: best_lib = results[0] if best_lib.get("security_score", 0) >= 80: overall_rec = ( f"✅ Recommended: {best_lib['library']} has excellent security" ) elif best_lib.get("security_score", 0) >= 60: overall_rec = f"⚠️ Proceed with caution: {best_lib['library']} is the most secure option" else: overall_rec = "🚨 Security concerns: All libraries have significant vulnerabilities" else: overall_rec = "Unable to generate recommendation" return { "comparison_results": results, "total_libraries": len(libraries), "scan_timestamp": datetime.now().isoformat(), "overall_recommendation": overall_rec, "ecosystem": ecosystem, } except Exception as e: return { "error": f"Security comparison failed: {str(e)}", "libraries": libraries, "ecosystem": ecosystem, } @mcp.tool() async def suggest_secure_libraries( partial_name: str, include_security_score: bool = True ): """ Enhanced library suggestions that include security scores for informed decisions. Args: partial_name: Partial library name to search for include_security_score: Whether to include security scores (slower but more informative) Returns: Library suggestions with optional security information """ await enforce_rate_limit("suggest_secure_libraries") # Get basic suggestions first basic_suggestions = await suggest_libraries(partial_name) if not include_security_score or not basic_suggestions: return { "suggestions": basic_suggestions, "partial_name": partial_name, "security_info_included": False, } # Add security information for top 5 suggestions from .vulnerability_scanner import security_integration enhanced_suggestions = [] top_suggestions = basic_suggestions[:5] # Limit to avoid too many API calls # Get security scores in parallel security_tasks = [ security_integration.get_security_summary(lib, "PyPI") for lib in top_suggestions ] try: security_results = await asyncio.gather(*security_tasks, return_exceptions=True) for lib, sec_res_item in zip(top_suggestions, security_results): suggestion = {"library": lib} if isinstance(sec_res_item, Exception): suggestion.update( { "security_score": None, "security_status": "unknown", "security_badge": "❓", } ) else: security_result = sec_res_item score = security_result.get("security_score", 50) suggestion.update( { "security_score": score, "security_status": security_result.get("status", "unknown"), # type: ignore "security_badge": ( "🛡️" if score >= 90 else "✅" if score >= 70 else "⚠️" if score >= 50 else "🚨" ), "vulnerabilities": security_result.get("total_vulnerabilities", 0), # type: ignore } ) enhanced_suggestions.append(suggestion) # Add remaining suggestions without security info for lib in basic_suggestions[5:]: enhanced_suggestions.append( { "library": lib, "security_score": None, "security_status": "not_scanned", "note": "Use scan_library_vulnerabilities for security details", } ) # Sort by security score for enhanced suggestions enhanced_suggestions.sort( key=lambda x: x.get("security_score") or 0, reverse=True ) return { "suggestions": enhanced_suggestions, "partial_name": partial_name, "security_info_included": True, "total_suggestions": len(enhanced_suggestions), "note": "Libraries with security scores are sorted by security rating", } except Exception as e: return { "suggestions": [{"library": lib} for lib in basic_suggestions], "partial_name": partial_name, "security_info_included": False, "error": f"Security enhancement failed: {str(e)}", } @mcp.tool() async def scan_project_dependencies(project_path: str = "."): """ Scans project dependencies from files like pyproject.toml or requirements.txt for vulnerabilities. Args: project_path: The path to the project directory (defaults to current directory). Returns: A comprehensive security report of all project dependencies. """ from .vulnerability_scanner import vulnerability_scanner from .project_scanner import find_and_parse_dependencies parsed_info = find_and_parse_dependencies(project_path) if not parsed_info: return { "error": "No dependency file found.", "message": "Supported files are pyproject.toml, requirements.txt, or package.json.", } filename, ecosystem, dependencies = parsed_info if not dependencies: return { "summary": { "dependency_file": filename, "ecosystem": ecosystem, "total_dependencies": 0, "vulnerable_count": 0, "overall_project_risk": "None", "message": "No dependencies found to scan.", }, "vulnerable_packages": [], } total_deps = len(dependencies) print( f"🔎 Found {total_deps} dependencies in '{filename}'. Scanning for vulnerabilities...", file=sys.stderr, ) scan_tasks = [ vulnerability_scanner.scan_library(name, ecosystem) for name in dependencies.keys() ] results = await asyncio.gather(*scan_tasks, return_exceptions=True) vulnerable_deps = [] for i, report_item in enumerate(results): dep_name = list(dependencies.keys())[i] if isinstance(report_item, Exception): # Could log this error continue else: report = report_item if report.vulnerabilities: # type: ignore vulnerable_deps.append( { "library": dep_name, "version": dependencies[dep_name], "vulnerability_count": report.total_vulnerabilities, # type: ignore "security_score": report.security_score, "summary": ( report.recommendations[0] if report.recommendations else "Update to the latest version." ), "details": [ vuln.to_dict() for vuln in report.vulnerabilities[:3] ], # Top 3 } ) vulnerable_deps.sort(key=lambda x: x["security_score"]) return { "summary": { "dependency_file": filename, "ecosystem": ecosystem, "total_dependencies": total_deps, "vulnerable_count": len(vulnerable_deps), "overall_project_risk": ( "High" if any(d["security_score"] < 50 for d in vulnerable_deps) else ( "Medium" if any(d["security_score"] < 70 for d in vulnerable_deps) else "Low" ) ), }, "vulnerable_packages": vulnerable_deps, } @mcp.tool() async def generate_project_starter(project_name: str, template: str): """ Generates a starter project from a template (e.g., 'fastapi', 'react-vite'). Args: project_name: The name for the new project directory. template: The project template to use. Returns: A summary of the created project structure. """ from .project_generator import generate_project try: result = generate_project(project_name, template) # Provide a more user-friendly summary summary = f"✅ Successfully created '{result['project_name']}' using the '{result['template_used']}' template.\n" summary += f"Location: {result['project_path']}\n\n" summary += "Next steps:\n" if template == "fastapi": summary += f"1. cd {result['project_name']}\n" summary += "2. uv pip sync\n" summary += "3. uv run uvicorn main:app --reload\n" elif template == "react-vite": summary += f"1. cd {result['project_name']}\n" summary += "2. npm install\n" summary += "3. npm run dev\n" result["user_summary"] = summary return result except (ValueError, FileExistsError) as e: return {"error": str(e)} @mcp.tool() async def manage_dev_environment(service: str, project_path: str = "."): """ Manages local development environments using Docker Compose. Args: service: The service to set up (e.g., 'postgres', 'redis'). project_path: The path to the project directory. Returns: A confirmation message with the next steps. """ from .docker_manager import create_docker_compose, TEMPLATES try: if service not in TEMPLATES: return { "error": f"Service '{service}' not supported.", "available_services": list(TEMPLATES.keys()), } compose_file = create_docker_compose(service, project_path) return { "status": "success", "message": f"✅ Successfully created 'docker-compose.yml' for '{service}' in '{project_path}'.", "next_steps": [ f"1. Review the generated file: {compose_file}", "2. Run the service: docker-compose up -d", "3. To stop the service: docker-compose down", ], "service_details": TEMPLATES[service]["services"], } except (ValueError, FileExistsError) as e: return {"error": str(e)} except Exception as e: return {"error": f"An unexpected error occurred: {str(e)}"} @mcp.tool() async def get_current_config(): """ Returns the current, active configuration of the MCP server. This allows users to view the default config and use it as a template for local overrides. """ try: # The `config` global is a dictionary created from the Pydantic model # at startup, so it represents the active configuration. return config except Exception as e: return {"error": f"Could not retrieve configuration: {str(e)}"} @mcp.tool() async def snyk_scan_library(library_name: str, version: str = "latest", ecosystem: str = "pypi"): """ Scan a library using Snyk for comprehensive security analysis. Args: library_name: Name of the library to scan version: Version of the library (default: "latest") ecosystem: Package ecosystem ("pypi", "npm", "maven", etc.) Returns: Detailed security report from Snyk including vulnerabilities, licenses, and remediation advice """ from .snyk_integration import snyk_integration try: # Test connection first connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), "setup_instructions": [ "1. Sign up for Snyk account at https://snyk.io", "2. Get API token from your Snyk account settings", "3. Set SNYK_API_KEY environment variable", "4. Optionally set SNYK_ORG_ID for organization-specific scans" ] } # Perform the scan package_info = await snyk_integration.scan_package(library_name, version, ecosystem) return { "library": library_name, "version": version, "ecosystem": ecosystem, "scan_timestamp": datetime.now().isoformat(), "vulnerability_summary": { "total": len(package_info.vulnerabilities), "critical": package_info.severity_counts.get("critical", 0), "high": package_info.severity_counts.get("high", 0), "medium": package_info.severity_counts.get("medium", 0), "low": package_info.severity_counts.get("low", 0) }, "vulnerabilities": [ { "id": vuln.id, "title": vuln.title, "severity": vuln.severity.value, "cvss_score": vuln.cvss_score, "cve": vuln.cve, "is_patchable": vuln.is_patchable, "upgrade_path": vuln.upgrade_path[:3] if vuln.upgrade_path else [], "snyk_url": f"https://snyk.io/vuln/{vuln.id}" } for vuln in package_info.vulnerabilities[:10] # Limit to top 10 ], "license_info": [ { "name": license.name, "type": license.type, "spdx_id": license.spdx_id, "is_deprecated": license.is_deprecated } for license in package_info.licenses ], "recommendations": [ "🔍 Review all critical and high severity vulnerabilities", "📦 Update to latest secure version if available", "⚖️ Ensure license compliance with your organization's policies", "🔄 Set up continuous monitoring for this package" ] } except Exception as e: return { "error": f"Snyk scan failed: {str(e)}", "library": library_name, "version": version } @mcp.tool() async def snyk_scan_project(project_path: str = "."): """ Scan entire project dependencies using Snyk. Args: project_path: Path to the project directory (default: current directory) Returns: Comprehensive security report for all project dependencies """ from .snyk_integration import snyk_integration from .project_scanner import find_and_parse_dependencies try: # Find project dependencies dep_result = find_and_parse_dependencies(project_path) if not dep_result: return { "error": "No supported dependency files found", "supported_files": ["pyproject.toml", "requirements.txt", "package.json"], "project_path": project_path } filename, ecosystem, dependencies = dep_result manifest_path = os.path.join(project_path, filename) # Test Snyk connection connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error") } # Scan the project manifest scan_result = await snyk_integration.scan_project_manifest(manifest_path, ecosystem) if "error" in scan_result: return scan_result # Enhance with additional analysis high_priority_vulns = [ vuln for vuln in scan_result["vulnerabilities"] if vuln.get("severity") in ["critical", "high"] ] return { "project_path": project_path, "manifest_file": filename, "ecosystem": ecosystem, "scan_timestamp": scan_result["scan_timestamp"], "summary": { **scan_result["summary"], "high_priority_vulnerabilities": len(high_priority_vulns), "security_score": max(0, 100 - ( len([v for v in scan_result["vulnerabilities"] if v.get("severity") == "critical"]) * 25 + len([v for v in scan_result["vulnerabilities"] if v.get("severity") == "high"]) * 15 + len([v for v in scan_result["vulnerabilities"] if v.get("severity") == "medium"]) * 5 + len([v for v in scan_result["vulnerabilities"] if v.get("severity") == "low"]) * 1 )) }, "high_priority_vulnerabilities": high_priority_vulns[:10], "license_issues": scan_result["license_issues"], "remediation_summary": { "patches_available": len([v for v in scan_result["vulnerabilities"] if v.get("is_patchable")]), "upgrades_available": len([v for v in scan_result["vulnerabilities"] if v.get("upgrade_path")]), "total_fixable": len([v for v in scan_result["vulnerabilities"] if v.get("is_patchable") or v.get("upgrade_path")]) }, "next_steps": [ "🚨 Address all critical vulnerabilities immediately", "📦 Update packages with available security patches", "🔍 Review medium and low priority issues", "⚖️ Check license compliance for flagged packages", "🔄 Set up continuous monitoring with Snyk" ] } except Exception as e: return { "error": f"Project scan failed: {str(e)}", "project_path": project_path } @mcp.tool() async def snyk_license_check(project_path: str = ".", policy: str = "permissive"): """ Check license compliance for project dependencies using Snyk. Args: project_path: Path to the project directory policy: License policy to apply ("permissive", "copyleft-limited", "strict") Returns: License compliance report with risk assessment """ from .snyk_integration import snyk_integration from .project_scanner import find_and_parse_dependencies try: # Find project dependencies dep_result = find_and_parse_dependencies(project_path) if not dep_result: return {"error": "No supported dependency files found"} filename, ecosystem, dependencies = dep_result # Convert dependencies to list of tuples packages = [(name, version) for name, version in dependencies.items()] # Get license compliance report compliance_report = await snyk_integration.get_license_compliance(packages, ecosystem) # Apply policy-specific analysis policy_rules = { "permissive": { "allowed": {"MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", "ISC"}, "restricted": {"GPL-2.0", "GPL-3.0", "LGPL-2.1", "LGPL-3.0", "AGPL-3.0"}, "name": "Permissive Policy" }, "copyleft-limited": { "allowed": {"MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", "ISC", "LGPL-2.1", "LGPL-3.0"}, "restricted": {"GPL-2.0", "GPL-3.0", "AGPL-3.0"}, "name": "Limited Copyleft Policy" }, "strict": { "allowed": {"MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause"}, "restricted": {"GPL-2.0", "GPL-3.0", "LGPL-2.1", "LGPL-3.0", "AGPL-3.0"}, "name": "Strict Policy" } } selected_policy = policy_rules.get(policy, policy_rules["permissive"]) # Risk assessment risk_assessment = { "policy_applied": selected_policy["name"], "overall_compliance": "compliant" if compliance_report["non_compliant_packages"] == 0 else "non-compliant", "risk_level": "low" if compliance_report["non_compliant_packages"] == 0 else "high" if compliance_report["non_compliant_packages"] > 5 else "medium", "action_required": compliance_report["non_compliant_packages"] > 0 } return { "project_path": project_path, "policy": selected_policy["name"], "scan_timestamp": datetime.now().isoformat(), "compliance_summary": compliance_report, "risk_assessment": risk_assessment, "recommendations": [ "📋 Review all non-compliant packages", "🔄 Find alternative packages with compatible licenses", "⚖️ Consult legal team for high-risk licenses", "📝 Document license decisions for audit trail" ] } except Exception as e: return { "error": f"License check failed: {str(e)}", "project_path": project_path } @mcp.tool() async def snyk_monitor_project(project_path: str = "."): """ Set up continuous monitoring for a project with Snyk. Args: project_path: Path to the project directory Returns: Status of monitoring setup and project details """ from .snyk_integration import snyk_integration try: # Test connection and get organization info connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), "setup_required": [ "Set SNYK_API_KEY environment variable", "Set SNYK_ORG_ID environment variable", "Ensure you have organization admin privileges" ] } # Set up monitoring monitor_result = await snyk_integration.monitor_project(project_path) if "error" in monitor_result: return monitor_result return { "status": "success", "monitoring_enabled": True, "project_details": monitor_result, "organization": connection_test.get("organizations", []), "next_steps": [ "🔔 Configure alert preferences in Snyk dashboard", "📊 Review security reports regularly", "🔄 Enable automatic PRs for security updates", "📈 Set up integration with CI/CD pipeline" ], "dashboard_url": "https://app.snyk.io/org/your-org/projects" } except Exception as e: return { "error": f"Monitoring setup failed: {str(e)}", "project_path": project_path } def main(): """Main entry point for the MCP server.""" mcp.run(transport="stdio") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anton-prosterity/documentation-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server