from .registry import PackageInfo, PackageRegistryClient
from .search import SearxSearcher
from .service_health import ServiceHealthChecker
from .tracking import get_tracker
mcp = FastMCP("web-research-assistant")
searcher = SearxSearcher()
crawler_client = CrawlerClient()
registry_client = PackageRegistryClient()
github_client = GitHubClient()
pixabay_client = PixabayClient()
error_parser = ErrorParser()
api_docs_detector = APIDocsDetector()
api_docs_extractor = APIDocsExtractor()
data_extractor = DataExtractor()
tech_comparator = TechComparator(searcher, github_client, registry_client)
changelog_fetcher = ChangelogFetcher(github_client, registry_client)
service_health_checker = ServiceHealthChecker(crawler_client)
tracker = get_tracker()
def _format_search_hits(hits):
lines = []
for idx, hit in enumerate(hits, 1):
snippet = f"\n{hit.snippet}" if hit.snippet else ""
lines.append(f"{idx}. {hit.title} — {hit.url}{snippet}")
body = "\n\n".join(lines)
return clamp_text(body, MAX_RESPONSE_CHARS)
@mcp.tool()
async def web_search(
query: Annotated[str, "Natural-language web query"],
reasoning: Annotated[str, "Why you're using this tool (required for analytics)"],
category: Annotated[
str, "Optional SearXNG category (general, images, news, it, science, etc.)"
] = DEFAULT_CATEGORY,
max_results: Annotated[int, "How many ranked hits to return (1-10)"] = DEFAULT_MAX_RESULTS,
) -> str:
"""Use this first to gather fresh web search results via the local SearXNG instance."""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
hits = await searcher.search(query, category=category, max_results=max_results)
if not hits:
result = f"No results for '{query}' in category '{category}'."
else:
result = _format_search_hits(hits)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Search failed: {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000 # Convert to ms
tracker.track_usage(
tool_name="web_search",
reasoning=reasoning,
parameters={
"query": query,
"category": category,
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def crawl_url(
url: Annotated[str, "HTTP(S) URL (ideally from web_search output)"],
reasoning: Annotated[str, "Why you're crawling this URL (required for analytics)"],
max_chars: Annotated[int, "Trim textual result to this many characters"] = CRAWL_MAX_CHARS,
) -> str:
"""Fetch a URL with crawl4ai when you need the actual page text for quoting or analysis."""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
text = await crawler_client.fetch(url, max_chars=max_chars)
result = clamp_text(text, MAX_RESPONSE_CHARS)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Crawl failed for {url}: {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="crawl_url",
reasoning=reasoning,
parameters={"url": url, "max_chars": max_chars},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
def _format_package_info(info: PackageInfo) -> str:
"""Format PackageInfo into readable text response."""
lines = [
f"Package: {info.name} ({info.registry})",
"─" * 50,
f"Version: {info.version}",
]
if info.license:
lines.append(f"License: {info.license}")
if info.downloads:
lines.append(f"Downloads: {info.downloads}")
lines.append(f"Last Updated: {info.last_updated}")
if info.dependencies_count is not None:
lines.append(f"Dependencies: {info.dependencies_count}")
if info.security_issues > 0:
lines.append(f"⚠️ Security Issues: {info.security_issues}")
else:
lines.append("Security: ✅ No known vulnerabilities")
lines.append("") # blank line
if info.repository:
lines.append(f"Repository: {info.repository}")
if info.homepage and info.homepage != info.repository:
lines.append(f"Homepage: {info.homepage}")
if info.description:
lines.append(f"\nDescription: {info.description}")
return "\n".join(lines)
@mcp.tool()
async def package_info(
name: Annotated[str, "Package or module name to look up"],
reasoning: Annotated[str, "Why you're looking up this package (required for analytics)"],
registry: Annotated[
Literal["npm", "pypi", "crates", "go"],
"Package registry to search (npm, pypi, crates, go)",
] = "npm",
) -> str:
"""
Look up package information from npm, PyPI, crates.io, or Go modules.
Returns version, downloads, license, dependencies, security status, and repository links.
Use this to quickly evaluate libraries before adding them to your project.
Examples:
- package_info("express", reasoning="Need web framework", registry="npm")
- package_info("requests", reasoning="HTTP client for API", registry="pypi")
- package_info("serde", reasoning="JSON serialization", registry="crates")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
if registry == "npm":
info = await registry_client.search_npm(name)
elif registry == "pypi":
info = await registry_client.search_pypi(name)
elif registry == "crates":
info = await registry_client.search_crates(name)
else: # go
info = await registry_client.search_go(name)
result = clamp_text(_format_package_info(info), MAX_RESPONSE_CHARS)
success = True
except httpx.HTTPStatusError as exc:
error_msg = f"HTTP {exc.response.status_code}"
if exc.response.status_code == 404:
result = f"Package '{name}' not found on {registry}.\n\nDouble-check the package name and try again."
else:
result = f"Failed to fetch {registry} package '{name}': HTTP {exc.response.status_code}"
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Failed to fetch {registry} package '{name}': {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="package_info",
reasoning=reasoning,
parameters={"name": name, "registry": registry},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def search_examples(
query: Annotated[
str,
"What you're looking for (e.g., 'Python async await examples', 'React hooks tutorial', 'Rust error handling patterns')",
],
reasoning: Annotated[str, "Why you're searching for examples (required for analytics)"],
content_type: Annotated[
Literal["code", "articles", "both"],
"Type of content to find: 'code' for code examples, 'articles' for tutorials/blogs, 'both' for mixed results",
] = "both",
time_range: Annotated[
Literal["day", "week", "month", "year", "all"],
"How recent the content should be (use 'all' for best results, filter down if too many results)",
] = "all",
max_results: Annotated[int, "How many results to return (1-10)"] = DEFAULT_MAX_RESULTS,
) -> str:
"""
Search for code examples, tutorials, and technical articles.
Optimized for finding practical examples and learning resources. Can optionally filter by
time range for the most recent content. Perfect for learning new APIs, finding usage patterns,
or discovering how others solve specific technical problems.
Content Types:
- 'code': GitHub repos, code snippets, gists, Stack Overflow code examples
- 'articles': Blog posts, tutorials, documentation, technical articles
- 'both': Mix of code and written content (default)
Time Ranges:
- 'all': Search all available content (default, recommended for best results)
- 'year', 'month', 'week', 'day': Filter to recent content only
Examples:
- search_examples("FastAPI dependency injection examples", content_type="code")
- search_examples("React hooks tutorial", content_type="articles", time_range="year")
- search_examples("Rust lifetime examples", content_type="both")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
max_results = max(1, min(max_results, 10))
# Build optimized search query based on content type
enhanced_query = query
if content_type == "code":
# Prioritize code-heavy sources
enhanced_query = f"{query} (site:github.com OR site:stackoverflow.com OR site:gist.github.com OR example OR snippet)"
elif content_type == "articles":
# Prioritize articles and tutorials
enhanced_query = (
f"{query} (tutorial OR guide OR article OR blog OR how to OR documentation)"
)
else: # both
enhanced_query = f"{query} (example OR tutorial OR guide)"
# Use 'it' category for better tech content
hits = await searcher.search(
enhanced_query,
category="it", # IT/tech category for better results
max_results=max_results,
time_range=time_range if time_range != "all" else None,
)
if not hits:
result = (
f"No examples found for '{query}' in the {time_range if time_range != 'all' else 'all time'} range.\n\n"
"Try:\n"
"- Broader search terms\n"
"- Different time range\n"
"- Removing version numbers or very specific constraints\n\n"
"Note: Results depend on your SearXNG instance configuration. If you're only seeing MDN docs,\n"
"your SearXNG may need additional search engines enabled (GitHub, Stack Overflow, dev.to, etc.)."
)
else:
# Format results with additional context
lines = [
f"Code Examples & Articles for: {query}",
f"Time Range: {time_range.title() if time_range != 'all' else 'All time'} | Content Type: {content_type.title()}",
"─" * 50,
"",
]
for idx, hit in enumerate(hits, 1):
# Add source indicator
source = ""
if "github.com" in hit.url:
source = "[GitHub] "
elif "stackoverflow.com" in hit.url:
source = "[Stack Overflow] "
elif "medium.com" in hit.url or "dev.to" in hit.url:
source = "[Article] "
snippet = f"\n {hit.snippet}" if hit.snippet else ""
lines.append(f"{idx}. {source}{hit.title}")
lines.append(f" {hit.url}{snippet}")
lines.append("")
result_text = "\n".join(lines)
# Add note if results seem limited (all from same domain)
domains = set()
for hit in hits:
if "://" in hit.url:
domain = hit.url.split("://")[1].split("/")[0]
domains.add(domain)
if len(domains) == 1 and len(hits) > 2:
result_text += (
"\n\n──────────────────────────────────────────────────\n"
"ℹ️ Note: All results are from the same source. Your SearXNG instance may need\n"
" additional search engines configured (GitHub, Stack Overflow, dev.to, Medium)\n"
" to get diverse code examples and tutorials."
)
result = clamp_text(result_text, MAX_RESPONSE_CHARS)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Search failed for '{query}': {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="search_examples",
reasoning=reasoning,
parameters={
"query": query,
"content_type": content_type,
"time_range": time_range,
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def search_images(
query: Annotated[
str,
"What to search for (e.g., 'sunset beach', 'office workspace', 'technology abstract')",
],
reasoning: Annotated[str, "Why you're searching for images (required for analytics)"],
image_type: Annotated[
Literal["all", "photo", "illustration", "vector"],
"Type of image to search for",
] = "all",
orientation: Annotated[
Literal["all", "horizontal", "vertical"],
"Image orientation preference",
] = "all",
max_results: Annotated[int, "Maximum number of results (1-20)"] = 10,
) -> str:
"""
Search for high-quality stock images using Pixabay.
Returns royalty-free images that are safe to use. Perfect for finding photos,
illustrations, and vector graphics for projects, presentations, or design work.
Image Types:
- 'photo': Real photographs
- 'illustration': Digital illustrations and artwork
- 'vector': Vector graphics (SVG format available)
- 'all': All types (default)
Examples:
- search_images("mountain landscape", image_type="photo")
- search_images("business icons", image_type="vector")
- search_images("technology background", orientation="horizontal")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
# Check if API key is configured
if not pixabay_client.has_api_key():
# Fallback: Use web search for images instead
fallback_results = await searcher.search(
f"{query} stock photo free",
category="images",
max_results=max_results,
)
if fallback_results:
lines = [
f"Image Search Results for: {query}",
"(Using web search - configure PIXABAY_API_KEY for better stock photo results)",
"─" * 70,
"",
]
for idx, hit in enumerate(fallback_results, 1):
lines.append(f"{idx}. {hit.title}")
lines.append(f" {hit.url}")
if hit.snippet:
lines.append(f" {hit.snippet[:100]}")
lines.append("")
lines.extend(
[
"─" * 70,
"For better stock photo results with resolution info:",
"1. Get a free API key from: https://pixabay.com/api/docs/",
"2. Set: PIXABAY_API_KEY=your_key_here",
]
)
result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS)
success = True # Mark as success since we provided useful results
else:
result = (
"⚠️ Pixabay API key not configured and web search returned no results.\n\n"
"To enable full image search:\n"
"1. Get a free API key from: https://pixabay.com/api/docs/\n"
"2. Set the environment variable: PIXABAY_API_KEY=your_key_here\n"
"3. Restart the MCP server"
)
error_msg = "API key not configured"
success = False
else:
max_results = max(1, min(max_results, 20))
images = await pixabay_client.search_images(
query=query,
image_type=image_type,
orientation=orientation,
per_page=max_results,
)
if not images:
result = f"No images found for '{query}'.\n\nTry:\n- Different search terms\n- Broader keywords\n- Different image type or orientation"
else:
# Format results
lines = [
f"Stock Images for: {query}",
f"Type: {image_type.title()} | Orientation: {orientation.title()} | Found: {len(images)} images",
"─" * 70,
"",
]
for idx, img in enumerate(images, 1):
lines.append(f"{idx}. {img.tags}")
lines.append(
f" Resolution: {img.width}x{img.height} | 👁️ {img.views:,} | ⬇️ {img.downloads:,} | ❤️ {img.likes:,}"
)
lines.append(f" By: {img.user}")
lines.append(f" Preview: {img.preview_url}")
lines.append(f" Large: {img.large_url}")
if img.full_url != img.large_url:
lines.append(f" Full HD: {img.full_url}")
lines.append("")
result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS)
success = True
except ValueError as exc:
# API key not configured
error_msg = str(exc)
result = f"⚠️ {exc}\n\nPlease configure your Pixabay API key as described above."
except httpx.HTTPStatusError as exc:
error_msg = f"HTTP {exc.response.status_code}"
if exc.response.status_code == 400:
result = f"Invalid search parameters for '{query}'. Please check your search terms and filters."
elif exc.response.status_code == 429:
result = "Rate limit exceeded. Please wait a moment and try again."
else:
result = f"Failed to search images: HTTP {exc.response.status_code}"
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Image search failed for '{query}': {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="search_images",
reasoning=reasoning,
parameters={
"query": query,
"image_type": image_type,
"orientation": orientation,
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
def _format_package_search_results(packages: list[PackageInfo], query: str, registry: str) -> str:
"""Format package search results into readable text."""
if not packages:
return f"No packages found for '{query}' on {registry}.\n\nTry different keywords or check another registry."
lines = [
f"Search Results for '{query}' on {registry}:",
"─" * 50,
]
for i, pkg in enumerate(packages, 1):
lines.append(f"{i}. {pkg.name}")
if pkg.version != "unknown":
lines.append(f" Version: {pkg.version}")
if pkg.downloads:
lines.append(f" Downloads: {pkg.downloads}")
if pkg.description:
# Truncate long descriptions
desc = pkg.description[:100] + "..." if len(pkg.description) > 100 else pkg.description
lines.append(f" Description: {desc}")
lines.append("") # blank line between results
return "\n".join(lines)
@mcp.tool()
async def package_search(
query: Annotated[
str,
"Search keywords (e.g., 'web framework', 'json parser', 'machine learning')",
],
reasoning: Annotated[str, "Why you're searching for packages (required for analytics)"],
registry: Annotated[
Literal["npm", "pypi", "crates", "go"],
"Package registry to search (npm, pypi, crates, go)",
] = "npm",
max_results: Annotated[int, "Maximum number of results to return (1-10)"] = 5,
) -> str:
"""
Search for packages by keywords or description across registries.
Use this to find packages that solve a specific problem or provide certain functionality.
Perfect for discovering libraries when you know what you need but not the package name.
Examples:
- package_search("web framework", reasoning="Need backend framework", registry="npm")
- package_search("json parsing", reasoning="Data processing", registry="pypi")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
max_results = max(1, min(max_results, 10)) # Clamp to 1-10
packages = await registry_client.search_packages(query, registry, max_results)
result = _format_package_search_results(packages, query, registry)
result = clamp_text(result, MAX_RESPONSE_CHARS)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Search failed for '{query}' on {registry}: {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="package_search",
reasoning=reasoning,
parameters={
"query": query,
"registry": registry,
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
def _format_repo_info(info: RepoInfo, commits=None) -> str:
"""Format RepoInfo into readable text response."""
lines = [
f"Repository: {info.full_name}",
"─" * 50,
f"⭐ {info.stars:,} | 🍴 {info.forks:,} | 👁️ {info.watchers:,}",
]
if info.language:
lines.append(f"Language: {info.language}")
if info.license:
lines.append(f"License: {info.license}")
lines.append(f"Last Updated: {info.last_updated}")
# Issues and PRs
issue_line = f"Open Issues: {info.open_issues}"
if info.open_prs is not None:
issue_line += f" | Open PRs: {info.open_prs}"
lines.append(issue_line)
if info.archived:
lines.append("⚠️ This repository is archived (read-only)")
if info.topics:
lines.append(f"Topics: {', '.join(info.topics[:5])}") # Show first 5 topics
lines.append("") # blank line
if info.homepage:
lines.append(f"Homepage: {info.homepage}")
if commits:
lines.append("Recent Commits:")
for commit in commits[:3]: # Show top 3
lines.append(f"- [{commit.date}] {commit.message} ({commit.author})")
lines.append(f"\nDescription: {info.description}")
return "\n".join(lines)
@mcp.tool()
async def github_repo(
repo: Annotated[str, "GitHub repository (owner/repo format or full URL)"],
reasoning: Annotated[str, "Why you're checking this repository (required for analytics)"],
include_commits: Annotated[bool, "Include recent commit history"] = True,
) -> str:
"""
Fetch GitHub repository information and health metrics.
Returns stars, forks, issues, recent activity, language, license, and description.
Use this to evaluate open source projects before using them.
Examples:
- github_repo("microsoft/vscode", reasoning="Evaluate editor project")
- github_repo("https://github.com/facebook/react", reasoning="Research UI framework")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
try:
owner, repo_name = github_client.parse_repo_url(repo)
# Get repo info
repo_info = await github_client.get_repo_info(owner, repo_name)
# Optionally get recent commits
commits = None
if include_commits:
try:
commits = await github_client.get_recent_commits(owner, repo_name, count=3)
except Exception: # noqa: BLE001, S110
# Don't fail the whole request if commits fail
pass
result = clamp_text(_format_repo_info(repo_info, commits), MAX_RESPONSE_CHARS)
success = True
except httpx.HTTPStatusError as exc:
error_msg = f"HTTP {exc.response.status_code}"
if exc.response.status_code == 404:
# Try to provide helpful suggestions
suggestions = []
if "/" in repo:
parts = repo.replace("https://github.com/", "").split("/")
if len(parts) >= 2:
owner_guess = parts[0]
suggestions.append(
f"- Check if '{owner_guess}' is the correct organization/user"
)
suggestions.append(f"- The repository may have been renamed or deleted")
suggestions.append(f"- Try searching: https://github.com/search?q={parts[1]}")
result = (
f"Repository '{repo}' not found (HTTP 404).\n\n"
f"Possible reasons:\n"
f"- The repository doesn't exist or was deleted\n"
f"- The repository is private\n"
f"- There's a typo in the owner or repository name\n"
)
if suggestions:
result += f"\nSuggestions:\n" + "\n".join(suggestions)
elif exc.response.status_code == 403:
result = (
f"Access denied to repository '{repo}' (HTTP 403).\n\n"
f"Possible reasons:\n"
f"- The repository is private\n"
f"- GitHub API rate limit exceeded\n"
f"- Set GITHUB_TOKEN environment variable for higher rate limits"
)
elif exc.response.status_code == 301:
# This shouldn't happen anymore with our redirect handling, but just in case
result = (
f"Repository '{repo}' has moved (HTTP 301).\n\n"
f"The repository may have been renamed or transferred.\n"
f"Try searching for the new location on GitHub."
)
else:
result = f"Failed to fetch repository '{repo}': HTTP {exc.response.status_code}"
except ValueError as exc:
error_msg = str(exc)
result = str(exc) # Invalid repo format - already has good error message
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Failed to fetch repository '{repo}': {exc}"
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="github_repo",
reasoning=reasoning,
parameters={"repo": repo, "include_commits": include_commits},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def translate_error(
error_message: Annotated[str, "The error message or stack trace to investigate"],
reasoning: Annotated[str, "Why you're investigating this error (required for analytics)"],
language: Annotated[str | None, "Programming language (auto-detected if not provided)"] = None,
framework: Annotated[
str | None, "Framework context (e.g., 'React', 'FastAPI', 'Django')"
] = None,
max_results: Annotated[int, "Number of solutions to return (1-10)"] = 5,
) -> str:
"""
Find solutions for error messages and stack traces from Stack Overflow and GitHub.
Takes an error message or stack trace and finds relevant solutions with code examples.
Automatically detects language and framework, extracts key error information, and
searches for the best solutions ranked by votes and relevance.
Perfect for:
- Debugging production errors
- Understanding cryptic error messages
- Finding working code fixes
- Learning from similar issues
Examples:
- translate_error("TypeError: Cannot read property 'map' of undefined", reasoning="Debugging React app crash")
- translate_error("CORS policy: No 'Access-Control-Allow-Origin' header", reasoning="Fixing API integration", framework="FastAPI")
- translate_error("error[E0382]: borrow of moved value", reasoning="Learning Rust ownership", language="rust")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
parsed = None
try:
max_results = max(1, min(max_results, 10))
# Parse the error message
parsed = error_parser.parse(error_message, language=language, framework=framework)
# Build search query
search_query = error_parser.build_search_query(parsed)
# Search for solutions (request more to allow for filtering)
hits = await searcher.search(
search_query,
category="it",
max_results=max_results * 2, # Get extra results for filtering
)
# Filter and prioritize results
if hits:
# Filter out irrelevant sources
irrelevant_domains = {
"hub.docker.com",
"crates.io",
"npmjs.com",
"pypi.org",
"pkg.go.dev",
}
filtered_hits = [
hit
for hit in hits
if not any(domain in hit.url.lower() for domain in irrelevant_domains)
]
# Prioritize Stack Overflow results
so_hits = [hit for hit in filtered_hits if "stackoverflow.com" in hit.url]
other_hits = [hit for hit in filtered_hits if "stackoverflow.com" not in hit.url]
# Combine: Stack Overflow first, then others
hits = (so_hits + other_hits)[:max_results]
if not hits:
result = (
f"No solutions found for this error.\n\n"
f"Parsed Error Info:\n"
f"- Type: {parsed.error_type}\n"
f"- Language: {parsed.language or 'Unknown'}\n"
f"- Framework: {parsed.framework or 'None detected'}\n\n"
f"Try:\n"
f"- Providing more context with language/framework parameters\n"
f"- Searching for just the error type: {parsed.error_type}\n"
f"- Using web_search with broader terms"
)
else:
# Format results
lines = [
"Error Translation Results",
"═" * 70,
"",
"📋 Parsed Error Information:",
f" Error Type: {parsed.error_type}",
]
if parsed.language:
lines.append(f" Language: {parsed.language.title()}")
if parsed.framework:
lines.append(f" Framework: {parsed.framework.title()}")
if parsed.file_path:
location = f"{parsed.file_path}"
if parsed.line_number:
location += f":{parsed.line_number}"
lines.append(f" Location: {location}")
lines.extend(
[
"",
f"🔍 Search Query: {search_query}",
"",
"💡 Top Solutions (Stack Overflow prioritized):",
"─" * 70,
"",
]
)
for idx, hit in enumerate(hits, 1):
# Try to extract vote count from snippet (if available)
votes = "N/A"
if hit.snippet:
vote_match = re.search(r"(\d+)\s*votes?", hit.snippet, re.IGNORECASE)
if vote_match:
votes = vote_match.group(1)
# Check if it's a Stack Overflow link
is_so = "stackoverflow.com" in hit.url
source_icon = "[SO]" if is_so else "[Web]"
lines.append(f"{idx}. {source_icon} {hit.title}")
if votes != "N/A":
lines.append(f" Votes: {votes}")
lines.append(f" {hit.url}")
if hit.snippet:
# Clean and truncate snippet
snippet = hit.snippet.replace("\n", " ").strip()
if len(snippet) > 200:
snippet = snippet[:200] + "..."
lines.append(f" {snippet}")
lines.append("")
lines.extend(
[
"─" * 70,
"",
"💡 Tips:",
"- Check accepted answers (marked with ✓) first",
"- Look for solutions with high vote counts",
"- Verify the solution matches your exact error",
"- Use crawl_url to get full answer details",
]
)
result = clamp_text("\n".join(lines), MAX_RESPONSE_CHARS)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Error translation failed: {exc}\n\nTry simplifying the error message or provide language/framework context."
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="translate_error",
reasoning=reasoning,
parameters={
"error_type": parsed.error_type if parsed else "unknown",
"language": language or (parsed.language if parsed else None),
"framework": framework or (parsed.framework if parsed else None),
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def api_docs(
api_name: Annotated[str, "API name (e.g., 'stripe', 'github') or base docs URL"],
reasoning: Annotated[str, "Why you're looking up this API (required for analytics)"],
topic: Annotated[
str,
"What to search for (e.g., 'create customer', 'webhooks', 'authentication')",
],
max_results: Annotated[int, "Number of doc pages to fetch (1-3)"] = 2,
) -> str:
"""
Search and fetch official API documentation with examples and explanations.
Documentation-first approach: fetches human-written docs with context, examples,
and best practices. Much more useful than OpenAPI specs alone.
Discovery strategy:
1. Try common URL patterns (docs.{api}.com, {api}.com/docs, etc.)
2. If patterns fail, search for "{api} API official documentation"
3. Crawl discovered docs and extract relevant content
No hardcoded URLs - works for ANY API by discovering docs dynamically.
Examples:
- api_docs("stripe", "create customer", reasoning="Setting up payments")
- api_docs("github", "create repository", reasoning="Automating repo creation")
- api_docs("spartan", "button component", reasoning="Learning UI library")
"""
start_time = time.time()
success = False
error_msg = None
result = ""
docs_url = None
try:
max_results = max(1, min(max_results, 3)) # Clamp to 1-3
# Get alternative search terms for the API
search_terms = api_docs_detector.get_search_terms(api_name)
# Find documentation URL
docs_url = await api_docs_detector.find_docs_url(api_name)
if not docs_url:
# Fallback: Search for official docs using multiple search terms
for search_term in search_terms:
search_results = await searcher.search(
f"{search_term} official documentation",
category="general",
max_results=5,
)
# Filter for docs-like URLs
for hit in search_results:
if any(
indicator in hit.url.lower()
for indicator in ["docs", "developer", "api", "reference"]
):
docs_url = hit.url
# Extract base URL (remove path beyond /docs or /api)
if "/docs" in docs_url:
docs_url = docs_url.split("/docs")[0] + "/docs"
elif "/api" in docs_url:
docs_url = docs_url.split("/api")[0] + "/api"
break
if docs_url:
break
if not docs_url:
result = (
f"Could not find official documentation for '{api_name}'.\n\n"
f"Try:\n"
f"- Checking the API name spelling\n"
f"- Providing the docs URL directly (e.g., 'https://docs.example.com')\n"
f"- Using web_search to find the documentation manually"
)
error_msg = "Documentation URL not found"
success = False
else:
# Get the domain for site-specific search
docs_domain = api_docs_detector.get_docs_domain(docs_url)
# Search within the docs site
search_query = f"site:{docs_domain} {topic}"
doc_results = await searcher.search(
search_query, category="general", max_results=max_results * 2
)
# Fallback 1: If site-specific search fails, try broader search with API name + topic
if not doc_results:
# Try searching with the API name + topic directly
for search_term in search_terms[:2]: # Try first 2 terms
doc_results = await searcher.search(
f"{search_term} {topic}",
category="it",
max_results=max_results * 2,
)
if doc_results:
break
# Fallback 2: Try without site restriction but filter results
if not doc_results:
doc_results = await searcher.search(
f"{api_name} API {topic} documentation",
category="general",
max_results=max_results * 2,
)
# Fallback 3: Simplify the topic (extract key terms) and try again
if not doc_results and len(topic.split()) > 3:
# Extract key terms from complex topics
# e.g., "v3 audio tags SSML break tags emotional delivery" -> "audio SSML"
import re as re_module
# Remove version numbers, common filler words
simplified = re_module.sub(r"\bv\d+\b", "", topic, flags=re_module.IGNORECASE)
simplified = re_module.sub(
r"\b(api|the|a|an|and|or|for|to|with|using)\b",
"",
simplified,
flags=re_module.IGNORECASE,
)
key_terms = [w for w in simplified.split() if len(w) > 2][:3]
if key_terms:
simplified_topic = " ".join(key_terms)
doc_results = await searcher.search(
f"{api_name} {simplified_topic}",
category="it",
max_results=max_results * 2,
)
# Fallback 4: If all searches fail, try crawling the base docs URL directly
if not doc_results and docs_url:
try:
# Crawl the base docs URL - it often has navigation/overview
base_content = await crawler_client.fetch(docs_url, max_chars=15000)
if base_content and len(base_content) > 200:
# Create a minimal result from the base docs
doc = APIDocumentation(
api_name=api_name,
topic=topic,
docs_url=docs_url,
overview=api_docs_extractor.extract_overview(base_content),
parameters=[],
examples=api_docs_extractor.extract_examples(base_content),
related_links=api_docs_extractor.extract_links(base_content, docs_url),
notes=[
f"Note: Could not find specific docs for '{topic}'. Showing general documentation."
],
source_urls=[docs_url],
)
result = api_docs_extractor.format_documentation(doc)
result = clamp_text(result, MAX_RESPONSE_CHARS)
success = True
except Exception: # noqa: BLE001
pass # Fall through to error
if not doc_results and not success:
result = (
f"Found documentation site: {docs_url}\n"
f"But no results for topic: '{topic}'\n\n"
f"Try:\n"
f"- Broader search terms\n"
f"- Different wording (e.g., 'customers' instead of 'create customer')\n"
f"- Browse the docs directly: {docs_url}"
)
error_msg = "No results for topic"
success = False
elif doc_results:
# Crawl the top results
source_urls = []
all_content = []
for doc_result in doc_results[:max_results]:
try:
content = await crawler_client.fetch(doc_result.url, max_chars=12000)
all_content.append(content)
source_urls.append(doc_result.url)
except Exception: # noqa: BLE001
# Skip failed crawls
pass
if not all_content:
result = (
f"Failed to fetch documentation pages. Try browsing directly: {docs_url}"
)
error_msg = "Failed to crawl docs"
success = False
else:
# Combine content for extraction
combined_content = "\n\n".join(all_content)
# Extract structured information
overview = api_docs_extractor.extract_overview(combined_content)
parameters = api_docs_extractor.extract_parameters(combined_content)
examples = api_docs_extractor.extract_examples(combined_content)
notes = api_docs_extractor.extract_notes(combined_content)
related_links = api_docs_extractor.extract_links(combined_content, docs_url)
# Create documentation object
doc = APIDocumentation(
api_name=api_name,
topic=topic,
docs_url=docs_url,
overview=overview,
parameters=parameters,
examples=examples,
related_links=related_links,
notes=notes,
source_urls=source_urls,
)
# Format and return
result = api_docs_extractor.format_documentation(doc)
result = clamp_text(result, MAX_RESPONSE_CHARS)
success = True
except Exception as exc: # noqa: BLE001
error_msg = str(exc)
result = f"Failed to fetch API documentation: {exc}\n\nTry using web_search or crawl_url directly."
finally:
# Track usage
response_time = (time.time() - start_time) * 1000
tracker.track_usage(
tool_name="api_docs",
reasoning=reasoning,
parameters={
"api_name": api_name,
"topic": topic,
"docs_url": docs_url or "not_found",
"max_results": max_results,
},
response_time_ms=response_time,
success=success,
error_message=error_msg,
response_size=len(result.encode("utf-8")),
)
return result
@mcp.tool()
async def extract_data(
url: Annotated[str, "HTTP(S) URL to extract structured data from"],
reasoning: Annotated[str, "Why you're extracting data from this URL (required for analytics)"],
extract_type: Annotated[
Literal["table", "list", "fields", "json-ld", "auto"],
'Extraction type: "table", "list", "fields", "json-ld", or "auto"',
] = "auto",
selectors: Annotated[
dict[str, str] | None,
"CSS selectors for field extraction (only used with extract_type='fields')",
] = None,
max_items: Annotated[int, "Maximum number of items to extract"] = 100,
) -> str:
"""
Extract structured data from web pages.
Extracts tables, lists, or specific fields from HTML pages and returns
structured data. Much more efficient than parsing full page text.
Extract Types:
- "table": Extract HTML tables as list of dicts
- "list": Extract lists (ul/ol/dl) as structured list
- "fields": Extract specific elements using CSS selectors
- "json-ld": Extract JSON-LD structured data
- "auto": Automatically detect and extract structured content
Examples:
- extract_data("https://pypi.org/project/fastapi/", reasoning="Get package info")
- extract_data("https://github.com/user/repo/releases", reasoning="Get releases", extract_type="list")
- extract_data(
"https://example.com/product",
reasoning="Extract product details",
extract_type="fields",
selectors={"price": ".price", "title": "h1.product-name"}
)
"""
import json
start_time = time.time()
success = False
error_msg = None
result = ""
try:
# Fetch raw HTML
html = await crawler_client.fetch_raw(url)