Skip to main content
Glama
elad12390
by elad12390

extract_data

Extract structured data from web pages including tables, lists, JSON-LD, or specific fields using CSS selectors for efficient web research.

Instructions

Extract structured data from web pages. Extracts tables, lists, or specific fields from HTML pages and returns structured data. Much more efficient than parsing full page text. Extract Types: - "table": Extract HTML tables as list of dicts - "list": Extract lists (ul/ol/dl) as structured list - "fields": Extract specific elements using CSS selectors - "json-ld": Extract JSON-LD structured data - "auto": Automatically detect and extract structured content Examples: - extract_data("https://pypi.org/project/fastapi/", reasoning="Get package info") - extract_data("https://github.com/user/repo/releases", reasoning="Get releases", extract_type="list") - extract_data( "https://example.com/product", reasoning="Extract product details", extract_type="fields", selectors={"price": ".price", "title": "h1.product-name"} )

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
urlYes
reasoningYes
extract_typeNoauto
selectorsNo
max_itemsNo

Implementation Reference

  • The primary handler function for the 'extract_data' MCP tool. It fetches raw HTML from the URL using CrawlerClient, then uses DataExtractor to perform the specified extraction (tables, lists, fields, JSON-LD, or auto), formats as JSON, and returns the structured data.
    async def extract_data( url: Annotated[str, "HTTP(S) URL to extract structured data from"], reasoning: Annotated[str, "Why you're extracting data from this URL (required for analytics)"], extract_type: Annotated[ Literal["table", "list", "fields", "json-ld", "auto"], 'Extraction type: "table", "list", "fields", "json-ld", or "auto"', ] = "auto", selectors: Annotated[ dict[str, str] | None, "CSS selectors for field extraction (only used with extract_type='fields')", ] = None, max_items: Annotated[int, "Maximum number of items to extract"] = 100, ) -> str: """ Extract structured data from web pages. Extracts tables, lists, or specific fields from HTML pages and returns structured data. Much more efficient than parsing full page text. Extract Types: - "table": Extract HTML tables as list of dicts - "list": Extract lists (ul/ol/dl) as structured list - "fields": Extract specific elements using CSS selectors - "json-ld": Extract JSON-LD structured data - "auto": Automatically detect and extract structured content Examples: - extract_data("https://pypi.org/project/fastapi/", reasoning="Get package info") - extract_data("https://github.com/user/repo/releases", reasoning="Get releases", extract_type="list") - extract_data( "https://example.com/product", reasoning="Extract product details", extract_type="fields", selectors={"price": ".price", "title": "h1.product-name"} ) """ import json start_time = time.time() success = False error_msg = None result = "" try: # Fetch raw HTML html = await crawler_client.fetch_raw(url) # Extract based on type if extract_type == "table": tables = data_extractor.extract_tables(html, max_tables=max_items) extracted_data = { "type": "table", "tables": [ { "caption": t.caption, "headers": t.headers, "rows": t.rows[:max_items], } for t in tables ], "source": url, } elif extract_type == "list": lists = data_extractor.extract_lists(html, max_lists=max_items) extracted_data = { "type": "list", "lists": [{"title": li.title, "items": li.items[:max_items]} for li in lists], "source": url, } elif extract_type == "fields": if not selectors: raise ValueError("selectors parameter is required for extract_type='fields'") fields = data_extractor.extract_fields(html, selectors) extracted_data = {"type": "fields", "data": fields, "source": url} elif extract_type == "json-ld": json_ld = data_extractor.extract_json_ld(html) extracted_data = {"type": "json-ld", "data": json_ld, "source": url} else: # auto auto_data = data_extractor.auto_extract(html) extracted_data = {"type": "auto", "data": auto_data, "source": url} # Format result result = json.dumps(extracted_data, indent=2, ensure_ascii=False) result = clamp_text(result, MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Data extraction failed for {url}: {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="extract_data", reasoning=reasoning, parameters={ "url": url, "extract_type": extract_type, "has_selectors": selectors is not None, "max_items": max_items, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result
  • Core helper class implementing the extraction logic called by the extract_data handler. Provides methods for extracting tables, lists, custom fields via CSS selectors, JSON-LD, and automatic detection of structured content.
    class DataExtractor: """Extract structured data from HTML.""" def extract_tables(self, html: str, max_tables: int = 5) -> list[TableData]: """Extract HTML tables. Args: html: Raw HTML content max_tables: Maximum number of tables to extract Returns: List of TableData objects with caption, headers, and rows """ soup = BeautifulSoup(html, "html.parser") tables = [] for table in soup.find_all("table")[:max_tables]: # Extract caption caption_elem = table.find("caption") caption = _sanitize_text(caption_elem.get_text(strip=True)) if caption_elem else None # Extract headers headers = [] header_row = table.find("thead") if header_row: headers = [ _sanitize_text(th.get_text(strip=True)) for th in header_row.find_all("th") ] else: # Try first row first_row = table.find("tr") if first_row: headers = [ _sanitize_text(th.get_text(strip=True)) for th in first_row.find_all("th") ] # If no headers found, use generic column names if not headers: first_row = table.find("tr") if first_row: num_cols = len(first_row.find_all(["td", "th"])) headers = [f"Column {i + 1}" for i in range(num_cols)] if not headers: continue # Extract rows rows = [] tbody = table.find("tbody") row_elements = ( tbody.find_all("tr") if tbody else table.find_all("tr")[1:] ) # Skip header row if no tbody for tr in row_elements: cells = tr.find_all(["td", "th"]) if cells and len(cells) == len(headers): row_dict = {} for i, cell in enumerate(cells): row_dict[headers[i]] = _sanitize_text(cell.get_text(strip=True)) rows.append(row_dict) if rows: tables.append(TableData(caption=caption, headers=headers, rows=rows)) return tables def extract_lists(self, html: str, max_lists: int = 5) -> list[ListData]: """Extract HTML lists (ul, ol, dl). Args: html: Raw HTML content max_lists: Maximum number of lists to extract Returns: List of ListData objects with title and items """ soup = BeautifulSoup(html, "html.parser") lists = [] for list_elem in soup.find_all(["ul", "ol", "dl"])[:max_lists]: # Try to find a title (preceding heading) title = None prev = list_elem.find_previous(["h1", "h2", "h3", "h4", "h5", "h6"]) if prev: title = _sanitize_text(prev.get_text(strip=True)) # Extract items items = [] if list_elem.name in ["ul", "ol"]: for li in list_elem.find_all("li", recursive=False): items.append(_sanitize_text(li.get_text(strip=True))) else: # dl for dt in list_elem.find_all("dt"): dd = dt.find_next_sibling("dd") if dd: items.append( f"{_sanitize_text(dt.get_text(strip=True))}: {_sanitize_text(dd.get_text(strip=True))}" ) if items: lists.append( ListData( title=title, items=items, nested=False, # TODO: detect nested lists ) ) return lists def extract_fields(self, html: str, selectors: dict[str, str]) -> dict[str, str | list[str]]: """Extract specific fields using CSS selectors. Args: html: Raw HTML content selectors: Dict mapping field names to CSS selectors Returns: Dict with extracted field values (single string or list of strings) """ soup = BeautifulSoup(html, "html.parser") data: dict[str, str | list[str]] = {} for field_name, selector in selectors.items(): elements = soup.select(selector) if elements: if len(elements) == 1: data[field_name] = _sanitize_text(elements[0].get_text(strip=True)) else: data[field_name] = [_sanitize_text(el.get_text(strip=True)) for el in elements] return data def extract_json_ld(self, html: str) -> list[dict[str, Any]]: """Extract JSON-LD structured data. Args: html: Raw HTML content Returns: List of JSON-LD objects found in the page """ soup = BeautifulSoup(html, "html.parser") json_ld_scripts = soup.find_all("script", type="application/ld+json") data = [] for script in json_ld_scripts: try: if script.string: parsed = json.loads(script.string) data.append(parsed) except json.JSONDecodeError: pass return data def auto_extract(self, html: str) -> dict[str, Any]: """Automatically detect and extract structured content. Args: html: Raw HTML content Returns: Dict containing all detected structured data (tables, lists, json_ld) """ results: dict[str, Any] = {"tables": [], "lists": [], "json_ld": []} # Try JSON-LD first (highest quality) json_ld = self.extract_json_ld(html) if json_ld: results["json_ld"] = json_ld # Extract tables tables = self.extract_tables(html, max_tables=3) if tables: results["tables"] = [ {"caption": t.caption, "headers": t.headers, "rows": t.rows} for t in tables ] # Extract lists lists = self.extract_lists(html, max_lists=3) if lists: results["lists"] = [{"title": li.title, "items": li.items} for li in lists] return results
  • Uses CrawlerClient.fetch_raw() to obtain raw HTML content before extraction. CrawlerClient is imported from .crawler.
    from .registry import PackageInfo, PackageRegistryClient from .search import SearxSearcher from .service_health import ServiceHealthChecker from .tracking import get_tracker mcp = FastMCP("web-research-assistant") searcher = SearxSearcher() crawler_client = CrawlerClient() registry_client = PackageRegistryClient() github_client = GitHubClient() pixabay_client = PixabayClient() error_parser = ErrorParser() api_docs_detector = APIDocsDetector() api_docs_extractor = APIDocsExtractor() data_extractor = DataExtractor() tech_comparator = TechComparator(searcher, github_client, registry_client) changelog_fetcher = ChangelogFetcher(github_client, registry_client) service_health_checker = ServiceHealthChecker(crawler_client) tracker = get_tracker() def _format_search_hits(hits): lines = [] for idx, hit in enumerate(hits, 1): snippet = f"\n{hit.snippet}" if hit.snippet else "" lines.append(f"{idx}. {hit.title} — {hit.url}{snippet}") body = "\n\n".join(lines) return clamp_text(body, MAX_RESPONSE_CHARS) @mcp.tool() async def web_search( query: Annotated[str, "Natural-language web query"], reasoning: Annotated[str, "Why you're using this tool (required for analytics)"], category: Annotated[ str, "Optional SearXNG category (general, images, news, it, science, etc.)" ] = DEFAULT_CATEGORY, max_results: Annotated[int, "How many ranked hits to return (1-10)"] = DEFAULT_MAX_RESULTS, ) -> str: """Use this first to gather fresh web search results via the local SearXNG instance.""" start_time = time.time() success = False error_msg = None result = "" try: hits = await searcher.search(query, category=category, max_results=max_results) if not hits: result = f"No results for '{query}' in category '{category}'." else: result = _format_search_hits(hits) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Search failed: {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 # Convert to ms tracker.track_usage( tool_name="web_search", reasoning=reasoning, parameters={ "query": query, "category": category, "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def crawl_url( url: Annotated[str, "HTTP(S) URL (ideally from web_search output)"], reasoning: Annotated[str, "Why you're crawling this URL (required for analytics)"], max_chars: Annotated[int, "Trim textual result to this many characters"] = CRAWL_MAX_CHARS, ) -> str: """Fetch a URL with crawl4ai when you need the actual page text for quoting or analysis.""" start_time = time.time() success = False error_msg = None result = "" try: text = await crawler_client.fetch(url, max_chars=max_chars) result = clamp_text(text, MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Crawl failed for {url}: {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="crawl_url", reasoning=reasoning, parameters={"url": url, "max_chars": max_chars}, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result def _format_package_info(info: PackageInfo) -> str: """Format PackageInfo into readable text response.""" lines = [ f"Package: {info.name} ({info.registry})", "─" * 50, f"Version: {info.version}", ] if info.license: lines.append(f"License: {info.license}") if info.downloads: lines.append(f"Downloads: {info.downloads}") lines.append(f"Last Updated: {info.last_updated}") if info.dependencies_count is not None: lines.append(f"Dependencies: {info.dependencies_count}") if info.security_issues > 0: lines.append(f"⚠️ Security Issues: {info.security_issues}") else: lines.append("Security: ✅ No known vulnerabilities") lines.append("") # blank line if info.repository: lines.append(f"Repository: {info.repository}") if info.homepage and info.homepage != info.repository: lines.append(f"Homepage: {info.homepage}") if info.description: lines.append(f"\nDescription: {info.description}") return "\n".join(lines) @mcp.tool() async def package_info( name: Annotated[str, "Package or module name to look up"], reasoning: Annotated[str, "Why you're looking up this package (required for analytics)"], registry: Annotated[ Literal["npm", "pypi", "crates", "go"], "Package registry to search (npm, pypi, crates, go)", ] = "npm", ) -> str: """ Look up package information from npm, PyPI, crates.io, or Go modules. Returns version, downloads, license, dependencies, security status, and repository links. Use this to quickly evaluate libraries before adding them to your project. Examples: - package_info("express", reasoning="Need web framework", registry="npm") - package_info("requests", reasoning="HTTP client for API", registry="pypi") - package_info("serde", reasoning="JSON serialization", registry="crates") """ start_time = time.time() success = False error_msg = None result = "" try: if registry == "npm": info = await registry_client.search_npm(name) elif registry == "pypi": info = await registry_client.search_pypi(name) elif registry == "crates": info = await registry_client.search_crates(name) else: # go info = await registry_client.search_go(name) result = clamp_text(_format_package_info(info), MAX_RESPONSE_CHARS) success = True except httpx.HTTPStatusError as exc: error_msg = f"HTTP {exc.response.status_code}" if exc.response.status_code == 404: result = f"Package '{name}' not found on {registry}.\n\nDouble-check the package name and try again." else: result = f"Failed to fetch {registry} package '{name}': HTTP {exc.response.status_code}" except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Failed to fetch {registry} package '{name}': {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="package_info", reasoning=reasoning, parameters={"name": name, "registry": registry}, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def search_examples( query: Annotated[ str, "What you're looking for (e.g., 'Python async await examples', 'React hooks tutorial', 'Rust error handling patterns')", ], reasoning: Annotated[str, "Why you're searching for examples (required for analytics)"], content_type: Annotated[ Literal["code", "articles", "both"], "Type of content to find: 'code' for code examples, 'articles' for tutorials/blogs, 'both' for mixed results", ] = "both", time_range: Annotated[ Literal["day", "week", "month", "year", "all"], "How recent the content should be (use 'all' for best results, filter down if too many results)", ] = "all", max_results: Annotated[int, "How many results to return (1-10)"] = DEFAULT_MAX_RESULTS, ) -> str: """ Search for code examples, tutorials, and technical articles. Optimized for finding practical examples and learning resources. Can optionally filter by time range for the most recent content. Perfect for learning new APIs, finding usage patterns, or discovering how others solve specific technical problems. Content Types: - 'code': GitHub repos, code snippets, gists, Stack Overflow code examples - 'articles': Blog posts, tutorials, documentation, technical articles - 'both': Mix of code and written content (default) Time Ranges: - 'all': Search all available content (default, recommended for best results) - 'year', 'month', 'week', 'day': Filter to recent content only Examples: - search_examples("FastAPI dependency injection examples", content_type="code") - search_examples("React hooks tutorial", content_type="articles", time_range="year") - search_examples("Rust lifetime examples", content_type="both") """ start_time = time.time() success = False error_msg = None result = "" try: max_results = max(1, min(max_results, 10)) # Build optimized search query based on content type enhanced_query = query if content_type == "code": # Prioritize code-heavy sources enhanced_query = f"{query} (site:github.com OR site:stackoverflow.com OR site:gist.github.com OR example OR snippet)" elif content_type == "articles": # Prioritize articles and tutorials enhanced_query = ( f"{query} (tutorial OR guide OR article OR blog OR how to OR documentation)" ) else: # both enhanced_query = f"{query} (example OR tutorial OR guide)" # Use 'it' category for better tech content hits = await searcher.search( enhanced_query, category="it", # IT/tech category for better results max_results=max_results, time_range=time_range if time_range != "all" else None, ) if not hits: result = ( f"No examples found for '{query}' in the {time_range if time_range != 'all' else 'all time'} range.\n\n" "Try:\n" "- Broader search terms\n" "- Different time range\n" "- Removing version numbers or very specific constraints\n\n" "Note: Results depend on your SearXNG instance configuration. If you're only seeing MDN docs,\n" "your SearXNG may need additional search engines enabled (GitHub, Stack Overflow, dev.to, etc.)." ) else: # Format results with additional context lines = [ f"Code Examples & Articles for: {query}", f"Time Range: {time_range.title() if time_range != 'all' else 'All time'} | Content Type: {content_type.title()}", "─" * 50, "", ] for idx, hit in enumerate(hits, 1): # Add source indicator source = "" if "github.com" in hit.url: source = "[GitHub] " elif "stackoverflow.com" in hit.url: source = "[Stack Overflow] " elif "medium.com" in hit.url or "dev.to" in hit.url: source = "[Article] " snippet = f"\n {hit.snippet}" if hit.snippet else "" lines.append(f"{idx}. {source}{hit.title}") lines.append(f" {hit.url}{snippet}") lines.append("") result_text = "\n".join(lines) # Add note if results seem limited (all from same domain) domains = set() for hit in hits: if "://" in hit.url: domain = hit.url.split("://")[1].split("/")[0] domains.add(domain) if len(domains) == 1 and len(hits) > 2: result_text += ( "\n\n──────────────────────────────────────────────────\n" "ℹ️ Note: All results are from the same source. Your SearXNG instance may need\n" " additional search engines configured (GitHub, Stack Overflow, dev.to, Medium)\n" " to get diverse code examples and tutorials." ) result = clamp_text(result_text, MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Search failed for '{query}': {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="search_examples", reasoning=reasoning, parameters={ "query": query, "content_type": content_type, "time_range": time_range, "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def search_images( query: Annotated[ str, "What to search for (e.g., 'sunset beach', 'office workspace', 'technology abstract')", ], reasoning: Annotated[str, "Why you're searching for images (required for analytics)"], image_type: Annotated[ Literal["all", "photo", "illustration", "vector"], "Type of image to search for", ] = "all", orientation: Annotated[ Literal["all", "horizontal", "vertical"], "Image orientation preference", ] = "all", max_results: Annotated[int, "Maximum number of results (1-20)"] = 10, ) -> str: """ Search for high-quality stock images using Pixabay. Returns royalty-free images that are safe to use. Perfect for finding photos, illustrations, and vector graphics for projects, presentations, or design work. Image Types: - 'photo': Real photographs - 'illustration': Digital illustrations and artwork - 'vector': Vector graphics (SVG format available) - 'all': All types (default) Examples: - search_images("mountain landscape", image_type="photo") - search_images("business icons", image_type="vector") - search_images("technology background", orientation="horizontal") """ start_time = time.time() success = False error_msg = None result = "" try: # Check if API key is configured if not pixabay_client.has_api_key(): # Fallback: Use web search for images instead fallback_results = await searcher.search( f"{query} stock photo free", category="images", max_results=max_results, ) if fallback_results: lines = [ f"Image Search Results for: {query}", "(Using web search - configure PIXABAY_API_KEY for better stock photo results)", "─" * 70, "", ] for idx, hit in enumerate(fallback_results, 1): lines.append(f"{idx}. {hit.title}") lines.append(f" {hit.url}") if hit.snippet: lines.append(f" {hit.snippet[:100]}") lines.append("") lines.extend( [ "─" * 70, "For better stock photo results with resolution info:", "1. Get a free API key from: https://pixabay.com/api/docs/", "2. Set: PIXABAY_API_KEY=your_key_here", ] ) result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS) success = True # Mark as success since we provided useful results else: result = ( "⚠️ Pixabay API key not configured and web search returned no results.\n\n" "To enable full image search:\n" "1. Get a free API key from: https://pixabay.com/api/docs/\n" "2. Set the environment variable: PIXABAY_API_KEY=your_key_here\n" "3. Restart the MCP server" ) error_msg = "API key not configured" success = False else: max_results = max(1, min(max_results, 20)) images = await pixabay_client.search_images( query=query, image_type=image_type, orientation=orientation, per_page=max_results, ) if not images: result = f"No images found for '{query}'.\n\nTry:\n- Different search terms\n- Broader keywords\n- Different image type or orientation" else: # Format results lines = [ f"Stock Images for: {query}", f"Type: {image_type.title()} | Orientation: {orientation.title()} | Found: {len(images)} images", "─" * 70, "", ] for idx, img in enumerate(images, 1): lines.append(f"{idx}. {img.tags}") lines.append( f" Resolution: {img.width}x{img.height} | 👁️ {img.views:,} | ⬇️ {img.downloads:,} | ❤️ {img.likes:,}" ) lines.append(f" By: {img.user}") lines.append(f" Preview: {img.preview_url}") lines.append(f" Large: {img.large_url}") if img.full_url != img.large_url: lines.append(f" Full HD: {img.full_url}") lines.append("") result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS) success = True except ValueError as exc: # API key not configured error_msg = str(exc) result = f"⚠️ {exc}\n\nPlease configure your Pixabay API key as described above." except httpx.HTTPStatusError as exc: error_msg = f"HTTP {exc.response.status_code}" if exc.response.status_code == 400: result = f"Invalid search parameters for '{query}'. Please check your search terms and filters." elif exc.response.status_code == 429: result = "Rate limit exceeded. Please wait a moment and try again." else: result = f"Failed to search images: HTTP {exc.response.status_code}" except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Image search failed for '{query}': {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="search_images", reasoning=reasoning, parameters={ "query": query, "image_type": image_type, "orientation": orientation, "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result def _format_package_search_results(packages: list[PackageInfo], query: str, registry: str) -> str: """Format package search results into readable text.""" if not packages: return f"No packages found for '{query}' on {registry}.\n\nTry different keywords or check another registry." lines = [ f"Search Results for '{query}' on {registry}:", "─" * 50, ] for i, pkg in enumerate(packages, 1): lines.append(f"{i}. {pkg.name}") if pkg.version != "unknown": lines.append(f" Version: {pkg.version}") if pkg.downloads: lines.append(f" Downloads: {pkg.downloads}") if pkg.description: # Truncate long descriptions desc = pkg.description[:100] + "..." if len(pkg.description) > 100 else pkg.description lines.append(f" Description: {desc}") lines.append("") # blank line between results return "\n".join(lines) @mcp.tool() async def package_search( query: Annotated[ str, "Search keywords (e.g., 'web framework', 'json parser', 'machine learning')", ], reasoning: Annotated[str, "Why you're searching for packages (required for analytics)"], registry: Annotated[ Literal["npm", "pypi", "crates", "go"], "Package registry to search (npm, pypi, crates, go)", ] = "npm", max_results: Annotated[int, "Maximum number of results to return (1-10)"] = 5, ) -> str: """ Search for packages by keywords or description across registries. Use this to find packages that solve a specific problem or provide certain functionality. Perfect for discovering libraries when you know what you need but not the package name. Examples: - package_search("web framework", reasoning="Need backend framework", registry="npm") - package_search("json parsing", reasoning="Data processing", registry="pypi") """ start_time = time.time() success = False error_msg = None result = "" try: max_results = max(1, min(max_results, 10)) # Clamp to 1-10 packages = await registry_client.search_packages(query, registry, max_results) result = _format_package_search_results(packages, query, registry) result = clamp_text(result, MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Search failed for '{query}' on {registry}: {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="package_search", reasoning=reasoning, parameters={ "query": query, "registry": registry, "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result def _format_repo_info(info: RepoInfo, commits=None) -> str: """Format RepoInfo into readable text response.""" lines = [ f"Repository: {info.full_name}", "─" * 50, f"⭐ {info.stars:,} | 🍴 {info.forks:,} | 👁️ {info.watchers:,}", ] if info.language: lines.append(f"Language: {info.language}") if info.license: lines.append(f"License: {info.license}") lines.append(f"Last Updated: {info.last_updated}") # Issues and PRs issue_line = f"Open Issues: {info.open_issues}" if info.open_prs is not None: issue_line += f" | Open PRs: {info.open_prs}" lines.append(issue_line) if info.archived: lines.append("⚠️ This repository is archived (read-only)") if info.topics: lines.append(f"Topics: {', '.join(info.topics[:5])}") # Show first 5 topics lines.append("") # blank line if info.homepage: lines.append(f"Homepage: {info.homepage}") if commits: lines.append("Recent Commits:") for commit in commits[:3]: # Show top 3 lines.append(f"- [{commit.date}] {commit.message} ({commit.author})") lines.append(f"\nDescription: {info.description}") return "\n".join(lines) @mcp.tool() async def github_repo( repo: Annotated[str, "GitHub repository (owner/repo format or full URL)"], reasoning: Annotated[str, "Why you're checking this repository (required for analytics)"], include_commits: Annotated[bool, "Include recent commit history"] = True, ) -> str: """ Fetch GitHub repository information and health metrics. Returns stars, forks, issues, recent activity, language, license, and description. Use this to evaluate open source projects before using them. Examples: - github_repo("microsoft/vscode", reasoning="Evaluate editor project") - github_repo("https://github.com/facebook/react", reasoning="Research UI framework") """ start_time = time.time() success = False error_msg = None result = "" try: owner, repo_name = github_client.parse_repo_url(repo) # Get repo info repo_info = await github_client.get_repo_info(owner, repo_name) # Optionally get recent commits commits = None if include_commits: try: commits = await github_client.get_recent_commits(owner, repo_name, count=3) except Exception: # noqa: BLE001, S110 # Don't fail the whole request if commits fail pass result = clamp_text(_format_repo_info(repo_info, commits), MAX_RESPONSE_CHARS) success = True except httpx.HTTPStatusError as exc: error_msg = f"HTTP {exc.response.status_code}" if exc.response.status_code == 404: # Try to provide helpful suggestions suggestions = [] if "/" in repo: parts = repo.replace("https://github.com/", "").split("/") if len(parts) >= 2: owner_guess = parts[0] suggestions.append( f"- Check if '{owner_guess}' is the correct organization/user" ) suggestions.append(f"- The repository may have been renamed or deleted") suggestions.append(f"- Try searching: https://github.com/search?q={parts[1]}") result = ( f"Repository '{repo}' not found (HTTP 404).\n\n" f"Possible reasons:\n" f"- The repository doesn't exist or was deleted\n" f"- The repository is private\n" f"- There's a typo in the owner or repository name\n" ) if suggestions: result += f"\nSuggestions:\n" + "\n".join(suggestions) elif exc.response.status_code == 403: result = ( f"Access denied to repository '{repo}' (HTTP 403).\n\n" f"Possible reasons:\n" f"- The repository is private\n" f"- GitHub API rate limit exceeded\n" f"- Set GITHUB_TOKEN environment variable for higher rate limits" ) elif exc.response.status_code == 301: # This shouldn't happen anymore with our redirect handling, but just in case result = ( f"Repository '{repo}' has moved (HTTP 301).\n\n" f"The repository may have been renamed or transferred.\n" f"Try searching for the new location on GitHub." ) else: result = f"Failed to fetch repository '{repo}': HTTP {exc.response.status_code}" except ValueError as exc: error_msg = str(exc) result = str(exc) # Invalid repo format - already has good error message except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Failed to fetch repository '{repo}': {exc}" finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="github_repo", reasoning=reasoning, parameters={"repo": repo, "include_commits": include_commits}, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def translate_error( error_message: Annotated[str, "The error message or stack trace to investigate"], reasoning: Annotated[str, "Why you're investigating this error (required for analytics)"], language: Annotated[str | None, "Programming language (auto-detected if not provided)"] = None, framework: Annotated[ str | None, "Framework context (e.g., 'React', 'FastAPI', 'Django')" ] = None, max_results: Annotated[int, "Number of solutions to return (1-10)"] = 5, ) -> str: """ Find solutions for error messages and stack traces from Stack Overflow and GitHub. Takes an error message or stack trace and finds relevant solutions with code examples. Automatically detects language and framework, extracts key error information, and searches for the best solutions ranked by votes and relevance. Perfect for: - Debugging production errors - Understanding cryptic error messages - Finding working code fixes - Learning from similar issues Examples: - translate_error("TypeError: Cannot read property 'map' of undefined", reasoning="Debugging React app crash") - translate_error("CORS policy: No 'Access-Control-Allow-Origin' header", reasoning="Fixing API integration", framework="FastAPI") - translate_error("error[E0382]: borrow of moved value", reasoning="Learning Rust ownership", language="rust") """ start_time = time.time() success = False error_msg = None result = "" parsed = None try: max_results = max(1, min(max_results, 10)) # Parse the error message parsed = error_parser.parse(error_message, language=language, framework=framework) # Build search query search_query = error_parser.build_search_query(parsed) # Search for solutions (request more to allow for filtering) hits = await searcher.search( search_query, category="it", max_results=max_results * 2, # Get extra results for filtering ) # Filter and prioritize results if hits: # Filter out irrelevant sources irrelevant_domains = { "hub.docker.com", "crates.io", "npmjs.com", "pypi.org", "pkg.go.dev", } filtered_hits = [ hit for hit in hits if not any(domain in hit.url.lower() for domain in irrelevant_domains) ] # Prioritize Stack Overflow results so_hits = [hit for hit in filtered_hits if "stackoverflow.com" in hit.url] other_hits = [hit for hit in filtered_hits if "stackoverflow.com" not in hit.url] # Combine: Stack Overflow first, then others hits = (so_hits + other_hits)[:max_results] if not hits: result = ( f"No solutions found for this error.\n\n" f"Parsed Error Info:\n" f"- Type: {parsed.error_type}\n" f"- Language: {parsed.language or 'Unknown'}\n" f"- Framework: {parsed.framework or 'None detected'}\n\n" f"Try:\n" f"- Providing more context with language/framework parameters\n" f"- Searching for just the error type: {parsed.error_type}\n" f"- Using web_search with broader terms" ) else: # Format results lines = [ "Error Translation Results", "═" * 70, "", "📋 Parsed Error Information:", f" Error Type: {parsed.error_type}", ] if parsed.language: lines.append(f" Language: {parsed.language.title()}") if parsed.framework: lines.append(f" Framework: {parsed.framework.title()}") if parsed.file_path: location = f"{parsed.file_path}" if parsed.line_number: location += f":{parsed.line_number}" lines.append(f" Location: {location}") lines.extend( [ "", f"🔍 Search Query: {search_query}", "", "💡 Top Solutions (Stack Overflow prioritized):", "─" * 70, "", ] ) for idx, hit in enumerate(hits, 1): # Try to extract vote count from snippet (if available) votes = "N/A" if hit.snippet: vote_match = re.search(r"(\d+)\s*votes?", hit.snippet, re.IGNORECASE) if vote_match: votes = vote_match.group(1) # Check if it's a Stack Overflow link is_so = "stackoverflow.com" in hit.url source_icon = "[SO]" if is_so else "[Web]" lines.append(f"{idx}. {source_icon} {hit.title}") if votes != "N/A": lines.append(f" Votes: {votes}") lines.append(f" {hit.url}") if hit.snippet: # Clean and truncate snippet snippet = hit.snippet.replace("\n", " ").strip() if len(snippet) > 200: snippet = snippet[:200] + "..." lines.append(f" {snippet}") lines.append("") lines.extend( [ "─" * 70, "", "💡 Tips:", "- Check accepted answers (marked with ✓) first", "- Look for solutions with high vote counts", "- Verify the solution matches your exact error", "- Use crawl_url to get full answer details", ] ) result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Error translation failed: {exc}\n\nTry simplifying the error message or provide language/framework context." finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="translate_error", reasoning=reasoning, parameters={ "error_type": parsed.error_type if parsed else "unknown", "language": language or (parsed.language if parsed else None), "framework": framework or (parsed.framework if parsed else None), "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def api_docs( api_name: Annotated[str, "API name (e.g., 'stripe', 'github') or base docs URL"], reasoning: Annotated[str, "Why you're looking up this API (required for analytics)"], topic: Annotated[ str, "What to search for (e.g., 'create customer', 'webhooks', 'authentication')", ], max_results: Annotated[int, "Number of doc pages to fetch (1-3)"] = 2, ) -> str: """ Search and fetch official API documentation with examples and explanations. Documentation-first approach: fetches human-written docs with context, examples, and best practices. Much more useful than OpenAPI specs alone. Discovery strategy: 1. Try common URL patterns (docs.{api}.com, {api}.com/docs, etc.) 2. If patterns fail, search for "{api} API official documentation" 3. Crawl discovered docs and extract relevant content No hardcoded URLs - works for ANY API by discovering docs dynamically. Examples: - api_docs("stripe", "create customer", reasoning="Setting up payments") - api_docs("github", "create repository", reasoning="Automating repo creation") - api_docs("spartan", "button component", reasoning="Learning UI library") """ start_time = time.time() success = False error_msg = None result = "" docs_url = None try: max_results = max(1, min(max_results, 3)) # Clamp to 1-3 # Get alternative search terms for the API search_terms = api_docs_detector.get_search_terms(api_name) # Find documentation URL docs_url = await api_docs_detector.find_docs_url(api_name) if not docs_url: # Fallback: Search for official docs using multiple search terms for search_term in search_terms: search_results = await searcher.search( f"{search_term} official documentation", category="general", max_results=5, ) # Filter for docs-like URLs for hit in search_results: if any( indicator in hit.url.lower() for indicator in ["docs", "developer", "api", "reference"] ): docs_url = hit.url # Extract base URL (remove path beyond /docs or /api) if "/docs" in docs_url: docs_url = docs_url.split("/docs")[0] + "/docs" elif "/api" in docs_url: docs_url = docs_url.split("/api")[0] + "/api" break if docs_url: break if not docs_url: result = ( f"Could not find official documentation for '{api_name}'.\n\n" f"Try:\n" f"- Checking the API name spelling\n" f"- Providing the docs URL directly (e.g., 'https://docs.example.com')\n" f"- Using web_search to find the documentation manually" ) error_msg = "Documentation URL not found" success = False else: # Get the domain for site-specific search docs_domain = api_docs_detector.get_docs_domain(docs_url) # Search within the docs site search_query = f"site:{docs_domain} {topic}" doc_results = await searcher.search( search_query, category="general", max_results=max_results * 2 ) # Fallback 1: If site-specific search fails, try broader search with API name + topic if not doc_results: # Try searching with the API name + topic directly for search_term in search_terms[:2]: # Try first 2 terms doc_results = await searcher.search( f"{search_term} {topic}", category="it", max_results=max_results * 2, ) if doc_results: break # Fallback 2: Try without site restriction but filter results if not doc_results: doc_results = await searcher.search( f"{api_name} API {topic} documentation", category="general", max_results=max_results * 2, ) # Fallback 3: Simplify the topic (extract key terms) and try again if not doc_results and len(topic.split()) > 3: # Extract key terms from complex topics # e.g., "v3 audio tags SSML break tags emotional delivery" -> "audio SSML" import re as re_module # Remove version numbers, common filler words simplified = re_module.sub(r"\bv\d+\b", "", topic, flags=re_module.IGNORECASE) simplified = re_module.sub( r"\b(api|the|a|an|and|or|for|to|with|using)\b", "", simplified, flags=re_module.IGNORECASE, ) key_terms = [w for w in simplified.split() if len(w) > 2][:3] if key_terms: simplified_topic = " ".join(key_terms) doc_results = await searcher.search( f"{api_name} {simplified_topic}", category="it", max_results=max_results * 2, ) # Fallback 4: If all searches fail, try crawling the base docs URL directly if not doc_results and docs_url: try: # Crawl the base docs URL - it often has navigation/overview base_content = await crawler_client.fetch(docs_url, max_chars=15000) if base_content and len(base_content) > 200: # Create a minimal result from the base docs doc = APIDocumentation( api_name=api_name, topic=topic, docs_url=docs_url, overview=api_docs_extractor.extract_overview(base_content), parameters=[], examples=api_docs_extractor.extract_examples(base_content), related_links=api_docs_extractor.extract_links(base_content, docs_url), notes=[ f"Note: Could not find specific docs for '{topic}'. Showing general documentation." ], source_urls=[docs_url], ) result = api_docs_extractor.format_documentation(doc) result = clamp_text(result, MAX_RESPONSE_CHARS) success = True except Exception: # noqa: BLE001 pass # Fall through to error if not doc_results and not success: result = ( f"Found documentation site: {docs_url}\n" f"But no results for topic: '{topic}'\n\n" f"Try:\n" f"- Broader search terms\n" f"- Different wording (e.g., 'customers' instead of 'create customer')\n" f"- Browse the docs directly: {docs_url}" ) error_msg = "No results for topic" success = False elif doc_results: # Crawl the top results source_urls = [] all_content = [] for doc_result in doc_results[:max_results]: try: content = await crawler_client.fetch(doc_result.url, max_chars=12000) all_content.append(content) source_urls.append(doc_result.url) except Exception: # noqa: BLE001 # Skip failed crawls pass if not all_content: result = ( f"Failed to fetch documentation pages. Try browsing directly: {docs_url}" ) error_msg = "Failed to crawl docs" success = False else: # Combine content for extraction combined_content = "\n\n".join(all_content) # Extract structured information overview = api_docs_extractor.extract_overview(combined_content) parameters = api_docs_extractor.extract_parameters(combined_content) examples = api_docs_extractor.extract_examples(combined_content) notes = api_docs_extractor.extract_notes(combined_content) related_links = api_docs_extractor.extract_links(combined_content, docs_url) # Create documentation object doc = APIDocumentation( api_name=api_name, topic=topic, docs_url=docs_url, overview=overview, parameters=parameters, examples=examples, related_links=related_links, notes=notes, source_urls=source_urls, ) # Format and return result = api_docs_extractor.format_documentation(doc) result = clamp_text(result, MAX_RESPONSE_CHARS) success = True except Exception as exc: # noqa: BLE001 error_msg = str(exc) result = f"Failed to fetch API documentation: {exc}\n\nTry using web_search or crawl_url directly." finally: # Track usage response_time = (time.time() - start_time) * 1000 tracker.track_usage( tool_name="api_docs", reasoning=reasoning, parameters={ "api_name": api_name, "topic": topic, "docs_url": docs_url or "not_found", "max_results": max_results, }, response_time_ms=response_time, success=success, error_message=error_msg, response_size=len(result.encode("utf-8")), ) return result @mcp.tool() async def extract_data( url: Annotated[str, "HTTP(S) URL to extract structured data from"], reasoning: Annotated[str, "Why you're extracting data from this URL (required for analytics)"], extract_type: Annotated[ Literal["table", "list", "fields", "json-ld", "auto"], 'Extraction type: "table", "list", "fields", "json-ld", or "auto"', ] = "auto", selectors: Annotated[ dict[str, str] | None, "CSS selectors for field extraction (only used with extract_type='fields')", ] = None, max_items: Annotated[int, "Maximum number of items to extract"] = 100, ) -> str: """ Extract structured data from web pages. Extracts tables, lists, or specific fields from HTML pages and returns structured data. Much more efficient than parsing full page text. Extract Types: - "table": Extract HTML tables as list of dicts - "list": Extract lists (ul/ol/dl) as structured list - "fields": Extract specific elements using CSS selectors - "json-ld": Extract JSON-LD structured data - "auto": Automatically detect and extract structured content Examples: - extract_data("https://pypi.org/project/fastapi/", reasoning="Get package info") - extract_data("https://github.com/user/repo/releases", reasoning="Get releases", extract_type="list") - extract_data( "https://example.com/product", reasoning="Extract product details", extract_type="fields", selectors={"price": ".price", "title": "h1.product-name"} ) """ import json start_time = time.time() success = False error_msg = None result = "" try: # Fetch raw HTML html = await crawler_client.fetch_raw(url)
  • Input schema defined by Annotated type hints and descriptions in the function signature, used by FastMCP to generate the tool schema.
    url: Annotated[str, "HTTP(S) URL to extract structured data from"], reasoning: Annotated[str, "Why you're extracting data from this URL (required for analytics)"], extract_type: Annotated[ Literal["table", "list", "fields", "json-ld", "auto"], 'Extraction type: "table", "list", "fields", "json-ld", or "auto"', ] = "auto", selectors: Annotated[ dict[str, str] | None, "CSS selectors for field extraction (only used with extract_type='fields')", ] = None, max_items: Annotated[int, "Maximum number of items to extract"] = 100, ) -> str:
  • The @mcp.tool() decorator registers this function as an MCP tool named 'extract_data' (inferred from function name).
    async def extract_data(

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/elad12390/web-research-assistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server