Skip to main content
Glama
anton-prosterity

Documentation Search MCP Server

site_search.py41.8 kB
#!/usr/bin/env python3 """ Serper-free documentation site search. This module provides a Serper-free fallback for `site:` queries by: 1) Preferring docs-native search indexes when available (MkDocs / Sphinx) 2) Falling back to sitemap discovery (robots.txt + sitemap.xml) 3) Optionally using a Playwright-backed fetcher to score/snippet page content """ from __future__ import annotations import asyncio import gzip import json import os import re import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import ( Any, Awaitable, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, ) from urllib.parse import urljoin, urlparse import httpx from bs4 import BeautifulSoup _SITEMAP_CACHE_TTL = timedelta(hours=24) _INDEX_CACHE_TTL = timedelta(hours=24) _MAX_SITEMAP_URLS = 50_000 _MAX_SITEMAPS_TO_FETCH = 12 _MAX_INDEX_BYTES = 10_000_000 _MAX_INDEX_DOC_TEXT_CHARS = 5_000 _DEFAULT_CONTENT_FETCH_CONCURRENCY = 3 @dataclass(frozen=True) class _SitemapCacheEntry: fetched_at: datetime urls: Tuple[str, ...] _sitemap_cache: Dict[str, _SitemapCacheEntry] = {} _sitemap_locks: Dict[str, asyncio.Lock] = {} @dataclass(frozen=True) class _IndexCacheEntry: fetched_at: datetime kind: str payload: Any _index_cache: Dict[str, _IndexCacheEntry] = {} _index_locks: Dict[str, asyncio.Lock] = {} def _parse_iso_datetime(value: Any) -> Optional[datetime]: if not isinstance(value, str): return None raw = value.strip() if not raw: return None try: parsed = datetime.fromisoformat(raw) except ValueError: return None if parsed.tzinfo is not None: try: parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) except Exception: parsed = parsed.replace(tzinfo=None) return parsed def export_preindexed_state() -> Dict[str, Any]: """Export in-memory sitemap/index caches for persistence.""" now = datetime.now() sitemaps: Dict[str, Dict[str, Any]] = {} for origin, entry in _sitemap_cache.items(): sitemaps[origin] = { "fetched_at": entry.fetched_at.isoformat(), "urls": list(entry.urls), } indexes: Dict[str, Dict[str, Any]] = {} for index_url, entry in _index_cache.items(): payload = entry.payload if isinstance(payload, tuple): payload = list(payload) indexes[index_url] = { "fetched_at": entry.fetched_at.isoformat(), "kind": entry.kind, "payload": payload, } return { "schema_version": 1, "generated_at": now.isoformat(), "sitemaps": sitemaps, "indexes": indexes, } def import_preindexed_state(state: Any) -> None: """Import previously persisted sitemap/index caches.""" if not isinstance(state, dict): return imported_at = datetime.now() max_future_skew = timedelta(days=1) sitemaps = state.get("sitemaps") if isinstance(sitemaps, dict): for origin, entry in sitemaps.items(): if not isinstance(origin, str) or not isinstance(entry, dict): continue urls_raw = entry.get("urls") if not isinstance(urls_raw, list): continue urls = tuple(str(url).strip() for url in urls_raw if str(url).strip()) if not urls: continue fetched_at = _parse_iso_datetime(entry.get("fetched_at")) if fetched_at is None or fetched_at > imported_at + max_future_skew: fetched_at = imported_at _sitemap_cache[origin] = _SitemapCacheEntry( fetched_at=fetched_at, urls=urls ) indexes = state.get("indexes") if isinstance(indexes, dict): for index_url, entry in indexes.items(): if not isinstance(index_url, str) or not isinstance(entry, dict): continue kind = entry.get("kind") payload_raw = entry.get("payload") if not isinstance(kind, str): continue if kind == "mkdocs": if not isinstance(payload_raw, list): continue prepared = [] for doc in payload_raw: if not isinstance(doc, dict): continue location = str(doc.get("location") or "").strip() if not location: continue title = str(doc.get("title") or "").strip() text = str(doc.get("text") or "").strip() if len(text) > _MAX_INDEX_DOC_TEXT_CHARS: text = text[:_MAX_INDEX_DOC_TEXT_CHARS] prepared.append( {"location": location, "title": title, "text": text} ) if not prepared: continue payload: Any = tuple(prepared) elif kind == "sphinx": if not isinstance(payload_raw, dict): continue payload = payload_raw else: continue fetched_at = _parse_iso_datetime(entry.get("fetched_at")) if fetched_at is None or fetched_at > imported_at + max_future_skew: fetched_at = imported_at _index_cache[index_url] = _IndexCacheEntry( fetched_at=fetched_at, kind=kind, payload=payload ) def load_preindexed_state(path: str) -> bool: """Load a persisted index cache from disk into memory.""" if not path: return False if not os.path.exists(path): return False try: with open(path, "r", encoding="utf-8") as fh: raw = json.load(fh) except Exception: return False import_preindexed_state(raw) return True def save_preindexed_state(path: str) -> None: """Persist current in-memory sitemap/index caches to disk.""" if not path: raise ValueError("persist path must be non-empty") state = export_preindexed_state() tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as fh: json.dump(state, fh) os.replace(tmp_path, path) def _get_cached_index_from_memory(index_url: str, *, kind: str) -> Optional[Any]: cache_entry = _index_cache.get(index_url) if cache_entry and cache_entry.kind == kind: return cache_entry.payload return None def _get_cached_sitemap_urls_from_memory( origin: str, *, allow_stale: bool ) -> Optional[List[str]]: cache_entry = _sitemap_cache.get(origin) if not cache_entry: return None if not cache_entry.urls: return None if allow_stale: return list(cache_entry.urls) if datetime.now() - cache_entry.fetched_at <= _SITEMAP_CACHE_TTL: return list(cache_entry.urls) return None async def preindex_site( site_url: str, client: httpx.AsyncClient, *, user_agent: str, include_sitemap: bool = False, ) -> Dict[str, Any]: """Fetch and cache on-site search indexes for a docs site.""" parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return {"site_url": site_url, "status": "invalid_url"} origin = f"{parsed.scheme}://{parsed.netloc}" results: Dict[str, Any] = { "site_url": site_url, "origin": origin, "mkdocs_index": None, "sphinx_index": None, "sitemap": None, "errors": [], } for index_url in _mkdocs_index_candidates(site_url): try: docs = await _get_cached_index( client, index_url, user_agent=user_agent, kind="mkdocs", timeout_seconds=20.0, ) except Exception as e: results["errors"].append(f"mkdocs:{index_url}: {e}") continue if docs: results["mkdocs_index"] = {"index_url": index_url, "documents": len(docs)} break for index_url in _sphinx_index_candidates(site_url): try: index = await _get_cached_index( client, index_url, user_agent=user_agent, kind="sphinx", timeout_seconds=20.0, ) except Exception as e: results["errors"].append(f"sphinx:{index_url}: {e}") continue if isinstance(index, dict): filenames = index.get("filenames") results["sphinx_index"] = { "index_url": index_url, "documents": len(filenames) if isinstance(filenames, list) else None, } break if include_sitemap: try: urls = await _load_site_sitemap_urls( client, site_url, user_agent=user_agent ) if urls: _sitemap_cache[origin] = _SitemapCacheEntry( fetched_at=datetime.now(), urls=tuple(urls) ) results["sitemap"] = {"urls": len(urls)} except Exception as e: results["errors"].append(f"sitemap:{origin}: {e}") results["status"] = ( "ok" if results.get("mkdocs_index") or results.get("sphinx_index") or results.get("sitemap") else "no_index_found" ) return results def _parse_site_query(query: str) -> Tuple[Optional[str], str]: match = re.search(r"\bsite:(\S+)", query) if not match: return None, query.strip() site_token = match.group(1).strip().strip('"').strip("'") remaining = (query[: match.start()] + query[match.end() :]).strip() return site_token, remaining _STOPWORDS = { "a", "an", "and", "are", "as", "at", "be", "for", "from", "how", "in", "into", "is", "it", "of", "on", "or", "that", "the", "these", "this", "to", "using", "what", "when", "where", "why", "with", } def _tokenize_query(text: str) -> List[str]: tokens = [t for t in re.findall(r"[a-z0-9]+", text.lower()) if t] filtered: List[str] = [] for token in tokens: if token in _STOPWORDS: continue if len(token) <= 1: continue if token not in filtered: filtered.append(token) return filtered[:12] def _matches_site_prefix(candidate_url: str, site_url: str) -> bool: try: candidate = urlparse(candidate_url) site = urlparse(site_url) except Exception: return False if not candidate.netloc or candidate.netloc.lower() != site.netloc.lower(): return False site_path = site.path or "/" candidate_path = candidate.path or "/" site_path_norm = site_path.rstrip("/") candidate_path_norm = candidate_path.rstrip("/") if site_path_norm in ("", "/"): return True return candidate_path_norm == site_path_norm or candidate_path_norm.startswith( f"{site_path_norm}/" ) def _sitemap_candidates(site_url: str) -> List[str]: parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return [] origin = f"{parsed.scheme}://{parsed.netloc}" site_base = site_url.rstrip("/") candidates = [ f"{origin}/sitemap.xml", f"{origin}/sitemap_index.xml", f"{origin}/sitemap-index.xml", ] if site_base != origin: candidates.extend( [ f"{site_base}/sitemap.xml", f"{site_base}/sitemap_index.xml", ] ) # Deduplicate while preserving order. seen = set() unique: List[str] = [] for item in candidates: if item not in seen: unique.append(item) seen.add(item) return unique def _mkdocs_index_candidates(site_url: str) -> List[str]: parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return [] origin = f"{parsed.scheme}://{parsed.netloc}/" base = site_url.rstrip("/") + "/" candidates = [ urljoin(base, "search/search_index.json"), urljoin(base, "search_index.json"), ] if base != origin: candidates.extend( [ urljoin(origin, "search/search_index.json"), urljoin(origin, "search_index.json"), ] ) seen: set[str] = set() unique: List[str] = [] for item in candidates: if item not in seen: unique.append(item) seen.add(item) return unique def _mkdocs_base_from_index_url(index_url: str) -> str: suffixes = ("/search/search_index.json", "/search_index.json") for suffix in suffixes: if index_url.endswith(suffix): return index_url[: -len(suffix)] + "/" return urljoin(index_url, "./") def _sphinx_index_candidates(site_url: str) -> List[str]: parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return [] origin = f"{parsed.scheme}://{parsed.netloc}/" base = site_url.rstrip("/") + "/" candidates = [urljoin(base, "searchindex.js")] if base != origin: candidates.append(urljoin(origin, "searchindex.js")) seen: set[str] = set() unique: List[str] = [] for item in candidates: if item not in seen: unique.append(item) seen.add(item) return unique def _sphinx_base_from_index_url(index_url: str) -> str: if index_url.endswith("/searchindex.js"): return index_url[: -len("/searchindex.js")] + "/" if index_url.endswith("searchindex.js"): return index_url[: -len("searchindex.js")] return urljoin(index_url, "./") async def _fetch_bytes( client: httpx.AsyncClient, url: str, *, user_agent: str, timeout_seconds: float ) -> Optional[bytes]: try: response = await client.get( url, headers={"User-Agent": user_agent}, timeout=httpx.Timeout(timeout_seconds), follow_redirects=True, ) if response.status_code >= 400: return None return response.content except Exception: return None def _maybe_decompress_gzip(blob: bytes) -> bytes: # Some sitemaps are served as *.gz without Content-Encoding headers. if len(blob) >= 2 and blob[0] == 0x1F and blob[1] == 0x8B: try: return gzip.decompress(blob) except Exception: return blob return blob def _xml_root_tag(root: ET.Element) -> str: if "}" in root.tag: return root.tag.split("}", 1)[1] return root.tag def _parse_sitemap_xml(blob: bytes) -> Tuple[List[str], List[str]]: """ Returns (urls, child_sitemaps). """ try: root = ET.fromstring(blob) except Exception: return [], [] tag = _xml_root_tag(root) if tag == "urlset": urls = [ (loc.text or "").strip() for loc in root.findall(".//{*}url/{*}loc") if (loc.text or "").strip() ] return urls, [] if tag == "sitemapindex": sitemaps = [ (loc.text or "").strip() for loc in root.findall(".//{*}sitemap/{*}loc") if (loc.text or "").strip() ] return [], sitemaps return [], [] async def _discover_sitemaps_from_robots( client: httpx.AsyncClient, site_url: str, *, user_agent: str ) -> List[str]: parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return [] robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" try: response = await client.get( robots_url, headers={"User-Agent": user_agent}, timeout=httpx.Timeout(10.0), follow_redirects=True, ) if response.status_code >= 400: return [] sitemaps = [] for line in response.text.splitlines(): if line.lower().startswith("sitemap:"): sitemap = line.split(":", 1)[1].strip() if sitemap: sitemaps.append(sitemap) return sitemaps except Exception: return [] async def _load_site_sitemap_urls( client: httpx.AsyncClient, site_url: str, *, user_agent: str, allow_html_fallback: bool = True, ) -> List[str]: sitemap_urls = await _discover_sitemaps_from_robots( client, site_url, user_agent=user_agent ) sitemap_urls.extend(_sitemap_candidates(site_url)) visited_sitemaps = set() sitemap_queue = [] for sitemap_url in sitemap_urls: if sitemap_url and sitemap_url not in visited_sitemaps: sitemap_queue.append(sitemap_url) visited_sitemaps.add(sitemap_url) discovered_urls: List[str] = [] seen_urls = set() while sitemap_queue and len(visited_sitemaps) <= _MAX_SITEMAPS_TO_FETCH: sitemap_url = sitemap_queue.pop(0) blob = await _fetch_bytes( client, sitemap_url, user_agent=user_agent, timeout_seconds=15.0 ) if not blob: continue blob = _maybe_decompress_gzip(blob) urls, child_sitemaps = _parse_sitemap_xml(blob) for url in urls: if url in seen_urls: continue seen_urls.add(url) discovered_urls.append(url) if len(discovered_urls) >= _MAX_SITEMAP_URLS: return discovered_urls for child in child_sitemaps: if child in visited_sitemaps: continue if len(visited_sitemaps) >= _MAX_SITEMAPS_TO_FETCH: break visited_sitemaps.add(child) sitemap_queue.append(child) if discovered_urls: return discovered_urls if not allow_html_fallback: return [] return await _discover_urls_from_html_links(client, site_url, user_agent=user_agent) async def _discover_urls_from_html_links( client: httpx.AsyncClient, site_url: str, *, user_agent: str ) -> List[str]: """Discover internal links from the site's HTML when no sitemap is available.""" parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return [] origin = f"{parsed.scheme}://{parsed.netloc}" base_for_join = site_url.rstrip("/") + "/" html_blob = await _fetch_bytes( client, site_url, user_agent=user_agent, timeout_seconds=15.0 ) if not html_blob: return [] try: html_text = html_blob.decode("utf-8", errors="ignore") except Exception: return [] soup = BeautifulSoup(html_text, "html.parser") discovered_from_html: List[str] = [] seen_html_urls: set[str] = set() def _is_asset_path(path: str) -> bool: return bool( re.search( r"\.(?:png|jpe?g|gif|svg|webp|css|js|map|ico|woff2?|ttf|otf|eot|pdf|zip|gz)$", path.lower(), ) ) for anchor in soup.find_all("a", href=True): href = str(anchor.get("href") or "").strip() if not href: continue lower = href.lower() if lower.startswith(("#", "mailto:", "javascript:", "tel:")): continue absolute = urljoin(base_for_join, href) parsed_link = urlparse(absolute) if parsed_link.scheme not in {"http", "https"}: continue if parsed_link.netloc.lower() != parsed.netloc.lower(): continue if _is_asset_path(parsed_link.path or ""): continue sanitized = parsed_link._replace(query="", fragment="").geturl() if not sanitized.startswith(origin): continue if sanitized in seen_html_urls: continue seen_html_urls.add(sanitized) discovered_from_html.append(sanitized) if len(discovered_from_html) >= _MAX_SITEMAP_URLS: break return discovered_from_html def _extract_text_snippet(text: str, tokens: Sequence[str]) -> str: cleaned = re.sub(r"\s+", " ", text).strip() if not cleaned: return "" if not tokens: return cleaned[:240] lower = cleaned.lower() best_idx: Optional[int] = None for token in tokens: idx = lower.find(token) if idx == -1: continue if best_idx is None or idx < best_idx: best_idx = idx if best_idx is None: return cleaned[:240] start = max(0, best_idx - 80) end = min(len(cleaned), best_idx + 160) return cleaned[start:end].strip()[:240] def _score_urls(urls: Iterable[str], tokens: Sequence[str]) -> List[Tuple[int, str]]: scored: List[Tuple[int, str]] = [] if not tokens: return [(1, url) for url in urls] for url in urls: url_lower = url.lower() score = 0 for token in tokens: if token not in url_lower: continue score += 1 # Boost for segment-level matches. path = urlparse(url).path.lower() segments = [seg for seg in re.split(r"[/._-]+", path) if seg] if token in segments: score += 6 else: score += 2 if score > 0: scored.append((score, url)) scored.sort(key=lambda item: (-item[0], len(item[1]))) return scored def _fallback_title_from_url(url: str) -> str: parsed = urlparse(url) segment = (parsed.path or "/").rstrip("/").split("/")[-1] segment = re.sub(r"\.[a-z0-9]+$", "", segment, flags=re.IGNORECASE) segment = segment.replace("-", " ").replace("_", " ").strip() if not segment: return url return segment[:1].upper() + segment[1:] def _extract_page_snippet(soup: BeautifulSoup, tokens: Sequence[str]) -> str: for meta_name in ("description", "og:description"): meta = soup.find("meta", attrs={"name": meta_name}) or soup.find( "meta", attrs={"property": meta_name} ) if meta and meta.get("content"): return str(meta["content"]).strip()[:240] text = soup.get_text(" ", strip=True) if not text: return "" if not tokens: return text[:240] text_lower = text.lower() for token in tokens: idx = text_lower.find(token) if idx == -1: continue start = max(0, idx - 80) end = min(len(text), idx + 160) snippet = text[start:end].strip() return snippet[:240] return text[:240] async def _fetch_result_metadata( client: httpx.AsyncClient, url: str, *, user_agent: str, tokens: Sequence[str] ) -> Dict[str, str]: try: response = await client.get( url, headers={"User-Agent": user_agent}, timeout=httpx.Timeout(12.0), follow_redirects=True, ) if response.status_code >= 400: return {"title": _fallback_title_from_url(url), "snippet": ""} soup = BeautifulSoup(response.text, "html.parser") title = (soup.title.string or "").strip() if soup.title else "" if not title: title = _fallback_title_from_url(url) snippet = _extract_page_snippet(soup, tokens) return {"title": title, "snippet": snippet} except Exception: return {"title": _fallback_title_from_url(url), "snippet": ""} async def _get_cached_index( client: httpx.AsyncClient, index_url: str, *, user_agent: str, kind: str, timeout_seconds: float, ) -> Optional[Any]: now = datetime.now() cache_entry = _index_cache.get(index_url) stale_payload: Optional[Any] = None if cache_entry and cache_entry.kind == kind: stale_payload = cache_entry.payload if ( cache_entry and cache_entry.kind == kind and now - cache_entry.fetched_at <= _INDEX_CACHE_TTL ): return cache_entry.payload lock = _index_locks.setdefault(index_url, asyncio.Lock()) async with lock: cache_entry = _index_cache.get(index_url) if ( cache_entry and cache_entry.kind == kind and now - cache_entry.fetched_at <= _INDEX_CACHE_TTL ): return cache_entry.payload if cache_entry and cache_entry.kind == kind: stale_payload = cache_entry.payload blob = await _fetch_bytes( client, index_url, user_agent=user_agent, timeout_seconds=timeout_seconds ) if (not blob or len(blob) > _MAX_INDEX_BYTES) and stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload if not blob or len(blob) > _MAX_INDEX_BYTES: return None payload: Any if kind == "mkdocs": try: raw = json.loads(blob.decode("utf-8")) except Exception: if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload docs = raw.get("docs") if not isinstance(docs, list): if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload prepared = [] for doc in docs: if not isinstance(doc, dict): continue location = str(doc.get("location") or "").strip() title = str(doc.get("title") or "").strip() text = str(doc.get("text") or "").strip() if len(text) > _MAX_INDEX_DOC_TEXT_CHARS: text = text[:_MAX_INDEX_DOC_TEXT_CHARS] if not location: continue prepared.append({"location": location, "title": title, "text": text}) payload = tuple(prepared) elif kind == "sphinx": try: text = blob.decode("utf-8", errors="ignore") except Exception: if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload marker = "Search.setIndex(" idx = text.find(marker) if idx == -1: if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload start = text.find("{", idx) end = text.rfind("}") if start == -1 or end == -1 or end <= start: if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload json_text = text[start : end + 1] try: payload = json.loads(json_text) except Exception: if stale_payload is not None: _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=stale_payload ) return stale_payload else: return None _index_cache[index_url] = _IndexCacheEntry( fetched_at=datetime.now(), kind=kind, payload=payload ) return payload def _score_document(url: str, title: str, text: str, tokens: Sequence[str]) -> int: if not tokens: return 1 url_lower = url.lower() title_lower = title.lower() text_lower = text.lower() score = 0 for token in tokens: if token in title_lower: score += 25 if token in url_lower: score += 6 occurrences = text_lower.count(token) if occurrences: score += 8 + min(occurrences, 20) return score async def _gather_with_limit( coros: Sequence[Awaitable[Any]], *, concurrency: int ) -> List[Any]: if concurrency <= 1: results: List[Any] = [] for coro in coros: results.append(await coro) return results semaphore = asyncio.Semaphore(concurrency) async def _runner(coro: Awaitable[Any]) -> Any: async with semaphore: return await coro return await asyncio.gather( *[_runner(coro) for coro in coros], return_exceptions=True ) async def _search_via_mkdocs_index( site_url: str, tokens: Sequence[str], client: httpx.AsyncClient, *, user_agent: str, num_results: int, allow_network: bool, ) -> Optional[List[Dict[str, str]]]: for index_url in _mkdocs_index_candidates(site_url): if allow_network: docs = await _get_cached_index( client, index_url, user_agent=user_agent, kind="mkdocs", timeout_seconds=20.0, ) else: docs = _get_cached_index_from_memory(index_url, kind="mkdocs") if not docs: continue base_url = _mkdocs_base_from_index_url(index_url) scored: List[Tuple[int, Dict[str, str]]] = [] for doc in docs: location = str(doc.get("location") or "") url = urljoin(base_url, location) if not _matches_site_prefix(url, site_url): continue title = str(doc.get("title") or "") or _fallback_title_from_url(url) text = str(doc.get("text") or "") score = _score_document(url, title, text, tokens) if score <= 0: continue snippet = _extract_text_snippet(text, tokens) scored.append((score, {"link": url, "title": title, "snippet": snippet})) scored.sort(key=lambda item: (-item[0], len(item[1]["link"]))) organic = [item[1] for item in scored[: max(num_results, 1)]] if organic: return organic return None def _coerce_sphinx_doc_hits(entry: Any) -> List[Tuple[int, int]]: """Return a list of (doc_id, weight) pairs.""" if not isinstance(entry, list): return [] if not entry: return [] if all(isinstance(item, int) for item in entry): return [(item, 1) for item in entry] hits: List[Tuple[int, int]] = [] for item in entry: if isinstance(item, int): hits.append((item, 1)) continue if isinstance(item, (list, tuple)) and item and isinstance(item[0], int): weight = 1 if len(item) > 1 and isinstance(item[1], int): weight = max(item[1], 1) hits.append((item[0], weight)) return hits async def _search_via_sphinx_index( site_url: str, tokens: Sequence[str], client: httpx.AsyncClient, *, user_agent: str, num_results: int, fetch_text: Optional[Callable[[str], Awaitable[str]]], fetch_text_concurrency: int, allow_network: bool, ) -> Optional[List[Dict[str, str]]]: for index_url in _sphinx_index_candidates(site_url): if allow_network: index = await _get_cached_index( client, index_url, user_agent=user_agent, kind="sphinx", timeout_seconds=20.0, ) else: index = _get_cached_index_from_memory(index_url, kind="sphinx") if not isinstance(index, dict): continue filenames = index.get("filenames") titles = index.get("titles") terms = index.get("terms") titleterms = index.get("titleterms") if not ( isinstance(filenames, list) and isinstance(titles, list) and isinstance(terms, dict) ): continue base_url = _sphinx_base_from_index_url(index_url) scores: Dict[int, int] = {} for token in tokens: for doc_id, weight in _coerce_sphinx_doc_hits(terms.get(token)): scores[doc_id] = scores.get(doc_id, 0) + (10 * weight) if isinstance(titleterms, dict): for doc_id, weight in _coerce_sphinx_doc_hits(titleterms.get(token)): scores[doc_id] = scores.get(doc_id, 0) + (20 * weight) ranked_doc_ids = sorted(scores.items(), key=lambda item: -item[1]) hits: List[Tuple[int, str]] = [] for doc_id, _ in ranked_doc_ids: if not isinstance(doc_id, int) or doc_id < 0 or doc_id >= len(filenames): continue url = urljoin(base_url, str(filenames[doc_id])) if not _matches_site_prefix(url, site_url): continue hits.append((doc_id, url)) if len(hits) >= max(num_results, 1): break if not hits: continue urls = [url for _, url in hits] snippets_by_url: Dict[str, str] = {url: "" for url in urls} if allow_network and fetch_text and tokens: texts = await _gather_with_limit( [fetch_text(url) for url in urls], concurrency=fetch_text_concurrency ) for url, text in zip(urls, texts): if isinstance(text, Exception): continue snippets_by_url[url] = _extract_text_snippet(str(text), tokens) elif allow_network and tokens: metadatas = await asyncio.gather( *[ _fetch_result_metadata( client, url, user_agent=user_agent, tokens=tokens ) for url in urls ], return_exceptions=True, ) for url, metadata in zip(urls, metadatas): if isinstance(metadata, Exception): continue snippets_by_url[url] = metadata.get("snippet", "") organic: List[Dict[str, str]] = [] for doc_id, url in hits: title = _fallback_title_from_url(url) if doc_id < len(titles) and titles[doc_id]: title = str(titles[doc_id]) organic.append( { "link": url, "title": title, "snippet": snippets_by_url.get(url, ""), } ) if organic: return organic return None async def search_site_via_sitemap( query: str, client: httpx.AsyncClient, *, user_agent: str, num_results: int = 5, fetch_text: Optional[Callable[[str], Awaitable[str]]] = None, fetch_text_concurrency: int = _DEFAULT_CONTENT_FETCH_CONCURRENCY, allow_network: bool = True, ) -> Dict[str, Any]: """ Perform a Serper-free search for `site:` queries. Returns a Serper-like payload: {"organic": [{"link","title","snippet"}, ...]}. """ site_url, terms = _parse_site_query(query) if not site_url: return {"organic": []} parsed = urlparse(site_url) if not parsed.scheme or not parsed.netloc: return {"organic": []} origin = f"{parsed.scheme}://{parsed.netloc}" tokens = _tokenize_query(terms) # 1) Prefer docs-native search indexes when present. organic = await _search_via_mkdocs_index( site_url, tokens, client, user_agent=user_agent, num_results=num_results, allow_network=allow_network, ) if not organic: organic = await _search_via_sphinx_index( site_url, tokens, client, user_agent=user_agent, num_results=num_results, fetch_text=fetch_text, fetch_text_concurrency=fetch_text_concurrency, allow_network=allow_network, ) if organic: return {"organic": organic} # 2) Fallback: sitemap discovery + ranking. cached_urls = _get_cached_sitemap_urls_from_memory(origin, allow_stale=False) if cached_urls is not None: all_urls = cached_urls else: if not allow_network: stale_cached = _get_cached_sitemap_urls_from_memory( origin, allow_stale=True ) if stale_cached is None: return {"organic": []} all_urls = stale_cached else: lock = _sitemap_locks.setdefault(origin, asyncio.Lock()) async with lock: cached_urls = _get_cached_sitemap_urls_from_memory( origin, allow_stale=False ) if cached_urls is not None: all_urls = cached_urls else: loaded = await _load_site_sitemap_urls( client, site_url, user_agent=user_agent, allow_html_fallback=False, ) if loaded: _sitemap_cache[origin] = _SitemapCacheEntry( fetched_at=datetime.now(), urls=tuple(loaded) ) all_urls = loaded else: stale_cached = _get_cached_sitemap_urls_from_memory( origin, allow_stale=True ) if stale_cached is not None: existing = _sitemap_cache.get(origin) if existing and existing.urls: _sitemap_cache[origin] = _SitemapCacheEntry( fetched_at=datetime.now(), urls=existing.urls ) all_urls = stale_cached else: discovered = await _discover_urls_from_html_links( client, site_url, user_agent=user_agent ) if not discovered: return {"organic": []} _sitemap_cache[origin] = _SitemapCacheEntry( fetched_at=datetime.now(), urls=tuple(discovered) ) all_urls = discovered candidates = [u for u in all_urls if _matches_site_prefix(u, site_url)] scored = _score_urls(candidates, tokens) # Preselect candidates (URL-based), then optionally rescore using page text. preselect_limit = min(12, max(6, max(num_results, 1) * 2)) if scored: preselect_urls = [url for _, url in scored[:preselect_limit]] url_scores = {url: score for score, url in scored[:preselect_limit]} else: preselect_urls = sorted(candidates, key=len)[:preselect_limit] url_scores = {url: 0 for url in preselect_urls} if fetch_text and tokens and preselect_urls: texts = await _gather_with_limit( [fetch_text(url) for url in preselect_urls], concurrency=fetch_text_concurrency, ) rescored: List[Tuple[int, str, str, str]] = [] for url, text in zip(preselect_urls, texts): title = _fallback_title_from_url(url) if isinstance(text, Exception): rescored.append((url_scores.get(url, 0), url, title, "")) continue snippet = _extract_text_snippet(str(text), tokens) content_score = _score_document(url, title, str(text), tokens) total = url_scores.get(url, 0) + content_score rescored.append((total, url, title, snippet)) rescored.sort(key=lambda item: (-item[0], len(item[1]))) organic = [ {"link": url, "title": title, "snippet": snippet} for _, url, title, snippet in rescored[: max(num_results, 1)] ] return {"organic": organic} top_urls = preselect_urls[: max(num_results, 1)] if not top_urls: return {"organic": []} if not allow_network: return { "organic": [ {"link": url, "title": _fallback_title_from_url(url), "snippet": ""} for url in top_urls ] } tasks = [ _fetch_result_metadata(client, url, user_agent=user_agent, tokens=tokens) for url in top_urls ] metadatas = await asyncio.gather(*tasks, return_exceptions=True) organic: List[Dict[str, str]] = [] for url, metadata in zip(top_urls, metadatas): if isinstance(metadata, Exception): organic.append( { "link": url, "title": _fallback_title_from_url(url), "snippet": "", } ) else: organic.append( { "link": url, "title": metadata.get("title", _fallback_title_from_url(url)), "snippet": metadata.get("snippet", ""), } ) return {"organic": organic}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anton-prosterity/documentation-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server