Skip to main content
Glama

Mnemosyne MCP

by MumuTW
atlassian_loader.py10.7 kB
""" Atlassian 資料載入器 (Atlassian Data Loader) 擴展 ECL 管線以支援 Atlassian 知識圖譜資料載入。 """ import time from dataclasses import dataclass from typing import List, Optional from ..core.logging import get_logger from ..drivers.falkordb_driver import FalkorDBDriver from ..schemas.atlassian import AtlassianEntity, AtlassianRelationship logger = get_logger(__name__) @dataclass class AtlassianLoadResult: """Atlassian 資料載入結果""" jira_issues_loaded: int confluence_pages_loaded: int relationships_loaded: int errors: List[str] processing_time_ms: int class AtlassianGraphLoader: """ Atlassian 圖資料庫載入器 負責將 Atlassian 知識圖譜資料載入到 FalkorDB 中。 支援 Jira Issues, Confluence Pages 和它們之間的關係。 """ def __init__(self, driver: FalkorDBDriver): """ 初始化 Atlassian 載入器 Args: driver: FalkorDB 驅動器 """ self.driver = driver self.logger = get_logger(f"{__name__}.{self.__class__.__name__}") async def load_atlassian_data( self, entities: List[AtlassianEntity], relationships: List[AtlassianRelationship], ) -> AtlassianLoadResult: """ 載入 Atlassian 知識圖譜資料 Args: entities: Atlassian 實體列表 (Jira Issues + Confluence Pages) relationships: Atlassian 關係列表 Returns: AtlassianLoadResult: 載入結果 """ start_time = time.time() errors = [] jira_issues_loaded = 0 confluence_pages_loaded = 0 relationships_loaded = 0 try: # 確保連接 if not self.driver.is_connected: await self.driver.connect() # 載入實體節點 for entity in entities: try: if entity.entity_type == "jira_issue": await self._load_jira_issue_node(entity) jira_issues_loaded += 1 self.logger.debug(f"載入 Jira Issue: {entity.id}") elif entity.entity_type == "confluence_page": await self._load_confluence_page_node(entity) confluence_pages_loaded += 1 self.logger.debug(f"載入 Confluence Page: {entity.id}") else: self.logger.warning(f"未知實體類型: {entity.entity_type}") except Exception as e: error_msg = f"載入實體節點失敗 {entity.id}: {str(e)}" errors.append(error_msg) self.logger.error(error_msg) # 載入關係 for relationship in relationships: try: await self._load_relationship(relationship) relationships_loaded += 1 self.logger.debug( f"載入關係: {relationship.source_id} -> {relationship.target_id}" ) except Exception as e: error_msg = ( f"載入關係失敗 {relationship.source_id} -> " f"{relationship.target_id}: {str(e)}" ) errors.append(error_msg) self.logger.error(error_msg) processing_time_ms = int((time.time() - start_time) * 1000) self.logger.info( f"Atlassian 資料載入完成: {jira_issues_loaded} Jira Issues, " f"{confluence_pages_loaded} Confluence Pages, {relationships_loaded} 關係" ) except Exception as e: error_msg = f"Atlassian 資料載入過程發生錯誤: {str(e)}" errors.append(error_msg) self.logger.error(error_msg) processing_time_ms = int((time.time() - start_time) * 1000) return AtlassianLoadResult( jira_issues_loaded=jira_issues_loaded, confluence_pages_loaded=confluence_pages_loaded, relationships_loaded=relationships_loaded, errors=errors, processing_time_ms=processing_time_ms, ) async def _load_jira_issue_node(self, entity: AtlassianEntity) -> None: """載入 Jira Issue 節點""" properties = entity.to_graph_properties() # 使用 MERGE 確保節點唯一性 query = """ MERGE (issue:JiraIssue {id: $id}) SET issue += $properties RETURN issue """ await self.driver.execute_query( query, parameters={"id": entity.id, "properties": properties} ) # 如果有 project_key,建立與 Project 的關係 if hasattr(entity, "project_key") and entity.project_key: await self._ensure_project_node(entity.project_key) await self._create_project_relationship(entity.id, entity.project_key) async def _load_confluence_page_node(self, entity: AtlassianEntity) -> None: """載入 Confluence Page 節點""" properties = entity.to_graph_properties() # 使用 MERGE 確保節點唯一性 query = """ MERGE (page:ConfluencePage {id: $id}) SET page += $properties RETURN page """ await self.driver.execute_query( query, parameters={"id": entity.id, "properties": properties} ) # 如果有 space_key,建立與 Space 的關係 if hasattr(entity, "space_key") and entity.space_key: await self._ensure_space_node( entity.space_key, getattr(entity, "space_name", "") ) await self._create_space_relationship(entity.id, entity.space_key) async def _load_relationship(self, relationship: AtlassianRelationship) -> None: """載入關係""" properties = relationship.to_graph_properties() # 建立關係 query = """ MATCH (source) WHERE source.id = $source_id MATCH (target) WHERE target.id = $target_id MERGE (source)-[r:ATLASSIAN_RELATION {type: $relationship_type}]->(target) SET r += $properties RETURN r """ await self.driver.execute_query( query, parameters={ "source_id": relationship.source_id, "target_id": relationship.target_id, "relationship_type": str(relationship.relationship_type), "properties": properties, }, ) async def _ensure_project_node(self, project_key: str) -> None: """確保 Project 節點存在""" query = """ MERGE (project:JiraProject {key: $project_key}) ON CREATE SET project.name = $project_key, project.entity_type = 'jira_project', project.created_at = datetime() RETURN project """ await self.driver.execute_query(query, parameters={"project_key": project_key}) async def _ensure_space_node(self, space_key: str, space_name: str) -> None: """確保 Space 節點存在""" query = """ MERGE (space:ConfluenceSpace {key: $space_key}) ON CREATE SET space.name = $space_name, space.entity_type = 'confluence_space', space.created_at = datetime() RETURN space """ await self.driver.execute_query( query, parameters={"space_key": space_key, "space_name": space_name} ) async def _create_project_relationship( self, issue_id: str, project_key: str ) -> None: """建立 Issue 與 Project 的關係""" query = """ MATCH (issue:JiraIssue {id: $issue_id}) MATCH (project:JiraProject {key: $project_key}) MERGE (issue)-[:BELONGS_TO]->(project) """ await self.driver.execute_query( query, parameters={"issue_id": issue_id, "project_key": project_key} ) async def _create_space_relationship(self, page_id: str, space_key: str) -> None: """建立 Page 與 Space 的關係""" query = """ MATCH (page:ConfluencePage {id: $page_id}) MATCH (space:ConfluenceSpace {key: $space_key}) MERGE (page)-[:BELONGS_TO]->(space) """ await self.driver.execute_query( query, parameters={"page_id": page_id, "space_key": space_key} ) async def clear_atlassian_data(self, source_filter: Optional[str] = None) -> None: """ 清除 Atlassian 資料 Args: source_filter: 資料源過濾器,如果提供則只清除該來源的資料 """ if source_filter: # 清除特定來源的資料 query = """ MATCH (n) WHERE n.entity_type IN [ 'jira_issue', 'confluence_page', 'jira_project', 'confluence_space' ] AND (n.source_info IS NULL OR n.source_info CONTAINS $source_filter) OPTIONAL MATCH (n)-[r]-() DELETE r, n """ await self.driver.execute_query( query, parameters={"source_filter": source_filter} ) else: # 清除所有 Atlassian 資料 query = """ MATCH (n) WHERE n.entity_type IN [ 'jira_issue', 'confluence_page', 'jira_project', 'confluence_space' ] OPTIONAL MATCH (n)-[r]-() DELETE r, n """ await self.driver.execute_query(query) self.logger.info(f"清除 Atlassian 資料完成: {source_filter or '全部'}") async def get_atlassian_stats(self) -> dict: """ 獲取 Atlassian 資料統計 Returns: dict: 統計資訊 """ stats = {} # 統計各類型節點數量 count_queries = { "jira_issues": "MATCH (n:JiraIssue) RETURN count(n) as count", "confluence_pages": "MATCH (n:ConfluencePage) RETURN count(n) as count", "jira_projects": "MATCH (n:JiraProject) RETURN count(n) as count", "confluence_spaces": "MATCH (n:ConfluenceSpace) RETURN count(n) as count", "relationships": ( "MATCH ()-[r:ATLASSIAN_RELATION]-() RETURN count(r) as count" ), } for stat_name, query in count_queries.items(): result = await self.driver.execute_query(query) stats[stat_name] = result.data[0]["count"] if result.data else 0 return stats

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MumuTW/Mnemosyne-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server