Skip to main content
Glama
coldfire-x

Owner avatar beijing-car-quota-draw

mcp_server.py46.8 kB
""" MCP server implementation for Beijing car quota lottery results. Uses fastapi_mcp to expose FastAPI endpoints as MCP tools for AI agents. """ import asyncio import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from fastapi import FastAPI, HTTPException, Query from fastapi_mcp import FastApiMCP from pydantic import BaseModel, Field import uvicorn import aiofiles import re import json from ..models.quota_result import QuotaResult, QuotaType from ..parsers.pdf_parser import PDFParser from ..scrapers.web_scraper import WebScraper from ..scrapers.policy_scraper import PolicyScraper from ..storage.data_store import DataStore from ..analysis.analyzer import LotteryAnalyzer from ..utils.celebration_generator import CelebrationGenerator logger = logging.getLogger(__name__) class QuotaSearchRequest(BaseModel): """Request model for quota searches.""" application_code: str = Field(..., description="申请编码") class IDSearchRequest(BaseModel): """Request model for ID number searches.""" id_prefix: Optional[str] = Field(None, description="身份证号前6位 (可选)") id_suffix: Optional[str] = Field(None, description="身份证号后4位 (可选)") class CelebrationRequest(BaseModel): """Request model for celebration page generation.""" application_code: str = Field(..., description="申请编码") name: Optional[str] = Field("恭喜您", description="获奖者姓名 (可选)") save_to_file: Optional[bool] = Field(False, description="是否保存为文件 (可选)") class PolicyExplanationRequest(BaseModel): """Request model for policy explanations.""" question: str = Field(..., description="关于小客车指标政策的问题") detail_level: Optional[str] = Field("medium", description="详细程度: basic, medium, detailed") category: Optional[str] = Field(None, description="政策类别: 个人申请, 家庭申请, 单位申请, 新能源, 更新指标, 申请材料, 等等") class MCPServer: """MCP server for Beijing car quota lottery results.""" def __init__( self, data_dir: Path = Path("data"), downloads_dir: Path = Path("downloads"), host: str = "127.0.0.1", port: int = 8000 ): """ Initialize the MCP server. Args: data_dir: Directory for storing processed data downloads_dir: Directory for downloaded PDF files host: Server host port: Server port """ self.data_dir = data_dir self.downloads_dir = downloads_dir self.host = host self.port = port # Initialize components self.data_store = DataStore(data_dir) self.pdf_parser = PDFParser() self.web_scraper = WebScraper(downloads_dir) self.policy_scraper = PolicyScraper() self.analyzer = LotteryAnalyzer(self.data_store) self.celebration_generator = CelebrationGenerator() # Create FastAPI app self.app = FastAPI( title="Beijing Car Quota Lottery MCP Server", description="MCP server for querying Beijing car quota lottery results", version="0.1.0" ) # Setup routes self._setup_routes() # Create MCP server self.mcp = FastApiMCP( self.app, name="Beijing Car Quota Lottery API", description="API for querying Beijing car quota lottery results" ) # Mount MCP server self.mcp.mount() def _setup_routes(self): """Setup FastAPI routes that will be exposed as MCP tools.""" @self.app.get("/health", operation_id="health_check") async def health_check(): """Check server health and data status.""" stats = await self.data_store.get_statistics() return { "status": "healthy", "server": "Beijing Car Quota Lottery MCP Server", "version": "0.1.0", "data_stats": stats } @self.app.post("/search/application-code", operation_id="search_by_application_code") async def search_by_application_code(request: QuotaSearchRequest): """ Search for quota lottery results by application code (申请编码). This tool searches across all loaded PDF files to find entries matching the provided application code. """ try: results = await self.data_store.find_by_application_code(request.application_code) if not results: return { "found": False, "message": f"No results found for application code: {request.application_code}", "application_code": request.application_code } # Check if this might be a winner (not just waiting list) winner_results = [r for r in results if r.get("type") != "waiting_list"] is_potential_winner = len(winner_results) > 0 # Also check for high-priority waiting list (early sequence numbers) if not is_potential_winner: high_priority_results = [r for r in results if r.get("sequence_number") and r.get("sequence_number") <= 50000] is_potential_winner = len(high_priority_results) > 0 response = { "found": True, "application_code": request.application_code, "results": results, "count": len(results) } # Add celebration suggestion for potential winners if is_potential_winner: response["winner_detected"] = True response["celebration_suggestion"] = { "message": "🎉 Congratulations! You appear to have won the lottery!", "action": "Generate a celebration page to share the good news!", "endpoint": "/celebration/generate", "parameters": { "application_code": request.application_code, "name": "您的姓名", # User can customize "save_to_file": True } } else: response["winner_detected"] = False response["status_info"] = "Currently on waiting list - keep checking for updates!" return response except Exception as e: logger.error(f"Error searching by application code {request.application_code}: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/search/id-number", operation_id="search_by_id_number") async def search_by_id_number(request: IDSearchRequest): """ Search for quota lottery results by partial ID number. This tool searches for entries using the first 6 and/or last 4 digits of an ID number (身份证号). You can provide either: - Only id_prefix (first 6 digits) - Only id_suffix (last 4 digits) - Both id_prefix and id_suffix for exact match """ try: # Validation: at least one field must be provided if not request.id_prefix and not request.id_suffix: raise HTTPException( status_code=400, detail="At least one of id_prefix or id_suffix must be provided" ) # Validate field formats if provided if request.id_prefix and len(request.id_prefix) != 6: raise HTTPException( status_code=400, detail="id_prefix must be exactly 6 digits" ) if request.id_suffix and len(request.id_suffix) != 4: raise HTTPException( status_code=400, detail="id_suffix must be exactly 4 digits" ) # Search using the new flexible method results = await self.data_store.find_by_id_prefix_or_suffix( request.id_prefix, request.id_suffix ) # Create search pattern description if request.id_prefix and request.id_suffix: search_pattern = f"{request.id_prefix}****{request.id_suffix}" elif request.id_prefix: search_pattern = f"{request.id_prefix}****XXXX" else: search_pattern = f"XXXXXX****{request.id_suffix}" if not results: return { "found": False, "message": f"No results found for ID pattern: {search_pattern}", "search_pattern": search_pattern, "search_type": "prefix_and_suffix" if (request.id_prefix and request.id_suffix) else "prefix_only" if request.id_prefix else "suffix_only" } # Check if any results indicate winners winner_results = [r for r in results if r.get("type") != "waiting_list"] high_priority_results = [r for r in results if r.get("sequence_number") and r.get("sequence_number") <= 50000] is_potential_winner = len(winner_results) > 0 or len(high_priority_results) > 0 response = { "found": True, "search_pattern": search_pattern, "search_type": "prefix_and_suffix" if (request.id_prefix and request.id_suffix) else "prefix_only" if request.id_prefix else "suffix_only", "results": results, "count": len(results) } # Add celebration suggestion for potential winners if is_potential_winner and len(results) == 1: # Only suggest for single match result = results[0] response["winner_detected"] = True response["celebration_suggestion"] = { "message": "🎉 Congratulations! You appear to have won the lottery!", "action": "Generate a celebration page to share the good news!", "endpoint": "/celebration/generate", "parameters": { "application_code": result.get("application_code", ""), "name": result.get("name", "您的姓名"), "save_to_file": True } } elif is_potential_winner: response["winner_detected"] = True response["multiple_winners"] = True response["celebration_note"] = "Multiple winning entries found! You can generate celebration pages for each application code." else: response["winner_detected"] = False response["status_info"] = "Currently on waiting list - keep checking for updates!" return response except HTTPException: raise except Exception as e: logger.error(f"Error searching by ID {request.id_prefix}/{request.id_suffix}: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/celebration/generate", operation_id="generate_celebration_page") async def generate_celebration_page(request: CelebrationRequest): """ Generate a celebration HTML page for lottery winners. This tool creates a beautiful, animated HTML page with splash effects to celebrate users who have won the car quota lottery. The page includes fireworks, confetti, sparkles, and sharing functionality. """ try: # First, verify that the application code actually won results = await self.data_store.find_by_application_code(request.application_code) if not results: return { "success": False, "message": f"No lottery results found for application code: {request.application_code}", "celebration_generated": False } # Check if the person actually won (not just in waiting list) winner_results = [r for r in results if r.get("type") != "waiting_list"] if not winner_results: # Check if they're in a prioritized waiting list (which might indicate winning) prioritized_results = [r for r in results if r.get("sequence_number") and r.get("sequence_number") <= 50000] if not prioritized_results: return { "success": False, "message": "Application code found but appears to be on waiting list only. Celebration pages are for confirmed winners.", "celebration_generated": False, "waiting_list_info": results[:3] # Show first 3 waiting list entries } else: # Use prioritized waiting list results as potential winners winner_results = prioritized_results # Prepare winner information winner_info = { "application_code": request.application_code, "name": request.name or "恭喜您", "id_info": "" # We don't have ID info from application code search } # Generate celebration page save_path = None if request.save_to_file: celebrations_dir = self.data_dir / "celebrations" celebrations_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") save_path = celebrations_dir / f"celebration_{request.application_code}_{timestamp}.html" html_content = self.celebration_generator.generate_celebration_page( winner_info=winner_info, lottery_results=winner_results, save_path=save_path ) # Create sharing links sharing_links = self.celebration_generator.create_sharing_links(winner_info) response = { "success": True, "message": "🎉 Celebration page generated successfully!", "celebration_generated": True, "application_code": request.application_code, "winner_name": winner_info["name"], "lottery_results_count": len(winner_results), "sharing_links": sharing_links } if request.save_to_file and save_path: response["saved_file"] = str(save_path) else: # Return HTML content directly if not saving to file response["html_content"] = html_content return response except Exception as e: logger.error(f"Error generating celebration page for {request.application_code}: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/data/statistics", operation_id="get_data_statistics") async def get_data_statistics(): """ Get statistics about the loaded quota lottery data. Returns information about the number of files loaded, total entries, last update time, and breakdown by data type. """ try: stats = await self.data_store.get_statistics() return stats except Exception as e: logger.error(f"Error getting statistics: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/data/refresh", operation_id="refresh_data") async def refresh_data(max_pages: int = Query(5, description="Maximum pages to scrape")): """ Refresh quota lottery data by scraping the latest PDFs. This tool scrapes the Beijing Transportation Commission website for new PDF files and processes them to update the searchable data. """ try: logger.info("Starting data refresh...") # Use WebScraper as context manager to ensure proper resource cleanup async with WebScraper(self.downloads_dir) as scraper: # Scrape and download new PDFs downloaded_files = await scraper.scrape_and_download(max_pages) if not downloaded_files: return { "success": True, "message": "No new PDF files found", "files_processed": 0 } # Process downloaded PDFs processed_count = 0 errors = [] for file_info in downloaded_files: try: pdf_path = self.downloads_dir / file_info["filename"] source_url = file_info.get("source_page", "") # Parse PDF result = self.pdf_parser.parse_pdf(pdf_path, source_url) # Validate parsed data validation_report = self.pdf_parser.validate_parsed_data(result) if validation_report["is_valid"]: # Add to data store await self.data_store.add_quota_result(result) processed_count += 1 logger.info(f"Successfully processed {file_info['filename']}") else: errors.append(f"Validation failed for {file_info['filename']}: {validation_report['errors']}") except Exception as e: error_msg = f"Error processing {file_info['filename']}: {str(e)}" errors.append(error_msg) logger.error(error_msg) return { "success": True, "message": f"Data refresh completed", "files_downloaded": len(downloaded_files), "files_processed": processed_count, "errors": errors } except Exception as e: logger.error(f"Error during data refresh: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/data/scrape-policy", operation_id="scrape_policy_documents") async def scrape_policy_documents(): """ Scrape policy documents from the Beijing Transportation Commission website. This tool scrapes policy documents from the Beijing Transportation Commission website and processes them to update the searchable data. """ try: logger.info("Starting policy document scraping...") # Scrape policy documents scraped_documents = await self.policy_scraper.scrape_policy_content() return { "success": True, "message": f"Policy document scraping completed", "total_documents": len(scraped_documents), "documents": scraped_documents } except Exception as e: logger.error(f"Error during policy scraping: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/policy/explain", operation_id="explain_car_quota_policy") async def explain_car_quota_policy(request: PolicyExplanationRequest): """ Explain Beijing car quota policy based on the knowledge base. This tool analyzes the user's question about car quota policy and provides detailed explanations based on the scraped policy documents. It can answer questions about application procedures, requirements, materials needed, timeframes, and specific policy details. """ try: logger.info(f"Processing policy question: {request.question}") # Search relevant policy documents relevant_docs = await self._find_relevant_policy_documents(request.question, request.category) if not relevant_docs: return { "question": request.question, "answer": "很抱歉,在现有的政策知识库中没有找到相关信息。建议您访问北京市交通委员会官方网站或致电12328咨询热线获取最新政策信息。", "confidence": "low", "sources": [], "suggestions": [ "请检查问题的关键词是否正确", "尝试使用更具体的政策术语", "访问官方网站: https://xkczb.jtw.beijing.gov.cn", "致电咨询热线: 12328" ] } # Generate explanation based on relevant documents explanation = await self._generate_policy_explanation( request.question, relevant_docs, request.detail_level ) return { "question": request.question, "answer": explanation["answer"], "confidence": explanation["confidence"], "sources": explanation["sources"], "related_topics": explanation["related_topics"], "actionable_steps": explanation.get("actionable_steps", []), "important_notes": explanation.get("important_notes", []) } except Exception as e: logger.error(f"Error explaining policy: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/data/files", operation_id="list_data_files") async def list_data_files(): """ List all loaded quota lottery data files. Returns information about each PDF file that has been processed and is available for searching. """ try: stats = await self.data_store.get_statistics() return { "total_files": stats["total_files"], "files": stats["files"] } except Exception as e: logger.error(f"Error listing files: {e}") raise HTTPException(status_code=500, detail=str(e)) # Analysis endpoints @self.app.get("/analysis/comprehensive", operation_id="get_comprehensive_analysis") async def get_comprehensive_analysis(): """ Get comprehensive lottery analysis including success rates, waiting times, and trends. This tool provides a complete analysis of Beijing car quota lottery data, including success rates by year, estimated waiting times, trend analysis, and personalized recommendations. """ try: analysis = await self.analyzer.get_comprehensive_analysis() return analysis except Exception as e: logger.error(f"Error getting comprehensive analysis: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/analysis/success-rates", operation_id="get_success_rates_analysis") async def get_success_rates_analysis(): """ Get lottery success rate analysis by year. This tool calculates and returns the success rates for each year, including estimated number of applicants, winners, and rejection rates. """ try: analysis = await self.analyzer.get_success_rates() return analysis except Exception as e: logger.error(f"Error getting success rates analysis: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/analysis/waiting-time", operation_id="get_waiting_time_analysis") async def get_waiting_time_analysis(): """ Get waiting time analysis for queue participants. This tool estimates average waiting times for people in the queue based on historical data and annual quotas. """ try: analysis = await self.analyzer.get_waiting_time_analysis() return analysis except Exception as e: logger.error(f"Error getting waiting time analysis: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/analysis/trends", operation_id="get_trend_analysis") async def get_trend_analysis(): """ Get trend analysis comparing different years. This tool analyzes trends in success rates, competition levels, and changes in the lottery system over time. """ try: analysis = await self.analyzer.get_trend_analysis() return analysis except Exception as e: logger.error(f"Error getting trend analysis: {e}") raise HTTPException(status_code=500, detail=str(e)) async def _find_relevant_policy_documents(self, question: str, category: Optional[str] = None) -> List[Dict[str, Any]]: """Find policy documents relevant to the user's question.""" try: policies_dir = self.data_dir / "policies" if not policies_dir.exists(): logger.warning("No policies directory found") return [] # Get all policy files policy_files = list(policies_dir.glob("*.md")) if not policy_files: logger.warning("No policy documents found") return [] # Keywords from the question question_lower = question.lower() question_keywords = self._extract_keywords_from_question(question_lower) relevant_docs = [] for policy_file in policy_files: try: async with aiofiles.open(policy_file, 'r', encoding='utf-8') as f: content = await f.read() # Calculate relevance score relevance_score = self._calculate_document_relevance( content, question_keywords, question_lower, category ) if relevance_score > 0: # Extract key sections related to the question relevant_sections = self._extract_relevant_sections(content, question_keywords) relevant_docs.append({ "filename": policy_file.name, "path": str(policy_file), "relevance_score": relevance_score, "content": content, "relevant_sections": relevant_sections }) except Exception as e: logger.error(f"Error reading policy file {policy_file}: {e}") continue # Sort by relevance score and return top results relevant_docs.sort(key=lambda x: x["relevance_score"], reverse=True) return relevant_docs[:5] # Return top 5 most relevant documents except Exception as e: logger.error(f"Error finding relevant policy documents: {e}") return [] def _extract_keywords_from_question(self, question: str) -> List[str]: """Extract keywords from the user's question.""" # Common policy-related keywords policy_keywords = [ # Application types "个人申请", "家庭申请", "单位申请", "个人", "家庭", "单位", "企业", # Vehicle types "新能源", "普通", "燃油车", "电动车", "纯电动", # Process terms "申请", "摇号", "轮候", "更新", "复核", "审核", "材料", "条件", "流程", "程序", # Requirements "资格", "条件", "要求", "证件", "证明", "驾驶证", "身份证", "居住证", "工作居住证", # Time-related "时间", "期限", "有效期", "申报期", "截止", "多久", "什么时候", "几月", # Results and status "结果", "中签", "获得", "成功", "失败", "等待", "排队", # Documents and procedures "材料", "文件", "手续", "窗口", "办理", "提交", "打印", "下载", # Specific processes "转让", "变更", "过户", "注销", "登记", "被盗", "抢夺" ] # Find keywords present in the question found_keywords = [] for keyword in policy_keywords: if keyword in question: found_keywords.append(keyword) # Also extract potential question words question_words = ["如何", "怎么", "什么", "哪些", "多少", "几", "是否", "能否", "可以", "需要"] for word in question_words: if word in question: found_keywords.append(word) return found_keywords def _calculate_document_relevance(self, content: str, keywords: List[str], question: str, category: Optional[str]) -> float: """Calculate how relevant a document is to the user's question.""" score = 0.0 content_lower = content.lower() # Check for direct keyword matches for keyword in keywords: count = content_lower.count(keyword) score += count * 2 # Each keyword occurrence adds 2 points # Bonus for category match in filename or content if category: category_lower = category.lower() if category_lower in content_lower: score += 10 # Check for related terms in content related_terms = { "申请": ["提交", "填报", "办理", "登记"], "材料": ["证件", "文件", "证明", "资料"], "条件": ["要求", "资格", "规定"], "流程": ["程序", "步骤", "操作", "办理"], "时间": ["期限", "截止", "有效期", "申报期"] } for main_term, related in related_terms.items(): if main_term in question: for related_term in related: score += content_lower.count(related_term) * 1 # Penalty for very short documents (likely not comprehensive) if len(content) < 1000: score *= 0.5 return score def _extract_relevant_sections(self, content: str, keywords: List[str]) -> List[str]: """Extract sections of the document that are most relevant to the question.""" relevant_sections = [] # Split content into paragraphs paragraphs = content.split('\n\n') for paragraph in paragraphs: paragraph_lower = paragraph.lower() # Count keyword matches in this paragraph keyword_count = sum(1 for keyword in keywords if keyword in paragraph_lower) # If paragraph has relevant keywords and is substantial, include it if keyword_count >= 1 and len(paragraph.strip()) > 100: # Clean up the paragraph cleaned = paragraph.strip() if cleaned and not cleaned.startswith('*') and not cleaned.startswith('---'): relevant_sections.append(cleaned) # Return top 5 most relevant sections return relevant_sections[:5] async def _generate_policy_explanation(self, question: str, relevant_docs: List[Dict], detail_level: str) -> Dict[str, Any]: """Generate a comprehensive explanation based on relevant documents.""" try: # Combine relevant sections from all documents all_relevant_content = [] sources = [] for doc in relevant_docs: sources.append({ "filename": doc["filename"], "relevance_score": doc["relevance_score"] }) # Add the most relevant sections for section in doc["relevant_sections"]: all_relevant_content.append(section) if not all_relevant_content: return { "answer": "在现有政策文档中未找到相关信息。", "confidence": "low", "sources": sources, "related_topics": [] } # Generate answer based on detail level answer = self._construct_answer(question, all_relevant_content, detail_level) # Extract actionable steps and important notes actionable_steps = self._extract_actionable_steps(all_relevant_content) important_notes = self._extract_important_notes(all_relevant_content) related_topics = self._suggest_related_topics(question, relevant_docs) # Determine confidence level confidence = "high" if len(relevant_docs) >= 2 else "medium" if len(relevant_docs) == 1 else "low" return { "answer": answer, "confidence": confidence, "sources": sources, "related_topics": related_topics, "actionable_steps": actionable_steps, "important_notes": important_notes } except Exception as e: logger.error(f"Error generating policy explanation: {e}") return { "answer": "生成解释时发生错误,请稍后重试。", "confidence": "low", "sources": [], "related_topics": [] } def _construct_answer(self, question: str, content_sections: List[str], detail_level: str) -> str: """Construct a comprehensive answer from relevant content sections.""" # Basic structure for the answer answer_parts = [] # Identify the type of question question_lower = question.lower() if any(word in question_lower for word in ["如何", "怎么", "怎样"]): answer_parts.append("根据政策规定,具体步骤如下:\n") elif any(word in question_lower for word in ["什么", "哪些"]): answer_parts.append("根据政策文件,相关信息如下:\n") elif any(word in question_lower for word in ["需要", "要", "应该"]): answer_parts.append("根据政策要求:\n") else: answer_parts.append("根据相关政策规定:\n") # Process content based on detail level if detail_level == "basic": # Provide a concise summary key_points = [] for section in content_sections[:2]: # Use top 2 sections # Extract key sentences sentences = section.split('。') for sentence in sentences[:2]: # First 2 sentences per section if len(sentence.strip()) > 20: key_points.append(sentence.strip() + '。') answer_parts.extend(key_points[:3]) # Max 3 key points for basic elif detail_level == "detailed": # Provide comprehensive information for i, section in enumerate(content_sections[:4]): # Use top 4 sections if i > 0: answer_parts.append(f"\n{i+1}. ") # Clean and format the section cleaned_section = self._clean_content_section(section) answer_parts.append(cleaned_section) else: # medium (default) # Balanced approach for i, section in enumerate(content_sections[:3]): # Use top 3 sections if i > 0: answer_parts.append(f"\n\n{i+1}. ") # Extract important parts cleaned_section = self._clean_content_section(section) # Limit length for medium detail if len(cleaned_section) > 300: cleaned_section = cleaned_section[:300] + "..." answer_parts.append(cleaned_section) # Add closing note answer_parts.append("\n\n*以上信息基于现有政策文档,具体要求请以官方最新政策为准。如有疑问,请咨询12328热线或访问官方网站。*") return "".join(answer_parts) def _clean_content_section(self, section: str) -> str: """Clean up a content section for presentation.""" # Remove markdown formatting cleaned = re.sub(r'[#*_]', '', section) # Remove URLs and links cleaned = re.sub(r'http[s]?://\S+', '', cleaned) cleaned = re.sub(r'\[([^\]]*)\]\([^\)]*\)', r'\1', cleaned) # Remove extra whitespace cleaned = re.sub(r'\s+', ' ', cleaned) # Remove system notes cleaned = re.sub(r'\*[^*]*系统.*?\*', '', cleaned) return cleaned.strip() def _extract_actionable_steps(self, content_sections: List[str]) -> List[str]: """Extract actionable steps from the content.""" steps = [] # Look for numbered lists, step indicators, etc. step_patterns = [ r'(\d+)\.?\s*([^。]*申请[^。]*)', r'(\d+)\.?\s*([^。]*提交[^。]*)', r'(\d+)\.?\s*([^。]*办理[^。]*)', r'(\d+)\.?\s*([^。]*填报[^。]*)', r'第\s*([一二三四五六七八九十]+)\s*[步条]?\s*[::]?\s*([^。]*)', ] for section in content_sections: for pattern in step_patterns: matches = re.findall(pattern, section) for match in matches: if len(match) >= 2: step_text = match[1].strip() if len(step_text) > 10: # Ensure meaningful content steps.append(step_text) return steps[:5] # Return top 5 steps def _extract_important_notes(self, content_sections: List[str]) -> List[str]: """Extract important notes and warnings from the content.""" notes = [] # Look for important indicators note_patterns = [ r'注意[::]?\s*([^。]*)', r'重要[::]?\s*([^。]*)', r'注[::]?\s*([^。]*)', r'备注[::]?\s*([^。]*)', r'特别提醒[::]?\s*([^。]*)', ] for section in content_sections: for pattern in note_patterns: matches = re.findall(pattern, section) for match in matches: note_text = match.strip() if len(note_text) > 15: # Ensure meaningful content notes.append(note_text) return notes[:3] # Return top 3 notes def _suggest_related_topics(self, question: str, relevant_docs: List[Dict]) -> List[str]: """Suggest related topics based on the question and available documents.""" topics = [] # Analyze question to suggest related topics question_lower = question.lower() # Topic mapping topic_suggestions = { "个人申请": ["家庭申请政策", "申请材料清单", "申请条件说明", "审核流程"], "家庭申请": ["个人申请区别", "家庭积分规则", "申请材料要求", "轮候时间"], "新能源": ["普通指标对比", "轮候排队规则", "车辆类型要求", "更新指标"], "申请材料": ["窗口办理流程", "在线申请步骤", "证件要求", "代办规定"], "摇号": ["中签率分析", "阶梯中签规则", "申请次数统计", "结果查询"], "更新": ["车辆过户手续", "指标有效期", "更新条件", "申请流程"] } # Find matching topics based on question content for key, suggestions in topic_suggestions.items(): if key in question_lower: topics.extend(suggestions) # Also suggest based on available document types doc_types = set() for doc in relevant_docs: filename = doc["filename"] if "个人申请" in filename: doc_types.add("个人申请相关政策") if "家庭申请" in filename: doc_types.add("家庭申请相关政策") if "材料" in filename: doc_types.add("申请材料清单") if "更新" in filename: doc_types.add("更新指标政策") topics.extend(list(doc_types)) # Remove duplicates and limit return list(set(topics))[:5] async def initialize(self): """Initialize the server by loading existing data.""" logger.info("Initializing MCP server...") # Create directories self.data_dir.mkdir(exist_ok=True) self.downloads_dir.mkdir(exist_ok=True) # Load existing data from disk await self.data_store.load_from_disk() # If no data exists, try to load example PDFs if self.data_store.total_entries == 0: await self._load_example_pdfs() logger.info(f"MCP server initialized with {self.data_store.total_entries} total entries") async def _load_example_pdfs(self): """Load example PDF files if they exist.""" examples_dir = Path("examples") if not examples_dir.exists(): logger.info("No examples directory found, skipping example PDF loading") return # Load example PDFs for pdf_file in examples_dir.glob("*.pdf"): try: logger.info(f"Loading example PDF: {pdf_file}") # Parse PDF result = self.pdf_parser.parse_pdf(pdf_file, f"example://{pdf_file.name}") # Validate and add to store validation_report = self.pdf_parser.validate_parsed_data(result) if validation_report["is_valid"]: await self.data_store.add_quota_result(result) logger.info(f"Successfully loaded example PDF: {pdf_file.name}") else: logger.warning(f"Validation failed for {pdf_file.name}: {validation_report['errors']}") except Exception as e: logger.error(f"Error loading example PDF {pdf_file}: {e}") def run(self, **kwargs): """Run the MCP server.""" # Set default uvicorn parameters run_kwargs = { "host": self.host, "port": self.port, "log_level": "info" } run_kwargs.update(kwargs) # Initialize server before running asyncio.run(self.initialize()) logger.info(f"Starting MCP server at http://{self.host}:{self.port}") logger.info(f"MCP endpoint available at http://{self.host}:{self.port}/mcp") logger.info(f"API documentation at http://{self.host}:{self.port}/docs") uvicorn.run(self.app, **run_kwargs) # Convenience function for creating and running the server def create_server(**kwargs) -> MCPServer: """Create an MCP server instance.""" return MCPServer(**kwargs) def run_server(**kwargs): """Create and run an MCP server.""" server = create_server(**kwargs) server.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/coldfire-x/beijing-car-quota-draw-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server