Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
shrimp_data.py37.7 kB
#!/usr/bin/env python3 """ Shrimp Task Manager - Data Management System 로컬 AI의 데이터 관리 시스템: 지능형 조회, 검색, 분석 이 모듈은 작업 데이터의 효율적인 관리를 담당합니다: - list_tasks: 지능형 작업 목록 관리 및 시각화 - query_task: 스마트 작업 검색 및 의미적 매칭 - get_task_detail: 상세 정보 조회 및 컨텍스트 제공 데이터 관리 시스템은 무한 컨텍스트를 활용하여 방대한 작업 데이터를 효율적으로 조직화하고 필요한 정보를 즉시 제공하는 능력을 구현합니다. """ import os import re import json import sqlite3 import hashlib import asyncio import logging from typing import Dict, List, Optional, Any, Tuple, Set, Union from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum from pathlib import Path import difflib # 로컬 모듈 임포트 from .shrimp_base import ( Task, TaskStatus, TaskPriority, TaskCategory, RelatedFile, Evidence, ThoughtProcess, generate_task_id ) # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================================= # 데이터 관리 관련 클래스 (Data Management Classes) # ============================================================================= class SortOrder(Enum): """정렬 순서""" ASC = "asc" DESC = "desc" class SearchScope(Enum): """검색 범위""" ALL = "all" # 모든 필드 NAME = "name" # 이름만 DESCRIPTION = "description" # 설명만 TAGS = "tags" # 태그만 CONTENT = "content" # 내용 (구현가이드, 노트 등) @dataclass class TaskFilter: """작업 필터""" status: Optional[List[TaskStatus]] = None priority: Optional[List[TaskPriority]] = None category: Optional[List[TaskCategory]] = None tags: Optional[List[str]] = None date_range: Optional[Tuple[datetime, datetime]] = None completion_range: Optional[Tuple[float, float]] = None quality_range: Optional[Tuple[float, float]] = None has_dependencies: Optional[bool] = None has_evidence: Optional[bool] = None def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'status': [s.value for s in self.status] if self.status else None, 'priority': [p.value for p in self.priority] if self.priority else None, 'category': [c.value for c in self.category] if self.category else None, 'tags': self.tags, 'date_range': [d.isoformat() for d in self.date_range] if self.date_range else None, 'completion_range': list(self.completion_range) if self.completion_range else None, 'quality_range': list(self.quality_range) if self.quality_range else None, 'has_dependencies': self.has_dependencies, 'has_evidence': self.has_evidence } @dataclass class TaskSummary: """작업 요약""" id: str name: str status: TaskStatus priority: TaskPriority category: TaskCategory completion_percentage: float quality_score: Optional[float] created_at: datetime updated_at: datetime tags: Set[str] dependency_count: int evidence_count: int def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'id': self.id, 'name': self.name, 'status': self.status.value, 'priority': self.priority.value, 'category': self.category.value, 'completion_percentage': self.completion_percentage, 'quality_score': self.quality_score, 'created_at': self.created_at.isoformat(), 'updated_at': self.updated_at.isoformat(), 'tags': list(self.tags), 'dependency_count': self.dependency_count, 'evidence_count': self.evidence_count } @dataclass class SearchResult: """검색 결과""" task: Task relevance_score: float # 관련도 점수 (0.0-1.0) match_details: Dict[str, Any] # 매치 세부사항 highlighted_text: str # 하이라이트된 텍스트 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'task': self.task.to_dict(), 'relevance_score': self.relevance_score, 'match_details': self.match_details, 'highlighted_text': self.highlighted_text } @dataclass class TaskListResult: """작업 목록 결과""" tasks: List[TaskSummary] total_count: int filtered_count: int page: int page_size: int total_pages: int summary_stats: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'tasks': [task.to_dict() for task in self.tasks], 'total_count': self.total_count, 'filtered_count': self.filtered_count, 'page': self.page, 'page_size': self.page_size, 'total_pages': self.total_pages, 'summary_stats': self.summary_stats } # ============================================================================= # Data Management Manager - 핵심 클래스 # ============================================================================= class DataManagementManager: """데이터 관리 관리자""" def __init__(self, working_directory: str = "/home/skyki/qwen2.5"): """초기화""" self.working_directory = working_directory self.db_path = os.path.join(working_directory, "shrimp_tasks.db") # 메모리 캐시 self.task_cache: Dict[str, Task] = {} self.cache_timestamps: Dict[str, datetime] = {} self.cache_ttl = timedelta(minutes=30) # 검색 인덱스 self.search_index: Dict[str, Set[str]] = {} # 키워드 -> 작업 ID 집합 self.fuzzy_index: Dict[str, List[str]] = {} # 유사 매칭용 # 통계 데이터 self.stats_cache: Dict[str, Any] = {} self.stats_last_update: Optional[datetime] = None # 데이터베이스 초기화 self._initialize_database() logger.info("DataManagementManager 초기화 완료") def _initialize_database(self): """데이터베이스 초기화""" logger.info("🗄️ 데이터베이스 초기화 중...") with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 작업 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, status TEXT NOT NULL, priority TEXT NOT NULL, category TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, due_date TEXT, estimated_hours REAL, actual_hours REAL, completion_percentage REAL DEFAULT 0.0, quality_score REAL, implementation_guide TEXT, verification_criteria TEXT, notes TEXT, decision_rationale TEXT, parent_task TEXT, task_data TEXT -- JSON 형태의 전체 작업 데이터 ) ''') # 의존성 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS task_dependencies ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, dependency_task_id TEXT NOT NULL, dependency_type TEXT DEFAULT 'requires', description TEXT, FOREIGN KEY (task_id) REFERENCES tasks (id), FOREIGN KEY (dependency_task_id) REFERENCES tasks (id) ) ''') # 태그 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS task_tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, tag TEXT NOT NULL, FOREIGN KEY (task_id) REFERENCES tasks (id) ) ''') # 관련 파일 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS task_files ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, file_path TEXT NOT NULL, file_type TEXT NOT NULL, description TEXT, line_start INTEGER, line_end INTEGER, FOREIGN KEY (task_id) REFERENCES tasks (id) ) ''') # 증거 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS task_evidence ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, source TEXT NOT NULL, content TEXT NOT NULL, reliability REAL NOT NULL, evidence_type TEXT NOT NULL, timestamp TEXT NOT NULL, metadata TEXT, -- JSON FOREIGN KEY (task_id) REFERENCES tasks (id) ) ''') # 인덱스 생성 cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_updated ON tasks(updated_at)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_tags_tag ON task_tags(tag)') conn.commit() logger.info("✅ 데이터베이스 초기화 완료") # ========================================================================= # list_tasks: 지능형 작업 목록 관리 # ========================================================================= async def list_tasks(self, status: str = "all", filter_criteria: Optional[TaskFilter] = None, sort_by: str = "updated_at", sort_order: SortOrder = SortOrder.DESC, page: int = 1, page_size: int = 20) -> TaskListResult: """ 지능형 작업 목록 관리 및 시각화 Args: status: 상태 필터 ("all", "pending", "in_progress", "completed") filter_criteria: 상세 필터 조건 sort_by: 정렬 기준 sort_order: 정렬 순서 page: 페이지 번호 (1부터 시작) page_size: 페이지 크기 Returns: TaskListResult: 작업 목록 결과 """ logger.info(f"📋 작업 목록 조회: 상태={status}, 정렬={sort_by}, 페이지={page}") try: # 1. 기본 필터 설정 base_filter = self._create_base_filter(status) # 2. 추가 필터 적용 combined_filter = self._combine_filters(base_filter, filter_criteria) # 3. 데이터베이스 쿼리 생성 query, params = self._build_list_query(combined_filter, sort_by, sort_order) # 4. 총 개수 조회 total_count = await self._get_total_task_count() filtered_count = await self._get_filtered_task_count(combined_filter) # 5. 페이징 적용 offset = (page - 1) * page_size paginated_query = f"{query} LIMIT {page_size} OFFSET {offset}" # 6. 데이터 조회 tasks_data = await self._execute_list_query(paginated_query, params) # 7. TaskSummary 객체 생성 task_summaries = [] for task_data in tasks_data: summary = await self._create_task_summary(task_data) task_summaries.append(summary) # 8. 통계 정보 생성 summary_stats = await self._generate_summary_statistics(combined_filter) # 9. 페이징 정보 계산 total_pages = (filtered_count + page_size - 1) // page_size # 10. 결과 생성 result = TaskListResult( tasks=task_summaries, total_count=total_count, filtered_count=filtered_count, page=page, page_size=page_size, total_pages=total_pages, summary_stats=summary_stats ) logger.info(f"✅ 작업 목록 조회 완료: {len(task_summaries)}개 작업") return result except Exception as e: logger.error(f"❌ 작업 목록 조회 실패: {e}") raise def _create_base_filter(self, status: str) -> TaskFilter: """기본 필터 생성""" base_filter = TaskFilter() if status != "all": try: task_status = TaskStatus(status) base_filter.status = [task_status] except ValueError: logger.warning(f"⚠️ 알 수 없는 상태: {status}") return base_filter def _combine_filters(self, base_filter: TaskFilter, additional_filter: Optional[TaskFilter]) -> TaskFilter: """필터 조합""" if not additional_filter: return base_filter # 기본 필터와 추가 필터를 조합 combined = TaskFilter() # 상태 조합 if base_filter.status and additional_filter.status: combined.status = list(set(base_filter.status) & set(additional_filter.status)) else: combined.status = base_filter.status or additional_filter.status # 다른 필터들은 추가 필터 우선 combined.priority = additional_filter.priority combined.category = additional_filter.category combined.tags = additional_filter.tags combined.date_range = additional_filter.date_range combined.completion_range = additional_filter.completion_range combined.quality_range = additional_filter.quality_range combined.has_dependencies = additional_filter.has_dependencies combined.has_evidence = additional_filter.has_evidence return combined def _build_list_query(self, filter_criteria: TaskFilter, sort_by: str, sort_order: SortOrder) -> Tuple[str, List]: """목록 조회 쿼리 생성""" query = "SELECT * FROM tasks WHERE 1=1" params = [] # 상태 필터 if filter_criteria.status: status_placeholders = ','.join(['?' for _ in filter_criteria.status]) query += f" AND status IN ({status_placeholders})" params.extend([s.value for s in filter_criteria.status]) # 우선순위 필터 if filter_criteria.priority: priority_placeholders = ','.join(['?' for _ in filter_criteria.priority]) query += f" AND priority IN ({priority_placeholders})" params.extend([p.value for p in filter_criteria.priority]) # 카테고리 필터 if filter_criteria.category: category_placeholders = ','.join(['?' for _ in filter_criteria.category]) query += f" AND category IN ({category_placeholders})" params.extend([c.value for c in filter_criteria.category]) # 날짜 범위 필터 if filter_criteria.date_range: start_date, end_date = filter_criteria.date_range query += " AND created_at BETWEEN ? AND ?" params.extend([start_date.isoformat(), end_date.isoformat()]) # 완료율 범위 필터 if filter_criteria.completion_range: min_completion, max_completion = filter_criteria.completion_range query += " AND completion_percentage BETWEEN ? AND ?" params.extend([min_completion, max_completion]) # 품질 점수 범위 필터 if filter_criteria.quality_range: min_quality, max_quality = filter_criteria.quality_range query += " AND quality_score BETWEEN ? AND ?" params.extend([min_quality, max_quality]) # 정렬 valid_sort_fields = ['created_at', 'updated_at', 'name', 'priority', 'completion_percentage', 'quality_score'] if sort_by in valid_sort_fields: query += f" ORDER BY {sort_by} {sort_order.value.upper()}" else: query += f" ORDER BY updated_at {sort_order.value.upper()}" return query, params # ========================================================================= # query_task: 스마트 작업 검색 # ========================================================================= async def query_task(self, query: str, is_id: bool = False, scope: SearchScope = SearchScope.ALL, page: int = 1, page_size: int = 5, min_relevance: float = 0.1) -> List[SearchResult]: """ 스마트 작업 검색 및 의미적 매칭 Args: query: 검색 쿼리 is_id: ID 검색 모드 여부 scope: 검색 범위 page: 페이지 번호 page_size: 페이지 크기 min_relevance: 최소 관련도 점수 Returns: List[SearchResult]: 검색 결과 목록 """ logger.info(f"🔍 작업 검색: '{query}' (ID모드: {is_id}, 범위: {scope.value})") try: # 1. ID 검색 모드 if is_id: return await self._search_by_id(query) # 2. 키워드 전처리 processed_query = self._preprocess_search_query(query) # 3. 다중 검색 전략 실행 search_results = [] # 정확 매칭 exact_matches = await self._exact_search(processed_query, scope) search_results.extend(exact_matches) # 퍼지 매칭 fuzzy_matches = await self._fuzzy_search(processed_query, scope) search_results.extend(fuzzy_matches) # 의미적 매칭 semantic_matches = await self._semantic_search(processed_query, scope) search_results.extend(semantic_matches) # 4. 중복 제거 및 관련도 순 정렬 unique_results = self._deduplicate_and_sort_results(search_results) # 5. 관련도 필터링 filtered_results = [r for r in unique_results if r.relevance_score >= min_relevance] # 6. 페이징 적용 start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_results = filtered_results[start_idx:end_idx] # 7. 하이라이팅 적용 for result in paginated_results: result.highlighted_text = self._apply_highlighting(result, processed_query) logger.info(f"✅ 검색 완료: {len(paginated_results)}개 결과 (전체 {len(filtered_results)}개)") return paginated_results except Exception as e: logger.error(f"❌ 작업 검색 실패: {e}") raise async def _search_by_id(self, task_id: str) -> List[SearchResult]: """ID로 검색""" logger.info(f"🆔 ID 검색: {task_id}") # 정확한 ID 매칭 task = await self._load_task_from_db(task_id) if task: return [SearchResult( task=task, relevance_score=1.0, match_details={'type': 'exact_id', 'field': 'id'}, highlighted_text=task.name )] # 부분 ID 매칭 similar_ids = await self._find_similar_ids(task_id) results = [] for similar_id, similarity in similar_ids: task = await self._load_task_from_db(similar_id) if task: results.append(SearchResult( task=task, relevance_score=similarity, match_details={'type': 'partial_id', 'field': 'id', 'similarity': similarity}, highlighted_text=task.name )) return results def _preprocess_search_query(self, query: str) -> Dict[str, Any]: """검색 쿼리 전처리""" processed = { 'original': query, 'keywords': [], 'phrases': [], 'filters': {}, 'operators': [] } # 키워드 추출 keywords = re.findall(r'\b\w+\b', query.lower()) processed['keywords'] = [kw for kw in keywords if len(kw) > 2] # 구문 추출 (따옴표로 둘러싸인 부분) phrases = re.findall(r'"([^"]*)"', query) processed['phrases'] = phrases # 특수 필터 추출 (예: status:pending, priority:high) filters = re.findall(r'(\w+):(\w+)', query) for key, value in filters: if key in ['status', 'priority', 'category']: processed['filters'][key] = value # 불린 연산자 추출 if ' AND ' in query.upper(): processed['operators'].append('AND') if ' OR ' in query.upper(): processed['operators'].append('OR') if ' NOT ' in query.upper(): processed['operators'].append('NOT') return processed async def _exact_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]: """정확 매칭 검색""" logger.info("🎯 정확 매칭 검색 중...") results = [] keywords = processed_query['keywords'] phrases = processed_query['phrases'] # 데이터베이스 쿼리 생성 conditions = [] params = [] # 키워드 검색 for keyword in keywords: if scope == SearchScope.ALL: condition = "(name LIKE ? OR description LIKE ? OR implementation_guide LIKE ? OR notes LIKE ?)" params.extend([f'%{keyword}%'] * 4) elif scope == SearchScope.NAME: condition = "name LIKE ?" params.append(f'%{keyword}%') elif scope == SearchScope.DESCRIPTION: condition = "description LIKE ?" params.append(f'%{keyword}%') conditions.append(condition) # 구문 검색 for phrase in phrases: if scope == SearchScope.ALL: condition = "(name LIKE ? OR description LIKE ? OR implementation_guide LIKE ? OR notes LIKE ?)" params.extend([f'%{phrase}%'] * 4) elif scope == SearchScope.NAME: condition = "name LIKE ?" params.append(f'%{phrase}%') elif scope == SearchScope.DESCRIPTION: condition = "description LIKE ?" params.append(f'%{phrase}%') conditions.append(condition) if conditions: query = f"SELECT * FROM tasks WHERE {' AND '.join(conditions)}" tasks_data = await self._execute_search_query(query, params) for task_data in tasks_data: task = await self._create_task_from_db_data(task_data) relevance = self._calculate_exact_relevance(task, processed_query) results.append(SearchResult( task=task, relevance_score=relevance, match_details={'type': 'exact', 'matches': len(keywords) + len(phrases)}, highlighted_text="" )) return results async def _fuzzy_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]: """퍼지 매칭 검색""" logger.info("🔍 퍼지 매칭 검색 중...") results = [] keywords = processed_query['keywords'] # 모든 작업 로드 (캐시된 작업들) all_tasks = await self._load_all_tasks() for task in all_tasks: # 퍼지 매칭 점수 계산 fuzzy_score = self._calculate_fuzzy_score(task, keywords, scope) if fuzzy_score > 0.3: # 임계값 results.append(SearchResult( task=task, relevance_score=fuzzy_score, match_details={'type': 'fuzzy', 'score': fuzzy_score}, highlighted_text="" )) return results async def _semantic_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]: """의미적 매칭 검색""" logger.info("🧠 의미적 매칭 검색 중...") results = [] keywords = processed_query['keywords'] # 의미적 관련 키워드 확장 expanded_keywords = self._expand_keywords_semantically(keywords) # 확장된 키워드로 검색 all_tasks = await self._load_all_tasks() for task in all_tasks: semantic_score = self._calculate_semantic_score(task, expanded_keywords, scope) if semantic_score > 0.2: # 임계값 results.append(SearchResult( task=task, relevance_score=semantic_score, match_details={'type': 'semantic', 'expanded_keywords': expanded_keywords}, highlighted_text="" )) return results # ========================================================================= # get_task_detail: 상세 정보 조회 # ========================================================================= async def get_task_detail(self, task_id: str) -> Optional[Dict[str, Any]]: """ 상세 정보 조회 및 컨텍스트 제공 Args: task_id: 조회할 작업 ID Returns: Optional[Dict[str, Any]]: 상세 작업 정보 """ logger.info(f"📄 작업 상세 정보 조회: {task_id}") try: # 1. 기본 작업 정보 로드 task = await self._load_task_from_db(task_id) if not task: logger.warning(f"⚠️ 작업을 찾을 수 없음: {task_id}") return None # 2. 관련 작업 정보 수집 related_info = await self._gather_related_task_info(task) # 3. 의존성 정보 수집 dependency_info = await self._gather_dependency_info(task) # 4. 실행 히스토리 수집 execution_history = await self._gather_execution_history(task_id) # 5. 품질 메트릭 계산 quality_metrics = await self._calculate_quality_metrics(task) # 6. 컨텍스트 정보 수집 context_info = await self._gather_context_information(task) # 7. 상세 정보 조합 detail_info = { 'task': task.to_dict(), 'related_tasks': related_info, 'dependencies': dependency_info, 'execution_history': execution_history, 'quality_metrics': quality_metrics, 'context_info': context_info, 'metadata': { 'last_accessed': datetime.now().isoformat(), 'access_count': await self._increment_access_count(task_id), 'estimated_read_time': self._estimate_read_time(task), 'complexity_indicators': self._analyze_complexity_indicators(task) } } logger.info(f"✅ 상세 정보 조회 완료: {task.name}") return detail_info except Exception as e: logger.error(f"❌ 상세 정보 조회 실패: {e}") raise async def _gather_related_task_info(self, task: Task) -> Dict[str, Any]: """관련 작업 정보 수집""" logger.info("🔗 관련 작업 정보 수집 중...") related_info = { 'parent_tasks': [], 'child_tasks': [], 'sibling_tasks': [], 'similar_tasks': [] } # 부모 작업 if task.parent_task: parent = await self._load_task_from_db(task.parent_task) if parent: related_info['parent_tasks'].append({ 'id': parent.id, 'name': parent.name, 'status': parent.status.value }) # 자식 작업들 for subtask_id in task.subtasks: subtask = await self._load_task_from_db(subtask_id) if subtask: related_info['child_tasks'].append({ 'id': subtask.id, 'name': subtask.name, 'status': subtask.status.value }) # 형제 작업들 (같은 부모를 가진 작업들) if task.parent_task: siblings = await self._find_sibling_tasks(task.parent_task, task.id) related_info['sibling_tasks'] = siblings # 유사한 작업들 (카테고리, 태그 기반) similar_tasks = await self._find_similar_tasks(task) related_info['similar_tasks'] = similar_tasks[:5] # 상위 5개 return related_info async def _gather_dependency_info(self, task: Task) -> Dict[str, Any]: """의존성 정보 수집""" logger.info("📊 의존성 정보 수집 중...") dependency_info = { 'depends_on': [], 'blocks': [], 'dependency_graph': {}, 'critical_path': [] } # 직접 의존성 for dep in task.dependencies: dep_task = await self._load_task_from_db(dep.task_id) if dep_task: dependency_info['depends_on'].append({ 'id': dep_task.id, 'name': dep_task.name, 'status': dep_task.status.value, 'type': dep.dependency_type, 'description': dep.description }) # 이 작업에 의존하는 작업들 blocking_tasks = await self._find_tasks_depending_on(task.id) dependency_info['blocks'] = blocking_tasks # 의존성 그래프 생성 dependency_info['dependency_graph'] = await self._build_dependency_graph(task.id) # 크리티컬 패스 계산 dependency_info['critical_path'] = await self._calculate_critical_path(task.id) return dependency_info # ========================================================================= # 유틸리티 및 헬퍼 메서드들 # ========================================================================= async def _load_task_from_db(self, task_id: str) -> Optional[Task]: """데이터베이스에서 작업 로드""" # 캐시 확인 if task_id in self.task_cache: cache_time = self.cache_timestamps.get(task_id) if cache_time and datetime.now() - cache_time < self.cache_ttl: return self.task_cache[task_id] # 데이터베이스에서 로드 with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT task_data FROM tasks WHERE id = ?", (task_id,)) row = cursor.fetchone() if row: task_data = json.loads(row[0]) task = Task.from_dict(task_data) # 캐시에 저장 self.task_cache[task_id] = task self.cache_timestamps[task_id] = datetime.now() return task return None async def _load_all_tasks(self) -> List[Task]: """모든 작업 로드""" tasks = [] with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT task_data FROM tasks") for row in cursor.fetchall(): task_data = json.loads(row[0]) task = Task.from_dict(task_data) tasks.append(task) return tasks def _calculate_fuzzy_score(self, task: Task, keywords: List[str], scope: SearchScope) -> float: """퍼지 매칭 점수 계산""" text_to_search = "" if scope == SearchScope.ALL: text_to_search = f"{task.name} {task.description} {task.implementation_guide} {task.notes}" elif scope == SearchScope.NAME: text_to_search = task.name elif scope == SearchScope.DESCRIPTION: text_to_search = task.description elif scope == SearchScope.CONTENT: text_to_search = f"{task.implementation_guide} {task.notes}" text_to_search = text_to_search.lower() total_score = 0.0 for keyword in keywords: # 정확 매칭 if keyword in text_to_search: total_score += 1.0 continue # 유사도 매칭 words = text_to_search.split() best_match = 0.0 for word in words: similarity = difflib.SequenceMatcher(None, keyword, word).ratio() best_match = max(best_match, similarity) if best_match > 0.6: # 임계값 total_score += best_match return min(total_score / len(keywords), 1.0) if keywords else 0.0 def _expand_keywords_semantically(self, keywords: List[str]) -> Dict[str, List[str]]: """키워드 의미적 확장""" expansion_map = { 'implement': ['create', 'build', 'develop', 'code', 'construct'], 'fix': ['repair', 'correct', 'solve', 'debug', 'resolve'], 'test': ['verify', 'validate', 'check', 'examine', 'assess'], 'optimize': ['improve', 'enhance', 'refine', 'speed up', 'efficient'], 'design': ['plan', 'architect', 'structure', 'layout', 'blueprint'], 'api': ['interface', 'endpoint', 'service', 'rest', 'graphql'], 'database': ['db', 'storage', 'data', 'sql', 'nosql'], 'frontend': ['ui', 'interface', 'client', 'web', 'react', 'vue'], 'backend': ['server', 'api', 'service', 'database', 'logic'], 'security': ['auth', 'authentication', 'authorization', 'encryption', 'ssl'] } expanded = {} for keyword in keywords: expanded[keyword] = expansion_map.get(keyword.lower(), [keyword]) return expanded if __name__ == "__main__": # 기본 테스트 print("🚀 Data Management System 테스트") async def test_data_management(): manager = DataManagementManager() # list_tasks 테스트 task_list = await manager.list_tasks(status="all", page=1, page_size=10) print(f"✅ list_tasks 성공: {len(task_list.tasks)}개 작업, " f"총 {task_list.total_count}개 중 {task_list.filtered_count}개 필터링") # query_task 테스트 search_results = await manager.query_task("Python 개발", page=1, page_size=5) print(f"✅ query_task 성공: {len(search_results)}개 검색 결과") # get_task_detail 테스트 if task_list.tasks: detail = await manager.get_task_detail(task_list.tasks[0].id) if detail: print(f"✅ get_task_detail 성공: {detail['task']['name']}") else: print("ℹ️ get_task_detail: 작업을 찾을 수 없음 (정상 - 테스트 환경)") print("🎯 모든 Data Management 테스트 완료!") # 비동기 테스트 실행 import asyncio asyncio.run(test_data_management())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server