Skip to main content
Glama

Baidu Search MCP Server

by Evilran
server.py31.1 kB
from mcp.server.fastmcp import FastMCP, Context import httpx from bs4 import BeautifulSoup from typing import List, Dict, Any, Optional, Set, Tuple from dataclasses import dataclass import sys import traceback import asyncio from datetime import datetime, timedelta import re import readabilipy.simple_json import markdownify import logging from functools import wraps from urllib.parse import urlencode try: from curl_cffi.requests import AsyncSession as CurlAsyncSession from curl_cffi.requests.errors import RequestsError as CurlRequestsError except ImportError: CurlAsyncSession = None # type: ignore[assignment] CurlRequestsError = Exception # type: ignore[assignment] # Track Playwright availability so we can fall back gracefully when missing. _playwright_module = None _playwright_import_error: Optional[BaseException] = None _playwright_import_checked = False # Dynamic import of Playwright to avoid early import errors def _import_playwright(): global _playwright_module, _playwright_import_error, _playwright_import_checked if _playwright_import_checked: return _playwright_module _playwright_import_checked = True try: from playwright.async_api import async_playwright except ModuleNotFoundError as exc: _playwright_import_error = exc logging.getLogger(__name__).warning( "Playwright is not installed; falling back to curl_cffi transport." ) _playwright_module = None except Exception as exc: # pragma: no cover - unexpected import failure _playwright_import_error = exc logging.getLogger(__name__).warning( "Playwright import failed (%s); falling back to curl_cffi transport.", exc, ) _playwright_module = None else: _playwright_module = async_playwright _playwright_import_error = None return _playwright_module _browser = None _browser_context = None _playwright_instance = None _browser_lock: Optional[asyncio.Lock] = None async def _ensure_browser( user_agent: Optional[str] = None, extra_headers: Optional[Dict[str, str]] = None ): """Ensure a browser instance is available with SSL validation disabled""" global _browser, _browser_context, _playwright_instance, _browser_lock if _browser_lock is None: _browser_lock = asyncio.Lock() async with _browser_lock: if _browser is None or _browser_context is None: playwright_module = _import_playwright() if not playwright_module: return None, None _playwright_instance = await playwright_module().start() _browser = await _playwright_instance.chromium.launch() context_kwargs: Dict[str, Any] = {"ignore_https_errors": True} if user_agent: context_kwargs["user_agent"] = user_agent if extra_headers: context_kwargs["extra_http_headers"] = extra_headers _browser_context = await _browser.new_context(**context_kwargs) elif extra_headers: await _browser_context.set_extra_http_headers(extra_headers) return _browser, _browser_context logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @dataclass class SearchResult: title: str link: str snippet: str position: int class RateLimiter: def __init__(self, requests_per_minute: int = 30): self.requests_per_minute = requests_per_minute self.window = timedelta(minutes=1) self.requests: List[datetime] = [] async def acquire(self): now = datetime.now() self.requests = [req for req in self.requests if now - req < self.window] if len(self.requests) >= self.requests_per_minute: wait_time = max(0, 60 - (now - self.requests[0]).total_seconds()) if wait_time > 0: await asyncio.sleep(wait_time) self.requests.append(now) class BaiduSearcher: BASE_URL = "https://m.baidu.com/s" WEB_NORMAL = "1599" WEB_VIDEO_NORMAL = "48304" WEB_NOTE_NORMAL = "61570" WEB_KNOWLEDGE = "1529" WEB_WENKU = "1525" HEADERS = { "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Mobile Safari/537.36", "Referer": "https://m.baidu.com", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "en-US,en;q=0.9,zh;q=0.8,zh-CN;q=0.7", "Sec-Ch-Ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"', "Sec-Ch-Ua-Mobile": "?1", "Sec-ch-Ua-Platform": "Android", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin" } def __init__(self, fetcher: Optional["WebContentFetcher"] = None): self.rate_limiter = RateLimiter() self.fetcher = fetcher async def _log_ctx(self, ctx: Optional[Context], level: str, message: str) -> None: """Safely emit MCP context logs when a context object is available.""" if not ctx: return log_fn = getattr(ctx, level, None) if asyncio.iscoroutinefunction(log_fn): await log_fn(message) elif log_fn: log_fn(message) def _get_fetcher(self) -> "WebContentFetcher": if self.fetcher is None: self.fetcher = WebContentFetcher() return self.fetcher def handle_errors(func): @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except httpx.HTTPError as e: logger.error("HTTP error occurred: %s", e, exc_info=True) return [] except Exception as e: logger.error("Unexpected error: %s", e, exc_info=True) return [] return wrapper def _extract_text( self, element: Any, selector: Optional[str] = None, class_name: Optional[Any] = None, attrs: Optional[Dict[str, Any]] = None, ) -> str: if element is None: return "" find_kwargs: Dict[str, Any] = {} if class_name is not None: find_kwargs["class_"] = class_name if attrs is not None: find_kwargs["attrs"] = attrs if selector: found = element.find(selector, **find_kwargs) elif find_kwargs: found = element.find(**find_kwargs) else: found = element if not found: return "" return found.get_text(strip=True) def _parse_labels(self, element: Any) -> List[str]: if element is None: return [] labels = [] for tag in element.find_all(["span", "div"], class_=["sc-tag", "cos-tag"]): if tag and tag.text: labels.append(tag.text.strip()) return labels def _extract_url(self, element: Any) -> str: if element is None: return "" # Prefer Baidu provided redirect attribute when present direct = element.attrs.get("rl-link-href") if direct: return direct.strip() link = element.find("a") if not link: return "" href = link.get("href", "").strip() return href def _parse_abstract(self, element: Any) -> str: if element is None: return "" abstract_div = element.find(attrs={"data-module": "abstract"}) if not abstract_div: # Try video abstract format abstract_span = element.find("span", class_=re.compile(r"^abstract-text_")) return abstract_span.text.strip() if abstract_span else "" text_parts = [] for div in abstract_div.findAll("div", role="text"): for span in div.findAll("span"): if not span.find("span") and not span.get("class", ["c-color-gray"]): text_parts.append(span.text.strip()) return "\n".join(filter(None, text_parts)) def _create_search_result(self, data: Dict[str, Any], position: Optional[int] = None) -> SearchResult: return SearchResult( title=data.get("title", ""), link=data.get("url", ""), snippet=self._format_snippet(data), position=position ) def _format_snippet(self, data: Dict[str, Any]) -> str: parts = [] if data.get("abstract"): parts.append(f"# Abstract\n{data['abstract']}") if data.get("labels"): parts.append(f"# Labels\n{','.join(data['labels'])}") if data.get("content"): parts.append(f"# Content Preview\n{data['content']}") return "\n\n".join(parts) def _parse_search_page(self, html: str, seen_urls: Set[str]) -> List[Dict[str, Any]]: results = [] soup = BeautifulSoup(html, "html.parser") if not soup: return [] res_normal_container = soup.findAll( "div", class_="c-result result", new_srcid=self.WEB_NORMAL ) res_normal = [] for res in res_normal_container: _ = res.find("div", class_="c-result-content").find("article") header = _.find("section").find("div") # 标签 labels = [] # 标题 title = header.find(attrs={"data-module": "title"}) if title: title = title.find("span", class_="tts-b-hl").text # 链接 try: url = _["rl-link-href"] except Exception: url = header.find("a")["href"] __ = header.find(attrs={"data-module": "title"}) if __: __ = __.find("span", class_="sc-tag") # “官方”标签 if __: labels.append(__.text) section = _.find("section") # 简介 des = "" # 可能有多个`span`标签,需要依次解析 ___ = _.find(attrs={"data-module": "abstract"}) if ___: for s in ___.findAll("div", role="text"): for t in s.findAll("span"): try: if t.find("span").text: continue except Exception: pass try: if "c-color-gray" in t["class"]: continue except Exception: pass des += t.text des += "\n" des = des.strip("\n") # 来源(作者) origin = section.find("span", class_="cosc-source-text") if origin: origin = origin.text else: if __: origin = __.find("div", class_="single-text") if origin: origin = origin.text res_normal.append( { "title": title, "url": url, "labels": labels, "abstract": des, "origin": origin, "type": "web", } ) res_wenku_container = soup.findAll( "div", class_="c-result result", new_srcid=self.WEB_WENKU ) res_wenku_normal = [] for res in res_wenku_container: _ = res.find("div", class_="c-result-content").find("article") header = _.find("section").find("div") # 标签 labels = [] # 标题 title = header.find(attrs={"data-module": "title"}).find("span", class_="tts-b-hl").text # 链接 try: url = _["rl-link-href"] except Exception: url = header.find("a")["href"] __ = header.find(attrs={"data-module": "title"}).find("span", class_="sc-tag") # “官方”标签 if __: labels.append(__.text) section = _.find("section") # 简介 des = "" # 可能有多个`span`标签,需要依次解析 for s in _.find(attrs={"data-module": "abstract"}).findAll("div", role="text"): for t in s.findAll("span"): try: if t.find("span").text: continue except Exception: pass try: if "c-color-gray" in t["class"]: continue except Exception: pass des += t.text des += "\n" des = des.strip("\n") # 来源(作者) origin = section.find("span", class_="cosc-source-text") if origin: origin = origin.text else: origin = __.find("div", class_="single-text") if origin: origin = origin.text res_wenku_normal.append( { "title": title, "url": url, "labels": labels, "abstract": des, "origin": origin, "type": "doc", } ) res_video_normal_container = soup.findAll( "div", class_="c-result result", new_srcid=self.WEB_VIDEO_NORMAL ) res_video_normal = [] for res in res_video_normal_container: _ = res.find("div", class_="c-result-content").find("article") header = _.find("section").find("div") title = header.find("div", class_="title-container").find("p").find("span").text # 链接 try: url = _["rl-link-href"] except Exception: url = header.find("a")["href"] __ = _.findAll("span", class_="cos-tag") labels = [] for ___ in __: labels.append(___.text) pattern = re.compile(r"^abstract-text_") # 匹配以 "abstract-text_" 开头的类名 des = "" text = _.find("span", class_=pattern) if text: des = text.text.strip() origin = res.find("span", class_="cosc-source-text") if origin: origin = origin.text else: origin = __.find("div", class_="single-text") if origin: origin = origin.text res_video_normal.append( { "title": title, "url": url, "origin": origin, "labels": labels, "abstract": des, "type": "video", } ) res_note_normal_container = soup.findAll( "div", class_="c-result result", new_srcid=self.WEB_NOTE_NORMAL ) res_note_normal = [] for res in res_note_normal_container: _ = res.find("div", class_="c-result-content").find("article") __ = _.find("section").find("div").find("div", attrs={"data-module": "sc_lk"}) try: url = __["rl-link-href"] except Exception: url = __.find("a")["href"] title = __.find(attrs={"data-module": "title"}).find("span", class_="cosc-title-slot").text if not header: continue des = "" labels = [] source = __.find(attrs={"data-module": "source"}) for label in source.findAll("div"): if not label.find("div") and len(label.text) > 0: labels.append(label.text) origin = __.find("div", class_=re.compile(r"^source-name")) if origin: origin = origin.text else: origin = __.find("div", class_="single-text") if origin: origin = origin.text res_note_normal.append( { "title": title, "url": url, "origin": origin, "labels": labels, "abstract": des, "type": "note", } ) res_knowledge_normal_container = soup.findAll( "div", class_="c-result result", new_srcid=self.WEB_KNOWLEDGE ) res_knowledge_normal = [] for res in res_knowledge_normal_container: _ = res.find("div", class_="c-result-content").find("article") __ = _.find("section").find("div", attrs={"data-module": "lgtte"}) try: url = _["rl-link-href"] except Exception: url = __.find("a")["href"] title = __.find("div", class_="c-title").text des = "" labels = [] lgtt = _.find("section").find("div", attrs={"data-module": "lgtt"}) ___ = lgtt.find("div", class_=re.compile(r"^c-line-")) if ___: des = ___.text.strip() origin = _.find("div", class_="c-color-source") if origin: origin = origin.text else: origin = _.find("div", class_="single-text") if origin: origin = origin.text res_knowledge_normal.append( { "title": title, "url": url, "origin": origin, "labels": labels, "abstract": des, "type": "knowledge", } ) results.extend(res_normal) results.extend(res_wenku_normal) results.extend(res_knowledge_normal) results.extend(res_note_normal) results.extend(res_video_normal) return results async def _request_with_retries( self, browser_context: Any, params: Dict[str, Any], max_retries: int, ) -> Optional[str]: if browser_context is None: return await self._request_with_curl_cffi(params, max_retries) last_error: Optional[Exception] = None for attempt in range(1, max_retries + 1): page = None try: await self.rate_limiter.acquire() page = await browser_context.new_page() target_url = f"{self.BASE_URL}?{urlencode(params)}" response = await page.goto( target_url, wait_until="domcontentloaded", timeout=30_000 ) if response and not response.ok: raise RuntimeError( f"Baidu responded with status {response.status}" ) html = await page.content() return html except Exception as exc: last_error = exc logger.warning("Baidu request failed on attempt %d: %s", attempt, exc) finally: if page: await page.close() await asyncio.sleep(min(5 * attempt, 15)) if last_error: logger.error("All retries exhausted when querying Baidu: %s", last_error) return None async def _request_with_curl_cffi( self, params: Dict[str, Any], max_retries: int, ) -> Optional[str]: if CurlAsyncSession is None: logger.error("curl_cffi is not installed; unable to issue HTTP fallback requests.") return None last_error: Optional[Exception] = None target_url = f"{self.BASE_URL}?{urlencode(params)}" for attempt in range(1, max_retries + 1): try: await self.rate_limiter.acquire() async with CurlAsyncSession(timeout=30) as session: response = await session.get(target_url, headers=self.HEADERS) status = getattr(response, "status_code", None) if status is None: status = getattr(response, "status", None) if status and status >= 400: raise RuntimeError(f"Baidu responded with status {status}") html = response.text() if not html: raise ValueError("Received empty response from Baidu") return html except CurlRequestsError as exc: last_error = exc logger.warning( "curl_cffi request failed on attempt %d: %s", attempt, exc ) except Exception as exc: # pragma: no cover - defensive catch-all last_error = exc logger.warning( "Unexpected curl_cffi error on attempt %d: %s", attempt, exc ) await asyncio.sleep(min(5 * attempt, 15)) if last_error: logger.error( "All retries exhausted when querying Baidu via curl_cffi: %s", last_error, ) return None async def _perform_search( self, query: str, max_results: int, deep_mode: bool, max_retries: int, ctx: Optional[Context] = None, ) -> List[SearchResult]: await self._log_ctx(ctx, "info", f"Searching Baidu for: {query}") params = {"word": query} results: List[Dict[str, Any]] = [] seen_urls: Set[str] = set() page = 0 user_agent = self.HEADERS.get("User-Agent") extra_headers = { key: value for key, value in self.HEADERS.items() if key.lower() != "user-agent" } _, browser_context = await _ensure_browser( user_agent=user_agent, extra_headers=extra_headers or None ) if browser_context is None: if CurlAsyncSession is None: await self._log_ctx( ctx, "error", "Playwright is unavailable and curl_cffi is not installed; unable to execute search.", ) return [] await self._log_ctx( ctx, "warning", "Playwright unavailable; using curl_cffi fallback HTTP client.", ) while len(results) < max_results: params["pn"] = page * 10 page += 1 html = await self._request_with_retries(browser_context, params, max_retries) if html is None: await self._log_ctx( ctx, "error", "Failed to fetch search results from Baidu" ) break page_results = self._parse_search_page(html, seen_urls) if not page_results: break results.extend(page_results) limited_results = results[:max_results] if deep_mode and limited_results: tasks = [self.process_result(result, idx + 1) for idx, result in enumerate(limited_results)] enriched_results = await asyncio.gather(*tasks, return_exceptions=True) search_results: List[SearchResult] = [] for item in enriched_results: if isinstance(item, Exception): logger.error("Deep fetch failed for a result: %s", item, exc_info=True) continue if isinstance(item, SearchResult): search_results.append(item) else: search_results = [ self._create_search_result(result, idx + 1) for idx, result in enumerate(limited_results) ] await self._log_ctx(ctx, "info", f"Successfully found {len(search_results)} results") return search_results @handle_errors async def search( self, query: str, ctx: Context, max_results: int = 10, deep_mode: bool = False, max_retries: int = 2, ) -> List[SearchResult]: return await self._perform_search( query=query, max_results=max_results, deep_mode=deep_mode, max_retries=max_retries, ctx=ctx, ) @handle_errors async def search_fire( self, query: str, max_results: int = 10, deep_mode: bool = False, max_retries: int = 2, ) -> List[SearchResult]: return await self._perform_search( query=query, max_results=max_results, deep_mode=deep_mode, max_retries=max_retries, ctx=None, ) async def process_result(self, result: Dict[str, Any], position: int) -> SearchResult: """Process a single search result with deep content fetching""" try: fetcher = self._get_fetcher() text, url = await fetcher.fetch_and_parse(result["url"]) if text: result["content"] = text if url: result["url"] = url except Exception as e: logger.error("Error fetching content for %s: %s", result.get("url"), e, exc_info=True) return self._create_search_result(result, position) def format_results_for_llm(self, results: List[SearchResult]) -> str: """Format results in a natural language style for LLM processing""" if not results: return "No results were found for your search query. This could be due to Baidu's bot detection or the query returned no matches. Please try rephrasing your search or try again in a few minutes." output = [f"Found {len(results)} search results:\n"] for result in results: output.extend([ f"{result.position}. {result.title}", f" URL: {result.link}", f" Summary: {result.snippet}", "" # Empty line between results ]) return "\n".join(output) class WebContentFetcher: HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", } def __init__(self): self.rate_limiter = RateLimiter(requests_per_minute=20) def extract_content_from_html(self, html: str) -> str: """Extract and convert HTML content to Markdown format.""" ret = readabilipy.simple_json.simple_json_from_html_string( html, use_readability=True ) raw_content = ret.get("content") if isinstance(ret, dict) else None if not raw_content: soup = BeautifulSoup(html, "html.parser") for element in soup(["script", "style", "nav", "header", "footer"]): element.decompose() text = soup.get_text(separator=" ") cleaned = re.sub(r"\s+", " ", text).strip() else: cleaned = markdownify.markdownify( raw_content, heading_style=markdownify.ATX, ).strip() if len(cleaned) > 150: cleaned = f"{cleaned[:150].rstrip()}..." return cleaned def _extract_client_redirect(self, html: str) -> Optional[str]: soup = BeautifulSoup(html, "html.parser") script_tag = soup.find("script", string=re.compile(r"window\.location\.replace")) if script_tag and script_tag.string: match = re.search(r'window\.location\.replace\("([^"]+)"\)', script_tag.string) if match: return match.group(1) meta_tag = soup.find("meta", attrs={"http-equiv": re.compile("refresh", re.I)}) if meta_tag: content = meta_tag.get("content", "") if "url=" in content.lower(): return content.split("url=", 1)[-1].strip() return None async def fetch_and_parse(self, url: str, max_redirects: int = 5) -> Tuple[str, str]: """Fetch and parse content from a webpage, returning a text preview and the resolved URL.""" await self.rate_limiter.acquire() visited: Set[str] = set() target_url = url response: Optional[httpx.Response] = None async with httpx.AsyncClient(headers=self.HEADERS, timeout=30.0, follow_redirects=True) as client: for _ in range(max_redirects): if target_url in visited: logger.debug("Detected redirect loop for %s", target_url) break visited.add(target_url) try: response = await client.get(target_url) response.raise_for_status() except httpx.TimeoutException: logger.warning("Timeout while fetching %s", target_url) return "", target_url except httpx.HTTPError as exc: logger.error("HTTP error while fetching %s: %s", target_url, exc) return "", target_url content_type = response.headers.get("Content-Type", "") if "text/html" not in content_type: break redirect_url = self._extract_client_redirect(response.text) if not redirect_url: break target_url = redirect_url if not response: return "", url final_url = str(response.url) text = self.extract_content_from_html(response.text) return text, final_url # Initialize FastMCP server mcp = FastMCP("baidu-search") searcher = BaiduSearcher(WebContentFetcher()) @mcp.tool() async def search(query: str, ctx: Context, max_results: int = 6, deep_mode: bool = False) -> str: """ Search Baidu and return formatted results. Args: query: The search query string max_results: Maximum number of results to return (default: 6) deep_mode: Deep search the web content (default: False) ctx: MCP context for logging """ try: results = await searcher.search(query, ctx, max_results, deep_mode) return searcher.format_results_for_llm(results) except Exception as e: traceback.print_exc(file=sys.stderr) return f"An error occurred while searching: {str(e)}" def main(): mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evilran/baidu-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server