Skip to main content
Glama

BiliMind-MCP

by jiaohuix
bili_to_notes.py12.3 kB
import os import sys import json import time import requests import argparse from typing import Dict, List, Any import yt_dlp from faster_whisper import WhisperModel from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 定义常量 API_KEY = os.getenv("OPENAI_API_KEY", "sk-") API_BASE = os.getenv("API_BASE", "https://api.siliconflow.cn/v1") class BilibiliDownloader: """哔哩哔哩视频下载器""" def __init__(self, output_dir: str = "downloads"): self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) def download_audio(self, video_url: str) -> dict: """下载B站视频的音频""" print(f"开始下载视频音频: {video_url}") output_path = os.path.join(self.output_dir, "%(id)s.%(ext)s") ydl_opts = { 'format': 'bestaudio[ext=m4a]/bestaudio/best', 'outtmpl': output_path, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', }], 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=True) video_id = info.get("id") audio_path = os.path.join(self.output_dir, f"{video_id}.mp3") print(f"音频下载完成: {audio_path}") return { 'file_path': audio_path, 'title': info.get("title"), 'duration': info.get("duration", 0), 'cover_url': info.get("thumbnail"), 'video_id': video_id, } class WhisperTranscriber: """使用Faster-Whisper转录音频""" def __init__(self, model_dir: str = "./models"): self.model_dir = model_dir os.makedirs(model_dir, exist_ok=True) def download_model(self, model_size: str = "tiny", use_mirror: bool = True) -> bool: """下载模型文件,使用镜像站点""" print(f"开始从镜像站点下载 {model_size} 模型...") # 使用镜像站点 base_url = "https://hf-mirror.com" if use_mirror else "https://huggingface.co" repo_id = f"guillaumekln/faster-whisper-{model_size}" # 创建模型目录 model_dir = os.path.join(self.model_dir, model_size) os.makedirs(model_dir, exist_ok=True) # 需要下载的文件列表 files_to_download = [ "model.bin", "config.json", "tokenizer.json", "vocabulary.txt" ] # 下载文件 for filename in files_to_download: file_path = os.path.join(model_dir, filename) if os.path.exists(file_path): print(f"文件 {filename} 已存在,跳过下载") continue url = f"{base_url}/{repo_id}/resolve/main/{filename}" print(f"下载 {filename} 从 {url}...") try: response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) block_size = 1024 # 1 KB downloaded = 0 with open(file_path, 'wb') as f: for data in response.iter_content(block_size): downloaded += len(data) f.write(data) # 显示下载进度 done = int(50 * downloaded / total_size) if total_size > 0 else 0 sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {downloaded/1024/1024:.2f}/{total_size/1024/1024:.2f} MB") sys.stdout.flush() print(f"\n{filename} 下载完成") except Exception as e: print(f"下载 {filename} 失败: {e}") if os.path.exists(file_path): os.remove(file_path) # 删除可能部分下载的文件 return False return True def transcribe(self, audio_path: str, model_size: str = "tiny") -> Dict: """转录音频文件""" model_path = os.path.join(self.model_dir, model_size) # 检查是否已经下载了模型 if os.path.exists(os.path.join(model_path, "model.bin")): print(f"发现本地模型 {model_size},直接加载...") try: model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) except Exception as e: print(f"加载本地模型失败: {e}") print("尝试重新下载...") if not self.download_model(model_size=model_size): raise Exception("无法下载模型") model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) else: print(f"未发现本地模型 {model_size},开始下载...") if self.download_model(model_size=model_size): model = WhisperModel( model_path, device="cpu", compute_type="int8", local_files_only=True ) else: raise Exception("无法下载模型") # 执行转录 print(f"开始转录: {audio_path}") segments, info = model.transcribe(audio_path, language="zh", beam_size=5) # 打印检测到的语言和概率 print(f"检测到语言: '{info.language}' (概率: {info.language_probability:.2f})") # 收集所有文本片段 full_text = "" segments_list = list(segments) # 将生成器转换为列表 for segment in segments_list: full_text += segment.text + " " return { "full_text": full_text.strip(), "segments": segments_list, "language": info.language } # 定义常量 API_KEY = os.getenv("OPENAI_API_KEY", "sk-") API_BASE = os.getenv("API_BASE", "https://api.siliconflow.cn/v1") class NotesGenerator: """使用LLM生成笔记""" def __init__(self, api_base: str = API_BASE, api_key: str = API_KEY, model: str = "Qwen/Qwen3-8B"): self.api_base = api_base self.api_key = api_key self.model = model def generate_notes(self, transcript_text: str, video_title: str = "", tags: str = "") -> str: """根据转录文本生成笔记""" print("开始生成笔记...") # 构建提示词 prompt = f""" 你是一个专业的笔记助手,擅长将视频转录内容整理成清晰、有条理且信息丰富的笔记。 语言要求: - 笔记必须使用 **中文** 撰写。 - 专有名词、技术术语、品牌名称和人名应适当保留 **英文**。 视频标题: {video_title} 视频标签: {tags} 输出说明: - 仅返回最终的 **Markdown 内容**。 - **不要**将输出包裹在代码块中。 - 如果要加粗并保留编号,应使用 `1\\. **内容**`(加反斜杠),防止被误解析为有序列表。 - 或者使用 `## 1. 内容` 的形式作为标题。 视频转录内容: --- {transcript_text} --- 你的任务: 根据上面的转录内容,生成结构化的笔记,遵循以下原则: 1. **完整信息**:记录尽可能多的相关细节,确保内容全面。 2. **去除无关内容**:省略广告、填充词、问候语和不相关的言论。 3. **保留关键细节**:保留重要事实、示例、结论和建议。 4. **可读布局**:必要时使用项目符号,并保持段落简短,增强可读性。 5. 视频中提及的数学公式必须保留,并以 LaTeX 语法形式呈现,适合 Markdown 渲染。 额外任务: 1. 为每个主要标题(`##`)添加时间标记,格式为 `*Content-[mm:ss]`。 2. 如果某个部分涉及视觉演示、代码演示或UI交互,在该部分末尾插入截图提示,格式为 `*Screenshot-[mm:ss]`。 3. 在笔记末尾添加一个专业的AI总结,简要概括整个视频的内容。 请提供完整的笔记内容。 """ headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model, "messages": [ { "role": "system", "content": "你是一个专业的笔记助手,擅长将视频转录内容整理成笔记。" }, { "role": "user", "content": prompt } ], "temperature": 0.7 } try: response = requests.post(f"{self.api_base}/chat/completions", headers=headers, json=data) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: print(f"调用API失败: {e}") if 'response' in locals() and response: print(f"响应状态码: {response.status_code}") print(f"响应内容: {response.text}") return "" def main(): DEFAULT_VIDEO_URL = "https://www.bilibili.com/video/BV1z65TzuE94" parser = argparse.ArgumentParser(description='从B站视频生成笔记') parser.add_argument('--url', '-u', default=DEFAULT_VIDEO_URL, help=f'B站视频链接 (默认: {DEFAULT_VIDEO_URL})') parser.add_argument('--output', '-o', default='video_notes.md', help='输出笔记文件路径') parser.add_argument('--model-size', '-m', default='tiny', choices=['tiny', 'base', 'small', 'medium', 'large-v3'], help='Whisper模型大小') parser.add_argument('--keep-audio', '-k', action='store_true', help='保留下载的音频文件') args = parser.parse_args() print(f"处理视频: {args.url}") print(f"使用模型: {args.model_size}") print(f"输出文件: {args.output}") # 步骤1: 下载视频音频 downloader = BilibiliDownloader() try: audio_info = downloader.download_audio(args.url) except Exception as e: print(f"下载音频失败: {e}") return # 步骤2: 转录音频 transcriber = WhisperTranscriber() try: transcript = transcriber.transcribe(audio_info['file_path'], model_size=args.model_size) except Exception as e: print(f"转录音频失败: {e}") return # 步骤3: 生成笔记 notes_generator = NotesGenerator() try: notes = notes_generator.generate_notes( transcript["full_text"], video_title=audio_info['title'], tags="" ) except Exception as e: print(f"生成笔记失败: {e}") return # 保存笔记 with open(args.output, 'w', encoding='utf-8') as f: f.write(notes) print(f"笔记已保存到: {args.output}") # 保存转录文本 transcript_file = f"{os.path.splitext(args.output)[0]}_transcript.txt" with open(transcript_file, 'w', encoding='utf-8') as f: f.write(transcript["full_text"]) f.write("\n\n分段详情:\n") for segment in transcript["segments"]: f.write(f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}\n") print(f"转录文本已保存到: {transcript_file}") # 如果不保留音频文件,则删除 if not args.keep_audio and os.path.exists(audio_info['file_path']): os.remove(audio_info['file_path']) print(f"已删除音频文件: {audio_info['file_path']}") else: print(f"音频文件保留在: {audio_info['file_path']}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jiaohuix/BiliMind-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server