Skip to main content
Glama
anton-prosterity

Documentation Search MCP Server

main.py74.5 kB
import json import os import hashlib import time import logging from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path from typing import Annotated, Any, Dict, List, Optional import asyncio import anyio import httpx from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv from pydantic import BeforeValidator from .smart_search import smart_search, SearchResult from .web_scraper import scraper from .site_search import ( load_preindexed_state, preindex_site, save_preindexed_state, search_site_via_sitemap, ) from .site_index_downloader import ( ensure_site_index_file, load_site_index_settings_from_env, ) from .config_validator import validate_config, Config as AppConfig from .content_enhancer import content_enhancer from .version_resolver import version_resolver import sys import atexit # Load the environment variables load_dotenv() logger = logging.getLogger(__name__) USER_AGENT = "docs-app/1.0" SERPER_URL = "https://google.serper.dev/search" # Environment variables (removing API key exposure) SERPER_API_KEY = os.getenv("SERPER_API_KEY") @asynccontextmanager async def mcp_lifespan(_: FastMCP): async def async_heartbeat() -> None: # Work around environments where asyncio's cross-thread wakeups can be delayed. # The MCP stdio transport uses AnyIO worker threads for stdin/stdout; without # periodic loop wake-ups, those thread completions may not be processed. while True: await anyio.sleep(0.1) async with anyio.create_task_group() as tg: tg.start_soon(async_heartbeat) try: global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) settings = load_site_index_settings_from_env(cwd=os.getcwd()) try: result = await ensure_site_index_file( http_client, settings=settings, user_agent=USER_AGENT, ) status = result.get("status") if status == "downloaded": logger.debug( "Downloaded docs search index: %s (%s)", result.get("path"), result.get("url"), ) elif status == "error": print( f"⚠️ Docs search index download failed: {result.get('errors') or result.get('error')}", file=sys.stderr, ) except Exception as e: print(f"⚠️ Docs search index download failed: {e}", file=sys.stderr) if load_preindexed_state(settings.path): logger.debug("Loaded docs search index: %s", settings.path) yield {} finally: tg.cancel_scope.cancel() await shutdown_resources() # Initialize the MCP server mcp = FastMCP("documentation_search_enhanced", lifespan=mcp_lifespan) def _normalize_libraries(value: Any) -> List[str]: if value is None: return [] if isinstance(value, str): parts = [part.strip() for part in value.split(",")] return [part for part in parts if part] if isinstance(value, (list, tuple, set)): libraries: List[str] = [] for item in value: if item is None: continue item_str = str(item).strip() if item_str: libraries.append(item_str) return libraries return [str(value).strip()] LibrariesParam = Annotated[List[str], BeforeValidator(_normalize_libraries)] # Simple in-memory cache with TTL class SimpleCache: def __init__( self, ttl_hours: int = 24, max_entries: int = 1000, persistence_enabled: bool = False, persist_path: Optional[str] = None, ): self.cache: Dict[str, Dict[str, Any]] = {} self.ttl_hours = ttl_hours self.max_entries = max_entries self.persistence_enabled = persistence_enabled self.persist_path = persist_path self._lock = asyncio.Lock() if self.persistence_enabled and self.persist_path: self._load_from_disk() def _is_expired(self, timestamp: datetime) -> bool: return datetime.now() - timestamp > timedelta(hours=self.ttl_hours) async def get(self, key: str) -> Optional[str]: async with self._lock: if key in self.cache: entry = self.cache[key] if not self._is_expired(entry["timestamp"]): return entry["data"] del self.cache[key] return None async def set(self, key: str, data: str) -> None: async with self._lock: await self._cleanup_locked() if len(self.cache) >= self.max_entries: oldest_key = min( self.cache.keys(), key=lambda k: self.cache[k]["timestamp"] ) del self.cache[oldest_key] self.cache[key] = {"data": data, "timestamp": datetime.now()} await self._persist_locked() async def clear_expired(self) -> None: async with self._lock: await self._cleanup_locked() await self._persist_locked() async def stats(self) -> Dict[str, Any]: async with self._lock: expired_count = sum( 1 for entry in self.cache.values() if self._is_expired(entry["timestamp"]) ) return { "total_entries": len(self.cache), "expired_entries": expired_count, "active_entries": len(self.cache) - expired_count, "max_entries": self.max_entries, "ttl_hours": self.ttl_hours, "memory_usage_estimate": f"{len(str(self.cache)) / 1024:.2f} KB", } async def clear(self) -> int: async with self._lock: removed = len(self.cache) self.cache.clear() await self._persist_locked() return removed async def _cleanup_locked(self) -> None: expired_keys = [ k for k, v in self.cache.items() if self._is_expired(v["timestamp"]) ] for key in expired_keys: del self.cache[key] def _load_from_disk(self) -> None: try: if not os.path.exists(self.persist_path or ""): return with open(self.persist_path, "r", encoding="utf-8") as fh: raw = json.load(fh) for key, entry in raw.items(): try: timestamp = datetime.fromisoformat(entry["timestamp"]) if not self._is_expired(timestamp): self.cache[key] = { "data": entry["data"], "timestamp": timestamp, } except Exception: continue except Exception as exc: print(f"⚠️ Failed to load cache persistence: {exc}", file=sys.stderr) async def _persist_locked(self) -> None: if not (self.persistence_enabled and self.persist_path): return try: serialisable = { key: { "data": value["data"], "timestamp": value["timestamp"].isoformat(), } for key, value in self.cache.items() if not self._is_expired(value["timestamp"]) } tmp_path = f"{self.persist_path}.tmp" with open(tmp_path, "w", encoding="utf-8") as fh: json.dump(serialisable, fh) os.replace(tmp_path, self.persist_path) except Exception as exc: print(f"⚠️ Failed to persist cache: {exc}", file=sys.stderr) class TokenBucketRateLimiter: def __init__(self, requests_per_minute: int, burst: int): self.capacity = max(burst, requests_per_minute, 1) self.refill_rate = requests_per_minute / 60 if requests_per_minute > 0 else 0 self.tokens: Dict[str, float] = {} self.last_refill: Dict[str, float] = {} self._lock = asyncio.Lock() async def acquire(self, key: str = "global") -> None: if self.refill_rate == 0: return while True: async with self._lock: now = time.monotonic() tokens = self.tokens.get(key, float(self.capacity)) last = self.last_refill.get(key, now) elapsed = now - last if elapsed > 0: tokens = min(self.capacity, tokens + elapsed * self.refill_rate) if tokens >= 1: self.tokens[key] = tokens - 1 self.last_refill[key] = now return wait_time = (1 - tokens) / self.refill_rate if self.refill_rate else 0 self.tokens[key] = tokens self.last_refill[key] = now await asyncio.sleep(wait_time) def load_config() -> AppConfig: """Load and validate the configuration file. Priority: 1. Looks for `config.json` in the current working directory. 2. Falls back to the `config.json` bundled with the package. """ config_data = None local_config_path = os.path.join(os.getcwd(), "config.json") try: # 1. Prioritize local config file if os.path.exists(local_config_path): logger.debug("Found local config.json. Loading...") with open(local_config_path, "r") as f: config_data = json.load(f) else: # 2. Fallback to packaged config try: packaged_config_path = Path(__file__).with_name("config.json") config_data = json.loads( packaged_config_path.read_text(encoding="utf-8") ) except (FileNotFoundError, json.JSONDecodeError): # This is a critical failure if the package is broken print("FATAL: Packaged config.json not found.", file=sys.stderr) raise except Exception as e: print(f"FATAL: Could not read config.json. Error: {e}", file=sys.stderr) raise if not config_data: raise FileNotFoundError("Could not find or load config.json") try: validated_config = validate_config(config_data) logger.debug("Configuration successfully loaded and validated.") return validated_config except Exception as e: # Pydantic's ValidationError print( "❌ FATAL: Configuration validation failed. Please check your config.json.", file=sys.stderr, ) print(e, file=sys.stderr) raise # Load configuration config_model = load_config() config = config_model.model_dump() # Use the dict version for existing logic real_time_search_enabled = ( config.get("server_config", {}).get("features", {}).get("real_time_search", True) ) docs_urls = {} # Handle both old simple URL format and new enhanced format for lib_name, lib_data in config.get("docs_urls", {}).items(): if isinstance(lib_data, dict): docs_urls[lib_name] = str(lib_data.get("url") or "").strip() else: docs_urls[lib_name] = str(lib_data or "").strip() cache_config = config.get("cache", {"enabled": False}) cache_persistence_enabled = cache_config.get("persistence_enabled", False) cache_persist_path = cache_config.get("persist_path") if cache_persistence_enabled and not cache_persist_path: cache_persist_path = os.path.join(os.getcwd(), ".docs_cache.json") # Initialize cache if enabled cache = ( SimpleCache( ttl_hours=cache_config.get("ttl_hours", 24), max_entries=cache_config.get("max_entries", 1000), persistence_enabled=cache_persistence_enabled, persist_path=cache_persist_path, ) if cache_config.get("enabled", False) else None ) site_index_settings = load_site_index_settings_from_env(cwd=os.getcwd()) site_index_path = site_index_settings.path http_client: Optional[httpx.AsyncClient] = None scrape_semaphore = asyncio.Semaphore( config.get("server_config", {}).get("max_concurrent_requests", 10) ) rate_limit_config = config.get("rate_limiting", {"enabled": False}) rate_limiter = ( TokenBucketRateLimiter( requests_per_minute=rate_limit_config.get("requests_per_minute", 60), burst=rate_limit_config.get("burst_requests", 10), ) if rate_limit_config.get("enabled", False) else None ) async def enforce_rate_limit(tool_name: str) -> None: if rate_limiter: await rate_limiter.acquire(tool_name) async def search_web_with_retry( query: str, max_retries: int = 3, num_results: int = 3 ) -> dict: """Search documentation pages, with retries. Uses Serper when configured; otherwise falls back to on-site docs search (MkDocs/Sphinx indexes when available, otherwise sitemap discovery). """ global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) if not SERPER_API_KEY: try: return await search_site_via_sitemap( query, http_client, user_agent=USER_AGENT, num_results=num_results, allow_network=real_time_search_enabled, ) except Exception as e: print(f"Fallback site search failed: {e}", file=sys.stderr) return {"organic": []} payload = json.dumps({"q": query, "num": num_results}) headers = { "X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json", "User-Agent": USER_AGENT, } for attempt in range(max_retries): try: response = await http_client.post( SERPER_URL, headers=headers, content=payload, ) response.raise_for_status() return response.json() except httpx.TimeoutException: if attempt == max_retries - 1: print( f"Timeout after {max_retries} attempts for query: {query}", file=sys.stderr, ) break await asyncio.sleep(2**attempt) # Exponential backoff except httpx.HTTPStatusError as e: if e.response.status_code == 429: # Rate limited if attempt == max_retries - 1: print(f"Rate limited after {max_retries} attempts", file=sys.stderr) break await asyncio.sleep(2 ** (attempt + 2)) # Longer wait for rate limits else: print(f"HTTP error {e.response.status_code}: {e}", file=sys.stderr) break except Exception as e: if attempt == max_retries - 1: print( f"Unexpected error after {max_retries} attempts: {e}", file=sys.stderr, ) break await asyncio.sleep(2**attempt) # Serper is optional; fall back to sitemap search if it fails. try: return await search_site_via_sitemap( query, http_client, user_agent=USER_AGENT, num_results=num_results, allow_network=real_time_search_enabled, ) except Exception as e: print(f"Fallback site search failed: {e}", file=sys.stderr) return {"organic": []} async def fetch_url_with_cache(url: str, max_retries: int = 3) -> str: """Fetch URL content with caching and a Playwright-based scraper.""" cache_key = hashlib.md5(url.encode()).hexdigest() if cache: cached_content = await cache.get(cache_key) if cached_content: return cached_content # Use the new Playwright scraper async with scrape_semaphore: content = await scraper.scrape_url(url) if cache and "Error:" not in content: await cache.set(cache_key, content) return content # Backward compatibility aliases async def search_web(query: str, num_results: int = 3) -> dict: return await search_web_with_retry(query, num_results=num_results) async def fetch_url(url: str) -> str: return await fetch_url_with_cache(url) # Configure smart search now that the helpers are in place smart_search.configure(docs_urls, search_web) async def shutdown_resources() -> None: global http_client if http_client: await http_client.aclose() http_client = None await scraper.close() def _cleanup_sync() -> None: try: loop = asyncio.get_running_loop() except RuntimeError: try: asyncio.run(shutdown_resources()) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(shutdown_resources()) loop.close() else: loop.create_task(shutdown_resources()) atexit.register(_cleanup_sync) def get_versioned_docs_url(library: str, version: str, lib_config: Dict) -> str: """ Build version-specific documentation URL. Args: library: Library name version: Requested version (e.g., "4.2", "stable", "latest") lib_config: Library configuration from config.json Returns: Versioned documentation URL """ base_url = str(lib_config.get("url") or "") # If version is "latest", return base URL as-is if version == "latest": return base_url # Check if library supports version templates template = lib_config.get("version_url_template") if template: return template.format(version=version) # Handle common patterns by replacing stable/latest in URL versioned_url = base_url.replace("/stable/", f"/{version}/") versioned_url = versioned_url.replace("/latest/", f"/{version}/") return versioned_url @mcp.tool() async def get_docs( query: str, libraries: LibrariesParam, version: str = "latest", auto_detect_version: bool = False, ): """ Search documentation for a given query and one or more libraries. Args: query: The query to search for (e.g., "Chroma DB") libraries: A single library or a list of libraries to search in (e.g., "langchain" or ["fastapi", "django"]) version: Library version to search (e.g., "4.2", "stable", "latest"). Default: "latest" auto_detect_version: Automatically detect installed package version. Default: False Returns: Dictionary with structured summaries and supporting metadata """ await enforce_rate_limit("get_docs") if isinstance(libraries, str): libraries = [lib.strip() for lib in libraries.split(",") if lib.strip()] config_dict = config_model.model_dump() library_summaries: List[Dict[str, Any]] = [] summary_sections: List[str] = [] for library in libraries: # Resolve version (with auto-detection if enabled) resolved_version = await version_resolver.resolve_version( library=library, requested_version=version, auto_detect=auto_detect_version, project_path=".", ) lib_entry: Dict[str, Any] = { "library": library, "requested_query": query, "status": "searched", "results": [], } lib_config = config_dict.get("docs_urls", {}).get(library, {}) auto_approve = lib_config.get("auto_approve", True) if not auto_approve: print( f"⚠️ Requesting approval to search {library} documentation...", file=sys.stderr, ) docs_root = docs_urls.get(library) if not docs_root: lib_entry.update( { "status": "unsupported", "message": f"Library '{library}' not supported by this tool", } ) library_summaries.append(lib_entry) summary_sections.append( f"### {library}\n- Unsupported library; no documentation root configured." ) continue # Get version-specific URL versioned_url = get_versioned_docs_url(library, resolved_version, lib_config) # Build search query with version context search_query = f"site:{versioned_url} {query}" if resolved_version != "latest" and not lib_config.get("version_url_template"): # Add version to query if URL doesn't support versioning search_query += f" version {resolved_version}" search_results = await search_web(search_query, num_results=5) organic_results = (search_results.get("organic") or [])[:3] if not organic_results: lib_entry.update( { "status": "no_results", "message": "No indexed documentation results returned", } ) library_summaries.append(lib_entry) summary_sections.append(f"### {library}\n- No results for query '{query}'.") continue fetch_tasks = [fetch_url(result["link"]) for result in organic_results] fetched_contents = await asyncio.gather(*fetch_tasks, return_exceptions=True) library_lines = [f"### {library}"] for result, content in zip(organic_results, fetched_contents): entry: Dict[str, Any] = { "title": result.get("title") or result.get("link"), "url": result.get("link"), "source_snippet": result.get("snippet", ""), } if isinstance(content, Exception): error_message = str(content) entry["status"] = "error" entry["error"] = error_message library_lines.append( f"- {entry['title']}: failed to fetch ({error_message})" ) else: content_str = str(content) summary = content_enhancer.generate_summary(content_str, query) code_snippet_count = len( content_enhancer.extract_code_snippets(content_str) ) entry.update( { "status": "ok", "summary": summary, "code_snippet_count": code_snippet_count, } ) bullet_summary = summary if summary else "No summary extracted." library_lines.append( f"- {entry['title']}: {bullet_summary} (code snippets: {code_snippet_count})" ) lib_entry["results"].append(entry) lib_entry["total_results"] = len(lib_entry["results"]) library_summaries.append(lib_entry) summary_sections.append("\n".join(library_lines)) if cache: await cache.clear_expired() return { "query": query, "libraries": library_summaries, "summary_markdown": "\n\n".join(summary_sections), } @mcp.tool() async def suggest_libraries(partial_name: str): """ Suggest libraries based on partial input for auto-completion. Args: partial_name: Partial library name to search for (e.g. "lang" -> ["langchain"]) Returns: List of matching library names """ if not partial_name: return list(sorted(docs_urls.keys())) partial_lower = partial_name.lower() suggestions = [] # Exact matches first for lib in docs_urls.keys(): if lib.lower() == partial_lower: suggestions.append(lib) # Starts with matches for lib in docs_urls.keys(): if lib.lower().startswith(partial_lower) and lib not in suggestions: suggestions.append(lib) # Contains matches for lib in docs_urls.keys(): if partial_lower in lib.lower() and lib not in suggestions: suggestions.append(lib) return sorted(suggestions[:10]) # Limit to top 10 suggestions @mcp.tool() async def health_check(): """ Check the health and availability of documentation sources. Returns: Dictionary with health status of each library's documentation site """ results = {} # Test a sample of libraries to avoid overwhelming servers sample_libraries = list(docs_urls.items())[:5] for library, url in sample_libraries: start_time = time.time() try: async with httpx.AsyncClient() as client: response = await client.head( str(url), timeout=httpx.Timeout(10.0), headers={"User-Agent": USER_AGENT}, follow_redirects=True, ) response_time = time.time() - start_time results[library] = { "status": "healthy", "status_code": response.status_code, "response_time_ms": round(response_time * 1000, 2), "url": url, } except httpx.TimeoutException: results[library] = { "status": "timeout", "error": "Request timed out", "url": url, } except Exception as e: results[library] = {"status": "error", "error": str(e), "url": url} # Add cache stats if caching is enabled if cache: cache_stats = await cache.stats() results["_cache_stats"] = {"enabled": True, **cache_stats} else: results["_cache_stats"] = {"enabled": False} return results @mcp.tool() async def clear_cache(): """ Clear the documentation cache to force fresh fetches. Returns: Status message about cache clearing """ if cache: entries_cleared = await cache.clear() return f"Cache cleared. Removed {entries_cleared} cached entries." else: return "Caching is not enabled." @mcp.tool() async def get_cache_stats(): """ Get statistics about the current cache usage. Returns: Dictionary with cache statistics """ if not cache: return {"enabled": False, "message": "Caching is not enabled"} stats = await cache.stats() details = { "enabled": True, **stats, } details["persistence"] = { "enabled": cache.persistence_enabled, "path": cache.persist_path, } return details @mcp.tool() async def preindex_docs( libraries: LibrariesParam, include_sitemap: bool = False, persist_path: Optional[str] = None, max_concurrent_sites: int = 3, ): """ Pre-download and persist docs site indexes for Serper-free search. This caches MkDocs/Sphinx search indexes (and optionally sitemaps) to disk so the server can search supported documentation sites without requiring Serper. """ await enforce_rate_limit("preindex_docs") targets = libraries or sorted(docs_urls.keys()) if not targets: return { "status": "no_targets", "message": "No libraries configured to preindex", } global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) concurrency = max(1, min(int(max_concurrent_sites), 10)) semaphore = asyncio.Semaphore(concurrency) async def _run_one(library: str) -> Dict[str, Any]: docs_root = docs_urls.get(library) if not docs_root: return {"library": library, "status": "unsupported"} async with semaphore: summary = await preindex_site( docs_root, http_client, user_agent=USER_AGENT, include_sitemap=include_sitemap, ) summary["library"] = library return summary results = await asyncio.gather(*[_run_one(lib) for lib in targets]) path = persist_path or site_index_path try: save_preindexed_state(path) persisted: Dict[str, Any] = {"status": "ok", "path": path} except Exception as e: persisted = {"status": "error", "path": path, "error": str(e)} return { "status": "ok", "persist": persisted, "real_time_search_enabled": real_time_search_enabled, "include_sitemap": include_sitemap, "max_concurrent_sites": concurrency, "total_libraries": len(targets), "results": results, } @mcp.tool() async def semantic_search( query: str, libraries: LibrariesParam, context: Optional[str] = None, version: str = "latest", auto_detect_version: bool = False, use_vector_rerank: bool = True, ): """ Enhanced semantic search across one or more libraries with AI-powered relevance ranking. Uses hybrid search combining: - Vector embeddings for semantic similarity (50% weight) - Keyword matching for precise results (30% weight) - Source authority and metadata (20% weight) Args: query: The search query. libraries: A single library or a list of libraries to search in. context: Optional context about your project or use case. version: Library version to search (e.g., "4.2", "stable", "latest"). Default: "latest" auto_detect_version: Automatically detect installed package version. Default: False use_vector_rerank: Enable vector-based semantic reranking for better relevance. Default: True Returns: Enhanced search results with AI-powered relevance scores and metadata, ranked across all libraries. """ from .reranker import get_reranker await enforce_rate_limit("semantic_search") if isinstance(libraries, str): libraries = [lib.strip() for lib in libraries.split(",") if lib.strip()] search_tasks = [ smart_search.semantic_search(query, lib, context) for lib in libraries ] try: results_by_library = await asyncio.gather(*search_tasks, return_exceptions=True) all_results: List[SearchResult] = [] for res_list in results_by_library: if not isinstance(res_list, Exception): all_results.extend(res_list) # type: ignore # Apply vector-based reranking for better semantic relevance if use_vector_rerank and all_results: try: reranker = get_reranker() all_results = await reranker.rerank( all_results, query, use_semantic=True ) except ImportError: logger.warning( "Vector search dependencies not installed. " "Falling back to basic relevance sorting. " "Install with: pip install documentation-search-enhanced[vector]" ) all_results.sort(key=lambda r: r.relevance_score, reverse=True) else: # Fallback to basic relevance score sorting all_results.sort(key=lambda r: r.relevance_score, reverse=True) return { "query": query, "libraries_searched": libraries, "total_results": len(all_results), "vector_rerank_enabled": use_vector_rerank, "results": [ { "source_library": result.source_library, "title": result.title, "url": result.url, "snippet": ( result.snippet[:300] + "..." if len(result.snippet) > 300 else result.snippet ), "relevance_score": result.relevance_score, "content_type": result.content_type, "difficulty_level": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", "has_code_examples": result.code_snippets_count > 0, } for result in all_results[:10] # Top 10 combined results ], } except Exception as e: return {"error": f"Search failed: {str(e)}", "results": []} @mcp.tool() async def filtered_search( query: str, library: str, content_type: Optional[str] = None, difficulty_level: Optional[str] = None, has_code_examples: Optional[bool] = None, version: str = "latest", auto_detect_version: bool = False, ): """ Search with advanced filtering options. Args: query: The search query library: The library to search in content_type: Filter by content type ("tutorial", "reference", "example", "guide") difficulty_level: Filter by difficulty ("beginner", "intermediate", "advanced") has_code_examples: Filter for content with code examples (true/false) version: Library version to search (e.g., "4.2", "stable", "latest"). Default: "latest" auto_detect_version: Automatically detect installed package version. Default: False Returns: Filtered search results matching specified criteria """ from .smart_search import filtered_search, SearchFilters await enforce_rate_limit("filtered_search") filters = SearchFilters( content_type=content_type, difficulty_level=difficulty_level, has_code_examples=has_code_examples, ) try: results = await filtered_search.search_with_filters(query, library, filters) return { "query": query, "library": library, "filters_applied": { "content_type": content_type, "difficulty_level": difficulty_level, "has_code_examples": has_code_examples, }, "total_results": len(results), "results": [ { "title": result.title, "url": result.url, "snippet": ( result.snippet[:200] + "..." if len(result.snippet) > 200 else result.snippet ), "relevance_score": result.relevance_score, "content_type": result.content_type, "difficulty_level": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", } for result in results[:10] ], } except Exception as e: return {"error": f"Filtered search failed: {str(e)}", "results": []} @mcp.tool() async def get_learning_path(library: str, experience_level: str = "beginner"): """ Get a structured learning path for a library based on experience level. Args: library: The library to create a learning path for experience_level: Your current level ("beginner", "intermediate", "advanced") Returns: Structured learning path with progressive topics and resources """ # Dynamic learning path generation based on difficulty level_topics = { "beginner": [ "Getting Started", "Basic Concepts", "First Examples", "Common Patterns", ], "intermediate": [ "Advanced Features", "Best Practices", "Integration", "Testing", ], "advanced": [ "Performance Optimization", "Advanced Architecture", "Production Deployment", "Monitoring", ], } if experience_level not in level_topics: return {"error": f"Experience level {experience_level} not supported"} learning_steps = [] for i, topic in enumerate(level_topics[experience_level]): learning_steps.append( { "step": i + 1, "topic": f"{library.title()} - {topic}", "content_type": "tutorial", "search_query": f"{library} {topic.lower()}", "target_library": library, "estimated_time": "2-4 hours", } ) return { "library": library, "experience_level": experience_level, "total_topics": len(learning_steps), "estimated_total_time": f"{len(learning_steps) * 2}-{len(learning_steps) * 4} hours", "learning_path": learning_steps, "next_level": { "beginner": "intermediate", "intermediate": "advanced", "advanced": "Consider specializing in specific areas or exploring related technologies", }.get(experience_level, ""), } # Removed 1000+ lines of hardcoded learning path data @mcp.tool() async def get_code_examples( library: str, topic: str, language: str = "python", version: str = "latest", auto_detect_version: bool = False, ): """ Get curated code examples for a specific topic and library. Args: library: The library to search for examples topic: The specific topic or feature language: Programming language for examples version: Library version to search (e.g., "4.2", "stable", "latest"). Default: "latest" auto_detect_version: Automatically detect installed package version. Default: False Returns: Curated code examples with explanations """ await enforce_rate_limit("get_code_examples") # Enhanced query for code-specific search code_query = f"{library} {topic} example code {language}" try: # Use filtered search to find examples with code from .smart_search import filtered_search, SearchFilters filters = SearchFilters(content_type="example", has_code_examples=True) results = await filtered_search.search_with_filters( code_query, library, filters ) if not results: # Fallback to regular search if library not in docs_urls: return {"error": f"Library {library} not supported"} query = f"site:{docs_urls[library]} {code_query}" search_results = await search_web(query) if not search_results.get("organic"): return {"error": "No code examples found"} # Process first result for code extraction first_result = search_results["organic"][0] content = await fetch_url(first_result["link"]) # Extract code snippets (simplified) code_blocks = [] import re code_pattern = r"```(?:python|javascript|typescript|js)?\n(.*?)```" matches = re.finditer(code_pattern, content, re.DOTALL) for i, match in enumerate(matches): if i >= 3: # Limit to 3 examples break code_blocks.append( { "example": i + 1, "code": match.group(1).strip(), "language": language, "source_url": first_result["link"], } ) return { "library": library, "topic": topic, "language": language, "total_examples": len(code_blocks), "examples": code_blocks, } else: # Process enhanced results examples = [] for i, result in enumerate(results[:3]): examples.append( { "example": i + 1, "title": result.title, "description": ( result.snippet[:200] + "..." if len(result.snippet) > 200 else result.snippet ), "url": result.url, "difficulty": result.difficulty_level, "estimated_read_time": f"{result.estimated_read_time} min", } ) return { "library": library, "topic": topic, "language": language, "total_examples": len(examples), "examples": examples, } except Exception as e: return {"error": f"Failed to get code examples: {str(e)}"} @mcp.tool() async def get_environment_config(): """ Get current environment configuration and settings. Returns: Current environment configuration details """ from .config_manager import config_manager config = config_manager.get_config() return { "environment": config_manager.environment, "server_config": { "logging_level": config["server_config"]["logging_level"], "max_concurrent_requests": config["server_config"][ "max_concurrent_requests" ], "request_timeout_seconds": config["server_config"][ "request_timeout_seconds" ], }, "cache_config": { "enabled": config["cache"]["enabled"], "ttl_hours": config["cache"]["ttl_hours"], "max_entries": config["cache"]["max_entries"], }, "rate_limiting": { "enabled": config["rate_limiting"]["enabled"], "requests_per_minute": config["rate_limiting"]["requests_per_minute"], }, "features": config["server_config"]["features"], "total_libraries": len(config_manager.get_docs_urls()), "available_libraries": list(config_manager.get_docs_urls().keys())[ :10 ], # Show first 10 } @mcp.tool() async def scan_library_vulnerabilities(library_name: str, ecosystem: str = "PyPI"): """ Comprehensive vulnerability scan using OSINT sources (OSV, GitHub Advisories, Safety DB). Args: library_name: Name of the library to scan (e.g., "fastapi", "react") ecosystem: Package ecosystem ("PyPI", "npm", "Maven", "Go", etc.) Returns: Detailed security report with vulnerabilities, severity levels, and recommendations """ await enforce_rate_limit("scan_library_vulnerabilities") from .vulnerability_scanner import vulnerability_scanner try: # Perform comprehensive scan security_report = await vulnerability_scanner.scan_library( library_name, ecosystem ) return { "scan_results": security_report.to_dict(), "summary": { "library": security_report.library_name, "ecosystem": security_report.ecosystem, "security_score": security_report.security_score, "risk_level": ( "🚨 High Risk" if security_report.security_score < 50 else ( "⚠️ Medium Risk" if security_report.security_score < 70 else ( "✅ Low Risk" if security_report.security_score < 90 else "🛡️ Excellent" ) ) ), "critical_vulnerabilities": security_report.critical_count, "total_vulnerabilities": security_report.total_vulnerabilities, "primary_recommendation": ( security_report.recommendations[0] if security_report.recommendations else "No specific recommendations" ), }, "scan_timestamp": security_report.scan_date, "sources": [ "OSV Database", "GitHub Security Advisories", "Safety DB (PyPI only)", ], } except Exception as e: return { "error": f"Vulnerability scan failed: {str(e)}", "library": library_name, "ecosystem": ecosystem, "scan_timestamp": datetime.now().isoformat(), } @mcp.tool() async def get_security_summary(library_name: str, ecosystem: str = "PyPI"): """ Get quick security overview for a library without detailed vulnerability list. Args: library_name: Name of the library ecosystem: Package ecosystem (default: PyPI) Returns: Concise security summary with score and basic recommendations """ await enforce_rate_limit("get_security_summary") from .vulnerability_scanner import security_integration try: summary = await security_integration.get_security_summary( library_name, ecosystem ) # Add security badge score = summary.get("security_score", 50) if score >= 90: badge = "🛡️ EXCELLENT" elif score >= 70: badge = "✅ SECURE" elif score >= 50: badge = "⚠️ CAUTION" else: badge = "🚨 HIGH RISK" return { "library": library_name, "ecosystem": ecosystem, "security_badge": badge, "security_score": score, "status": summary.get("status", "unknown"), "vulnerabilities": { "total": summary.get("total_vulnerabilities", 0), "critical": summary.get("critical_vulnerabilities", 0), }, "recommendation": summary.get( "primary_recommendation", "No recommendations available" ), "last_scanned": datetime.now().isoformat(), } except Exception as e: return { "library": library_name, "ecosystem": ecosystem, "security_badge": "❓ UNKNOWN", "security_score": None, "status": "scan_failed", "error": str(e), } @mcp.tool() async def compare_library_security(libraries: List[str], ecosystem: str = "PyPI"): """ Compare security scores across multiple libraries to help with selection. Args: libraries: List of library names to compare ecosystem: Package ecosystem for all libraries Returns: Security comparison with rankings and recommendations """ await enforce_rate_limit("compare_library_security") from .vulnerability_scanner import security_integration if len(libraries) > 10: return {"error": "Maximum 10 libraries allowed for comparison"} results = [] # Scan all libraries in parallel for faster comparison scan_tasks = [ security_integration.get_security_summary(lib, ecosystem) for lib in libraries ] try: summaries = await asyncio.gather(*scan_tasks, return_exceptions=True) for i, (library, summary_item) in enumerate(zip(libraries, summaries)): if isinstance(summary_item, Exception): results.append( { "library": library, "security_score": 0, "status": "scan_failed", "error": str(summary_item), } ) else: summary = summary_item results.append( { "library": library, "security_score": summary.get("security_score", 0), # type: ignore "status": summary.get("status", "unknown"), # type: ignore "vulnerabilities": summary.get("total_vulnerabilities", 0), # type: ignore "critical_vulnerabilities": summary.get( "critical_vulnerabilities", 0 ), # type: ignore "recommendation": summary.get("primary_recommendation", ""), # type: ignore } ) # Sort by security score (highest first) results.sort(key=lambda x: x.get("security_score", 0), reverse=True) # Add rankings for i, result in enumerate(results): result["rank"] = i + 1 score = result.get("security_score", 0) if score >= 90: result["rating"] = "🛡️ Excellent" elif score >= 70: result["rating"] = "✅ Secure" elif score >= 50: result["rating"] = "⚠️ Caution" else: result["rating"] = "🚨 High Risk" # Generate overall recommendation if results: best_lib = results[0] if best_lib.get("security_score", 0) >= 80: overall_rec = ( f"✅ Recommended: {best_lib['library']} has excellent security" ) elif best_lib.get("security_score", 0) >= 60: overall_rec = f"⚠️ Proceed with caution: {best_lib['library']} is the most secure option" else: overall_rec = "🚨 Security concerns: All libraries have significant vulnerabilities" else: overall_rec = "Unable to generate recommendation" return { "comparison_results": results, "total_libraries": len(libraries), "scan_timestamp": datetime.now().isoformat(), "overall_recommendation": overall_rec, "ecosystem": ecosystem, } except Exception as e: return { "error": f"Security comparison failed: {str(e)}", "libraries": libraries, "ecosystem": ecosystem, } @mcp.tool() async def suggest_secure_libraries( partial_name: str, include_security_score: bool = True ): """ Enhanced library suggestions that include security scores for informed decisions. Args: partial_name: Partial library name to search for include_security_score: Whether to include security scores (slower but more informative) Returns: Library suggestions with optional security information """ await enforce_rate_limit("suggest_secure_libraries") # Get basic suggestions first basic_suggestions = await suggest_libraries(partial_name) if not include_security_score or not basic_suggestions: return { "suggestions": basic_suggestions, "partial_name": partial_name, "security_info_included": False, } # Add security information for top 5 suggestions from .vulnerability_scanner import security_integration enhanced_suggestions = [] top_suggestions = basic_suggestions[:5] # Limit to avoid too many API calls # Get security scores in parallel security_tasks = [ security_integration.get_security_summary(lib, "PyPI") for lib in top_suggestions ] try: security_results = await asyncio.gather(*security_tasks, return_exceptions=True) for lib, sec_res_item in zip(top_suggestions, security_results): suggestion = {"library": lib} if isinstance(sec_res_item, Exception): suggestion.update( { "security_score": None, "security_status": "unknown", "security_badge": "❓", } ) else: security_result = sec_res_item score = security_result.get("security_score", 50) suggestion.update( { "security_score": score, "security_status": security_result.get("status", "unknown"), # type: ignore "security_badge": ( "🛡️" if score >= 90 else "✅" if score >= 70 else "⚠️" if score >= 50 else "🚨" ), "vulnerabilities": security_result.get( "total_vulnerabilities", 0 ), # type: ignore } ) enhanced_suggestions.append(suggestion) # Add remaining suggestions without security info for lib in basic_suggestions[5:]: enhanced_suggestions.append( { "library": lib, "security_score": None, "security_status": "not_scanned", "note": "Use scan_library_vulnerabilities for security details", } ) # Sort by security score for enhanced suggestions enhanced_suggestions.sort( key=lambda x: x.get("security_score") or 0, reverse=True ) return { "suggestions": enhanced_suggestions, "partial_name": partial_name, "security_info_included": True, "total_suggestions": len(enhanced_suggestions), "note": "Libraries with security scores are sorted by security rating", } except Exception as e: return { "suggestions": [{"library": lib} for lib in basic_suggestions], "partial_name": partial_name, "security_info_included": False, "error": f"Security enhancement failed: {str(e)}", } @mcp.tool() async def scan_project_dependencies(project_path: str = "."): """ Scans project dependencies from files like pyproject.toml or requirements.txt for vulnerabilities. Args: project_path: The path to the project directory (defaults to current directory). Returns: A comprehensive security report of all project dependencies. """ from .vulnerability_scanner import vulnerability_scanner from .project_scanner import find_and_parse_dependencies parsed_info = find_and_parse_dependencies(project_path) if not parsed_info: return { "error": "No dependency file found.", "message": "Supported files are pyproject.toml, requirements.txt, or package.json.", } filename, ecosystem, dependencies = parsed_info if not dependencies: return { "summary": { "dependency_file": filename, "ecosystem": ecosystem, "total_dependencies": 0, "vulnerable_count": 0, "overall_project_risk": "None", "message": "No dependencies found to scan.", }, "vulnerable_packages": [], } total_deps = len(dependencies) logger.debug( "Found %s dependencies in %s. Scanning for vulnerabilities...", total_deps, filename, ) scan_tasks = [ vulnerability_scanner.scan_library(name, ecosystem) for name in dependencies.keys() ] results = await asyncio.gather(*scan_tasks, return_exceptions=True) vulnerable_deps = [] for i, report_item in enumerate(results): dep_name = list(dependencies.keys())[i] if isinstance(report_item, Exception): # Could log this error continue else: report = report_item if report.vulnerabilities: # type: ignore vulnerable_deps.append( { "library": dep_name, "version": dependencies[dep_name], "vulnerability_count": report.total_vulnerabilities, # type: ignore "security_score": report.security_score, "summary": ( report.recommendations[0] if report.recommendations else "Update to the latest version." ), "details": [ vuln.to_dict() for vuln in report.vulnerabilities[:3] ], # Top 3 } ) vulnerable_deps.sort(key=lambda x: x["security_score"]) return { "summary": { "dependency_file": filename, "ecosystem": ecosystem, "total_dependencies": total_deps, "vulnerable_count": len(vulnerable_deps), "overall_project_risk": ( "High" if any(d["security_score"] < 50 for d in vulnerable_deps) else ( "Medium" if any(d["security_score"] < 70 for d in vulnerable_deps) else "Low" ) ), }, "vulnerable_packages": vulnerable_deps, } @mcp.tool() async def generate_project_starter(project_name: str, template: str): """ Generates a starter project from a template (e.g., 'fastapi', 'react-vite'). Args: project_name: The name for the new project directory. template: The project template to use. Returns: A summary of the created project structure. """ from .project_generator import generate_project try: result = generate_project(project_name, template) # Provide a more user-friendly summary summary = f"✅ Successfully created '{result['project_name']}' using the '{result['template_used']}' template.\n" summary += f"Location: {result['project_path']}\n\n" summary += "Next steps:\n" if template == "fastapi": summary += f"1. cd {result['project_name']}\n" summary += "2. uv pip sync\n" summary += "3. uv run uvicorn main:app --reload\n" elif template == "react-vite": summary += f"1. cd {result['project_name']}\n" summary += "2. npm install\n" summary += "3. npm run dev\n" result["user_summary"] = summary return result except (ValueError, FileExistsError) as e: return {"error": str(e)} @mcp.tool() async def manage_dev_environment(service: str, project_path: str = "."): """ Manages local development environments using Docker Compose. Args: service: The service to set up (e.g., 'postgres', 'redis'). project_path: The path to the project directory. Returns: A confirmation message with the next steps. """ from .docker_manager import create_docker_compose, TEMPLATES try: if service not in TEMPLATES: return { "error": f"Service '{service}' not supported.", "available_services": list(TEMPLATES.keys()), } compose_file = create_docker_compose(service, project_path) return { "status": "success", "message": f"✅ Successfully created 'docker-compose.yml' for '{service}' in '{project_path}'.", "next_steps": [ f"1. Review the generated file: {compose_file}", "2. Run the service: docker-compose up -d", "3. To stop the service: docker-compose down", ], "service_details": TEMPLATES[service]["services"], } except (ValueError, FileExistsError) as e: return {"error": str(e)} except Exception as e: return {"error": f"An unexpected error occurred: {str(e)}"} @mcp.tool() async def get_current_config(): """ Returns the current, active configuration of the MCP server. This allows users to view the default config and use it as a template for local overrides. """ try: # The `config` global is a dictionary created from the Pydantic model # at startup, so it represents the active configuration. return config except Exception as e: return {"error": f"Could not retrieve configuration: {str(e)}"} @mcp.tool() async def snyk_scan_library( library_name: str, version: str = "latest", ecosystem: str = "pypi" ): """ Scan a library using Snyk for comprehensive security analysis. Args: library_name: Name of the library to scan version: Version of the library (default: "latest") ecosystem: Package ecosystem ("pypi", "npm", "maven", etc.) Returns: Detailed security report from Snyk including vulnerabilities, licenses, and remediation advice """ from .snyk_integration import snyk_integration try: # Test connection first connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), "setup_instructions": [ "1. Sign up for Snyk account at https://snyk.io", "2. Get API token from your Snyk account settings", "3. Set SNYK_API_KEY environment variable", "4. Optionally set SNYK_ORG_ID for organization-specific scans", ], } # Perform the scan package_info = await snyk_integration.scan_package( library_name, version, ecosystem ) return { "library": library_name, "version": version, "ecosystem": ecosystem, "scan_timestamp": datetime.now().isoformat(), "vulnerability_summary": { "total": len(package_info.vulnerabilities), "critical": package_info.severity_counts.get("critical", 0), "high": package_info.severity_counts.get("high", 0), "medium": package_info.severity_counts.get("medium", 0), "low": package_info.severity_counts.get("low", 0), }, "vulnerabilities": [ { "id": vuln.id, "title": vuln.title, "severity": vuln.severity.value, "cvss_score": vuln.cvss_score, "cve": vuln.cve, "is_patchable": vuln.is_patchable, "upgrade_path": vuln.upgrade_path[:3] if vuln.upgrade_path else [], "snyk_url": f"https://snyk.io/vuln/{vuln.id}", } for vuln in package_info.vulnerabilities[:10] # Limit to top 10 ], "license_info": [ { "name": license.name, "type": license.type, "spdx_id": license.spdx_id, "is_deprecated": license.is_deprecated, } for license in package_info.licenses ], "recommendations": [ "🔍 Review all critical and high severity vulnerabilities", "📦 Update to latest secure version if available", "⚖️ Ensure license compliance with your organization's policies", "🔄 Set up continuous monitoring for this package", ], } except Exception as e: return { "error": f"Snyk scan failed: {str(e)}", "library": library_name, "version": version, } @mcp.tool() async def snyk_scan_project(project_path: str = "."): """ Scan entire project dependencies using Snyk. Args: project_path: Path to the project directory (default: current directory) Returns: Comprehensive security report for all project dependencies """ from .snyk_integration import snyk_integration from .project_scanner import find_and_parse_dependencies try: # Find project dependencies dep_result = find_and_parse_dependencies(project_path) if not dep_result: return { "error": "No supported dependency files found", "supported_files": [ "pyproject.toml", "requirements.txt", "package.json", ], "project_path": project_path, } filename, ecosystem, dependencies = dep_result manifest_path = os.path.join(project_path, filename) # Test Snyk connection connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), } # Scan the project manifest scan_result = await snyk_integration.scan_project_manifest( manifest_path, ecosystem ) if "error" in scan_result: return scan_result # Enhance with additional analysis high_priority_vulns = [ vuln for vuln in scan_result["vulnerabilities"] if vuln.get("severity") in ["critical", "high"] ] return { "project_path": project_path, "manifest_file": filename, "ecosystem": ecosystem, "scan_timestamp": scan_result["scan_timestamp"], "summary": { **scan_result["summary"], "high_priority_vulnerabilities": len(high_priority_vulns), "security_score": max( 0, 100 - ( len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "critical" ] ) * 25 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "high" ] ) * 15 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "medium" ] ) * 5 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "low" ] ) * 1 ), ), }, "high_priority_vulnerabilities": high_priority_vulns[:10], "license_issues": scan_result["license_issues"], "remediation_summary": { "patches_available": len( [v for v in scan_result["vulnerabilities"] if v.get("is_patchable")] ), "upgrades_available": len( [v for v in scan_result["vulnerabilities"] if v.get("upgrade_path")] ), "total_fixable": len( [ v for v in scan_result["vulnerabilities"] if v.get("is_patchable") or v.get("upgrade_path") ] ), }, "next_steps": [ "🚨 Address all critical vulnerabilities immediately", "📦 Update packages with available security patches", "🔍 Review medium and low priority issues", "⚖️ Check license compliance for flagged packages", "🔄 Set up continuous monitoring with Snyk", ], } except Exception as e: return {"error": f"Project scan failed: {str(e)}", "project_path": project_path} @mcp.tool() async def snyk_license_check(project_path: str = ".", policy: str = "permissive"): """ Check license compliance for project dependencies using Snyk. Args: project_path: Path to the project directory policy: License policy to apply ("permissive", "copyleft-limited", "strict") Returns: License compliance report with risk assessment """ from .snyk_integration import snyk_integration from .project_scanner import find_and_parse_dependencies try: # Find project dependencies dep_result = find_and_parse_dependencies(project_path) if not dep_result: return {"error": "No supported dependency files found"} filename, ecosystem, dependencies = dep_result # Convert dependencies to list of tuples packages = [(name, version) for name, version in dependencies.items()] # Get license compliance report compliance_report = await snyk_integration.get_license_compliance( packages, ecosystem ) # Apply policy-specific analysis policy_rules = { "permissive": { "allowed": {"MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", "ISC"}, "restricted": { "GPL-2.0", "GPL-3.0", "LGPL-2.1", "LGPL-3.0", "AGPL-3.0", }, "name": "Permissive Policy", }, "copyleft-limited": { "allowed": { "MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", "ISC", "LGPL-2.1", "LGPL-3.0", }, "restricted": {"GPL-2.0", "GPL-3.0", "AGPL-3.0"}, "name": "Limited Copyleft Policy", }, "strict": { "allowed": {"MIT", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause"}, "restricted": { "GPL-2.0", "GPL-3.0", "LGPL-2.1", "LGPL-3.0", "AGPL-3.0", }, "name": "Strict Policy", }, } selected_policy = policy_rules.get(policy, policy_rules["permissive"]) # Risk assessment risk_assessment = { "policy_applied": selected_policy["name"], "overall_compliance": ( "compliant" if compliance_report["non_compliant_packages"] == 0 else "non-compliant" ), "risk_level": ( "low" if compliance_report["non_compliant_packages"] == 0 else ( "high" if compliance_report["non_compliant_packages"] > 5 else "medium" ) ), "action_required": compliance_report["non_compliant_packages"] > 0, } return { "project_path": project_path, "policy": selected_policy["name"], "scan_timestamp": datetime.now().isoformat(), "compliance_summary": compliance_report, "risk_assessment": risk_assessment, "recommendations": [ "📋 Review all non-compliant packages", "🔄 Find alternative packages with compatible licenses", "⚖️ Consult legal team for high-risk licenses", "📝 Document license decisions for audit trail", ], } except Exception as e: return { "error": f"License check failed: {str(e)}", "project_path": project_path, } @mcp.tool() async def snyk_monitor_project(project_path: str = "."): """ Set up continuous monitoring for a project with Snyk. Args: project_path: Path to the project directory Returns: Status of monitoring setup and project details """ from .snyk_integration import snyk_integration try: # Test connection and get organization info connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), "setup_required": [ "Set SNYK_API_KEY environment variable", "Set SNYK_ORG_ID environment variable", "Ensure you have organization admin privileges", ], } # Set up monitoring monitor_result = await snyk_integration.monitor_project(project_path) if "error" in monitor_result: return monitor_result return { "status": "success", "monitoring_enabled": True, "project_details": monitor_result, "organization": connection_test.get("organizations", []), "next_steps": [ "🔔 Configure alert preferences in Snyk dashboard", "📊 Review security reports regularly", "🔄 Enable automatic PRs for security updates", "📈 Set up integration with CI/CD pipeline", ], "dashboard_url": "https://app.snyk.io/org/your-org/projects", } except Exception as e: return { "error": f"Monitoring setup failed: {str(e)}", "project_path": project_path, } def main(): """Main entry point for the MCP server.""" mcp.run(transport="stdio") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anton-prosterity/documentation-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server