Skip to main content
Glama

BiliMind-MCP

by jiaohuix
bilimind_mcp.py15.1 kB
import os import sys import json import time import requests from typing import Dict import yt_dlp from faster_whisper import WhisperModel from dotenv import load_dotenv from datetime import datetime from mcp.server.fastmcp import FastMCP # 加载环境变量 load_dotenv() # 定义常量 API_BASE = "https://api.siliconflow.cn/v1" API_KEY = os.getenv("OPENAI_API_KEY", "sk-") MCP_PORT = int(os.getenv("MCP_PORT", 8001)) MODEL_NAME = "Qwen/Qwen3-8B" DEFAULT_OUTPUT_DIR = "downloads" DEFAULT_MODEL_DIR = "models" WHISPER_MODEL_SIZE = "tiny" # 固定使用tiny模型 # 初始化FastMCP服务器 mcp = FastMCP("bili_note_generator", port=MCP_PORT) class BilibiliDownloader: """哔哩哔哩视频下载器""" def __init__(self, output_dir: str = DEFAULT_OUTPUT_DIR): self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) def download_audio(self, video_url: str) -> dict: """下载B站视频的音频""" print(f"开始下载视频音频: {video_url}") output_path = os.path.join(self.output_dir, "%(id)s.%(ext)s") ydl_opts = { 'format': 'bestaudio[ext=m4a]/bestaudio/best', 'outtmpl': output_path, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', }], 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=True) video_id = info.get("id") audio_path = os.path.join(self.output_dir, f"{video_id}.mp3") print(f"音频下载完成: {audio_path}") return { 'file_path': audio_path, 'title': info.get("title"), 'duration': info.get("duration", 0), 'cover_url': info.get("thumbnail"), 'video_id': video_id, } class WhisperTranscriber: """使用Faster-Whisper转录音频""" def __init__(self, model_dir: str = DEFAULT_MODEL_DIR): self.model_dir = model_dir os.makedirs(model_dir, exist_ok=True) def check_model_files(self) -> bool: """检查模型文件是否已完整存在""" model_size = WHISPER_MODEL_SIZE model_path = os.path.join(self.model_dir, model_size) # 需要检查的文件列表 required_files = [ "model.bin", "config.json", "tokenizer.json", "vocabulary.txt" ] # 检查每个文件是否存在 all_files_exist = True for filename in required_files: file_path = os.path.join(model_path, filename) if not os.path.exists(file_path): print(f"缺少模型文件: {filename}") all_files_exist = False break return all_files_exist def download_model(self, use_mirror: bool = True) -> bool: """下载模型文件,使用镜像站点""" model_size = WHISPER_MODEL_SIZE print(f"开始从镜像站点下载 {model_size} 模型...") # 使用镜像站点 base_url = "https://hf-mirror.com" if use_mirror else "https://huggingface.co" repo_id = f"guillaumekln/faster-whisper-{model_size}" # 创建模型目录 model_dir = os.path.join(self.model_dir, model_size) os.makedirs(model_dir, exist_ok=True) # 需要下载的文件列表 files_to_download = [ "model.bin", "config.json", "tokenizer.json", "vocabulary.txt" ] # 下载文件 for filename in files_to_download: file_path = os.path.join(model_dir, filename) if os.path.exists(file_path): print(f"文件 {filename} 已存在,跳过下载") continue url = f"{base_url}/{repo_id}/resolve/main/{filename}" print(f"下载 {filename} 从 {url}...") try: response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) block_size = 1024 # 1 KB downloaded = 0 with open(file_path, 'wb') as f: for data in response.iter_content(block_size): downloaded += len(data) f.write(data) # 显示下载进度 done = int(50 * downloaded / total_size) if total_size > 0 else 0 sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {downloaded/1024/1024:.2f}/{total_size/1024/1024:.2f} MB") sys.stdout.flush() print(f"\n{filename} 下载完成") except Exception as e: print(f"下载 {filename} 失败: {e}") if os.path.exists(file_path): os.remove(file_path) # 删除可能部分下载的文件 return False return True def transcribe(self, audio_path: str) -> Dict: """转录音频文件""" model_size = WHISPER_MODEL_SIZE model_path = os.path.join(self.model_dir, model_size) # 检查是否存在全局模型目录 global_model_path = os.path.join(DEFAULT_MODEL_DIR, model_size) global_model_exists = os.path.exists(os.path.join(global_model_path, "model.bin")) # 优先使用全局模型目录 if global_model_exists: print(f"发现全局模型目录 {global_model_path},将使用该目录") model_path = global_model_path # 检查模型文件是否完整存在 if os.path.exists(os.path.join(model_path, "model.bin")): print(f"发现本地模型 {model_size},直接加载...") try: model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) except Exception as e: print(f"加载本地模型失败: {e}") print("尝试重新下载...") if not self.download_model(): raise Exception("无法下载模型") model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) else: print(f"未发现本地模型 {model_size},开始下载...") if self.download_model(): model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) else: raise Exception("无法下载模型") # 执行转录 print(f"开始转录: {audio_path}") segments, info = model.transcribe(audio_path, language="zh", beam_size=5) # 打印检测到的语言和概率 print(f"检测到语言: '{info.language}' (概率: {info.language_probability:.2f})") # 收集所有文本片段 full_text = "" segments_list = list(segments) # 将生成器转换为列表 for segment in segments_list: full_text += segment.text + " " return { "full_text": full_text.strip(), "segments": segments_list, "language": info.language } class NotesGenerator: """使用LLM生成笔记""" def __init__(self, api_base: str = API_BASE, api_key: str = API_KEY, model: str = MODEL_NAME): self.api_base = api_base self.api_key = api_key self.model = model def generate_notes(self, transcript_text: str, video_title: str = "", tags: str = "") -> str: """根据转录文本生成笔记""" print("开始生成笔记...") # 构建提示词 prompt = f""" 你是一个专业的笔记助手,擅长将视频转录内容整理成清晰、有条理且信息丰富的笔记。 语言要求: - 笔记必须使用 **中文** 撰写。 - 专有名词、技术术语、品牌名称和人名应适当保留 **英文**。 视频标题: {video_title} 视频标签: {tags} 输出说明: - 仅返回最终的 **Markdown 内容**。 - **不要**将输出包裹在代码块中。 - 如果要加粗并保留编号,应使用 `1\\. **内容**`(加反斜杠),防止被误解析为有序列表。 - 或者使用 `## 1. 内容` 的形式作为标题。 视频转录内容: --- {transcript_text} --- 你的任务: 根据上面的转录内容,生成结构化的笔记,遵循以下原则: 1. **完整信息**:记录尽可能多的相关细节,确保内容全面。 2. **去除无关内容**:省略广告、填充词、问候语和不相关的言论。 3. **保留关键细节**:保留重要事实、示例、结论和建议。 4. **可读布局**:必要时使用项目符号,并保持段落简短,增强可读性。 5. 视频中提及的数学公式必须保留,并以 LaTeX 语法形式呈现,适合 Markdown 渲染。 额外任务: 1. 为每个主要标题(`##`)添加时间标记,格式为 `*Content-[mm:ss]`。 2. 如果某个部分涉及视觉演示、代码演示或UI交互,在该部分末尾插入截图提示,格式为 `*Screenshot-[mm:ss]`。 3. 在笔记末尾添加一个专业的AI总结,简要概括整个视频的内容。 请提供完整的笔记内容。 """ headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model, "messages": [ { "role": "system", "content": "你是一个专业的笔记助手,擅长将视频转录内容整理成笔记。" }, { "role": "user", "content": prompt } ], "temperature": 0.7 } try: response = requests.post(f"{self.api_base}/chat/completions", headers=headers, json=data) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: print(f"调用API失败: {e}") if 'response' in locals() and response: print(f"响应状态码: {response.status_code}") print(f"响应内容: {response.text}") return "" # 实现MCP工具 @mcp.tool() async def generate_bilibili_notes(video_url: str) -> str: """ 从B站视频生成笔记。该工具会下载视频音频,转录为文本,然后生成结构化笔记。 Args: video_url: B站视频链接,例如 https://www.bilibili.com/video/BV1z65TzuE94 Returns: str: 生成的笔记内容(Markdown格式) """ # 记录开始时间 start_time = time.time() # 创建临时目录 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") output_dir = f"downloads_{timestamp}" # 检查全局模型目录是否存在 global_model_dir = DEFAULT_MODEL_DIR model_path = os.path.join(global_model_dir, WHISPER_MODEL_SIZE) # 如果全局目录不存在或不完整,则使用临时目录 model_dir = global_model_dir if not (os.path.exists(model_path) and os.path.exists(os.path.join(model_path, "model.bin"))): model_dir = f"models_{timestamp}" os.makedirs(output_dir, exist_ok=True) try: # 步骤1: 下载视频音频 downloader = BilibiliDownloader(output_dir=output_dir) audio_info = downloader.download_audio(video_url) # 步骤2: 转录音频 transcriber = WhisperTranscriber(model_dir=model_dir) transcript = transcriber.transcribe(audio_info['file_path']) # 步骤3: 生成笔记 notes_generator = NotesGenerator() notes = notes_generator.generate_notes( transcript["full_text"], video_title=audio_info['title'], tags="" ) # 删除音频文件 if os.path.exists(audio_info['file_path']): os.remove(audio_info['file_path']) print(f"已删除音频文件: {audio_info['file_path']}") # 记录结束时间 end_time = time.time() processing_time = end_time - start_time # 添加处理信息 processing_info = f""" --- **处理信息**: - 视频标题: {audio_info['title']} - 视频ID: {audio_info['video_id']} - 处理时间: {processing_time:.2f} 秒 - 使用模型: faster-whisper-{WHISPER_MODEL_SIZE} - 生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} --- """ return notes + processing_info except Exception as e: error_message = f"生成笔记失败: {str(e)}" print(error_message) return error_message finally: # 清理临时文件 if os.path.exists(output_dir): import shutil shutil.rmtree(output_dir, ignore_errors=True) # 如果使用的是临时模型目录,也清理它 if model_dir != global_model_dir and os.path.exists(model_dir): import shutil shutil.rmtree(model_dir, ignore_errors=True) @mcp.tool() async def get_current_time() -> str: """ 获取当前的日期和时间,并返回格式化的时间字符串,包括星期几和其他详细信息。 Returns: str: 当前时间的详细信息,格式为 "YYYY年MM月DD日 星期X HH:MM:SS" """ # 获取当前时间 current_time = datetime.now() # 格式化日期和时间 formatted_time = current_time.strftime("%Y年%m月%d日 %H:%M:%S") # 获取星期几(英文) weekday_english = current_time.strftime("%A") # 将英文星期几转换为中文 weekday_chinese = { "Monday": "星期一", "Tuesday": "星期二", "Wednesday": "星期三", "Thursday": "星期四", "Friday": "星期五", "Saturday": "星期六", "Sunday": "星期日" }.get(weekday_english, "未知") # 组合最终的返回字符串 result = f"{formatted_time} {weekday_chinese}" return result if __name__ == "__main__": # 初始化并运行服务器 mcp.run(transport='sse') # 服务器发送事件

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jiaohuix/BiliMind-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server