get_current_config
Retrieve current system configuration settings for TrendRadar's crawler, push notifications, keywords, or weight parameters to monitor and adjust data collection and analysis operations.
Instructions
获取当前系统配置
Args: section: 配置节,可选值: - "all": 所有配置(默认) - "crawler": 爬虫配置 - "push": 推送配置 - "keywords": 关键词配置 - "weights": 权重配置
Returns: JSON格式的配置信息
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| section | No | all |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The actual business logic implementation for getting current system configuration.
def get_current_config(self, section: str = "all") -> Dict: """ 获取当前系统配置 Args: section: 配置节 - all/crawler/push/keywords/weights Returns: 配置字典 Raises: FileParseError: 配置文件解析错误 """ # 尝试从缓存获取 cache_key = f"config:{section}" cached = self.cache.get(cache_key, ttl=3600) # 1小时缓存 if cached: return cached # 解析配置文件 config_data = self.parser.parse_yaml_config() word_groups = self.parser.parse_frequency_words() # 根据section返回对应配置 if section == "all" or section == "crawler": crawler_config = { "enable_crawler": config_data.get("crawler", {}).get("enable_crawler", True), "use_proxy": config_data.get("crawler", {}).get("use_proxy", False), "request_interval": config_data.get("crawler", {}).get("request_interval", 1), "retry_times": 3, "platforms": [p["id"] for p in config_data.get("platforms", [])] } if section == "all" or section == "push": push_config = { "enable_notification": config_data.get("notification", {}).get("enable_notification", True), "enabled_channels": [], "message_batch_size": config_data.get("notification", {}).get("message_batch_size", 20), "push_window": config_data.get("notification", {}).get("push_window", {}) } # 检测已配置的通知渠道 webhooks = config_data.get("notification", {}).get("webhooks", {}) if webhooks.get("feishu_url"): push_config["enabled_channels"].append("feishu") if webhooks.get("dingtalk_url"): push_config["enabled_channels"].append("dingtalk") if webhooks.get("wework_url"): push_config["enabled_channels"].append("wework") if section == "all" or section == "keywords": keywords_config = { "word_groups": word_groups, "total_groups": len(word_groups) } if section == "all" or section == "weights": weights_config = { "rank_weight": config_data.get("weight", {}).get("rank_weight", 0.6), "frequency_weight": config_data.get("weight", {}).get("frequency_weight", 0.3), "hotness_weight": config_data.get("weight", {}).get("hotness_weight", 0.1) } # 组装结果 if section == "all": result = { "crawler": crawler_config, "push": push_config, "keywords": keywords_config, "weights": weights_config } elif section == "crawler": result = crawler_config elif section == "push": result = push_config elif section == "keywords": result = keywords_config elif section == "weights": result = weights_config else: result = {} # 缓存结果 self.cache.set(cache_key, result) return result - mcp_server/tools/config_mgmt.py:26-58 (handler)The tool wrapper implementation that calls DataService and handles exceptions/responses.
def get_current_config(self, section: Optional[str] = None) -> Dict: """ 获取当前系统配置 Args: section: 配置节 - all/crawler/push/keywords/weights,默认all Returns: 配置字典 Example: >>> tools = ConfigManagementTools() >>> result = tools.get_current_config(section="crawler") >>> print(result['crawler']['platforms']) """ try: # 参数验证 section = validate_config_section(section) # 获取配置 config = self.data_service.get_current_config(section=section) return { "config": config, "section": section, "success": True } except MCPError as e: return { "success": False, "error": e.to_dict() }