Skip to main content
Glama

QueryNest

by niuzaishu
semantic_storage_factory.py9.56 kB
# -*- coding: utf-8 -*- """ 语义存储工厂 提供统一的语义存储工厂,支持多种存储后端 """ import os import importlib import inspect from typing import Dict, Any, Optional, Type from pathlib import Path import structlog from storage.semantic_storage_interface import ( SemanticStorageInterface, SemanticStorageType, SemanticStorageConfig ) logger = structlog.get_logger(__name__) class SemanticStorageFactory: """语义存储工厂""" _instance = None _storage_implementations = {} @classmethod def get_instance(cls): """获取单例实例""" if cls._instance is None: cls._instance = SemanticStorageFactory() return cls._instance def __init__(self): """初始化存储工厂""" self._discover_storage_implementations() def _discover_storage_implementations(self): """发现所有存储实现类""" logger.info("开始发现语义存储实现") # 注册内置存储实现 self._register_builtin_implementations() # 尝试导入其他可能的存储实现 self._try_import_storage("semantic_mongodb_storage", SemanticStorageType.MONGODB) self._try_import_storage("semantic_redis_storage", SemanticStorageType.REDIS) self._try_import_storage("semantic_elasticsearch_storage", SemanticStorageType.ELASTICSEARCH) self._try_import_storage("semantic_memory_storage", SemanticStorageType.MEMORY) logger.info("语义存储实现发现完成", implementations=list(self._storage_implementations.keys())) def _register_builtin_implementations(self): """注册内置存储实现""" try: # 注册文件存储实现 from storage.enhanced_local_semantic_storage import EnhancedLocalSemanticStorage self._storage_implementations[SemanticStorageType.FILE] = EnhancedLocalSemanticStorage logger.info("已注册文件存储实现", class_name="EnhancedLocalSemanticStorage") except ImportError: # 回退到原有的本地存储 try: from storage.local_semantic_storage import LocalSemanticStorage self._storage_implementations[SemanticStorageType.FILE] = LocalSemanticStorage logger.info("已注册本地文件存储实现", class_name="LocalSemanticStorage") except ImportError: logger.warning("无法导入本地文件存储实现") def _try_import_storage(self, module_name: str, storage_type: SemanticStorageType): """尝试导入存储模块""" try: module_path = f"storage.{module_name}" module = importlib.import_module(module_path) # 查找模块中的SemanticStorageInterface子类 for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, SemanticStorageInterface) and obj != SemanticStorageInterface): self._storage_implementations[storage_type] = obj logger.info(f"已注册存储实现: {storage_type.value}", class_name=name) break except ImportError: logger.debug(f"未找到存储模块: {module_name}") except Exception as e: logger.warning(f"导入存储模块失败: {module_name}", error=str(e)) def create_storage(self, config: Optional[SemanticStorageConfig] = None) -> SemanticStorageInterface: """创建存储实例""" if config is None: config = SemanticStorageConfig() storage_type = config.storage_type if storage_type not in self._storage_implementations: # 如果指定的存储类型不可用,回退到文件存储 logger.warning(f"不支持的存储类型: {storage_type},回退到文件存储") storage_type = SemanticStorageType.FILE if storage_type not in self._storage_implementations: raise ValueError(f"无可用的存储实现") storage_class = self._storage_implementations[storage_type] try: # 根据存储类型创建实例 if storage_type == SemanticStorageType.FILE: return storage_class( base_path=config.base_path, enable_compression=config.enable_compression, enable_cache=config.enable_cache, cache_ttl=config.cache_ttl, cache_size=config.cache_size ) elif storage_type == SemanticStorageType.MONGODB: return storage_class( mongodb_url=config.mongodb_url, database_name=config.mongodb_database, collection_name=config.mongodb_collection, enable_versioning=config.enable_versioning, max_versions=config.max_versions ) elif storage_type == SemanticStorageType.REDIS: return storage_class( redis_url=config.redis_url, redis_prefix=config.redis_prefix, enable_cache=config.enable_cache, cache_ttl=config.cache_ttl ) elif storage_type == SemanticStorageType.ELASTICSEARCH: return storage_class( elasticsearch_url=config.elasticsearch_url, index_name=config.elasticsearch_index, enable_versioning=config.enable_versioning ) elif storage_type == SemanticStorageType.MEMORY: return storage_class( max_size=config.cache_size, ttl=config.cache_ttl ) else: # 对于自定义存储,尝试使用额外配置 return storage_class(**config.extra) except Exception as e: logger.error(f"创建存储实例失败: {storage_type}", error=str(e)) raise def create_default_storage(self) -> SemanticStorageInterface: """创建默认存储实现""" # 从环境变量获取配置 storage_type_str = os.environ.get("QUERYNEST_SEMANTIC_STORAGE", "file") try: storage_type = SemanticStorageType(storage_type_str.lower()) except ValueError: logger.warning(f"未知的语义存储类型: {storage_type_str},使用默认文件存储") storage_type = SemanticStorageType.FILE # 创建存储配置 config = SemanticStorageConfig( storage_type=storage_type, base_path=os.environ.get("QUERYNEST_SEMANTIC_PATH", "data/semantics"), enable_versioning=os.environ.get("QUERYNEST_SEMANTIC_VERSIONING", "true").lower() == "true", enable_conflict_detection=os.environ.get("QUERYNEST_CONFLICT_DETECTION", "true").lower() == "true", mongodb_url=os.environ.get("QUERYNEST_MONGODB_URL", "mongodb://localhost:27017"), mongodb_database=os.environ.get("QUERYNEST_SEMANTIC_DB", "querynest_semantics"), redis_url=os.environ.get("QUERYNEST_REDIS_URL", "redis://localhost:6379/1"), elasticsearch_url=os.environ.get("QUERYNEST_ES_URL", "http://localhost:9200"), enable_cache=os.environ.get("QUERYNEST_SEMANTIC_CACHE", "true").lower() == "true", cache_ttl=int(os.environ.get("QUERYNEST_SEMANTIC_CACHE_TTL", "3600")), cache_size=int(os.environ.get("QUERYNEST_SEMANTIC_CACHE_SIZE", "1000")) ) try: return self.create_storage(config) except Exception as e: logger.error("创建默认语义存储失败,回退到文件存储", error=str(e)) # 最后的回退方案 fallback_config = SemanticStorageConfig( storage_type=SemanticStorageType.FILE, base_path="data/semantics" ) return self.create_storage(fallback_config) def register_storage(self, storage_type: SemanticStorageType, storage_class: Type[SemanticStorageInterface]) -> bool: """手动注册存储实现""" if not issubclass(storage_class, SemanticStorageInterface): logger.error(f"无效的存储类: {storage_class.__name__},必须是SemanticStorageInterface的子类") return False self._storage_implementations[storage_type] = storage_class logger.info(f"已手动注册语义存储: {storage_type.value}", class_name=storage_class.__name__) return True def get_available_storage_types(self) -> Dict[SemanticStorageType, Type[SemanticStorageInterface]]: """获取可用的存储类型""" return self._storage_implementations.copy() # 便捷函数,用于获取存储实例 def get_semantic_storage(config: Optional[SemanticStorageConfig] = None) -> SemanticStorageInterface: """获取语义存储实例""" factory = SemanticStorageFactory.get_instance() if config is None: return factory.create_default_storage() else: return factory.create_storage(config)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/niuzaishu/QueryNest'

If you have feedback or need assistance with the MCP directory API, please join our Discord server