MCP2Brave
import os
import logging
import sys
import requests
from datetime import datetime
from dotenv import load_dotenv
from fastmcp import FastMCP
from logging.handlers import RotatingFileHandler
# 设置默认编码为UTF-8
if sys.stdout.encoding != 'utf-8':
try:
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
pass
# 读取环境变量
load_dotenv()
# 准备日志
def setup_logger(name):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# 创建logs目录(如果不存在)
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 文件处理器 - 使用 RotatingFileHandler 限制文件大小
log_file = os.path.join(log_dir, f"{name}.log")
file_handler = RotatingFileHandler(
log_file,
maxBytes=1024*1024, # 1MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# 使用新的日志设置
logger = setup_logger("mcp2brave")
logger.info("Logger initialized - outputs to both console and file in logs directory")
# Create an MCP server
mcp = FastMCP("mcp2brave", dependencies=["python-dotenv", "requests"])
# 准备API密钥
API_KEY = os.getenv("BRAVE_API_KEY")
if not API_KEY:
logger.error("BRAVE_API_KEY environment variable not found")
raise ValueError("BRAVE_API_KEY environment variable required")
def _detect_language(text: str) -> str:
"""检测文本语言并返回对应的语言代码"""
# 定义语言检测规则
LANGUAGE_PATTERNS = {
# 中文 (简体和繁体)
'zh-hans': ('\u4e00', '\u9fff'), # 简体中文
'zh-hant': ('\u4e00', '\u9fff'), # 繁体中文
# 日文
'jp': ('\u3040', '\u309f', '\u30a0', '\u30ff'), # 平假名和片假名
# 韩文
'ko': ('\uac00', '\ud7af'), # 谚文
# 俄文
'ru': ('\u0400', '\u04ff'), # 西里尔字母
# 阿拉伯文
'ar': ('\u0600', '\u06ff'),
# 希伯来文
'he': ('\u0590', '\u05ff'),
# 泰文
'th': ('\u0e00', '\u0e7f'),
# 越南文 (使用扩展拉丁字母)
'vi': ('àáạảãâầấậẩẫăằắặẳẵèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹđ'),
# 印地文
'hi': ('\u0900', '\u097f'),
# 泰米尔文
'ta': ('\u0b80', '\u0bff'),
# 特卢固文
'te': ('\u0c00', '\u0c7f'),
}
def contains_chars_in_range(text, *ranges):
"""检查文本是否包含指定Unicode范围内的字符"""
if len(ranges) % 2 == 0: # 范围对
for i in range(0, len(ranges), 2):
start, end = ranges[i:i+2]
if any(start <= char <= end for char in text):
return True
else: # 字符列表
return any(char in ranges[0] for char in text)
return False
# 检测常见的非拉丁文字系统
for lang, pattern in LANGUAGE_PATTERNS.items():
if contains_chars_in_range(text, *pattern):
# 对中文进行简繁体识别(这里使用简单规则,实际应用可能需要更复杂的逻辑)
if lang in ['zh-hans', 'zh-hant']:
# 这里可以添加更复杂的简繁体识别逻辑
return 'zh-hans' # 默认返回简体中文
return lang
# 检测拉丁字母语言(简单示例)
# 注意:这是一个非常简化的实现,实际应用可能需要更复杂的语言检测
LATIN_PATTERNS = {
'es': ['ñ', 'á', 'é', 'í', 'ó', 'ú', '¿', '¡'],
'fr': ['é', 'è', 'ê', 'à', 'ç', 'ù', 'û', 'ï'],
'de': ['ä', 'ö', 'ü', 'ß'],
'pt-pt': ['ã', 'õ', 'á', 'é', 'í', 'ó', 'ú', 'â', 'ê', 'ô'],
'it': ['à', 'è', 'é', 'ì', 'ò', 'ó', 'ù'],
}
for lang, patterns in LATIN_PATTERNS.items():
if any(pattern in text.lower() for pattern in patterns):
return lang
# 默认返回英语
return "en"
def _extract_text_from_html(html_content: str) -> str:
"""从HTML内容中提取有意义的文本"""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'iframe', 'ad', '.advertisement']):
element.decompose()
# 优先提取文章主要内容
article = soup.find('article')
if article:
content = article
else:
# 尝试找到主要内容区域
content = soup.find(['main', '.content', '#content', '.post-content', '.article-content'])
if not content:
content = soup
# 获取文本
text = content.get_text(separator='\n')
# 文本清理
lines = []
for line in text.split('\n'):
line = line.strip()
# 跳过空行和太短的行
if line and len(line) > 30:
lines.append(line)
# 组合文本,限制在1000字符以内
cleaned_text = ' '.join(lines)
if len(cleaned_text) > 1000:
# 尝试在句子边界截断
end_pos = cleaned_text.rfind('. ', 0, 1000)
if end_pos > 0:
cleaned_text = cleaned_text[:end_pos + 1]
else:
cleaned_text = cleaned_text[:1000]
return cleaned_text
except Exception as e:
logger.error(f"Error extracting text from HTML: {str(e)}")
# 如果无法处理HTML,返回原始内容的一部分
text = html_content.replace('<', ' <').replace('>', '> ').split()
return ' '.join(text)[:500]
def _do_search_with_summary(query: str) -> str:
"""Internal function to handle the search logic with summary support"""
try:
query = query.encode('utf-8').decode('utf-8')
url = "https://api.search.brave.com/res/v1/web/search"
headers = {
"Accept": "application/json",
"X-Subscription-Token": API_KEY
}
params = {
"q": query,
"count": 5,
"result_filter": "web",
"enable_summarizer": True,
"format": "json"
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
logger.debug("API Response Structure:")
logger.debug(f"Response Keys: {list(data.keys())}")
# 处理搜索结果
summary_text = ""
search_results = []
# 获取网页搜索结果
if 'web' in data and 'results' in data['web']:
results = data['web']['results']
# 获取摘要
if 'summarizer' in data:
logger.debug("Found official summarizer data")
summary = data.get('summarizer', {})
summary_text = summary.get('text', '')
else:
logger.debug("No summarizer found, generating summary from top results")
# 使用前两个结果的内容作为摘要
try:
summaries = []
for result in results[:2]: # 只处理前两个结果
url = result.get('url')
if url:
logger.debug(f"Fetching content from: {url}")
content = _get_url_content_direct(url)
# 提取HTML中的文本内容
raw_content = content.split('---\n\n')[-1]
text_content = _extract_text_from_html(raw_content)
if text_content:
# 添加标题和来源信息
title = result.get('title', 'No title')
date = result.get('age', '') or result.get('published_time', '')
summaries.append(f"### {title}")
if date:
summaries.append(f"Published: {date}")
summaries.append(text_content)
if summaries:
summary_text = "\n\n".join([
"Generated summary from top results:",
*summaries
])
logger.debug("Successfully generated summary from content")
else:
summary_text = results[0].get('description', '')
except Exception as e:
logger.error(f"Error generating summary from content: {str(e)}")
summary_text = results[0].get('description', '')
# 处理搜索结果显示
for result in results:
title = result.get('title', 'No title').encode('utf-8').decode('utf-8')
url = result.get('url', 'No URL')
description = result.get('description', 'No description').encode('utf-8').decode('utf-8')
search_results.append(f"- {title}\n URL: {url}\n Description: {description}\n")
# 组合输出
output = []
if summary_text:
output.append(f"Summary:\n{summary_text}\n")
if search_results:
output.append("Search Results:\n" + "\n".join(search_results))
logger.debug(f"Has summary: {bool(summary_text)}")
logger.debug(f"Number of results: {len(search_results)}")
return "\n".join(output) if output else "No results found for your query."
except Exception as e:
logger.error(f"Search error: {str(e)}")
logger.exception("Detailed error trace:")
return f"Error performing search: {str(e)}"
def _get_url_content_direct(url: str) -> str:
"""Internal function to get content directly using requests"""
try:
logger.debug(f"Directly fetching content from URL: {url}")
response = requests.get(url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
response.raise_for_status()
# 尝试检测编码
if 'charset' in response.headers.get('content-type', '').lower():
response.encoding = response.apparent_encoding
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# 移除不需要的元素
for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'iframe', 'ad', '.advertisement']):
element.decompose()
# 尝试找到主要内容区域
main_content = None
possible_content_elements = [
soup.find('article'),
soup.find('main'),
soup.find(class_='content'),
soup.find(id='content'),
soup.find(class_='post-content'),
soup.find(class_='article-content'),
soup.find(class_='entry-content'),
soup.find(class_='main-content'),
soup.select_one('div[class*="content"]'), # 包含 "content" 的任何 class
]
for element in possible_content_elements:
if element:
main_content = element
break
if not main_content:
main_content = soup
text = main_content.get_text(separator='\n')
lines = []
for line in text.split('\n'):
line = line.strip()
if line and len(line) > 30:
lines.append(line)
cleaned_text = ' '.join(lines)
if len(cleaned_text) > 1000:
end_pos = cleaned_text.rfind('. ', 0, 1000)
if end_pos > 0:
cleaned_text = cleaned_text[:end_pos + 1]
else:
cleaned_text = cleaned_text[:1000]
metadata = f"URL: {url}\n"
metadata += f"Content Length: {len(response.text)} characters\n"
metadata += f"Content Type: {response.headers.get('content-type', 'Unknown')}\n"
metadata += "---\n\n"
return f"{metadata}{cleaned_text}"
except Exception as e:
logger.error(f"Error extracting text from HTML: {str(e)}")
return f"Error extracting text: {str(e)}"
except Exception as e:
logger.error(f"Error fetching URL content directly: {str(e)}")
return f"Error getting content: {str(e)}"
def _do_news_search(query: str, country: str = "all", search_lang: str = None) -> str:
"""Internal function to handle news search using Brave News API"""
try:
query = query.encode('utf-8').decode('utf-8')
# 如果未指定语言,自动检测
if search_lang is None:
search_lang = _detect_language(query)
logger.debug(f"Detected language: {search_lang} for query: {query}")
url = "https://api.search.brave.com/res/v1/news/search"
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": API_KEY
}
params = {
"q": query,
"count": 10,
"country": country,
"search_lang": search_lang,
"spellcheck": 1
}
logger.debug(f"Searching news for query: {query}")
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
# 处理新闻搜索结果
results = []
if 'results' in data:
for news in data['results']:
title = news.get('title', 'No title').encode('utf-8').decode('utf-8')
url = news.get('url', 'No URL')
description = news.get('description', 'No description').encode('utf-8').decode('utf-8')
date = news.get('published_time', 'Unknown date')
source = news.get('source', {}).get('name', 'Unknown source')
news_item = [
f"- {title}",
f" Source: {source}",
f" Date: {date}",
f" URL: {url}",
f" Description: {description}\n"
]
results.append("\n".join(news_item))
if not results:
return "No news found for your query."
return "News Results:\n\n" + "\n".join(results)
except requests.exceptions.RequestException as e:
logger.error(f"News API request error: {str(e)}")
return f"Error searching news: {str(e)}"
except Exception as e:
logger.error(f"News search error: {str(e)}")
logger.exception("Detailed error trace:")
return f"Error searching news: {str(e)}"
@mcp.tool()
def search_brave_with_summary(query: str) -> str:
"""Search the web using Brave Search API """
return _do_search_with_summary(query)
@mcp.tool()
def brave_search_summary(query: str) -> str:
"""使用Brave搜索引擎搜索网络信息"""
return _do_search_with_summary(query)
@mcp.tool()
def get_url_content_direct(url: str) -> str:
"""Get webpage content directly using HTTP request
Args:
url (str): The URL to fetch content from
Returns:
str: The webpage content and metadata
"""
return _get_url_content_direct(url)
@mcp.tool()
def url_content(url: str) -> str:
"""直接获取网页内容
参数:
url (str): 目标网页地址
返回:
str: 网页内容和元数据
"""
return _get_url_content_direct(url)
@mcp.tool()
def search_news(query: str) -> str:
"""Search news using Brave News API
Args:
query (str): The search query for news
Returns:
str: News search results including titles, sources, dates and descriptions
"""
return _do_news_search(query)
@mcp.tool()
def search_news_info(query: str) -> str:
"""使用Brave新闻API搜索新闻
参数:
query (str): 新闻搜索关键词
返回:
str: 新闻搜索结果,包含标题、来源、日期和描述
"""
return _do_news_search(query)