Skip to main content
Glama

TAPD Data Fetcher

data_preprocessor.py16.5 kB
""" data_preprocessor.py 功能: - 清理 description 字段中的 HTML 样式信息,保留有意义的文字内容、超链接和图片地址 - 使用 DeepSeek API 对内容进行准确复述 - 处理腾讯文档链接和图片内容(预留功能) - 优化 TAPD 数据的 description 字段 依赖:aiohttp, beautifulsoup4, python-docx """ import os import json import re import aiohttp import asyncio from typing import Dict, List, Optional, Any from bs4 import BeautifulSoup from mcp.server.fastmcp import FastMCP from docx import Document import csv import shutil import uuid from .common_utils import get_api_manager, get_file_manager mcp = FastMCP("data_preprocessor") async def call_deepseek_api(content: str, session: aiohttp.ClientSession) -> str: """调用 DeepSeek API 对内容进行复述""" api_manager = get_api_manager() prompt = f"""适当简化以下文本,保留关键信息: {content} 只输出简化结果,不要解释。""" return await api_manager.call_llm(prompt, session, max_tokens=2000) def clean_html_styles(html_content: str) -> str: """清理HTML样式信息,保留有意义的文字内容、超链接和图片地址""" if not html_content: return "" # 先用正则表达式清理样式属性 import re # 移除 style 属性 html_content = re.sub(r'\s*style="[^"]*"', '', html_content) html_content = re.sub(r"\s*style='[^']*'", '', html_content) # 移除其他无用属性,但保留 href、src、alt、title # 移除 data-* 属性 html_content = re.sub(r'\s*data-[^=]*="[^"]*"', '', html_content) html_content = re.sub(r"\s*data-[^=]*='[^']*'", '', html_content) # 移除常见的无用属性 useless_attrs = ['margin', 'padding', 'line-height', 'color', 'font-family', 'font-size', 'font-style', 'font-variant', 'font-weight', 'letter-spacing', 'orphans', 'text-align', 'text-indent', 'text-transform', 'widows', 'word-spacing', 'webkit-text-stroke-width', 'white-space', 'background-color', 'text-decoration-thickness', 'text-decoration-style', 'text-decoration-color'] for attr in useless_attrs: html_content = re.sub(f'\\s*{attr}="[^"]*"', '', html_content) html_content = re.sub(f"\\s*{attr}='[^']*'", '', html_content) # 使用 BeautifulSoup 解析清理后的 HTML soup = BeautifulSoup(html_content, 'html.parser') # 提取有用信息 result_parts = [] # 处理文本内容 text_content = soup.get_text(separator=' ', strip=True) # 清理多余的空格和无用文本 text_content = re.sub(r'\s+', ' ', text_content) text_content = text_content.replace('...', '').strip() if text_content: result_parts.append(text_content) # 提取超链接 - 使用正则表达式直接提取 link_pattern = r'<a[^>]*href=[\'"](https?://[^\'">]+)[\'"][^>]*>([^<]*)</a>' links = re.findall(link_pattern, html_content, re.IGNORECASE) for href, text in links: if text.strip(): result_parts.append(f"链接: {text.strip()} ({href})") else: result_parts.append(f"链接: {href}") # 提取图片信息 - 使用正则表达式直接提取 img_pattern = r'<img[^>]*src=[\'"](/?[^\'">]+)[\'"][^>]*(?:alt=[\'"]([^\'">]*)[\'"])?[^>]*>' images = re.findall(img_pattern, html_content, re.IGNORECASE) for src, alt in images: if alt: result_parts.append(f"图片: {alt} ({src})") else: result_parts.append(f"图片: {src}") return ' '.join(result_parts) def extract_document_links(content: str) -> List[str]: """提取腾讯文档链接""" # 匹配腾讯文档链接模式 tencent_doc_pattern = r'https://docs\.qq\.com/[^\s\)"<>]+|https://doc\.weixin\.qq\.com/[^\s\)"<>]+' links = re.findall(tencent_doc_pattern, content) # 清理链接末尾的引号等字符 cleaned_links = [] for link in links: # 移除末尾的引号、尖括号等 link = re.sub(r'["\'>]+$', '', link) if link and link not in cleaned_links: cleaned_links.append(link) return cleaned_links def extract_image_paths(content: str) -> List[str]: """提取图片路径""" # 匹配图片路径模式 img_pattern = r'/tfl/pictures/[^\s\)"]+\.(?:png|jpg|jpeg|gif|bmp)' paths = re.findall(img_pattern, content) return paths def extract_docx_content(docx_path: str) -> str: """提取 docx 文档内容为纯文本(复制自 docx_summarizer.py)""" try: if not os.path.exists(docx_path): return f"文档文件不存在: {docx_path}" doc = Document(docx_path) paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] # 提取表格内容 table_content = [] for table in doc.tables: for row in table.rows: row_data = [cell.text.strip() for cell in row.cells] table_content.append(" | ".join(row_data)) # 合并内容 all_content = [] if paragraphs: all_content.extend(paragraphs) if table_content: all_content.append("表格内容:") all_content.extend(table_content) return "\n".join(all_content) if all_content else "文档无有效内容" except Exception as e: return f"文档解析失败: {str(e)}" @mcp.tool() async def preprocess_description_field( data_file_path: str = "local_data/msg_from_fetcher.json", output_file_path: str = "local_data/msg_from_fetcher.json", use_api: bool = False, process_documents: bool = False, process_images: bool = False ) -> str: """ 预处理 TAPD 数据的 description 字段 参数: data_file_path (str): 输入数据文件路径 output_file_path (str): 输出文件路径 use_api (bool): 是否使用 DeepSeek API 进行内容复述 process_documents (bool): 是否处理腾讯文档链接(预留功能) process_images (bool): 是否处理图片内容(预留功能) 返回: str: 处理结果的 JSON 字符串 """ try: # 读取数据文件 file_manager = get_file_manager() if not os.path.exists(data_file_path): return json.dumps({"status": "error", "message": f"数据文件不存在: {data_file_path}"}, ensure_ascii=False) data = file_manager.load_tapd_data(data_file_path) processed_count = 0 api_call_count = 0 error_count = 0 results = {"stories": [], "bugs": []} # 初始化 HTTP 会话 async with aiohttp.ClientSession() as session: # 处理需求数据 if 'stories' in data: for story in data['stories']: try: original_desc = story.get('description', '') if original_desc: # 1. 清理 HTML 样式 cleaned_content = clean_html_styles(original_desc) # 2. 提取文档链接和图片路径 doc_links = extract_document_links(original_desc) img_paths = extract_image_paths(original_desc) # 3. 使用 API 进行内容复述 if use_api and cleaned_content.strip(): try: processed_content = await call_deepseek_api(cleaned_content, session) api_call_count += 1 except Exception as e: processed_content = cleaned_content error_count += 1 print(f"API调用失败,使用清理后的内容: {str(e)}") else: processed_content = cleaned_content # 4. 处理文档内容(预留功能) if process_documents and doc_links: for link in doc_links: processed_content += f"\n\n腾讯文档链接: {link}" # TODO: 实现文档下载和内容提取 # 5. 处理图片内容(预留功能) if process_images and img_paths: for img_path in img_paths: processed_content += f"\n\n图片路径: {img_path}" # TODO: 实现图片 OCR 识别 # 更新 description 字段 story['description'] = processed_content processed_count += 1 results['stories'].append(story) except Exception as e: print(f"处理需求 {story.get('id', 'unknown')} 时出错: {str(e)}") error_count += 1 results['stories'].append(story) # 处理缺陷数据 if 'bugs' in data: for bug in data['bugs']: try: original_desc = bug.get('description', '') if original_desc: # 与需求处理逻辑相同 cleaned_content = clean_html_styles(original_desc) doc_links = extract_document_links(original_desc) img_paths = extract_image_paths(original_desc) if use_api and cleaned_content.strip(): try: processed_content = await call_deepseek_api(cleaned_content, session) api_call_count += 1 except Exception as e: processed_content = cleaned_content error_count += 1 print(f"API调用失败,使用清理后的内容: {str(e)}") else: processed_content = cleaned_content if process_documents and doc_links: for link in doc_links: processed_content += f"\n\n腾讯文档链接: {link}" if process_images and img_paths: for img_path in img_paths: processed_content += f"\n\n图片路径: {img_path}" bug['description'] = processed_content processed_count += 1 results['bugs'].append(bug) except Exception as e: print(f"处理缺陷 {bug.get('id', 'unknown')} 时出错: {str(e)}") error_count += 1 results['bugs'].append(bug) # 保存处理后的数据 file_manager.save_json_data(results, output_file_path) # 返回处理结果 result_summary = { "status": "success", "message": "数据预处理完成", "statistics": { "processed_items": processed_count, "api_calls": api_call_count, "errors": error_count, "output_file": output_file_path } } return json.dumps(result_summary, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"数据预处理失败: {str(e)}"}, ensure_ascii=False) @mcp.tool() def preview_description_cleaning( data_file_path: str = "local_data/msg_from_fetcher.json", item_count: int = 3 ) -> str: """ 预览 description 字段清理效果,不实际修改数据 参数: data_file_path (str): 数据文件路径 item_count (int): 预览的条目数量 返回: str: 预览结果的 JSON 字符串 """ try: file_manager = get_file_manager() if not os.path.exists(data_file_path): return json.dumps({"status": "error", "message": f"数据文件不存在: {data_file_path}"}, ensure_ascii=False) data = file_manager.load_tapd_data(data_file_path) preview_results = [] count = 0 # 预览需求数据 if 'stories' in data and count < item_count: for story in data['stories']: if count >= item_count: break original_desc = story.get('description', '') if original_desc: cleaned_desc = clean_html_styles(original_desc) doc_links = extract_document_links(original_desc) img_paths = extract_image_paths(original_desc) preview_results.append({ "id": story.get('id', 'unknown'), "type": "story", "name": story.get('name', ''), "original_length": len(original_desc), "cleaned_length": len(cleaned_desc), "original_preview": original_desc[:200] + "..." if len(original_desc) > 200 else original_desc, "cleaned_preview": cleaned_desc[:200] + "..." if len(cleaned_desc) > 200 else cleaned_desc, "document_links": doc_links, "image_paths": img_paths }) count += 1 # 预览缺陷数据 if 'bugs' in data and count < item_count: for bug in data['bugs']: if count >= item_count: break original_desc = bug.get('description', '') if original_desc: cleaned_desc = clean_html_styles(original_desc) doc_links = extract_document_links(original_desc) img_paths = extract_image_paths(original_desc) preview_results.append({ "id": bug.get('id', 'unknown'), "type": "bug", "title": bug.get('title', ''), "original_length": len(original_desc), "cleaned_length": len(cleaned_desc), "original_preview": original_desc[:200] + "..." if len(original_desc) > 200 else original_desc, "cleaned_preview": cleaned_desc[:200] + "..." if len(cleaned_desc) > 200 else cleaned_desc, "document_links": doc_links, "image_paths": img_paths }) count += 1 return json.dumps({ "status": "success", "preview_count": len(preview_results), "results": preview_results }, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"预览失败: {str(e)}"}, ensure_ascii=False) if __name__ == "__main__": # 测试代码 async def test(): result = await preprocess_description_field( data_file_path="local_data/msg_from_fetcher.json", output_file_path="local_data/test_preprocessed.json", use_api=False # 测试时不使用API ) print(result) # 运行测试 # asyncio.run(test()) # 预览清理效果 preview_result = preview_description_cleaning() print("预览结果:") print(preview_result)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/OneCuriousLearner/MCPAgentRE'

If you have feedback or need assistance with the MCP directory API, please join our Discord server