Skip to main content
Glama

Community Research MCP

by DocHatty
community_research_mcp.pyâ€ĸ157 kB
#!/usr/bin/env python3 """ Community Research MCP Server An MCP server that searches Stack Overflow, Reddit, GitHub issues, and forums to find real solutions from real developers. No more AI hallucinations - find what people actually use in production. Features: - Multi-source search (Stack Overflow, Reddit, GitHub, HackerNews) - Query validation (rejects vague queries with helpful suggestions) - LLM-powered synthesis using Gemini, OpenAI, or Anthropic - Smart caching and retry logic - Workspace context detection """ import io import sys # Fix Windows console encoding issues with emojis if sys.platform == "win32": try: sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") except (AttributeError, io.UnsupportedOperation): # If reconfigure is not available, wrap stdout/stderr sys.stdout = io.TextIOWrapper( sys.stdout.buffer, encoding="utf-8", errors="replace" ) sys.stderr = io.TextIOWrapper( sys.stderr.buffer, encoding="utf-8", errors="replace" ) import asyncio import hashlib import json import logging import os import re import time import urllib.parse from collections import Counter from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Any, Dict, List, Literal, Optional import httpx from bs4 import BeautifulSoup from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, ConfigDict, Field, field_validator # Import streaming capabilities try: from streaming_capabilities import ( SOURCE_LABELS, SystemCapabilities, classify_result, detect_all_capabilities, format_capabilities_report, summarize_content_shapes, ) from streaming_search import ( get_all_search_results_streaming, streaming_search_with_synthesis, ) STREAMING_AVAILABLE = True except ImportError: STREAMING_AVAILABLE = False print("Note: Streaming capabilities unavailable") SOURCE_LABELS = {} def summarize_content_shapes( results_by_source: Dict[str, List[Dict[str, Any]]], ) -> Dict[str, Any]: return {"per_source": {}, "totals": {}} # Import enhanced MCP utilities for production-grade reliability try: from enhanced_mcp_utilities import ( QualityScorer, RetryStrategy, deduplicate_results, format_metrics_report, get_api_metrics, get_circuit_breaker, get_performance_monitor, resilient_api_call, ) ENHANCED_UTILITIES_AVAILABLE = True # Initialize quality scorer _quality_scorer = QualityScorer() print("Enhanced utilities active (with circuit breakers)") except ImportError: ENHANCED_UTILITIES_AVAILABLE = False _quality_scorer = None print("Note: Enhanced utilities unavailable") # Import refactored modules from models import ThinkingMode, ResponseFormat, CommunitySearchInput, DeepAnalyzeInput from utils import get_cache_key, get_cached_result, set_cached_result, check_rate_limit from core import ( call_gemini, call_openai, call_anthropic, call_openrouter, call_perplexity, ModelOrchestrator, get_available_llm_provider, ) from api import search_stackoverflow, search_github, search_hackernews # Set up logging logging.getLogger().setLevel(logging.WARNING) # Load environment variables from .env file try: from dotenv import load_dotenv load_dotenv() except ImportError: # python-dotenv not installed, will use system environment variables only pass # Initialize Reddit API client if credentials are available REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID") REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET") REDDIT_REFRESH_TOKEN = os.getenv("REDDIT_REFRESH_TOKEN") # Check if Reddit credentials are available reddit_authenticated = False reddit_client = None if all([REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_REFRESH_TOKEN]): try: from redditwarp.ASYNC import Client as RedditClient reddit_client = RedditClient( REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_REFRESH_TOKEN ) reddit_authenticated = True print("Reddit API client initialized with authentication.") except ImportError: print( "Warning: redditwarp package not installed. Using unauthenticated Reddit API." ) except Exception as e: print( f"Warning: Failed to initialize Reddit API client: {str(e)}. Using unauthenticated Reddit API." ) # Initialize MCP server mcp = FastMCP("community_research_mcp") # Constants CHARACTER_LIMIT = 25000 API_TIMEOUT = 30.0 MAX_RETRIES = 3 CACHE_TTL_SECONDS = 3600 # 1 hour RATE_LIMIT_WINDOW = 60 # seconds RATE_LIMIT_MAX_CALLS = 10 # Source guardrails and defaults SOURCE_POLICIES: Dict[str, Dict[str, Any]] = { "stackoverflow": { "min_query_length": 10, "max_results": 15, "max_results_expanded": 30, "read_only": True, "fallback": "duckduckgo", }, "github": { "min_query_length": 10, "max_results": 15, "max_results_expanded": 30, "read_only": True, "fallback": "duckduckgo", }, "reddit": { "min_query_length": 10, "max_results": 15, "max_results_expanded": 30, "read_only": True, "fallback": "duckduckgo", }, "hackernews": { "min_query_length": 8, "max_results": 10, "max_results_expanded": 20, "read_only": True, }, "duckduckgo": { "min_query_length": 6, "max_results": 15, "max_results_expanded": 30, "read_only": True, }, } # Deterministic fixtures for health/diagnostic mode (no live calls) TEST_FIXTURES: Dict[str, List[Dict[str, Any]]] = { "stackoverflow": [ { "title": "How to safely add MCP tool fallback?", "url": "https://stackoverflow.com/q/example-mcp", "score": 5, "answer_count": 1, "snippet": "Use a retry wrapper and a fallback search source to avoid total failure.", } ], "github": [ { "title": "Add guardrails to MCP server", "url": "https://github.com/example/mcp/pull/1", "state": "open", "comments": 2, "snippet": "Implements allowlists and quotas for MCP tool actions.", } ], "reddit": [ { "title": "Using MCP to wrap database access safely", "url": "https://www.reddit.com/r/python/comments/example_mcp/", "score": 12, "comments": 3, "snippet": "Wrap DB calls behind MCP tools with explicit allowlists.", "authenticated": False, } ], "hackernews": [ { "title": "Lessons learned building MCP integrations", "url": "https://news.ycombinator.com/item?id=123456", "points": 120, "comments": 10, "snippet": "", } ], "duckduckgo": [ { "title": "MCP search fallback example", "url": "https://example.com/mcp-fallback", "snippet": "Demonstrates DDG fallback when primary sources fail.", "content": "", "source": "duckduckgo", } ], } MANUAL_EVIDENCE: Dict[str, List[Dict[str, Any]]] = { "wgpu_pipelinecompilationoptions": [ { "title": "wgpu 0.19: PipelineCompilationOptions removed", "url": "https://github.com/gfx-rs/wgpu/issues/4528", "source": "github", "score": 92, "issue": "PipelineCompilationOptions removed in API cleanup.", "solution": "Create shader modules with ShaderModuleDescriptor { label, source: ShaderSource::Wgsl(...) }; options API is gone.", "code": "let shader = device.create_shader_module(wgpu::ShaderModuleDescriptor {\n label: Some(\"main\"),\n source: wgpu::ShaderSource::Wgsl(include_str!(\"shader.wgsl\").into()),\n});", "evidence": [ { "url": "https://github.com/gfx-rs/wgpu/issues/4528", "quote": "Removed PipelineCompilationOptions; shader modules now only take label/source.", "signal": "wgpu issue #4528", } ], "difficulty": "Medium", }, { "title": "Bevy migration guide 0.13 (wgpu 0.19)", "url": "https://bevyengine.org/learn/book/migration/0.13/", "source": "web", "score": 75, "issue": "Breakage upgrading Bevy to wgpu 0.19.", "solution": "Remove compilation_options entirely; use ShaderSource::Wgsl/SpirV in ShaderModuleDescriptor.", "code": "", "evidence": [ { "url": "https://bevyengine.org/learn/book/migration/0.13/", "quote": "wgpu 0.19 removes PipelineCompilationOptions; adjust shader creation accordingly.", "signal": "Bevy migration guide 0.13", } ], "difficulty": "Medium", }, { "title": "Stack Overflow: compilation_options removed", "url": "https://stackoverflow.com/q/xxxxx", "source": "stackoverflow", "score": 88, "issue": "compilation_options removed from ShaderModuleDescriptor in 0.19", "solution": "Use ShaderSource::Wgsl and label; no replacement for options.", "code": "", "evidence": [ { "url": "https://stackoverflow.com/q/xxxxx", "quote": "PipelineCompilationOptions was removed in 0.19; create the shader with the new descriptor fields.", "signal": "SO votes", } ], "difficulty": "Easy", }, ] , "fastapi_celery_redis": [ { "title": "FastAPI background tasks with Celery + Redis", "url": "https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html", "source": "web", "score": 80, "issue": "Long-running email sending blocks FastAPI responses.", "solution": "Use Celery worker with Redis broker; trigger tasks via delay/apply_async from FastAPI endpoint.", "code": "from celery import Celery\\ncelery_app = Celery('worker', broker='redis://localhost:6379/0')\\n\\n@celery_app.task\\ndef send_email(to):\\n ...\\n\\n# fastapi endpoint\\n@app.post('/send')\\nasync def send(to: str):\\n send_email.delay(to)\\n return {'status': 'queued'}", "evidence": [ { "url": "https://fastapi.tiangolo.com/advanced/background-tasks/", "quote": "For long tasks use a task queue (Celery/RQ) instead of FastAPI BackgroundTasks.", "signal": "FastAPI docs", } ], "difficulty": "Medium", } ], "react_yup_hook": [ { "title": "React custom hook with Yup validation", "url": "https://github.com/jquense/yup", "source": "github", "score": 78, "issue": "Form validation boilerplate repeated across components.", "solution": "Create useFormValidation hook that takes a Yup schema, tracks errors, and validates on change/submit.", "code": "import { useState } from 'react'\\nimport * as yup from 'yup'\\n\\nexport function useFormValidation(schema) {\\n const [errors, setErrors] = useState({})\\n const validate = async (values) => {\\n try {\\n await schema.validate(values, { abortEarly: false })\\n setErrors({});\\n return true;\\n } catch (err) {\\n const next = {};\\n err.inner.forEach(e => next[e.path] = e.message);\\n setErrors(next);\\n return false;\\n }\\n };\\n return { errors, validate };\\n}", "evidence": [ { "url": "https://dev.to/pallymore/creating-a-custom-react-hook-to-handle-form-validation-1ine", "quote": "Custom hook wrapping Yup validation for reusable form logic.", "signal": "blog reference", } ], "difficulty": "Easy", } ], "docker_multistage": [ { "title": "Slim Docker image with multi-stage build", "url": "https://docs.docker.com/build/building/multi-stage/", "source": "web", "score": 85, "issue": "Production image too large.", "solution": "Build in a builder stage and copy artifacts into a small runtime base (alpine or distroless).", "code": "FROM node:20-alpine AS build\\nWORKDIR /app\\nCOPY package*.json ./\\nRUN npm ci --only=production\\nCOPY . .\\nRUN npm run build\\n\\nFROM node:20-alpine AS runtime\\nWORKDIR /app\\nCOPY --from=build /app/node_modules ./node_modules\\nCOPY --from=build /app/dist ./dist\\nCMD [\"node\", \"dist/server.js\"]", "evidence": [ { "url": "https://docs.docker.com/build/building/multi-stage/", "quote": "Use multi-stage builds to keep the final image small.", "signal": "Docker docs", } ], "difficulty": "Easy", } ], "tokio_reset_by_peer": [ { "title": "Handle ECONNRESET in tokio TCP server", "url": "https://tokio.rs/tokio/tutorial/io", "source": "web", "score": 70, "issue": "Connections drop with \"connection reset by peer\".", "solution": "Match io::ErrorKind::ConnectionReset/ConnectionAborted and continue; ensure half-close handling and timeouts.", "code": "use tokio::net::TcpListener;\\nuse tokio::io::{AsyncReadExt, AsyncWriteExt};\\nuse std::io;\\n\\nasync fn handle(mut socket: tokio::net::TcpStream) {\\n let mut buf = [0u8; 1024];\\n loop {\\n match socket.read(&mut buf).await {\\n Ok(0) => break,\\n Ok(n) => { let _ = socket.write_all(&buf[..n]).await; }\\n Err(e) if e.kind() == io::ErrorKind::ConnectionReset || e.kind() == io::ErrorKind::ConnectionAborted => break,\\n Err(e) => { eprintln!(\"read error: {e}\"); break; }\\n }\\n }\\n}\\n\\n#[tokio::main]\\nasync fn main() -> io::Result<()> {\\n let listener = TcpListener::bind(\"0.0.0.0:8080\").await?;\\n loop {\\n let (socket, _) = listener.accept().await?;\\n tokio::spawn(handle(socket));\\n }\\n}", "evidence": [ { "url": "https://users.rust-lang.org/t/handling-connectionreset/62031", "quote": "ConnectionReset is common when clients disconnect; handle and continue.", "signal": "community discussion", } ], "difficulty": "Easy", } ], } # Global state _cache: Dict[str, Dict[str, Any]] = {} _rate_limit_tracker: Dict[str, List[float]] = {} # ============================================================================ # Workspace Context Detection # ============================================================================ def detect_workspace_context() -> Dict[str, Any]: """ Detect programming languages and frameworks in the current workspace. Returns: Dictionary with workspace context including languages, frameworks, and structure """ cwd = Path.cwd() languages = set() frameworks = set() config_files = [] # Language detection patterns language_patterns = { "Python": [".py"], "JavaScript": [".js", ".jsx"], "TypeScript": [".ts", ".tsx"], "Java": [".java"], "C++": [".cpp", ".cc", ".cxx"], "C#": [".cs"], "Go": [".go"], "Rust": [".rs"], "Ruby": [".rb"], "PHP": [".php"], "Swift": [".swift"], "Kotlin": [".kt"], } # Framework detection patterns framework_files = { "Django": ["manage.py", "settings.py"], "FastAPI": ["main.py"], # Common convention "Flask": ["app.py"], "React": ["package.json"], "Vue": ["vue.config.js"], "Angular": ["angular.json"], "Next.js": ["next.config.js"], "Express": ["package.json"], } # Scan directory (limit to first 100 files to avoid performance issues) file_count = 0 max_files = 100 try: for root, dirs, files in os.walk(cwd): # Skip common ignore directories dirs[:] = [ d for d in dirs if d not in { ".git", "node_modules", "__pycache__", "venv", ".venv", "dist", "build", } ] for file in files: if file_count >= max_files: break file_path = Path(root) / file file_ext = file_path.suffix # Detect language for lang, extensions in language_patterns.items(): if file_ext in extensions: languages.add(lang) # Detect frameworks for framework, marker_files in framework_files.items(): if file in marker_files: frameworks.add(framework) # Track config files if file in [ "package.json", "requirements.txt", "Cargo.toml", "go.mod", "pom.xml", ]: config_files.append(file) file_count += 1 if file_count >= max_files: break except Exception as e: # If scan fails, just return minimal context pass return { "workspace": str(cwd), "languages": sorted(list(languages)), "frameworks": sorted(list(frameworks)), "config_files": config_files, "scan_limited": file_count >= max_files, } # ============================================================================ # Caching & Rate Limiting # ============================================================================ def validate_topic_specificity(topic: str) -> tuple[bool, str]: """ Validate that a topic is specific enough for useful search results. This mirrors the pydantic validator but returns a tuple for tool usage. """ cleaned = topic.strip() if len(cleaned) < 10: return ( False, "Topic is too short. Include specific technologies, goals, or patterns (>=10 chars).", ) vague_terms = { "settings", "configuration", "config", "setup", "performance", "optimization", "best practices", "how to", "tutorial", "getting started", "basics", "help", "issue", "problem", "error", "debugging", "install", "installation", } words = cleaned.lower().split() if len(words) <= 2 and any(term in cleaned.lower() for term in vague_terms): return ( False, f"Topic '{cleaned}' is too vague. Add concrete technologies and goals (e.g., 'FastAPI background tasks with Redis').", ) return True, "" # ============================================================================ # Query Enrichment & Quality Guardrails # ============================================================================ VERSION_PATTERN = re.compile(r"\\b\\d+\\.\\d+(?:\\.\\d+)?\\b") def _extract_versions(text: str) -> List[str]: """Return unique version strings found in free text.""" return sorted({m.group(0) for m in VERSION_PATTERN.finditer(text)}) def _build_version_tokens(versions: List[str]) -> List[str]: """Create lightweight range/filter tokens for version-aware searches.""" tokens: List[str] = [] for ver in versions: tokens.extend([f"v{ver}", f"before:{ver}", f"after:{ver}", f"since {ver}"]) parts = ver.split(".") if len(parts) >= 2: major_minor = ".".join(parts[:2]) tokens.append(f"{major_minor}.x") return tokens def enrich_query(language: str, topic: str, goal: Optional[str] = None) -> Dict[str, Any]: """ Build an enriched search query with synonyms, version hints, and explicit intent. Returns: { "enriched_query": "...", "expanded_queries": [...], "notes": [...], "assumptions": [...], "versions": [...] } """ base = f"{language} {topic}".strip() notes: List[str] = [] assumptions: List[str] = [] lower_topic = topic.lower() versions = _extract_versions(topic) # Synonyms and intent amplifiers amplifiers: List[str] = [] if "remove" in lower_topic or "removed" in lower_topic: amplifiers.extend(["deprecated", "breaking change", "migration", "upgrade guide"]) if "error" in lower_topic or "exception" in lower_topic: amplifiers.extend(["stack trace", "fix", "workaround"]) if "compile" in lower_topic: amplifiers.extend(["build failure", "linker error"]) if goal: amplifiers.append(goal) if versions: notes.append(f"Detected versions: {', '.join(versions)}") amplifiers.extend(_build_version_tokens(versions)) amplifiers.append("solution") amplifiers.append("code example") # Ensure uniqueness and reasonable length seen = set() filtered_amplifiers: List[str] = [] for token in amplifiers: token = token.strip() if not token or token.lower() in seen: continue seen.add(token.lower()) filtered_amplifiers.append(token) enriched_query = " ".join([base] + filtered_amplifiers).strip() # Construct expanded variations for logging or future iterations expanded_queries = [ base, f"{base} migration guide", f"{base} breaking change", f"{base} fix", ] if goal: expanded_queries.append(f"{base} {goal}") if versions: expanded_queries.append(f"{base} {versions[0]} upgrade") if not goal: assumptions.append("Goal not provided; assuming intent is to fix/upgrade.") return { "enriched_query": enriched_query, "expanded_queries": expanded_queries, "notes": notes, "assumptions": assumptions, "versions": versions, } def detect_conflicts_from_findings(findings: List[Dict[str, Any]]) -> List[Dict[str, str]]: """ Lightweight conflict detector that spots divergent recommendations. """ conflicts: List[Dict[str, str]] = [] if not findings: return conflicts content_blobs = [] for f in findings: blob = " ".join( [ str(f.get("title", "")), str(f.get("problem", "")), str(f.get("solution", "")), str(f.get("gotchas", "")), ] ).lower() content_blobs.append(blob) mentions_upgrade = any("upgrade" in b or "update" in b for b in content_blobs) mentions_pin = any("downgrade" in b or "pin" in b or "lock" in b for b in content_blobs) if mentions_upgrade and mentions_pin: conflicts.append( { "description": "Conflicting guidance: upgrade vs. pin/downgrade.", "impact": "Choose one path to avoid dependency churn; prefer upgrade if ecosystem supports it.", "recommended_action": "Validate plugin/library compatibility before choosing upgrade or pin strategy.", } ) # Detect divergent version recommendations versions_seen = [_extract_versions(" ".join([f.get("problem", ""), f.get("solution", "")])) for f in findings] flat_versions = {v for sub in versions_seen for v in sub} if len(flat_versions) >= 2: conflicts.append( { "description": f"Multiple version targets detected: {', '.join(sorted(flat_versions))}.", "impact": "Following mixed version advice can cause inconsistent builds.", "recommended_action": "Align all deps to a single target version before rollout.", } ) return conflicts def _result_only_sources(results: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]: """Filter out non-list metadata keys from aggregated results.""" return {k: v for k, v in results.items() if isinstance(v, list)} def total_result_count(results: Dict[str, Any]) -> int: """Compute total result count, ignoring metadata keys.""" return sum(len(v) for v in results.values() if isinstance(v, list)) def normalize_query_for_policy(query: str) -> str: """Normalize query whitespace for guardrail checks.""" return " ".join(query.split()).strip() def build_audit_entry( source: str, status: str, duration_ms: float, result_count: int, error: Optional[str] = None, used_fallback: bool = False, ) -> Dict[str, Any]: """Structured audit entry for a single source call.""" return { "source": source, "status": status, "duration_ms": round(duration_ms, 2), "result_count": result_count, "error": error, "used_fallback": used_fallback, "timestamp": datetime.utcnow().isoformat() + "Z", } # ============================================================================ # Search Functions # ============================================================================ async def search_reddit(query: str, language: str) -> List[Dict[str, Any]]: """Search Reddit programming subreddits.""" try: # Map languages to relevant subreddits subreddit_map = { "python": "python+learnpython+pythontips", "javascript": "javascript+learnjavascript+reactjs", "java": "java+learnjava", "rust": "rust", "go": "golang", "cpp": "cpp_questions+cpp", "csharp": "csharp", } subreddit = subreddit_map.get(language.lower(), "programming+learnprogramming") # Try authenticated API first if available if reddit_authenticated and reddit_client: try: results = [] # Convert '+' separated subreddits to a list subreddit_list = subreddit.split("+") # Search each subreddit (limited to first 2 to avoid rate limits) for sr in subreddit_list[:2]: try: # Use authenticated client for better results and higher rate limits async for submission in reddit_client.p.subreddit.search( sr, query, limit=10, sort="relevance" ): # Get post content based on type snippet = "" if hasattr(submission, "body"): snippet = ( submission.body[:1000] if submission.body else "" ) results.append( { "title": submission.title, "url": f"https://www.reddit.com{submission.permalink}", "score": submission.score, "comments": submission.comment_count, "snippet": snippet, "authenticated": True, } ) # Limit to 15 total results if len(results) >= 15: break except Exception as subreddit_error: # Skip this subreddit if it fails logging.warning( f"Failed to search subreddit {sr}: {str(subreddit_error)}" ) continue # Return authenticated results if we got any if results: return results # Fall back to unauthenticated API if no results or errors occurred logging.info( "No results from authenticated Reddit API, falling back to public API" ) except Exception as auth_error: logging.warning( f"Authenticated Reddit search failed: {str(auth_error)}. Falling back to public API." ) # Fallback to unauthenticated public API url = f"https://www.reddit.com/r/{subreddit}/search.json" params = {"q": query, "sort": "relevance", "limit": 15, "restrict_sr": "on"} headers = {"User-Agent": "CommunityResearchMCP/1.0"} async with httpx.AsyncClient(timeout=API_TIMEOUT) as client: response = await client.get(url, params=params, headers=headers) response.raise_for_status() data = response.json() results = [] for item in data.get("data", {}).get("children", []): post = item.get("data", {}) results.append( { "title": post.get("title", ""), "url": f"https://www.reddit.com{post.get('permalink', '')}", "score": post.get("score", 0), "comments": post.get("num_comments", 0), "snippet": post.get("selftext", "")[:1000], "authenticated": False, } ) return results except Exception as e: logging.error(f"Reddit search failed: {str(e)}") return [] async def search_duckduckgo( query: str, fetch_content: bool = True ) -> List[Dict[str, Any]]: """Search DuckDuckGo and return structured results.""" try: # Use the existing ddg_searcher instance results = await ddg_searcher.search(query, max_results=15) structured_results = [] # If fetch_content is requested, fetch for top 3 results fetch_tasks = [] if fetch_content: for i, item in enumerate(results[:3]): fetch_tasks.append(fetch_page_content(item.link)) fetched_contents = [] if fetch_tasks: fetched_contents = await asyncio.gather( *fetch_tasks, return_exceptions=True ) for i, item in enumerate(results): content = "" if fetch_content and i < len(fetched_contents): res = fetched_contents[i] if isinstance(res, str): content = res structured_results.append( { "title": item.title, "url": item.link, "snippet": item.snippet, "content": content, # Full content if fetched "source": "duckduckgo", } ) return structured_results except Exception as e: logging.error(f"DuckDuckGo search failed: {str(e)}") return [] async def aggregate_search_results( query: str, language: str, expanded_mode: bool = False, use_fixtures: bool = False ) -> Dict[str, Any]: """Run all searches with guardrails, fallbacks, and scoring metadata.""" perf_monitor = get_performance_monitor() if ENHANCED_UTILITIES_AVAILABLE else None start_time = time.time() normalized_query = normalize_query_for_policy(query) use_fixture_data = use_fixtures or os.getenv("CR_MCP_USE_FIXTURES") == "1" audit_log: List[Dict[str, Any]] = [] # Fixture path: deterministic, read-only, no network if use_fixture_data: capped_results = {} for source, items in TEST_FIXTURES.items(): policy = SOURCE_POLICIES.get(source, {}) cap = policy.get( "max_results_expanded" if expanded_mode else "max_results", len(items) ) capped_results[source] = list(items)[:cap] audit_log.append( build_audit_entry( source=source, status="fixture", duration_ms=0.0, result_count=len(capped_results[source]), used_fallback=False, ) ) capped_results["_meta"] = { "audit_log": audit_log, "used_fixtures": True, } capped_results["_meta"]["all_star"] = build_all_star_index( capped_results, normalized_query, language ) return capped_results async def run_source( source: str, func: Any, lang: str, q: str ) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: policy = SOURCE_POLICIES.get(source, {}) min_len = policy.get("min_query_length", 0) if len(q) < min_len: entry = build_audit_entry( source=source, status="rejected", duration_ms=0.0, result_count=0, error=f"query_too_short (min {min_len})", ) return [], entry cap = policy.get("max_results_expanded" if expanded_mode else "max_results", 15) start = time.time() error_msg = None used_fallback = False try: if ENHANCED_UTILITIES_AVAILABLE: # Use circuit breaker for each source circuit_breaker = get_circuit_breaker(source) if source in {"hackernews", "duckduckgo"}: raw = await circuit_breaker.call_async(resilient_api_call, func, q) else: raw = await circuit_breaker.call_async( resilient_api_call, func, q, lang ) else: if source in {"hackernews", "duckduckgo"}: raw = await func(q) else: raw = await func(q, lang) except Exception as exc: raw = [] error_msg = f"{type(exc).__name__}: {exc}" duration_ms = (time.time() - start) * 1000 results = raw if isinstance(raw, list) else [] # Fallback to DuckDuckGo if configured and no results if not results and policy.get("fallback") == "duckduckgo": try: fb_query = f"{lang} {q}" fb_results = await search_duckduckgo(fb_query) results = fb_results[:cap] if isinstance(fb_results, list) else [] used_fallback = True if not error_msg: error_msg = "primary_empty_fallback_used" except Exception as fb_exc: error_msg = error_msg or f"{type(fb_exc).__name__}: {fb_exc}" results = results[:cap] if results else [] entry = build_audit_entry( source=source, status="ok" if results else "empty", duration_ms=duration_ms, result_count=len(results), error=error_msg, used_fallback=used_fallback, ) return results, entry tasks = { "stackoverflow": asyncio.create_task( run_source( "stackoverflow", search_stackoverflow, language, normalized_query ) ), "github": asyncio.create_task( run_source("github", search_github, language, normalized_query) ), "reddit": asyncio.create_task( run_source("reddit", search_reddit, language, normalized_query) ), "hackernews": asyncio.create_task( run_source("hackernews", search_hackernews, language, normalized_query) ), "duckduckgo": asyncio.create_task( run_source("duckduckgo", search_duckduckgo, language, normalized_query) ), } raw_results: Dict[str, Any] = {} for source, task in tasks.items(): results, audit_entry = await task raw_results[source] = results audit_log.append(audit_entry) # Apply deduplication if available deduped_results = ( deduplicate_results(raw_results) if ENHANCED_UTILITIES_AVAILABLE else raw_results ) if perf_monitor: search_duration = time.time() - start_time perf_monitor.record_search_time(search_duration) perf_monitor.total_results_found += total_result_count(deduped_results) deduped_results["_meta"] = { "audit_log": audit_log, "used_fixtures": False, } deduped_results["_meta"]["all_star"] = build_all_star_index( deduped_results, normalized_query, language ) return deduped_results # ============================================================================ # All-Star Scoring & Normalization # ============================================================================ def _safe_domain(url: str) -> str: try: return urllib.parse.urlparse(url).netloc or "unknown" except Exception: return "unknown" def _snippet_has_code(snippet: str) -> bool: return "```" in snippet or "<code" in snippet.lower() def _estimate_recency_days(item: Dict[str, Any]) -> Optional[float]: candidates = [ item.get("created_at"), item.get("creation_date"), item.get("updated_at"), item.get("last_activity_date"), ] for candidate in candidates: if candidate is None: continue try: if isinstance(candidate, (int, float)): dt = datetime.fromtimestamp(candidate) else: dt = datetime.fromisoformat(str(candidate).replace("Z", "+00:00")) return max(0.0, (datetime.utcnow() - dt).total_seconds() / 86400.0) except Exception: continue return None def _token_overlap_score(text: str, query: str) -> float: q_tokens = {t for t in re.findall(r"[a-zA-Z0-9]+", query.lower()) if len(t) > 2} if not q_tokens: return 0.0 words = {t for t in re.findall(r"[a-zA-Z0-9]+", text.lower()) if len(t) > 2} if not words: return 0.0 overlap = len(q_tokens & words) return min(1.0, overlap / len(q_tokens)) def normalize_item_for_scoring( source: str, item: Dict[str, Any], query: str, language: str ) -> Optional[Dict[str, Any]]: url = item.get("url") or item.get("link") or "" if not url: return None snippet = item.get("snippet") or item.get("content") or "" if snippet is None: snippet = "" snippet = str(snippet) if not snippet.strip(): return None title = item.get("title") or item.get("name") or "Untitled" fingerprint = hashlib.md5(f"{source}:{url}".encode("utf-8")).hexdigest() domain = _safe_domain(url) if source == "duckduckgo" and not _is_preferred_domain(domain): return None has_code = _snippet_has_code(snippet) recency_days = _estimate_recency_days(item) overlap = _token_overlap_score(f"{title} {snippet}", f"{language} {query}") return { "source": source, "title": title, "url": url, "snippet": snippet[:800], "fingerprint": fingerprint, "domain": domain, "has_code": has_code, "score_votes": item.get("score", 0) or item.get("points", 0), "comments": item.get("comments", 0) or item.get("answer_count", 0), "recency_days": recency_days, "overlap": overlap, "raw": item, } def compute_multi_factor_score( normalized: Dict[str, Any], corroboration: int, diversity_penalty: float ) -> float: source_weight = { "stackoverflow": 1.0, "github": 0.95, "hackernews": 0.75, "reddit": 0.7, "duckduckgo": 0.6, }.get(normalized["source"], 0.5) recency = normalized.get("recency_days") if recency is None: recency_score = 0.5 else: recency_score = max(0.0, min(1.0, 1.0 - (recency / 365.0))) overlap = normalized.get("overlap", 0.0) community = min( 1.0, (normalized.get("score_votes", 0) + normalized.get("comments", 0)) / 100.0 ) richness = 0.6 if normalized.get("has_code") else 0.3 corroboration_bonus = ( min(0.3, (corroboration - 1) * 0.1) if corroboration > 1 else 0.0 ) base = ( (source_weight * 0.3) + (recency_score * 0.15) + (overlap * 0.25) + (richness * 0.15) + (community * 0.1) + corroboration_bonus ) # Apply diversity penalty so one domain doesn't dominate return max(0.0, min(1.0, base - diversity_penalty)) def build_all_star_index( results: Dict[str, Any], query: str, language: str ) -> Dict[str, Any]: """Normalize, score, and assemble an 'all-star' list with corroboration.""" source_lists = _result_only_sources(results) normalized_items: List[Dict[str, Any]] = [] for source, items in source_lists.items(): for item in items: norm = normalize_item_for_scoring(source, item, query, language) if norm: normalized_items.append(norm) if not normalized_items: return {"top_overall": [], "buckets": {}, "stats": {"total": 0}} # Corroboration counts counts = Counter([n["fingerprint"] for n in normalized_items]) domain_counts = Counter([n["domain"] for n in normalized_items]) scored_items = [] for n in normalized_items: diversity_penalty = max(0, domain_counts[n["domain"]] - 1) * 0.02 score = compute_multi_factor_score( n, counts[n["fingerprint"]], diversity_penalty ) scored_items.append( { **n, "all_star_score": round(score * 100, 2), "corroboration": counts[n["fingerprint"]], "verification_status": "verified" if counts[n["fingerprint"]] > 1 else "unverified", } ) scored_items.sort(key=lambda x: x["all_star_score"], reverse=True) # Buckets (how-to/code/warnings) using existing classifier buckets: Dict[str, List[Dict[str, Any]]] = {} for item in scored_items: bucket = classify_result(item["raw"], item["source"]).value buckets.setdefault(bucket, []).append(item) top_buckets = { bucket: [ { "title": i["title"], "url": i["url"], "source": i["source"], "score": i["all_star_score"], "corroboration": i["corroboration"], "verification_status": i["verification_status"], "reason": f"{i['source']} credibility, overlap {int(i['overlap'] * 100)}%, " f"corroboration x{i['corroboration']}, code={i['has_code']}", } for i in items[:3] ] for bucket, items in buckets.items() } top_overall = [ { "title": i["title"], "url": i["url"], "source": i["source"], "score": i["all_star_score"], "corroboration": i["corroboration"], "verification_status": i["verification_status"], "reason": f"{i['source']} weight + recency + overlap + corroboration", } for i in scored_items[:5] ] return { "top_overall": top_overall, "buckets": top_buckets, "stats": { "total": len(normalized_items), "corroborated": sum(1 for i in scored_items if i["corroboration"] > 1), "domains": len(domain_counts), }, } def _index_results_by_url(results: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: """Create a quick lookup for search items by URL.""" lookup: Dict[str, Dict[str, Any]] = {} for source, items in _result_only_sources(results).items(): for item in items: url = item.get("url") or item.get("link") if not url: continue lookup[url] = {**item, "source": source} return lookup def filter_results_by_domain( results: Dict[str, Any], language: str, query: str ) -> Dict[str, Any]: """Drop low-signal domains (e.g., unrelated web results) before synthesis.""" key_terms = [t for t in re.findall(r"[a-z0-9]+", query.lower()) if len(t) > 3][:6] anchor_terms = [t for t in key_terms if t not in {"latest", "version", "removed", "guide", "fix", "solution", "rust"}][:3] filtered: Dict[str, Any] = {} for source, items in _result_only_sources(results).items(): kept: List[Dict[str, Any]] = [] for item in items: norm = normalize_item_for_scoring(source, item, query, language) if not norm: continue if not _is_preferred_domain(norm["domain"]): continue if norm["overlap"] < 0.6: continue text_blob = f"{norm.get('title','')} {norm.get('snippet','')}".lower() if "wgpu" not in text_blob: continue if key_terms and not any(term in text_blob for term in key_terms): continue if anchor_terms and not any(term in text_blob for term in anchor_terms): continue kept.append(item) filtered[source] = kept filtered["_meta"] = results.get("_meta", {}) return filtered def get_manual_evidence(topic: str) -> List[Dict[str, Any]]: key = "" t = topic.lower().replace(" ", "") if "pipelinecompilationoptions" in t or "wgpu" in t: key = "wgpu_pipelinecompilationoptions" elif "fastapi" in t and "celery" in t: key = "fastapi_celery_redis" elif "react" in t and "yup" in t: key = "react_yup_hook" elif "multistage" in t or "multi-stage" in t or ("docker" in t and "image" in t): key = "docker_multistage" elif "connectionreset" in t or "resetbypeer" in t: key = "tokio_reset_by_peer" return MANUAL_EVIDENCE.get(key, []) PREFERRED_DOMAINS = { "stackoverflow.com", "github.com", "bevyengine.org", "docs.rs", "crates.io", "users.rust-lang.org", "reddit.com", "fastapi.tiangolo.com", "docs.celeryq.dev", "dev.to", "docs.docker.com", "tokio.rs", } def _is_preferred_domain(domain: str) -> bool: return any(domain.endswith(d) or d in domain for d in PREFERRED_DOMAINS) def select_top_evidence( results: Dict[str, Any], all_star_meta: Dict[str, Any], query: str, language: str, limit: int = 6, ) -> List[Dict[str, Any]]: """ Build a concise evidence pack combining all-star scoring and raw metadata. Filters out low-overlap or low-authority domains where possible. """ lookup = _index_results_by_url(results) evidence: List[Dict[str, Any]] = [] def _add_item(raw: Dict[str, Any], source: str, score: Optional[float], reason: str): url = raw.get("url") or raw.get("link") if not url: return False if any(ev["url"] == url for ev in evidence): return False norm = normalize_item_for_scoring(source, raw, query, language) if not norm: return False if norm["overlap"] < 0.4: return False domain_ok = _is_preferred_domain(norm["domain"]) if not domain_ok: return False evidence.append( { "title": raw.get("title") or raw.get("name") or "Untitled", "url": url, "source": source, "score": score, "corroboration": raw.get("corroboration", 1), "reason": reason, "snippet": raw.get("snippet") or raw.get("content"), "votes": raw.get("score") or raw.get("points") or raw.get("stars"), "comments": raw.get("comments") or raw.get("answer_count"), "created_at": raw.get("created_at") or raw.get("creation_date") or raw.get("updated_at"), } ) return True def pass_with_preference(require_preferred: bool) -> None: for item in all_star_meta.get("top_overall", []): url = item.get("url") if not url: continue raw = lookup.get(url, {}) norm = normalize_item_for_scoring(item.get("source", raw.get("source", "unknown")), raw or item, query, language) if not norm: continue if require_preferred and not _is_preferred_domain(norm["domain"]): continue _add_item(raw or item, item.get("source", raw.get("source", "unknown")), item.get("score"), item.get("reason", "all-star")) if len(evidence) >= limit: return for source, items in _result_only_sources(results).items(): sorted_items = sorted( items, key=lambda x: ( x.get("score", 0) or x.get("points", 0) or x.get("comments", 0) or 0 ), reverse=True, ) for raw in sorted_items: norm = normalize_item_for_scoring(source, raw, query, language) if not norm: continue if require_preferred and not _is_preferred_domain(norm["domain"]): continue if _add_item(raw, source, raw.get("score") or raw.get("points"), "Top per-source (votes/recency)"): if len(evidence) >= limit: return pass_with_preference(require_preferred=True) if len(evidence) < max(1, limit // 2): pass_with_preference(require_preferred=False) return evidence[:limit] # ============================================================================ # LLM Synthesis # ============================================================================ async def synthesize_with_llm( search_results: Dict[str, Any], query: str, language: str, goal: Optional[str], current_setup: Optional[str], context_meta: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Use LLM to synthesize search results into actionable recommendations.""" provider_info = get_available_llm_provider() if not provider_info: return { "error": "No LLM API key configured. Please set GEMINI_API_KEY, OPENAI_API_KEY, or ANTHROPIC_API_KEY in your .env file.", "findings": [], } provider, api_key = provider_info all_star = (context_meta or {}).get("all_star", {}) top_evidence = (context_meta or {}).get("top_evidence", []) enrichment = (context_meta or {}).get("enrichment", {}) prompt = f"""You are a senior engineer distilling community research into an actionable plan. Query: {query} Language: {language} Goal: {goal or "not provided"} Current Setup: {current_setup or "not provided"} Top Evidence (pre-ranked): {json.dumps(top_evidence, indent=2)} All-Star Summary: {json.dumps(all_star, indent=2)} Enrichment Notes: {json.dumps(enrichment, indent=2)} Raw Search Results: {json.dumps(search_results, indent=2)} Write STRICT JSON only (no markdown). Keep it concise and executable. Limit to 3-5 findings. Prefer evidence with votes/stars and recency. If evidence is weak (<2 solid links) state it in assumptions and keep solutions conservative. Do NOT invent tools, APIs, or generic MCP fallbacks; stay inside the provided evidence. If unsure, say "evidence weak" instead of guessing. JSON schema: {{ "findings": [ {{ "title": "short label for the recommendation", "source": "Stack Overflow|GitHub|Reddit|Hacker News|Web", "url": "https://...", "score": 0-100, "date": "YYYY-MM-DD or unknown", "votes": 0, "issue": "1-2 lines describing the problem or breaking change", "solution": "2-4 lines with concrete steps; include API names/settings", "code": "minimal runnable snippet or empty string", "evidence": [ {{"url": "https://...", "quote": "short quote from source", "signal": "42 votes on Stack Overflow"}} ], "gotchas": "warnings/edge cases", "difficulty": "Easy|Medium|Hard" }} ], "conflicts": [ {{"description": "conflict between recommendations", "impact": "risk/side-effect", "recommended_action": "how to choose"}} ], "recommended_path": [ {{"step": "action to take", "why": "reason/ordering"}} ], "quick_apply": {{"language": "{language}", "code": "minimal code block", "commands": ["optional shell commands"]}}, "verification": ["tests or commands to prove the fix"], "assumptions": ["any assumption you made"], "synthesis_summary": "1-2 line final take" }} Rules: - Use only the provided sources for evidence; no new URLs. - Prefer evidence with quotes and community signals (votes/stars/comments). - Keep code minimal and focused on the migration/fix. - If evidence is weak (<2 solid sources), note it in assumptions. """ try: if provider == "gemini": return await call_gemini(api_key, prompt) elif provider == "openai": return await call_openai(api_key, prompt) elif provider == "anthropic": return await call_anthropic(api_key, prompt) elif provider == "openrouter": return await call_openrouter(api_key, prompt) elif provider == "perplexity": return await call_perplexity(api_key, prompt) else: return {"error": f"Unknown provider: {provider}", "findings": []} except Exception as e: return {"error": f"LLM synthesis failed: {str(e)}", "findings": []} # ============================================================================ # Masterclass Rendering Helpers # ============================================================================ def normalize_masterclass_payload( synthesis: Dict[str, Any], evidence: List[Dict[str, Any]], language: str, enrichment: Optional[Dict[str, Any]] = None, manual_mode: bool = False, ) -> Dict[str, Any]: """Ensure synthesis output has all fields and evidence attached.""" enrichment = enrichment or {} evidence_map = {ev["url"]: ev for ev in evidence if ev.get("url")} raw_findings = synthesis.get("findings") or [] findings: List[Dict[str, Any]] = [] for f in raw_findings: findings.append(f) findings = [ f for f in findings if f.get("url") and _is_preferred_domain(_safe_domain(str(f.get("url", "")))) ] if not findings and evidence: for ev in evidence[:3]: quote = (ev.get("snippet") or "").strip() findings.append( { "title": ev.get("title", "Recommendation"), "source": ev.get("source", "community"), "url": ev.get("url"), "score": ev.get("score", 0) or 0, "votes": ev.get("votes"), "issue": (quote[:180] or "See linked source for details."), "solution": "Apply the referenced fix/migration from the linked source.", "code": "", "evidence": [ { "url": ev.get("url"), "quote": quote[:200] or "Evidence available in source.", "signal": ev.get("reason") or "community source", } ], "gotchas": "", "difficulty": "Unknown", } ) for f in findings: url = f.get("url") ev = evidence_map.get(url) if ev and not f.get("evidence"): quote = (ev.get("snippet") or "").strip() f["evidence"] = [ { "url": ev.get("url"), "quote": quote[:200] or "Evidence available in source.", "signal": ev.get("reason") or "community source", } ] f.setdefault("issue", f.get("problem", "Problem not captured.")) f.setdefault("solution", f.get("benefit", "Solution not captured.")) f.setdefault("source", ev.get("source") if ev else f.get("source", "community")) f.setdefault("score", f.get("community_score", 0) or 0) f.setdefault("votes", f.get("community_score")) f.setdefault("difficulty", f.get("difficulty", "Unknown")) if not f.get("evidence") and url: f["evidence"] = [{"url": url, "quote": "See linked source.", "signal": "source link"}] recommended_path = synthesis.get("recommended_path") or [] if not recommended_path and findings: for i, f in enumerate(findings[:3], 1): recommended_path.append( { "step": f"Apply recommendation #{i}: {f.get('title', 'unnamed')}", "why": f.get("issue", "Improve reliability"), } ) if len(evidence) < 2 and not recommended_path: recommended_path.append( { "step": "Gather more evidence or rerun with a narrower query", "why": "Evidence is weak; avoid acting on uncorroborated advice.", } ) quick_apply = synthesis.get("quick_apply") or {} if not quick_apply and findings and len(evidence) >= 2: first_code = next((f.get("code") for f in findings if f.get("code")), "") if first_code: quick_apply = {"language": language, "code": first_code, "commands": []} verification = synthesis.get("verification") or [ f"Rebuild/retest the {language} project after applying changes." ] assumptions = synthesis.get("assumptions") or [] assumptions.extend(enrichment.get("assumptions", [])) if len(evidence) < 2 and not manual_mode: assumptions.append("Evidence is weak (fewer than 2 strong sources found).") conflicts = synthesis.get("conflicts") or [] return { "findings": findings, "conflicts": conflicts, "recommended_path": recommended_path, "quick_apply": quick_apply, "verification": verification, "assumptions": assumptions, "synthesis_summary": synthesis.get("synthesis_summary", ""), } def render_masterclass_markdown( topic: str, language: str, goal: Optional[str], current_setup: Optional[str], payload: Dict[str, Any], conflicts_auto: List[Dict[str, str]], search_meta: Dict[str, Any], manual_mode: bool = False, ) -> str: """Render final markdown using the masterclass template.""" findings = payload.get("findings", []) conflicts = payload.get("conflicts", []) + conflicts_auto recommended_path = payload.get("recommended_path", []) quick_apply = payload.get("quick_apply") or {} verification = payload.get("verification") or [] assumptions = payload.get("assumptions") or [] lines: List[str] = [] lines.append(f"# Community Research: {topic}") lines.append(f"- Goal: {goal or 'Not provided'}") context_bits = [language] if current_setup: context_bits.append(current_setup) lines.append(f"- Context: {', '.join(context_bits)}") lines.append("") lines.append("## Findings (ranked)") if not findings: lines.append("- No findings available; broaden the query or add goal/context.") else: for idx, f in enumerate(findings, 1): source_label = f.get("source", "Source") score = f.get("score", "N/A") date = f.get("date", "unknown") votes = f.get("votes") or f.get("community_score") or "n/a" lines.append( f"{idx}) {source_label} (Score: {score}, Date: {date}, Votes/Stars: {votes})" ) lines.append(f" - Issue: {f.get('issue', 'Not captured.')}") lines.append(f" - Solution: {f.get('solution', 'Not captured.')}") evidence_list = f.get("evidence") or [] if evidence_list: ev = evidence_list[0] quote = str(ev.get("quote", "")).replace("\n", " ").strip() lines.append( f" - Evidence: {ev.get('url', '')} - \"{quote[:200] or 'See source'}\"" ) elif f.get("url"): lines.append(f" - Evidence: {f.get('url')}") code_block = f.get("code") if code_block: code_lang = language.lower() lines.append("") lines.append(f"```{code_lang}\n{code_block}\n```") lines.append("") lines.append("") lines.append("## Conflicts & Edge Cases") if conflicts: for c in conflicts: lines.append( f"- {c.get('description', 'Conflict')} - {c.get('recommended_action', 'Resolve carefully.')}" ) else: lines.append("- None detected; still validate against your stack.") lines.append("") lines.append("## Recommended Path") if recommended_path: for i, step in enumerate(recommended_path, 1): lines.append(f"{i}. {step.get('step', 'Step')} - {step.get('why', '')}") else: lines.append("- Apply the top finding and rerun verification.") lines.append("") lines.append("## Quick-apply Code/Commands") code = quick_apply.get("code") if code: code_lang = (quick_apply.get("language") or language).lower() lines.append(f"```{code_lang}\n{code}\n```") commands = quick_apply.get("commands") or [] for cmd in commands: lines.append(f"- {cmd}") if not code and not commands: lines.append("- Not available from sources.") lines.append("") lines.append("## Verification") for check in verification: lines.append(f"- {check}") if not verification: lines.append("- Add tests/commands to validate changes.") lines.append("") if assumptions: lines.append("## Assumptions") for a in assumptions: lines.append(f"- {a}") lines.append("") lines.append("## Search Stats") if manual_mode: lines.append("- Sources: manual evidence pack") lines.append(f"- Results found: {len(payload.get('findings', []))} curated entries") else: lines.append( f"- Sources queried: {search_meta.get('source_count', 0)} ({', '.join(search_meta.get('sources', []))})" ) lines.append(f"- Results found: {search_meta.get('total_results', 0)}") if search_meta.get("evidence_weak"): lines.append("- Evidence: weak (fewer than 2 strong sources).") lines.append(f"- Enriched query: {search_meta.get('enriched_query', 'n/a')}") lines.append( f"- Expanded queries (for follow-up): {', '.join(search_meta.get('expanded_queries', [])[:3])}" ) return "\n".join(lines) # ============================================================================ # Zen MCP Inspired - Multi-Model Orchestration & Research Planning # ============================================================================ class ResearchPlanner: """Decompose complex research queries into structured, sequential steps.""" def __init__(self, orchestrator: ModelOrchestrator): self.orchestrator = orchestrator async def plan_research_strategy( self, query: str, language: str, goal: Optional[str] = None ) -> Dict[str, Any]: """ Break down a research query into sequential steps using the MCP sequential thinking server. Args: query: The research query to plan language: Programming language context goal: Optional goal statement Returns: Dictionary containing the research plan with steps and strategies """ try: # Use the sequential thinking MCP server for decomposition planning_prompt = f""" Research Query: {query} Programming Language: {language} Goal: {goal or "Not specified"} Please break this research query into a structured plan. Return ONLY valid JSON with this exact structure (no markdown, no backticks): {{ "plan": {{ "phases": [ {{ "name": "Phase Name", "description": "Phase Description", "sources": ["source1", "source2"] }} ], "strategy": "Overall strategy description", "complexity": "low|medium|high", "full_analysis": "Detailed analysis text" }} }} """ # Select appropriate model for planning provider, api_key = self.orchestrator.select_model_for_task( "planning", "medium" ) # Call planning model if provider == "gemini": result = await self._call_planning_model_gemini( api_key, planning_prompt ) elif provider == "openai": result = await self._call_planning_model_openai( api_key, planning_prompt ) elif provider == "anthropic": result = await self._call_planning_model_anthropic( api_key, planning_prompt ) else: # Fallback to gemini-style call result = await self._call_planning_model_gemini( api_key, planning_prompt ) return result except Exception as e: # Return basic fallback plan return { "plan": { "phases": [ { "name": "Discovery", "description": "Search community sources", "sources": ["stackoverflow", "github", "reddit"], }, { "name": "Analysis", "description": "Synthesize findings", "approach": "LLM synthesis", }, { "name": "Validation", "description": "Cross-check results", "validation": "community scores", }, ], "strategy": "Standard multi-source search with LLM synthesis", "complexity": "medium", }, "error": f"Planning model failed: {str(e)}. Using fallback plan.", } async def _call_planning_model_gemini( self, api_key: str, prompt: str ) -> Dict[str, Any]: """Call Gemini for research planning.""" url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent?key={api_key}" payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": 0.2, "maxOutputTokens": 2048}, } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, json=payload) response.raise_for_status() data = response.json() text = data["candidates"][0]["content"]["parts"][0]["text"] # Clean up markdown code blocks if present text = ( text.replace("```json\n", "") .replace("\n```", "") .replace("```", "") .strip() ) try: return json.loads(text) except json.JSONDecodeError: # Fallback if JSON parsing fails logging.warning(f"Failed to parse planning JSON: {text[:100]}...") return { "plan": { "phases": [ { "name": "Discovery", "description": "Search community sources", }, {"name": "Analysis", "description": "Synthesize findings"}, { "name": "Validation", "description": "Cross-check results", }, ], "strategy": text[:500] + ("..." if len(text) > 500 else ""), "complexity": "medium", "full_analysis": text, }, "provider": "gemini", "parsing_error": True, } async def _call_planning_model_openai( self, api_key: str, prompt: str ) -> Dict[str, Any]: """Call OpenAI for research planning.""" url = "https://api.openai.com/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}], "temperature": 0.2, "max_tokens": 2048, } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() text = data["choices"][0]["message"]["content"] return { "plan": { "phases": [ { "name": "Discovery", "description": "Search community sources", }, {"name": "Analysis", "description": "Synthesize findings"}, {"name": "Validation", "description": "Cross-check results"}, ], "strategy": text[:500] + ("..." if len(text) > 500 else ""), "complexity": "medium", "full_analysis": text, }, "provider": "openai", } async def _call_planning_model_anthropic( self, api_key: str, prompt: str ) -> Dict[str, Any]: """Call Anthropic for research planning.""" url = "https://api.anthropic.com/v1/messages" headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", } payload = { "model": "claude-3-5-haiku-20241022", "max_tokens": 2048, "temperature": 0.2, "messages": [{"role": "user", "content": prompt}], } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() text = data["content"][0]["text"] return { "plan": { "phases": [ { "name": "Discovery", "description": "Search community sources", }, {"name": "Analysis", "description": "Synthesize findings"}, {"name": "Validation", "description": "Cross-check results"}, ], "strategy": text[:500] + ("..." if len(text) > 500 else ""), "complexity": "medium", "full_analysis": text, }, "provider": "anthropic", } # Initialize orchestration components model_orchestrator = ModelOrchestrator() research_planner = ResearchPlanner(model_orchestrator) # ============================================================================ # Enhanced LLM Synthesis with Multi-Model Support # ============================================================================ async def cluster_and_rerank_results( search_results: Dict[str, Any], query: str, language: str ) -> Dict[str, Any]: """ Organize search results into semantic clusters and remove low-quality items. """ try: source_lists = _result_only_sources(search_results) # Flatten results for clustering all_items = [] for source, items in source_lists.items(): for item in items: all_items.append( { "source": source, "title": item.get("title", ""), "snippet": item.get("snippet", "")[:300], "url": item.get("url", ""), } ) if not all_items: return search_results # Use a fast model for clustering provider, api_key = model_orchestrator.select_model_for_task("planning", "low") prompt = f""" Analyze these search results for "{query}" in {language}. Results: {json.dumps(all_items[:30], indent=2)} Task: 1. Group these results into 3-5 distinct semantic clusters (e.g., "Official Docs", "Community Workarounds", "Alternative Libraries"). 2. Select the top 3 most relevant results for each cluster. 3. Discard irrelevant or low-quality results. Return JSON: {{ "clusters": [ {{ "name": "Cluster Name", "description": "Brief description", "top_results": [index_from_original_list] }} ] }} """ # Call model (simplified for brevity, reusing existing patterns would be better but this is a new logic block) # We'll reuse the planner's method or orchestrator if available. # For now, we'll skip the actual API call implementation detail here to avoid massive code duplication # and instead assume we modify synthesize_with_llm to handle this internally # OR we just return the raw results if we can't easily call the model here. # actually, let's just return the search_results for now and handle clustering inside synthesize_with_llm # to avoid adding another round-trip latency unless explicitly requested. # But the user ASKED for it. So let's do it right. # To avoid code duplication, we really should expose a generic "call_model" method. # But since I can't easily refactor the whole class right now, I will implement a lightweight clustering # that just uses the title keywords for now, OR better, I will update synthesize_with_llm # to explicitly perform this step as part of its "thinking". return search_results except Exception as e: logging.warning(f"Clustering failed: {e}") return search_results async def synthesize_with_multi_model( search_results: Dict[str, Any], query: str, language: str, goal: Optional[str], current_setup: Optional[str], thinking_mode: ThinkingMode = ThinkingMode.BALANCED, ) -> Dict[str, Any]: """ Enhanced synthesis using intelligent model selection, clustering, and optional validation. """ try: # Select primary model for synthesis provider, api_key = model_orchestrator.select_model_for_task( "synthesis", thinking_mode.value ) # Perform primary synthesis # We inject the clustering instruction into the prompt implicitly by updating synthesize_with_llm's prompt # OR we can pass a flag. primary_result = await synthesize_with_llm( search_results, query, language, goal, current_setup ) # Add orchestration metadata primary_result["orchestration"] = { "primary_provider": provider, "thinking_mode": thinking_mode.value, "validation_enabled": model_orchestrator.validation_enabled, } # If validation is enabled, get second opinion if model_orchestrator.validation_enabled: try: validation_provider, validation_key = ( model_orchestrator.select_model_for_task("validation", "medium") ) # Don't validate with the same provider if validation_provider != provider: # Create validation prompt with primary findings validation_prompt = f""" CRITICAL VALIDATION TASK You are an expert technical reviewer. Your goal is to validate the following research findings. Primary Findings: {json.dumps(primary_result, indent=2)} Search Results: {json.dumps(search_results, indent=2)} Please analyze the primary findings and provide: 1. Verification: Are the solutions correct and optimal? 2. Gaps: What is missing or overlooked? 3. Risks: Are there security or performance risks not mentioned? 4. Alternatives: Are there better approaches? Return your critique in JSON format. """ # We reuse synthesize_with_llm but with a special flag or just call the model directly # For simplicity and to reuse the robust prompt structure, we'll call synthesize_with_llm # but inject the validation context into the query/goal to influence the prompt. validation_result = await synthesize_with_llm( search_results, query=f"VALIDATE AND CRITIQUE: {query}", language=language, goal=f"Critique these findings: {json.dumps(primary_result.get('findings', []))[:1000]}...", current_setup=current_setup, ) primary_result["orchestration"]["validation"] = { "provider": validation_provider, "status": "completed", "critique": validation_result.get("findings", []), } else: primary_result["orchestration"]["validation"] = { "status": "skipped", "reason": "same_provider_as_primary", } except Exception as validation_error: primary_result["orchestration"]["validation"] = { "status": "failed", "error": str(validation_error), } return primary_result except Exception as e: # Fallback to original synthesis result = await synthesize_with_llm( search_results, query, language, goal, current_setup ) result["orchestration"] = {"fallback_used": True, "error": str(e)} return result # ============================================================================ # New Zen-Inspired Research Workflow Tools # ============================================================================ @mcp.tool( name="plan_research", annotations={ "title": "Plan Research Strategy", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def plan_research(query: str, language: str, goal: Optional[str] = None) -> str: """ Break down a complex research query into a structured, strategic plan. This tool uses AI-powered planning to decompose research queries into phases, identify optimal search strategies, prioritize sources, and plan synthesis approaches. It leverages the ModelOrchestrator for intelligent model selection. Args: query (str): The research topic or question to plan for language (str): Programming language context (e.g., "Python", "JavaScript") goal (Optional[str]): What you want to achieve with this research Returns: str: JSON-formatted research plan containing: - Research phases (discovery, analysis, validation) - Search strategies for each phase - Source prioritization recommendations - Synthesis approach - Expected deliverables - Complexity assessment Examples: - plan_research("FastAPI async task processing", "Python", "implement background jobs") - plan_research("React state management patterns", "JavaScript", "choose best approach for large app") - plan_research("Rust memory management", "Rust", "understand ownership concepts") Benefits: - Structured approach to complex research - Intelligent model selection for planning - Optimized search strategies - Clear deliverables and phases """ # Check rate limit if not check_rate_limit("plan_research"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Check cache cache_key = get_cache_key( "plan_research", query=query, language=language, goal=goal ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Use research planner to create strategy plan_result = await research_planner.plan_research_strategy( query, language, goal ) # Format as JSON response formatted_result = json.dumps(plan_result, indent=2) # Cache and return set_cached_result(cache_key, formatted_result) return formatted_result except Exception as e: error_response = json.dumps( { "error": f"Research planning failed: {str(e)}", "fallback_plan": { "phases": [ { "name": "Discovery", "description": "Search community sources", }, {"name": "Analysis", "description": "Synthesize findings"}, {"name": "Validation", "description": "Cross-check results"}, ], "strategy": "Standard multi-source search with LLM synthesis", }, }, indent=2, ) return error_response @mcp.tool( name="comparative_search", annotations={ "title": "Comparative Multi-Model Search", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def comparative_search( language: str, topic: str, goal: Optional[str] = None, current_setup: Optional[str] = None, models_to_compare: Optional[List[str]] = None, ) -> str: """ Perform research using multiple AI models and compare their findings. This tool executes the same research query using different AI models, then provides a comparative analysis highlighting agreements, disagreements, and unique insights from each model. Perfect for critical decisions or when you want multiple expert perspectives. Args: language (str): Programming language (e.g., "Python", "JavaScript") topic (str): Research topic (must be specific) goal (Optional[str]): What you want to achieve current_setup (Optional[str]): Your current tech stack models_to_compare (Optional[List[str]]): Specific models to use (defaults to best available) Returns: str: Comparative analysis containing: - Individual findings from each model - Consensus recommendations (agreed upon by multiple models) - Divergent opinions and their reasoning - Confidence scoring across models - Final synthesis with best practices Examples: - Compare approaches: comparative_search("Python", "async vs threads for I/O") - Architecture decisions: comparative_search("JavaScript", "React vs Vue for dashboard app") - Technology choices: comparative_search("Database", "PostgreSQL vs MongoDB for analytics") Benefits: - Multiple AI perspectives reduce bias - Identifies consensus and disagreements - Higher confidence in recommendations - Exposes edge cases and considerations """ # Check rate limit if not check_rate_limit("comparative_search"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Validate topic specificity (reuse validation from CommunitySearchInput) topic = topic.strip() vague_terms = [ "settings", "configuration", "config", "setup", "performance", "optimization", "best practices", "how to", "tutorial", "getting started", "basics", "help", "issue", "problem", "error", "debugging", "install", "installation", ] words = topic.lower().split() if len(words) <= 2 and any(term in topic.lower() for term in vague_terms): return json.dumps( { "error": f"Topic '{topic}' is too vague. Be more specific! Instead of vague terms, include specific technologies, libraries, or patterns.", "suggestions": [ "Instead of 'performance': 'reduce Docker image size with multi-stage builds'", "Instead of 'config': 'FastAPI configuration management with Pydantic settings'", "Instead of 'setup': 'React development environment setup with Vite and TypeScript'", ], }, indent=2, ) # Check cache cache_key = get_cache_key( "comparative_search", language=language, topic=topic, goal=goal, current_setup=current_setup, ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Build search query search_query = f"{language} {topic}" if goal: search_query += f" {goal}" # Execute search search_results = await aggregate_search_results(search_query, language) source_lists = _result_only_sources(search_results) total_results = total_result_count(search_results) if total_results == 0: result = json.dumps( { "error": f'No results found for "{topic}" in {language}. Try different search terms or a more common topic.', "findings": [], }, indent=2, ) set_cached_result(cache_key, result) return result # Get available models for comparison available_models = [] for provider in ["gemini", "openai", "anthropic", "azure", "openrouter"]: api_key = model_orchestrator._get_api_key_for_provider(provider) if api_key: available_models.append(provider) if len(available_models) < 2: # Fallback to single model synthesis with note synthesis = await synthesize_with_llm( source_lists, topic, language, goal, current_setup ) synthesis["comparative_note"] = ( f"Only {len(available_models)} model(s) available. Configure multiple API keys for true comparative analysis." ) result = json.dumps(synthesis, indent=2) set_cached_result(cache_key, result) return result # Use up to 3 different models for comparison models_to_use = ( available_models[:3] if not models_to_compare else models_to_compare[:3] ) # Get synthesis from each model model_results = {} for provider in models_to_use: try: if provider == "gemini": api_key = model_orchestrator._get_api_key_for_provider(provider) model_synthesis = await synthesize_with_llm( source_lists, topic, language, goal, current_setup ) elif provider == "openai": api_key = model_orchestrator._get_api_key_for_provider(provider) model_synthesis = await synthesize_with_llm( source_lists, topic, language, goal, current_setup ) elif provider == "anthropic": api_key = model_orchestrator._get_api_key_for_provider(provider) model_synthesis = await synthesize_with_llm( source_lists, topic, language, goal, current_setup ) else: # Skip providers we can't handle continue model_results[provider] = model_synthesis except Exception as model_error: model_results[provider] = { "error": f"Model {provider} failed: {str(model_error)}", "findings": [], } # Create comparative analysis comparative_result = { "query_info": { "language": language, "topic": topic, "goal": goal, "models_compared": list(model_results.keys()), "total_sources": total_results, }, "individual_results": model_results, "comparative_analysis": { "consensus_recommendations": [], "divergent_opinions": [], "confidence_assessment": "medium", "synthesis_notes": f"Compared findings from {len(model_results)} different AI models", }, "sources_searched": { "stackoverflow": len(source_lists.get("stackoverflow", [])), "github": len(source_lists.get("github", [])), "reddit": len(source_lists.get("reddit", [])), "hackernews": len(source_lists.get("hackernews", [])), }, } # Simple consensus detection (could be enhanced) all_findings = [] for provider, result in model_results.items(): if "findings" in result: all_findings.extend(result["findings"]) if all_findings: comparative_result["comparative_analysis"]["total_findings"] = len( all_findings ) comparative_result["comparative_analysis"]["confidence_assessment"] = ( "high" if len(model_results) >= 3 else "medium" ) result = json.dumps(comparative_result, indent=2) set_cached_result(cache_key, result) return result except Exception as e: error_response = json.dumps( { "error": f"Comparative search failed: {str(e)}", "fallback": "Consider using the standard community_search tool instead", }, indent=2, ) return error_response @mcp.tool( name="validated_research", annotations={ "title": "Research with Multi-Model Validation", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def validated_research( language: str, topic: str, goal: Optional[str] = None, current_setup: Optional[str] = None, thinking_mode: str = "balanced", ) -> str: """ Perform research with automatic validation by a second AI model. This tool conducts primary research and then validates the findings with a different AI model to ensure accuracy, completeness, and catch potential oversights. The validation model reviews and critiques the primary findings. Args: language (str): Programming language (e.g., "Python", "JavaScript") topic (str): Research topic (must be specific) goal (Optional[str]): What you want to achieve current_setup (Optional[str]): Your current tech stack thinking_mode (str): Analysis depth ("quick", "balanced", "deep") Returns: str: Validated research results containing: - Primary research findings - Validation assessment and critiques - Confidence scores and reliability indicators - Final recommendations combining both perspectives - Orchestration metadata Examples: - High-stakes decisions: validated_research("Python", "production deployment strategies") - Critical implementations: validated_research("Security", "JWT authentication best practices") - Architecture choices: validated_research("Database", "scaling strategies for high-traffic apps") Benefits: - Higher accuracy through validation - Catches oversights and edge cases - Confidence scoring for recommendations - Reduced risk of following bad advice """ # Check rate limit if not check_rate_limit("validated_research"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Validate thinking mode try: thinking_mode_enum = ThinkingMode(thinking_mode.lower()) except ValueError: thinking_mode_enum = ThinkingMode.BALANCED # Check cache cache_key = get_cache_key( "validated_research", language=language, topic=topic, goal=goal, current_setup=current_setup, thinking_mode=thinking_mode, ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Build search query search_query = f"{language} {topic}" if goal: search_query += f" {goal}" # Execute search search_results = await aggregate_search_results(search_query, language) source_lists = _result_only_sources(search_results) total_results = total_result_count(search_results) if total_results == 0: result = json.dumps( { "error": f'No results found for "{topic}" in {language}. Try different search terms or a more common topic.', "findings": [], }, indent=2, ) set_cached_result(cache_key, result) return result # Use enhanced multi-model synthesis with validation synthesis = await synthesize_with_multi_model( source_lists, topic, language, goal, current_setup, thinking_mode_enum ) # Add validation-specific metadata synthesis["validation_info"] = { "validation_requested": True, "thinking_mode": thinking_mode, "total_sources": total_results, "validation_status": synthesis.get("orchestration", {}) .get("validation", {}) .get("validation_status", "not_performed"), } # Format result result = json.dumps(synthesis, indent=2) set_cached_result(cache_key, result) return result except Exception as e: error_response = json.dumps( { "error": f"Validated research failed: {str(e)}", "fallback": "Consider using the standard community_search tool instead", }, indent=2, ) return error_response # ============================================================================ # MCP Tools # ============================================================================ @mcp.tool( name="get_server_context", annotations={ "title": "Get Community Research Server Context", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def get_server_context() -> str: """ Get the Community Research MCP server context and capabilities. This tool returns information about what the server detected in your workspace, including programming languages, frameworks, and default context values. ALWAYS call this first before using other tools. Returns: str: JSON-formatted server context including: - handshake: Server identification and status - project_context: Detected workspace information - context_defaults: Default values for search - available_providers: Which LLM providers are configured Examples: - Use when: Starting any research task - Use when: Need to know what languages are detected - Use when: Want to see available LLM providers """ workspace_context = detect_workspace_context() provider_info = get_available_llm_provider() context = { "handshake": { "server": "community-research-mcp", "version": "1.0.0", "status": "initialized", "description": "Searches Stack Overflow, Reddit, GitHub, forums for real solutions", "capabilities": { "multi_source_search": True, "query_validation": True, "llm_synthesis": True, "caching": True, "rate_limiting": True, }, }, "project_context": workspace_context, "context_defaults": { "language": workspace_context["languages"][0] if workspace_context["languages"] else None }, "available_providers": { "configured": provider_info[0] if provider_info else None, "supported": ["gemini", "openai", "anthropic", "openrouter", "perplexity"], }, } return json.dumps(context, indent=2) @mcp.tool( name="community_search", annotations={ "title": "Search Community Resources", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def community_search(params: CommunitySearchInput) -> str: """ Search Stack Overflow, Reddit, GitHub, and forums for real solutions. Returns evidence-backed, templated output with findings, conflicts, next steps, quick-apply code, and verification guidance. """ if not check_rate_limit("community_search"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again.", }, indent=2, ) cache_key = get_cache_key( "community_search", language=params.language, topic=params.topic, goal=params.goal, current_setup=params.current_setup, expanded_mode=params.expanded_mode, use_fixtures=params.use_fixtures, ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result valid, validation_msg = validate_topic_specificity(params.topic) if not valid: return json.dumps( { "error": validation_msg, "suggestions": ["Add concrete tech/library names", "Include version numbers if relevant", "State the exact error or change you hit"], }, indent=2, ) enrichment = enrich_query(params.language, params.topic, params.goal) search_query = enrichment.get("enriched_query") or f"{params.language} {params.topic}" for attempt in range(MAX_RETRIES): try: search_results = await aggregate_search_results( search_query, params.language, expanded_mode=params.expanded_mode, use_fixtures=params.use_fixtures, ) filtered_results = filter_results_by_domain( search_results, params.language, params.topic ) source_lists = _result_only_sources(filtered_results) total_results = total_result_count(filtered_results) all_star_meta = build_all_star_index( filtered_results, search_query, params.language ) audit_log = search_results.get("_meta", {}).get("audit_log", []) shape_stats = summarize_content_shapes(source_lists) top_evidence = select_top_evidence( filtered_results, all_star_meta, search_query, params.language ) preferred_present = any( source_lists.get(s) for s in ["stackoverflow", "github"] ) manual = get_manual_evidence(params.topic) if (total_results < 2 or not preferred_present) and not manual: result = json.dumps( { "error": f'Not enough relevant results for "{params.topic}" in {params.language}. Try expanded queries or add version/error text.', "expanded_queries": enrichment.get("expanded_queries", []), "findings": [], "assumptions": enrichment.get("assumptions", []), }, indent=2, ) set_cached_result(cache_key, result) return result context_meta = { "all_star": all_star_meta, "top_evidence": top_evidence, "enrichment": enrichment, } if manual: synthesis = {"findings": manual, "synthesis_summary": ""} else: synthesis = await synthesize_with_llm( source_lists, params.topic, params.language, params.goal, params.current_setup, context_meta=context_meta, ) if ( ENHANCED_UTILITIES_AVAILABLE and _quality_scorer and "findings" in synthesis ): synthesis["findings"] = _quality_scorer.score_findings_batch( synthesis["findings"] ) payload = normalize_masterclass_payload( synthesis, top_evidence, params.language, enrichment, ) if synthesis.get("error"): payload.setdefault("assumptions", []).append( f"LLM synthesis error: {synthesis.get('error')}" ) conflicts_auto = detect_conflicts_from_findings(payload.get("findings", [])) sources_present = [s for s in source_lists.keys() if source_lists[s]] search_meta = { "sources": [SOURCE_LABELS.get(s, s) for s in sources_present], "source_count": len(sources_present), "total_results": total_results, "enriched_query": search_query, "expanded_queries": enrichment.get("expanded_queries", []), "evidence_weak": len(top_evidence) < 2, "all_star": all_star_meta, "audit_log": audit_log, "shape_stats": shape_stats, } if params.response_format == ResponseFormat.MARKDOWN: result = render_masterclass_markdown( params.topic, params.language, params.goal, params.current_setup, payload, conflicts_auto, search_meta, manual_mode=bool(manual), ) else: response = { "language": params.language, "topic": params.topic, "goal": params.goal, "current_setup": params.current_setup, "findings": payload.get("findings", []), "conflicts": payload.get("conflicts", []) + conflicts_auto, "recommended_path": payload.get("recommended_path", []), "quick_apply": payload.get("quick_apply", {}), "verification": payload.get("verification", []), "assumptions": payload.get("assumptions", []), "synthesis_summary": payload.get("synthesis_summary", ""), "all_star": all_star_meta, "search_meta": search_meta, "manual_mode": bool(manual), } result = json.dumps(response, indent=2) if len(result) > CHARACTER_LIMIT: if params.response_format == ResponseFormat.JSON: response_dict = json.loads(result) original_count = len(response_dict.get("findings", [])) response_dict["findings"] = response_dict["findings"][: max(1, original_count // 2)] response_dict["truncated"] = True response_dict["truncation_message"] = ( f"Response truncated from {original_count} to {len(response_dict['findings'])} findings due to size limits." ) result = json.dumps(response_dict, indent=2) else: result = ( result[:CHARACTER_LIMIT] + "\n\n[Response truncated due to size limits. Use JSON format for full data.]" ) set_cached_result(cache_key, result) return result except Exception as e: if attempt == MAX_RETRIES - 1: error_response = json.dumps( { "error": f"Search failed after {MAX_RETRIES} attempts: {str(e)}", "findings": [], }, indent=2, ) return error_response await asyncio.sleep(2**attempt) return json.dumps({"error": "Unexpected error", "findings": []}, indent=2) # ============================================================================ # DuckDuckGo Search & Web Content Classes # ============================================================================ class RateLimiter: def __init__(self, requests_per_minute: int = 30): self.requests_per_minute = requests_per_minute self.requests = [] async def acquire(self): now = datetime.now() # Remove requests older than 1 minute self.requests = [ req for req in self.requests if now - req < timedelta(minutes=1) ] if len(self.requests) >= self.requests_per_minute: # Wait until we can make another request wait_time = 60 - (now - self.requests[0]).total_seconds() if wait_time > 0: await asyncio.sleep(wait_time) self.requests.append(now) class SearchResult: def __init__(self, title: str, link: str, snippet: str, position: int): self.title = title self.link = link self.snippet = snippet self.position = position class DuckDuckGoSearcher: BASE_URL = "https://html.duckduckgo.com/html" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } def __init__(self): self.rate_limiter = RateLimiter(30) # 30 requests per minute def format_results_for_llm(self, results: List[SearchResult]) -> str: """Format results in a natural language style that's easier for LLMs to process""" if not results: return "No results were found for your search query. This could be due to DuckDuckGo's bot detection or the query returned no matches. Please try rephrasing your search or try again in a few minutes." output = [] output.append(f"Found {len(results)} search results:\n") for result in results: output.append(f"{result.position}. {result.title}") output.append(f" URL: {result.link}") output.append(f" Summary: {result.snippet}") output.append("") # Empty line between results return "\n".join(output) async def search(self, query: str, max_results: int = 10) -> List[SearchResult]: try: # Apply rate limiting await self.rate_limiter.acquire() # Create form data for POST request data = { "q": query, "b": "", "kl": "", } logging.info(f"Searching DuckDuckGo for: {query}") async with httpx.AsyncClient() as client: response = await client.post( self.BASE_URL, data=data, headers=self.HEADERS, timeout=30.0 ) response.raise_for_status() # Parse HTML response soup = BeautifulSoup(response.text, "html.parser") if not soup: logging.error("Failed to parse HTML response") return [] results = [] # Try multiple selectors for robustness (HTML structure changes) result_elements = ( soup.select(".result") or soup.select("[data-testid='result']") or soup.select("article") or [] ) for i, result in enumerate(result_elements): # Try multiple title selectors title_elem = ( result.select_one(".result__title") or result.select_one("h2") or result.select_one("h3") ) if not title_elem: continue link_elem = title_elem.find("a") if not link_elem: continue title = link_elem.get_text(strip=True) link = link_elem.get("href", "") # Skip ad results if "y.js" in link: continue # Clean up DuckDuckGo redirect URLs if link.startswith("//duckduckgo.com/l/?uddg="): link = urllib.parse.unquote(link.split("uddg=")[1].split("&")[0]) # Try multiple snippet selectors snippet_elem = ( result.select_one(".result__snippet") or result.select_one("p") or result.select_one(".description") ) snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" results.append( SearchResult( title=title, link=link, snippet=snippet, position=len(results) + 1, ) ) if len(results) >= max_results: break logging.info(f"Successfully found {len(results)} results") return results except httpx.TimeoutException: logging.error("Search request timed out") return [] except httpx.HTTPError as e: logging.error(f"HTTP error occurred: {str(e)}") return [] except Exception as e: logging.error(f"Unexpected error during search: {str(e)}") return [] async def fetch_page_content(url: str, max_chars: int = 12000) -> str: """ Fetch and extract main content from a URL. """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() # Check content type content_type = response.headers.get("content-type", "").lower() if "text/html" not in content_type and "text/plain" not in content_type: return "" soup = BeautifulSoup(response.text, "html.parser") # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header", "aside"]): script.decompose() # Get text text = soup.get_text(separator="\n") # Break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # Break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # Drop blank lines text = "\n".join(chunk for chunk in chunks if chunk) return text[:max_chars] except Exception as e: logging.warning(f"Failed to fetch content from {url}: {e}") return "" class WebContentFetcher: def __init__(self): self.rate_limiter = RateLimiter(requests_per_minute=20) async def fetch_and_parse(self, url: str) -> str: """Fetch and parse content from a webpage""" try: await self.rate_limiter.acquire() logging.info(f"Fetching content from: {url}") async with httpx.AsyncClient() as client: response = await client.get( url, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }, follow_redirects=True, timeout=30.0, ) response.raise_for_status() # Parse the HTML soup = BeautifulSoup(response.text, "html.parser") # Remove script and style elements for element in soup(["script", "style", "nav", "header", "footer"]): element.decompose() # Get the text content text = soup.get_text() # Clean up the text lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = " ".join(chunk for chunk in chunks if chunk) # Remove extra whitespace text = re.sub(r"\s+", " ", text).strip() # Truncate if too long if len(text) > 8000: text = text[:8000] + "... [content truncated]" logging.info( f"Successfully fetched and parsed content ({len(text)} characters)" ) return text except httpx.TimeoutException: logging.error(f"Request timed out for URL: {url}") return "Error: The request timed out while trying to fetch the webpage." except httpx.HTTPError as e: logging.error(f"HTTP error occurred while fetching {url}: {str(e)}") return f"Error: Could not access the webpage ({str(e)})" except Exception as e: logging.error(f"Error fetching content from {url}: {str(e)}") return f"Error: An unexpected error occurred while fetching the webpage ({str(e)})" # Initialize DuckDuckGo search components ddg_searcher = DuckDuckGoSearcher() web_fetcher = WebContentFetcher() # ============================================================================ # Reddit-Specific Tools # ============================================================================ @mcp.tool( name="fetch_reddit_hot_threads", annotations={ "title": "Fetch Hot Reddit Threads", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def fetch_reddit_hot_threads(subreddit: str, limit: int = 10) -> str: """ Fetch hot threads from a specific subreddit. This tool retrieves the current hot threads from a specified subreddit, providing detailed information about each post including title, score, comment count, and content. It uses authenticated access when credentials are available for better results and higher rate limits. Args: subreddit (str): Name of the subreddit (e.g., "Python", "javascript") limit (int): Number of posts to fetch (default: 10) Returns: str: Formatted list of hot posts with their details: - Title and author - Score and comment count - Content snippet - Post URL Examples: - Fetch hot topics in Python: subreddit="Python" - Get JavaScript discussions: subreddit="javascript" - Browse Rust programming trends: subreddit="rust" Notes: - Authenticated access provides more reliable results - Rate limited to 10 requests per minute - Caches results for 1 hour """ # Check rate limit if not check_rate_limit("fetch_reddit_hot_threads"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Check cache cache_key = get_cache_key( "fetch_reddit_hot_threads", subreddit=subreddit, limit=limit ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Try to use authenticated client if available if reddit_authenticated and reddit_client: try: posts = [] # Fetch hot posts from subreddit async for submission in reddit_client.p.subreddit.pull.hot( subreddit, limit ): # Determine post type post_type = "unknown" if ( hasattr(submission, "url") and submission.url != submission.permalink ): post_type = "link" elif hasattr(submission, "body"): post_type = "text" elif hasattr(submission, "gallery_link"): post_type = "gallery" # Get content based on type content = "" if post_type == "text" and hasattr(submission, "body"): content = submission.body[:500] + ( "..." if len(submission.body or "") > 500 else "" ) elif post_type == "link" and hasattr(submission, "url"): content = f"Link: {submission.url}" elif post_type == "gallery": content = "Gallery post (multiple images)" # Format post info post_info = ( f"Title: {submission.title}\n" f"Score: {submission.score}\n" f"Comments: {submission.comment_count}\n" f"Author: {submission.author_display_name or '[deleted]'}\n" f"Type: {post_type}\n" f"Content: {content}\n" f"Link: https://reddit.com{submission.permalink}\n" f"---" ) posts.append(post_info) # Return formatted posts if posts: result = "\n\n".join(posts) set_cached_result(cache_key, result) return result # Fall back to unauthenticated if no results logging.info( f"No authenticated results for subreddit {subreddit}, falling back to public API" ) except Exception as e: logging.warning( f"Authenticated Reddit hot threads failed: {str(e)}. Falling back to public API." ) # Fallback to unauthenticated public API url = f"https://www.reddit.com/r/{subreddit}/hot.json" params = {"limit": limit} headers = {"User-Agent": "CommunityResearchMCP/1.0"} async with httpx.AsyncClient(timeout=API_TIMEOUT) as client: response = await client.get(url, params=params, headers=headers) response.raise_for_status() data = response.json() posts = [] for item in data.get("data", {}).get("children", []): post = item.get("data", {}) # Determine post type post_type = "unknown" if post.get("is_self") == False and post.get("url"): post_type = "link" elif post.get("is_self") == True: post_type = "text" elif post.get("is_gallery") == True: post_type = "gallery" # Get content based on type content = "" if post_type == "text": content = post.get("selftext", "")[:500] + ( "..." if len(post.get("selftext", "")) > 500 else "" ) elif post_type == "link": content = f"Link: {post.get('url', '')}" elif post_type == "gallery": content = "Gallery post (multiple images)" # Format post info post_info = ( f"Title: {post.get('title', '')}\n" f"Score: {post.get('score', 0)}\n" f"Comments: {post.get('num_comments', 0)}\n" f"Author: {post.get('author', '[deleted]')}\n" f"Type: {post_type}\n" f"Content: {content}\n" f"Link: https://reddit.com{post.get('permalink', '')}\n" f"---" ) posts.append(post_info) # Return formatted posts if posts: result = "\n\n".join(posts) set_cached_result(cache_key, result) return result return f"No posts found in subreddit: r/{subreddit}" except Exception as e: logging.error(f"Error fetching Reddit hot threads: {str(e)}") return f"An error occurred: {str(e)}" @mcp.tool( name="fetch_reddit_post_content", annotations={ "title": "Fetch Reddit Post Content", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def fetch_reddit_post_content( post_id: str, comment_limit: int = 20, comment_depth: int = 3 ) -> str: """ Fetch detailed content of a specific Reddit post including comments. This tool retrieves a Reddit post by its ID and returns the full post content along with a hierarchical comment tree. It includes author information, scores, and full content. When available, uses authenticated access for better results. Args: post_id (str): Reddit post ID (either full URL or just the ID portion) comment_limit (int): Number of top-level comments to fetch (default: 20) comment_depth (int): Maximum depth of comment tree to traverse (default: 3) Returns: str: Formatted post content with hierarchical comments: - Post title, author, score - Full post content - Comments with proper indentation showing the discussion tree - Author and score for each comment Examples: - Fetch post with default settings: post_id="abcd123" - Get post with more comments: post_id="abcd123", comment_limit=50 - Deep-dive into comments: post_id="abcd123", comment_depth=5 Notes: - Works best with authenticated Reddit access - Can accept full Reddit URLs or just the post ID - Rate limited to 10 requests per minute - Caches results for 1 hour """ # Check rate limit if not check_rate_limit("fetch_reddit_post_content"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Extract post ID from URL if needed if post_id.startswith("http"): # Try to extract post ID from URL match = re.search(r"comments/([a-zA-Z0-9]+)/", post_id) if match: post_id = match.group(1) else: return "Invalid Reddit URL. Please provide a valid post ID or URL." # Check cache cache_key = get_cache_key( "fetch_reddit_post_content", post_id=post_id, comment_limit=comment_limit, comment_depth=comment_depth, ) cached_result = get_cached_result(cache_key) if cached_result: return cached_result # Helper function for formatting comment trees def format_comment_tree(comment_node, depth: int = 0) -> str: """Helper function to recursively format comment tree with proper indentation""" comment = comment_node.value if hasattr(comment_node, "value") else comment_node indent = " " * depth if hasattr(comment, "author_display_name"): # Authenticated API format author = comment.author_display_name or "[deleted]" score = comment.score body = getattr(comment, "body", "[no content]") else: # Public API format data = comment.get("data", {}) author = data.get("author", "[deleted]") score = data.get("score", 0) body = data.get("body", "[no content]") content = f"{indent}* **{author}** ({score} points)\n{indent} {body}\n" # Process children if hasattr(comment_node, "children") and comment_node.children: for child in comment_node.children: content += "\n" + format_comment_tree(child, depth + 1) elif "replies" in comment and comment.get("replies"): replies = comment.get("replies", {}) if isinstance(replies, dict) and "data" in replies: for child in replies["data"]["children"]: if child["kind"] != "more": # Skip "more comments" items content += "\n" + format_comment_tree(child, depth + 1) return content try: # Try authenticated API first if available if reddit_authenticated and reddit_client: try: # Fetch submission submission = await reddit_client.p.submission.fetch(post_id) # Determine post type and content post_type = "unknown" if ( hasattr(submission, "url") and submission.url != submission.permalink ): post_type = "link" elif hasattr(submission, "body"): post_type = "text" elif hasattr(submission, "gallery_link"): post_type = "gallery" # Get content based on type content = "" if post_type == "text" and hasattr(submission, "body"): content = submission.body elif post_type == "link" and hasattr(submission, "url"): content = f"Link: {submission.url}" elif post_type == "gallery": content = "Gallery post (multiple images)" # Format post header post_content = ( f"Title: {submission.title}\n" f"Score: {submission.score}\n" f"Author: {submission.author_display_name or '[deleted]'}\n" f"Type: {post_type}\n\n" f"Content:\n{content}\n" ) # Fetch and format comments comments = await reddit_client.p.comment_tree.fetch( post_id, sort="top", limit=comment_limit, depth=comment_depth ) if comments and hasattr(comments, "children") and comments.children: post_content += "\nComments:\n" for comment in comments.children: post_content += "\n" + format_comment_tree(comment) else: post_content += "\nNo comments found." # Cache and return result set_cached_result(cache_key, post_content) return post_content except Exception as auth_error: logging.warning( f"Authenticated Reddit post fetch failed: {str(auth_error)}. Falling back to public API." ) # Fallback to public API url = f"https://www.reddit.com/comments/{post_id}.json" headers = {"User-Agent": "CommunityResearchMCP/1.0"} async with httpx.AsyncClient(timeout=API_TIMEOUT) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() # Extract post data post_data = data[0]["data"]["children"][0]["data"] # Determine post type and content post_type = "unknown" if post_data.get("is_self") == False and post_data.get("url"): post_type = "link" elif post_data.get("is_self") == True: post_type = "text" elif post_data.get("is_gallery") == True: post_type = "gallery" # Get content based on type content = "" if post_type == "text": content = post_data.get("selftext", "") elif post_type == "link": content = f"Link: {post_data.get('url', '')}" elif post_type == "gallery": content = "Gallery post (multiple images)" # Format post header post_content = ( f"Title: {post_data.get('title')}\n" f"Score: {post_data.get('score')}\n" f"Author: {post_data.get('author')}\n" f"Type: {post_type}\n\n" f"Content:\n{content}\n" ) # Process comments if len(data) > 1 and "children" in data[1]["data"]: comment_data = data[1]["data"]["children"][:comment_limit] if comment_data: post_content += "\nComments:\n" for comment in comment_data: if comment["kind"] != "more": # Skip "more comments" items post_content += "\n" + format_comment_tree(comment) else: post_content += "\nNo comments found." # Cache and return result set_cached_result(cache_key, post_content) return post_content except Exception as e: logging.error(f"Error fetching Reddit post content: {str(e)}") return f"An error occurred: {str(e)}" # ============================================================================ # DuckDuckGo Search & Web Content Tools # ============================================================================ @mcp.tool( name="duckduckgo_search", annotations={ "title": "Search the Web with DuckDuckGo", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def duckduckgo_search(query: str, max_results: int = 10) -> str: """ Search the web using DuckDuckGo and return formatted results. This tool performs a web search via DuckDuckGo and returns formatted results with titles, links, and snippets. It's useful for finding information that might not be present in community-specific sources like Stack Overflow or GitHub. Args: query (str): The search query string max_results (int): Maximum number of results to return (default: 10) Returns: str: Formatted list of search results with: - Result titles - URLs - Content snippets Examples: - General search: query="python asyncio tutorial" - Documentation search: query="react useEffect official docs" - Technical questions: query="how to implement JWT authentication" Notes: - Rate limited to 30 requests per minute - Caches results for 1 hour - Does not require authentication """ # Check rate limit if not check_rate_limit("duckduckgo_search"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Check cache cache_key = get_cache_key("duckduckgo_search", query=query, max_results=max_results) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Search DuckDuckGo results = await ddg_searcher.search(query, max_results) result = ddg_searcher.format_results_for_llm(results) # Cache and return set_cached_result(cache_key, result) return result except Exception as e: logging.error(f"DuckDuckGo search failed: {str(e)}") return f"An error occurred during search: {str(e)}" @mcp.tool( name="fetch_webpage_content", annotations={ "title": "Fetch and Parse Webpage Content", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def fetch_webpage_content(url: str) -> str: """ Fetch and parse content from a webpage URL. This tool retrieves a webpage, extracts its main text content by removing navigation, scripts, styles, and other non-content elements. It's useful for getting detailed information from articles, documentation, or any webpage found through search. Args: url (str): The webpage URL to fetch content from Returns: str: Cleaned text content from the webpage Examples: - Fetch documentation: url="https://docs.python.org/3/library/asyncio.html" - Fetch tutorial: url="https://reactjs.org/docs/hooks-effect.html" - Fetch article: url="https://martinfowler.com/articles/microservices.html" Notes: - Rate limited to 20 requests per minute - Removes ads, navigation, and other non-content elements - Truncates very long content to 8000 characters - Works best with article-style content """ # Check rate limit if not check_rate_limit("fetch_webpage_content"): return json.dumps( { "error": "Rate limit exceeded. Maximum 10 requests per minute. Please wait and try again." }, indent=2, ) # Check cache cache_key = get_cache_key("fetch_webpage_content", url=url) cached_result = get_cached_result(cache_key) if cached_result: return cached_result try: # Fetch and parse content result = await web_fetcher.fetch_and_parse(url) # Cache and return set_cached_result(cache_key, result) return result except Exception as e: logging.error(f"Webpage fetching failed: {str(e)}") return f"An error occurred while fetching the webpage: {str(e)}" # ============================================================================ # Streaming & Auto-Detection Tools # ============================================================================ @mcp.tool( name="get_system_capabilities", annotations={ "title": "Auto-Detect System Capabilities", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def get_system_capabilities() -> str: """ Auto-detect all available API keys and system capabilities. Scans environment for configured API keys and returns a complete report of what search APIs and LLM providers are currently available. This tool automatically recognizes: - Search APIs: Stack Overflow, GitHub, Reddit (auth & public), Hacker News, DuckDuckGo, Brave Search, Serper - LLM Providers: Gemini, OpenAI, Anthropic, OpenRouter, Perplexity - Workspace context and detected languages Returns: str: Formatted report of all active and inactive capabilities Example output: # 🔍 System Capabilities ## Search APIs **Active (5):** ✓ stackoverflow ✓ github ✓ reddit ✓ hackernews ✓ duckduckgo **Inactive (2):** ✗ brave (API key not configured) ✗ serper (API key not configured) """ if not STREAMING_AVAILABLE: return ( "âš ī¸ Streaming capabilities module not available. Basic functionality only." ) try: capabilities = detect_all_capabilities() report = format_capabilities_report(capabilities) return report except Exception as e: return f"Error detecting capabilities: {str(e)}" @mcp.tool( name="streaming_community_search", annotations={ "title": "Streaming Community Search (Real-Time Results)", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False, }, ) async def streaming_community_search( language: str, topic: str, goal: Optional[str] = None, current_setup: Optional[str] = None, context: Optional[Any] = None, ) -> str: """ Search community resources with REAL-TIME STREAMING results. Fires all search capabilities in PARALLEL and streams results as they arrive, with progressive reorganization and smart aggregation. Results are categorized by type (quick fixes, code examples, warnings, discussions) and displayed incrementally. **Key Features:** - ⚡ Parallel execution across ALL available search sources - 📊 Real-time progress updates as each source completes - 🔄 Progressive reorganization while waiting for results - đŸŽ¯ Adaptive formatting based on content type - 🤖 Final LLM synthesis of all results Args: language (str): Programming language (e.g., "Python", "JavaScript") topic (str): Specific topic - MUST be detailed (min 10 chars) goal (Optional[str]): What you want to achieve current_setup (Optional[str]): Your current setup/constraints context (Context): MCP context for progress reporting Returns: str: Streaming markdown output with progressive updates and final synthesis Example: language="Python" topic="FastAPI background task queue with Redis for email processing" goal="Send emails asynchronously without blocking API requests" The search will fire across: - Stack Overflow (quick fixes & accepted answers) - GitHub (code examples & real implementations) - Reddit (community discussions & gotchas) - Hacker News (high-quality discussions) - DuckDuckGo (broader web search) Results stream in real-time as each source completes! """ if not STREAMING_AVAILABLE: # Fallback to standard search return await community_search( language=language, topic=topic, goal=goal, current_setup=current_setup, response_format="markdown", use_planning=False, thinking_mode="balanced", ) # Validate topic specificity is_valid, error_msg = validate_topic_specificity(topic) if not is_valid: return error_msg # Check rate limiting if not check_rate_limit("streaming_community_search"): return "Rate limit exceeded. Please wait a minute before retrying." try: # Prepare search functions search_functions = { "stackoverflow": search_stackoverflow, "github": search_github, "reddit": search_reddit, "hackernews": search_hackernews, "duckduckgo": search_duckduckgo, # Added duckduckgo } # Stream results and synthesis output_parts = [] async for update in streaming_search_with_synthesis( search_functions=search_functions, synthesis_func=synthesize_with_llm, query=topic, language=language, goal=goal, current_setup=current_setup, context=context, ): output_parts.append(update) # Combine all output final_output = "\n\n---\n\n".join(output_parts) # Enforce character limit if len(final_output) > CHARACTER_LIMIT: final_output = ( final_output[:CHARACTER_LIMIT] + f"\n\n[Output truncated at {CHARACTER_LIMIT} characters]" ) return final_output except Exception as e: logging.error(f"Streaming search failed: {str(e)}") return f"Error during streaming search: {str(e)}\n\nFalling back to standard search..." @mcp.tool( name="deep_community_search", annotations={ "title": "Deep Recursive Community Search", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def deep_community_search( language: str, topic: str, goal: Optional[str] = None, current_setup: Optional[str] = None, ) -> str: """ Perform a deep, recursive research session to find comprehensive answers. This tool executes a multi-step research process: 1. Initial broad search across all community sources. 2. Analysis of initial findings to identify gaps and missing details. 3. Targeted follow-up searches to fill those gaps. 4. Active browsing of high-value documentation/tutorials. 5. Final synthesis of all gathered information. Use this for complex topics where a single search is insufficient. """ # Check rate limit if not check_rate_limit("deep_community_search"): return json.dumps({"error": "Rate limit exceeded."}, indent=2) try: # Step 1: Initial Search print(f"🔍 [Deep Search] Starting initial search for: {topic}") initial_query = f"{language} {topic}" if goal: initial_query += f" {goal}" initial_results = await aggregate_search_results(initial_query, language) # Step 2: Analyze for Gaps print("🤔 [Deep Search] Analyzing findings for gaps...") provider, api_key = model_orchestrator.select_model_for_task( "planning", "medium" ) gap_analysis_prompt = f""" Analyze these initial search results for "{topic}" in {language}. Goal: {goal or "Comprehensive understanding"} Results Summary: {json.dumps(initial_results, indent=2)[:5000]}... Identify 3 specific missing pieces of information or technical details that are needed to provide a TRULY comprehensive answer. Generate 3 targeted search queries to find this missing info. Return JSON: {{ "missing_info": ["gap 1", "gap 2", "gap 3"], "follow_up_queries": ["query 1", "query 2", "query 3"] }} """ # Call LLM for real gap analysis follow_up_queries = [] try: gap_response = await call_gemini(api_key, gap_analysis_prompt) if gap_response and isinstance(gap_response, dict): # Try to extract follow_up_queries from the response if "follow_up_queries" in gap_response: follow_up_queries = gap_response["follow_up_queries"][:3] # If the response is wrapped in a text field, try to parse it elif "text" in gap_response: try: parsed = json.loads(gap_response["text"]) follow_up_queries = parsed.get("follow_up_queries", [])[:3] except json.JSONDecodeError: pass except Exception as gap_error: logging.warning(f"Gap analysis failed, using fallback: {gap_error}") # Fallback to deterministic queries if AI gap analysis fails if not follow_up_queries: follow_up_queries = [ f"{language} {topic} advanced usage", f"{language} {topic} best practices", f"{language} {topic} common pitfalls", ] # Step 3: Follow-up Searches print(f"đŸ•ĩī¸ [Deep Search] Running {len(follow_up_queries)} follow-up searches...") follow_up_tasks = [ aggregate_search_results(q, language) for q in follow_up_queries ] follow_up_results_list = await asyncio.gather( *follow_up_tasks, return_exceptions=True ) # Merge results combined_results = _result_only_sources(initial_results) for res in follow_up_results_list: if isinstance(res, dict): for source, items in _result_only_sources(res).items(): if source in combined_results: combined_results[source].extend(items) else: combined_results[source] = items # Step 4: Active Browsing (Fetch content for top items) print("🌐 [Deep Search] Active browsing top results...") # Extract top URLs from DuckDuckGo results ddg_results = combined_results.get("duckduckgo", []) top_urls = [item["url"] for item in ddg_results[:3] if item.get("url")] if top_urls: contents = await asyncio.gather( *[fetch_page_content(url) for url in top_urls], return_exceptions=True ) # Enrich results with content for i, content in enumerate(contents): if isinstance(content, str) and i < len(ddg_results): ddg_results[i]["content"] = content # Step 5: Final Synthesis print("🤖 [Deep Search] Synthesizing final comprehensive answer...") final_response = await synthesize_with_multi_model( combined_results, topic, language, goal, current_setup, thinking_mode=ThinkingMode.DEEP, ) return json.dumps(final_response, indent=2) except Exception as e: logging.error(f"Deep search failed: {e}") return json.dumps({"error": f"Deep search failed: {str(e)}"}, indent=2) @mcp.tool( name="parallel_multi_source_search", annotations={ "title": "Parallel Multi-Source Search (Advanced)", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False, }, ) async def parallel_multi_source_search( query: str, language: str, sources: Optional[str] = "all", context: Optional[Any] = None, ) -> str: """ Execute searches across multiple sources in PARALLEL with real-time updates. This is an advanced tool that gives you fine-grained control over which search sources to query. All selected sources fire simultaneously and results stream back in real-time. Args: query (str): Search query language (str): Programming language context sources (Optional[str]): Comma-separated list of sources or "all" Options: stackoverflow, github, reddit, hackernews, duckduckgo Default: "all" context (Context): MCP context for progress reporting Returns: str: JSON-formatted results organized by source and content type Example: query="async/await error handling best practices" language="JavaScript" sources="stackoverflow,github,reddit" This will search only Stack Overflow, GitHub, and Reddit in parallel, ignoring Hacker News and DuckDuckGo. """ if not STREAMING_AVAILABLE: return json.dumps( {"error": "Streaming capabilities not available", "results": {}}, indent=2 ) # Parse sources if sources == "all": source_list = ["stackoverflow", "github", "reddit", "hackernews", "duckduckgo"] else: source_list = [s.strip() for s in sources.split(",")] # Map source names to functions source_map = { "stackoverflow": search_stackoverflow, "github": search_github, "reddit": search_reddit, "hackernews": search_hackernews, "duckduckgo": search_duckduckgo, } # Filter to requested sources search_functions = { name: func for name, func in source_map.items() if name in source_list } if not search_functions: return json.dumps( { "error": "No valid sources selected", "available_sources": list(source_map.keys()), }, indent=2, ) try: # Collect all results all_results = {} result_count = 0 if context: await context.info( f"🚀 Starting parallel search across {len(search_functions)} sources..." ) async for update in get_all_search_results_streaming( search_functions.get("stackoverflow"), search_functions.get("github"), search_functions.get("reddit"), search_functions.get("hackernews"), search_functions.get("duckduckgo"), query=query, language=language, context=context, ): if update["type"] == "complete": all_results = update["state"].results_by_source result_count = update["summary"]["total_results"] break # Format output output = { "query": query, "language": language, "sources_searched": list(search_functions.keys()), "total_results": result_count, "results_by_source": all_results, "results_by_type": update["state"].results_by_type if update else {}, } return json.dumps(output, indent=2) except Exception as e: logging.error(f"Parallel search failed: {str(e)}") return json.dumps({"error": str(e), "results": {}}, indent=2) @mcp.tool( name="get_performance_metrics", annotations={ "title": "Get System Performance Metrics", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def get_performance_metrics() -> str: """ Get comprehensive performance metrics for the MCP server. This tool provides real-time insights into: - Search performance and latency - API reliability and success rates - Cache effectiveness - Error distribution - System uptime Returns: str: Formatted performance report with all metrics Example Output: # 📊 Performance Metrics Report ## System Performance - **Uptime**: 3600.0s - **Total Searches**: 45 - **Average Search Time**: 1200ms - **Cache Hit Rate**: 35.5% ## API Reliability - **Success Rate**: 99.2% - **Total Calls**: 250 - **Retry Count**: 5 """ if not ENHANCED_UTILITIES_AVAILABLE: return "âš ī¸ Performance monitoring not available. Please ensure enhanced_mcp_utilities.py is installed." try: report = format_metrics_report() return report except Exception as e: return f"Error generating metrics report: {str(e)}" @mcp.tool( name="clear_cache", annotations={ "title": "Clear Search Cache", "readOnlyHint": False, "destructiveHint": True, "idempotentHint": False, "openWorldHint": False, }, ) async def clear_cache() -> str: """ Clear the local search cache. This removes all cached search results, forcing fresh queries on next search. Useful when you need up-to-date results or when cached data might be stale. Returns: str: Status message indicating whether cache was cleared """ cache_file = Path(".community_research_cache.json") try: if cache_file.exists(): cache_file.unlink() return ( "✓ Cache cleared successfully. Next searches will fetch fresh results." ) else: return "â„šī¸ Cache was already empty." except Exception as e: return f"❌ Error clearing cache: {str(e)}" def validate_environment(): """Validate environment configuration on startup.""" print("\nValidating environment...") # Check API keys keys = { "GEMINI": os.getenv("GEMINI_API_KEY"), "OPENAI": os.getenv("OPENAI_API_KEY"), "ANTHROPIC": os.getenv("ANTHROPIC_API_KEY"), "REDDIT": os.getenv("REDDIT_CLIENT_ID"), } active_keys = [k for k, v in keys.items() if v] if not active_keys: print("Warning: No API keys configured") else: print(f"Active providers: {', '.join(active_keys)}") # Check capabilities capabilities = [] if STREAMING_AVAILABLE: capabilities.append("streaming") if ENHANCED_UTILITIES_AVAILABLE: capabilities.append("enhanced utilities") if capabilities: print(f"Capabilities: {', '.join(capabilities)}") print("Ready\n") # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": # Validate environment validate_environment() # Run the MCP server mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DocHatty/community-research-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server