Skip to main content
Glama

SOAR MCP Server

by flagify-com
config_manager.py10.5 kB
#!/usr/bin/env python3 """ SOAR MCP 配置管理器 统一管理系统配置,支持缓存和实时更新 """ import time from typing import Any, Dict, List, Optional from threading import Lock from models import db_manager, SystemConfigData from logger_config import logger class ConfigManager: """系统配置管理器""" def __init__(self): self._config_cache = {} self._cache_timestamp = 0 self._cache_ttl = 60 # 缓存有效期60秒 self._lock = Lock() def _refresh_cache(self, force: bool = False): """刷新配置缓存""" current_time = time.time() with self._lock: if not force and (current_time - self._cache_timestamp < self._cache_ttl): return # 缓存仍有效 try: self._config_cache = db_manager.get_all_system_configs() self._cache_timestamp = current_time logger.info("配置缓存已刷新") except Exception as e: logger.error(f"刷新配置缓存失败: {e}") def get(self, key: str, default_value: Any = None) -> Any: """获取配置值""" self._refresh_cache() with self._lock: return self._config_cache.get(key, default_value) def set(self, key: str, value: Any, description: str = None) -> bool: """设置配置值""" try: success = db_manager.set_system_config(key, value, description) if success: # 立即刷新缓存 self._refresh_cache(force=True) return success except Exception as e: logger.error(f"设置配置失败 {key}: {e}") return False def get_all(self) -> Dict[str, Any]: """获取所有配置""" self._refresh_cache() with self._lock: return self._config_cache.copy() def get_soar_config(self) -> SystemConfigData: """获取SOAR相关配置""" config = self.get_all() return SystemConfigData( soar_api_url=config.get("soar_api_url", "https://hg.wuzhi-ai.com"), soar_api_token=config.get("soar_api_token", ""), soar_timeout=config.get("soar_timeout", 30), sync_interval=config.get("sync_interval", 14400), soar_labels=config.get("soar_labels", ["MCP"]) ) def update_soar_config(self, config_data: SystemConfigData) -> bool: """更新SOAR配置""" try: success = True success &= self.set("soar_api_url", config_data.soar_api_url, "SOAR服务器API地址") success &= self.set("soar_api_token", config_data.soar_api_token, "SOAR API Token") success &= self.set("soar_timeout", config_data.soar_timeout, "API超时时间(秒)") success &= self.set("sync_interval", config_data.sync_interval, "同步周期(秒)") success &= self.set("soar_labels", config_data.soar_labels, "剧本抓取标签列表") if success: logger.info("SOAR配置更新成功") else: logger.error("SOAR配置更新失败") return success except Exception as e: logger.error(f"更新SOAR配置失败: {e}") return False def get_api_url(self) -> str: """获取API地址""" return self.get("soar_api_url", "https://hg.wuzhi-ai.com") def get_api_token(self) -> str: """获取API Token""" return self.get("soar_api_token", "") def get_timeout(self) -> int: """获取超时时间""" return self.get("soar_timeout", 30) def get_labels(self) -> List[str]: """获取标签列表""" return self.get("soar_labels", ["MCP"]) def get_ssl_verify(self) -> bool: """获取SSL验证设置""" value = self.get("ssl_verify", True) # 处理字符串类型的布尔值 if isinstance(value, str): return value.lower() in ("true", "1", "yes", "on") return bool(value) def validate_config(self) -> Dict[str, Any]: """验证配置完整性""" config = self.get_soar_config() issues = [] # 检查必填项 if not config.soar_api_url: issues.append("API地址不能为空") if not config.soar_api_token: issues.append("API Token不能为空") if config.soar_timeout <= 0: issues.append("超时时间必须大于0") if config.sync_interval <= 0: issues.append("同步周期必须大于0") # 标签列表可以为空(表示同步所有标签) # URL格式检查 if config.soar_api_url and not (config.soar_api_url.startswith('http://') or config.soar_api_url.startswith('https://')): issues.append("API地址格式不正确,需要以http://或https://开头") return { "valid": len(issues) == 0, "issues": issues, "config": config.model_dump() } def is_first_run(self) -> bool: """ 检测是否为首次运行 当SOAR API地址和Token都为空时认为是首次运行 """ try: config = self.get_soar_config() # 检查关键配置是否为空 api_url_empty = not config.soar_api_url or config.soar_api_url.strip() == "" token_empty = not config.soar_api_token or config.soar_api_token.strip() == "" # 如果API地址和Token都为空,则认为是首次运行 is_first = api_url_empty and token_empty if is_first: logger.info("检测到首次运行:SOAR配置未完成") else: logger.debug("检测到已配置运行:SOAR配置已存在") return is_first except Exception as e: logger.warning(f"检测首次运行状态失败: {e}") return True # 异常时默认认为是首次运行,确保安全启动 def get_missing_required_configs(self) -> List[str]: """ 获取缺失的必需配置项列表 """ try: config = self.get_soar_config() missing = [] if not config.soar_api_url or config.soar_api_url.strip() == "": missing.append("SOAR API地址") if not config.soar_api_token or config.soar_api_token.strip() == "": missing.append("SOAR API Token") return missing except Exception as e: logger.error(f"检查配置完整性失败: {e}") return ["配置检查异常"] def test_connection(self) -> Dict[str, Any]: """测试API连接""" try: import requests config = self.get_soar_config() headers = { "hg-token": config.soar_api_token, "Content-Type": "application/json" } # 测试连接 test_url = f"{config.soar_api_url.rstrip('/')}/odp/core/v1/api/playbook/findAll" test_data = { "publishStatus": "ONLINE" } # 只有当标签列表不为空时才添加标签过滤(测试时使用第一个标签) if config.soar_labels: test_data["labelList"] = [{"name": label} for label in config.soar_labels[:1]] response = requests.post( test_url, json=test_data, headers=headers, timeout=config.soar_timeout, verify=self.get_ssl_verify() ) if response.status_code == 200: data = response.json() # 检查SOAR API的业务状态码 if data.get("code") == 200: result_data = data.get("result", []) return { "success": True, "message": "连接成功", "response_code": response.status_code, "api_code": data.get("code"), "data_count": len(result_data) if isinstance(result_data, list) else 0 } else: return { "success": False, "message": f"API业务错误: {data.get('message', '未知错误')}", "response_code": response.status_code, "api_code": data.get("code"), "error": str(data) } else: return { "success": False, "message": f"API返回错误: {response.status_code}", "response_code": response.status_code, "error": response.text[:200] } except requests.exceptions.Timeout: return { "success": False, "message": "连接超时", "error": "请检查网络连接和超时设置" } except requests.exceptions.ConnectionError: return { "success": False, "message": "连接失败", "error": "请检查API地址是否正确" } except Exception as e: return { "success": False, "message": "测试连接时发生错误", "error": str(e) } def init(self): """初始化配置管理器""" try: # 初始化数据库默认配置 db_manager.init_default_configs() # 刷新缓存 self._refresh_cache(force=True) logger.info("配置管理器初始化完成") # 记录当前配置状态 validation_result = self.validate_config() if validation_result["valid"]: logger.info("系统配置验证通过") else: logger.warning(f"系统配置存在问题: {', '.join(validation_result['issues'])}") except Exception as e: logger.error(f"配置管理器初始化失败: {e}") # 全局配置管理器实例 config_manager = ConfigManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flagify-com/soar-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server