#!/usr/bin/env python3
"""
Shrimp Task Manager - Data Management System
로컬 AI의 데이터 관리 시스템: 지능형 조회, 검색, 분석
이 모듈은 작업 데이터의 효율적인 관리를 담당합니다:
- list_tasks: 지능형 작업 목록 관리 및 시각화
- query_task: 스마트 작업 검색 및 의미적 매칭
- get_task_detail: 상세 정보 조회 및 컨텍스트 제공
데이터 관리 시스템은 무한 컨텍스트를 활용하여 방대한 작업 데이터를
효율적으로 조직화하고 필요한 정보를 즉시 제공하는 능력을 구현합니다.
"""
import os
import re
import json
import sqlite3
import hashlib
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple, Set, Union
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import difflib
# 로컬 모듈 임포트
from .shrimp_base import (
Task, TaskStatus, TaskPriority, TaskCategory, RelatedFile, Evidence,
ThoughtProcess, generate_task_id
)
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# =============================================================================
# 데이터 관리 관련 클래스 (Data Management Classes)
# =============================================================================
class SortOrder(Enum):
"""정렬 순서"""
ASC = "asc"
DESC = "desc"
class SearchScope(Enum):
"""검색 범위"""
ALL = "all" # 모든 필드
NAME = "name" # 이름만
DESCRIPTION = "description" # 설명만
TAGS = "tags" # 태그만
CONTENT = "content" # 내용 (구현가이드, 노트 등)
@dataclass
class TaskFilter:
"""작업 필터"""
status: Optional[List[TaskStatus]] = None
priority: Optional[List[TaskPriority]] = None
category: Optional[List[TaskCategory]] = None
tags: Optional[List[str]] = None
date_range: Optional[Tuple[datetime, datetime]] = None
completion_range: Optional[Tuple[float, float]] = None
quality_range: Optional[Tuple[float, float]] = None
has_dependencies: Optional[bool] = None
has_evidence: Optional[bool] = None
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'status': [s.value for s in self.status] if self.status else None,
'priority': [p.value for p in self.priority] if self.priority else None,
'category': [c.value for c in self.category] if self.category else None,
'tags': self.tags,
'date_range': [d.isoformat() for d in self.date_range] if self.date_range else None,
'completion_range': list(self.completion_range) if self.completion_range else None,
'quality_range': list(self.quality_range) if self.quality_range else None,
'has_dependencies': self.has_dependencies,
'has_evidence': self.has_evidence
}
@dataclass
class TaskSummary:
"""작업 요약"""
id: str
name: str
status: TaskStatus
priority: TaskPriority
category: TaskCategory
completion_percentage: float
quality_score: Optional[float]
created_at: datetime
updated_at: datetime
tags: Set[str]
dependency_count: int
evidence_count: int
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'id': self.id,
'name': self.name,
'status': self.status.value,
'priority': self.priority.value,
'category': self.category.value,
'completion_percentage': self.completion_percentage,
'quality_score': self.quality_score,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'tags': list(self.tags),
'dependency_count': self.dependency_count,
'evidence_count': self.evidence_count
}
@dataclass
class SearchResult:
"""검색 결과"""
task: Task
relevance_score: float # 관련도 점수 (0.0-1.0)
match_details: Dict[str, Any] # 매치 세부사항
highlighted_text: str # 하이라이트된 텍스트
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'task': self.task.to_dict(),
'relevance_score': self.relevance_score,
'match_details': self.match_details,
'highlighted_text': self.highlighted_text
}
@dataclass
class TaskListResult:
"""작업 목록 결과"""
tasks: List[TaskSummary]
total_count: int
filtered_count: int
page: int
page_size: int
total_pages: int
summary_stats: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'tasks': [task.to_dict() for task in self.tasks],
'total_count': self.total_count,
'filtered_count': self.filtered_count,
'page': self.page,
'page_size': self.page_size,
'total_pages': self.total_pages,
'summary_stats': self.summary_stats
}
# =============================================================================
# Data Management Manager - 핵심 클래스
# =============================================================================
class DataManagementManager:
"""데이터 관리 관리자"""
def __init__(self, working_directory: str = "/home/skyki/qwen2.5"):
"""초기화"""
self.working_directory = working_directory
self.db_path = os.path.join(working_directory, "shrimp_tasks.db")
# 메모리 캐시
self.task_cache: Dict[str, Task] = {}
self.cache_timestamps: Dict[str, datetime] = {}
self.cache_ttl = timedelta(minutes=30)
# 검색 인덱스
self.search_index: Dict[str, Set[str]] = {} # 키워드 -> 작업 ID 집합
self.fuzzy_index: Dict[str, List[str]] = {} # 유사 매칭용
# 통계 데이터
self.stats_cache: Dict[str, Any] = {}
self.stats_last_update: Optional[datetime] = None
# 데이터베이스 초기화
self._initialize_database()
logger.info("DataManagementManager 초기화 완료")
def _initialize_database(self):
"""데이터베이스 초기화"""
logger.info("🗄️ 데이터베이스 초기화 중...")
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 작업 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL,
priority TEXT NOT NULL,
category TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
due_date TEXT,
estimated_hours REAL,
actual_hours REAL,
completion_percentage REAL DEFAULT 0.0,
quality_score REAL,
implementation_guide TEXT,
verification_criteria TEXT,
notes TEXT,
decision_rationale TEXT,
parent_task TEXT,
task_data TEXT -- JSON 형태의 전체 작업 데이터
)
''')
# 의존성 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_dependencies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
dependency_task_id TEXT NOT NULL,
dependency_type TEXT DEFAULT 'requires',
description TEXT,
FOREIGN KEY (task_id) REFERENCES tasks (id),
FOREIGN KEY (dependency_task_id) REFERENCES tasks (id)
)
''')
# 태그 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
tag TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES tasks (id)
)
''')
# 관련 파일 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
file_path TEXT NOT NULL,
file_type TEXT NOT NULL,
description TEXT,
line_start INTEGER,
line_end INTEGER,
FOREIGN KEY (task_id) REFERENCES tasks (id)
)
''')
# 증거 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_evidence (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
source TEXT NOT NULL,
content TEXT NOT NULL,
reliability REAL NOT NULL,
evidence_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
metadata TEXT, -- JSON
FOREIGN KEY (task_id) REFERENCES tasks (id)
)
''')
# 인덱스 생성
cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_tasks_updated ON tasks(updated_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_tags_tag ON task_tags(tag)')
conn.commit()
logger.info("✅ 데이터베이스 초기화 완료")
# =========================================================================
# list_tasks: 지능형 작업 목록 관리
# =========================================================================
async def list_tasks(self,
status: str = "all",
filter_criteria: Optional[TaskFilter] = None,
sort_by: str = "updated_at",
sort_order: SortOrder = SortOrder.DESC,
page: int = 1,
page_size: int = 20) -> TaskListResult:
"""
지능형 작업 목록 관리 및 시각화
Args:
status: 상태 필터 ("all", "pending", "in_progress", "completed")
filter_criteria: 상세 필터 조건
sort_by: 정렬 기준
sort_order: 정렬 순서
page: 페이지 번호 (1부터 시작)
page_size: 페이지 크기
Returns:
TaskListResult: 작업 목록 결과
"""
logger.info(f"📋 작업 목록 조회: 상태={status}, 정렬={sort_by}, 페이지={page}")
try:
# 1. 기본 필터 설정
base_filter = self._create_base_filter(status)
# 2. 추가 필터 적용
combined_filter = self._combine_filters(base_filter, filter_criteria)
# 3. 데이터베이스 쿼리 생성
query, params = self._build_list_query(combined_filter, sort_by, sort_order)
# 4. 총 개수 조회
total_count = await self._get_total_task_count()
filtered_count = await self._get_filtered_task_count(combined_filter)
# 5. 페이징 적용
offset = (page - 1) * page_size
paginated_query = f"{query} LIMIT {page_size} OFFSET {offset}"
# 6. 데이터 조회
tasks_data = await self._execute_list_query(paginated_query, params)
# 7. TaskSummary 객체 생성
task_summaries = []
for task_data in tasks_data:
summary = await self._create_task_summary(task_data)
task_summaries.append(summary)
# 8. 통계 정보 생성
summary_stats = await self._generate_summary_statistics(combined_filter)
# 9. 페이징 정보 계산
total_pages = (filtered_count + page_size - 1) // page_size
# 10. 결과 생성
result = TaskListResult(
tasks=task_summaries,
total_count=total_count,
filtered_count=filtered_count,
page=page,
page_size=page_size,
total_pages=total_pages,
summary_stats=summary_stats
)
logger.info(f"✅ 작업 목록 조회 완료: {len(task_summaries)}개 작업")
return result
except Exception as e:
logger.error(f"❌ 작업 목록 조회 실패: {e}")
raise
def _create_base_filter(self, status: str) -> TaskFilter:
"""기본 필터 생성"""
base_filter = TaskFilter()
if status != "all":
try:
task_status = TaskStatus(status)
base_filter.status = [task_status]
except ValueError:
logger.warning(f"⚠️ 알 수 없는 상태: {status}")
return base_filter
def _combine_filters(self, base_filter: TaskFilter, additional_filter: Optional[TaskFilter]) -> TaskFilter:
"""필터 조합"""
if not additional_filter:
return base_filter
# 기본 필터와 추가 필터를 조합
combined = TaskFilter()
# 상태 조합
if base_filter.status and additional_filter.status:
combined.status = list(set(base_filter.status) & set(additional_filter.status))
else:
combined.status = base_filter.status or additional_filter.status
# 다른 필터들은 추가 필터 우선
combined.priority = additional_filter.priority
combined.category = additional_filter.category
combined.tags = additional_filter.tags
combined.date_range = additional_filter.date_range
combined.completion_range = additional_filter.completion_range
combined.quality_range = additional_filter.quality_range
combined.has_dependencies = additional_filter.has_dependencies
combined.has_evidence = additional_filter.has_evidence
return combined
def _build_list_query(self, filter_criteria: TaskFilter, sort_by: str, sort_order: SortOrder) -> Tuple[str, List]:
"""목록 조회 쿼리 생성"""
query = "SELECT * FROM tasks WHERE 1=1"
params = []
# 상태 필터
if filter_criteria.status:
status_placeholders = ','.join(['?' for _ in filter_criteria.status])
query += f" AND status IN ({status_placeholders})"
params.extend([s.value for s in filter_criteria.status])
# 우선순위 필터
if filter_criteria.priority:
priority_placeholders = ','.join(['?' for _ in filter_criteria.priority])
query += f" AND priority IN ({priority_placeholders})"
params.extend([p.value for p in filter_criteria.priority])
# 카테고리 필터
if filter_criteria.category:
category_placeholders = ','.join(['?' for _ in filter_criteria.category])
query += f" AND category IN ({category_placeholders})"
params.extend([c.value for c in filter_criteria.category])
# 날짜 범위 필터
if filter_criteria.date_range:
start_date, end_date = filter_criteria.date_range
query += " AND created_at BETWEEN ? AND ?"
params.extend([start_date.isoformat(), end_date.isoformat()])
# 완료율 범위 필터
if filter_criteria.completion_range:
min_completion, max_completion = filter_criteria.completion_range
query += " AND completion_percentage BETWEEN ? AND ?"
params.extend([min_completion, max_completion])
# 품질 점수 범위 필터
if filter_criteria.quality_range:
min_quality, max_quality = filter_criteria.quality_range
query += " AND quality_score BETWEEN ? AND ?"
params.extend([min_quality, max_quality])
# 정렬
valid_sort_fields = ['created_at', 'updated_at', 'name', 'priority', 'completion_percentage', 'quality_score']
if sort_by in valid_sort_fields:
query += f" ORDER BY {sort_by} {sort_order.value.upper()}"
else:
query += f" ORDER BY updated_at {sort_order.value.upper()}"
return query, params
# =========================================================================
# query_task: 스마트 작업 검색
# =========================================================================
async def query_task(self,
query: str,
is_id: bool = False,
scope: SearchScope = SearchScope.ALL,
page: int = 1,
page_size: int = 5,
min_relevance: float = 0.1) -> List[SearchResult]:
"""
스마트 작업 검색 및 의미적 매칭
Args:
query: 검색 쿼리
is_id: ID 검색 모드 여부
scope: 검색 범위
page: 페이지 번호
page_size: 페이지 크기
min_relevance: 최소 관련도 점수
Returns:
List[SearchResult]: 검색 결과 목록
"""
logger.info(f"🔍 작업 검색: '{query}' (ID모드: {is_id}, 범위: {scope.value})")
try:
# 1. ID 검색 모드
if is_id:
return await self._search_by_id(query)
# 2. 키워드 전처리
processed_query = self._preprocess_search_query(query)
# 3. 다중 검색 전략 실행
search_results = []
# 정확 매칭
exact_matches = await self._exact_search(processed_query, scope)
search_results.extend(exact_matches)
# 퍼지 매칭
fuzzy_matches = await self._fuzzy_search(processed_query, scope)
search_results.extend(fuzzy_matches)
# 의미적 매칭
semantic_matches = await self._semantic_search(processed_query, scope)
search_results.extend(semantic_matches)
# 4. 중복 제거 및 관련도 순 정렬
unique_results = self._deduplicate_and_sort_results(search_results)
# 5. 관련도 필터링
filtered_results = [r for r in unique_results if r.relevance_score >= min_relevance]
# 6. 페이징 적용
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_results = filtered_results[start_idx:end_idx]
# 7. 하이라이팅 적용
for result in paginated_results:
result.highlighted_text = self._apply_highlighting(result, processed_query)
logger.info(f"✅ 검색 완료: {len(paginated_results)}개 결과 (전체 {len(filtered_results)}개)")
return paginated_results
except Exception as e:
logger.error(f"❌ 작업 검색 실패: {e}")
raise
async def _search_by_id(self, task_id: str) -> List[SearchResult]:
"""ID로 검색"""
logger.info(f"🆔 ID 검색: {task_id}")
# 정확한 ID 매칭
task = await self._load_task_from_db(task_id)
if task:
return [SearchResult(
task=task,
relevance_score=1.0,
match_details={'type': 'exact_id', 'field': 'id'},
highlighted_text=task.name
)]
# 부분 ID 매칭
similar_ids = await self._find_similar_ids(task_id)
results = []
for similar_id, similarity in similar_ids:
task = await self._load_task_from_db(similar_id)
if task:
results.append(SearchResult(
task=task,
relevance_score=similarity,
match_details={'type': 'partial_id', 'field': 'id', 'similarity': similarity},
highlighted_text=task.name
))
return results
def _preprocess_search_query(self, query: str) -> Dict[str, Any]:
"""검색 쿼리 전처리"""
processed = {
'original': query,
'keywords': [],
'phrases': [],
'filters': {},
'operators': []
}
# 키워드 추출
keywords = re.findall(r'\b\w+\b', query.lower())
processed['keywords'] = [kw for kw in keywords if len(kw) > 2]
# 구문 추출 (따옴표로 둘러싸인 부분)
phrases = re.findall(r'"([^"]*)"', query)
processed['phrases'] = phrases
# 특수 필터 추출 (예: status:pending, priority:high)
filters = re.findall(r'(\w+):(\w+)', query)
for key, value in filters:
if key in ['status', 'priority', 'category']:
processed['filters'][key] = value
# 불린 연산자 추출
if ' AND ' in query.upper():
processed['operators'].append('AND')
if ' OR ' in query.upper():
processed['operators'].append('OR')
if ' NOT ' in query.upper():
processed['operators'].append('NOT')
return processed
async def _exact_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]:
"""정확 매칭 검색"""
logger.info("🎯 정확 매칭 검색 중...")
results = []
keywords = processed_query['keywords']
phrases = processed_query['phrases']
# 데이터베이스 쿼리 생성
conditions = []
params = []
# 키워드 검색
for keyword in keywords:
if scope == SearchScope.ALL:
condition = "(name LIKE ? OR description LIKE ? OR implementation_guide LIKE ? OR notes LIKE ?)"
params.extend([f'%{keyword}%'] * 4)
elif scope == SearchScope.NAME:
condition = "name LIKE ?"
params.append(f'%{keyword}%')
elif scope == SearchScope.DESCRIPTION:
condition = "description LIKE ?"
params.append(f'%{keyword}%')
conditions.append(condition)
# 구문 검색
for phrase in phrases:
if scope == SearchScope.ALL:
condition = "(name LIKE ? OR description LIKE ? OR implementation_guide LIKE ? OR notes LIKE ?)"
params.extend([f'%{phrase}%'] * 4)
elif scope == SearchScope.NAME:
condition = "name LIKE ?"
params.append(f'%{phrase}%')
elif scope == SearchScope.DESCRIPTION:
condition = "description LIKE ?"
params.append(f'%{phrase}%')
conditions.append(condition)
if conditions:
query = f"SELECT * FROM tasks WHERE {' AND '.join(conditions)}"
tasks_data = await self._execute_search_query(query, params)
for task_data in tasks_data:
task = await self._create_task_from_db_data(task_data)
relevance = self._calculate_exact_relevance(task, processed_query)
results.append(SearchResult(
task=task,
relevance_score=relevance,
match_details={'type': 'exact', 'matches': len(keywords) + len(phrases)},
highlighted_text=""
))
return results
async def _fuzzy_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]:
"""퍼지 매칭 검색"""
logger.info("🔍 퍼지 매칭 검색 중...")
results = []
keywords = processed_query['keywords']
# 모든 작업 로드 (캐시된 작업들)
all_tasks = await self._load_all_tasks()
for task in all_tasks:
# 퍼지 매칭 점수 계산
fuzzy_score = self._calculate_fuzzy_score(task, keywords, scope)
if fuzzy_score > 0.3: # 임계값
results.append(SearchResult(
task=task,
relevance_score=fuzzy_score,
match_details={'type': 'fuzzy', 'score': fuzzy_score},
highlighted_text=""
))
return results
async def _semantic_search(self, processed_query: Dict[str, Any], scope: SearchScope) -> List[SearchResult]:
"""의미적 매칭 검색"""
logger.info("🧠 의미적 매칭 검색 중...")
results = []
keywords = processed_query['keywords']
# 의미적 관련 키워드 확장
expanded_keywords = self._expand_keywords_semantically(keywords)
# 확장된 키워드로 검색
all_tasks = await self._load_all_tasks()
for task in all_tasks:
semantic_score = self._calculate_semantic_score(task, expanded_keywords, scope)
if semantic_score > 0.2: # 임계값
results.append(SearchResult(
task=task,
relevance_score=semantic_score,
match_details={'type': 'semantic', 'expanded_keywords': expanded_keywords},
highlighted_text=""
))
return results
# =========================================================================
# get_task_detail: 상세 정보 조회
# =========================================================================
async def get_task_detail(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
상세 정보 조회 및 컨텍스트 제공
Args:
task_id: 조회할 작업 ID
Returns:
Optional[Dict[str, Any]]: 상세 작업 정보
"""
logger.info(f"📄 작업 상세 정보 조회: {task_id}")
try:
# 1. 기본 작업 정보 로드
task = await self._load_task_from_db(task_id)
if not task:
logger.warning(f"⚠️ 작업을 찾을 수 없음: {task_id}")
return None
# 2. 관련 작업 정보 수집
related_info = await self._gather_related_task_info(task)
# 3. 의존성 정보 수집
dependency_info = await self._gather_dependency_info(task)
# 4. 실행 히스토리 수집
execution_history = await self._gather_execution_history(task_id)
# 5. 품질 메트릭 계산
quality_metrics = await self._calculate_quality_metrics(task)
# 6. 컨텍스트 정보 수집
context_info = await self._gather_context_information(task)
# 7. 상세 정보 조합
detail_info = {
'task': task.to_dict(),
'related_tasks': related_info,
'dependencies': dependency_info,
'execution_history': execution_history,
'quality_metrics': quality_metrics,
'context_info': context_info,
'metadata': {
'last_accessed': datetime.now().isoformat(),
'access_count': await self._increment_access_count(task_id),
'estimated_read_time': self._estimate_read_time(task),
'complexity_indicators': self._analyze_complexity_indicators(task)
}
}
logger.info(f"✅ 상세 정보 조회 완료: {task.name}")
return detail_info
except Exception as e:
logger.error(f"❌ 상세 정보 조회 실패: {e}")
raise
async def _gather_related_task_info(self, task: Task) -> Dict[str, Any]:
"""관련 작업 정보 수집"""
logger.info("🔗 관련 작업 정보 수집 중...")
related_info = {
'parent_tasks': [],
'child_tasks': [],
'sibling_tasks': [],
'similar_tasks': []
}
# 부모 작업
if task.parent_task:
parent = await self._load_task_from_db(task.parent_task)
if parent:
related_info['parent_tasks'].append({
'id': parent.id,
'name': parent.name,
'status': parent.status.value
})
# 자식 작업들
for subtask_id in task.subtasks:
subtask = await self._load_task_from_db(subtask_id)
if subtask:
related_info['child_tasks'].append({
'id': subtask.id,
'name': subtask.name,
'status': subtask.status.value
})
# 형제 작업들 (같은 부모를 가진 작업들)
if task.parent_task:
siblings = await self._find_sibling_tasks(task.parent_task, task.id)
related_info['sibling_tasks'] = siblings
# 유사한 작업들 (카테고리, 태그 기반)
similar_tasks = await self._find_similar_tasks(task)
related_info['similar_tasks'] = similar_tasks[:5] # 상위 5개
return related_info
async def _gather_dependency_info(self, task: Task) -> Dict[str, Any]:
"""의존성 정보 수집"""
logger.info("📊 의존성 정보 수집 중...")
dependency_info = {
'depends_on': [],
'blocks': [],
'dependency_graph': {},
'critical_path': []
}
# 직접 의존성
for dep in task.dependencies:
dep_task = await self._load_task_from_db(dep.task_id)
if dep_task:
dependency_info['depends_on'].append({
'id': dep_task.id,
'name': dep_task.name,
'status': dep_task.status.value,
'type': dep.dependency_type,
'description': dep.description
})
# 이 작업에 의존하는 작업들
blocking_tasks = await self._find_tasks_depending_on(task.id)
dependency_info['blocks'] = blocking_tasks
# 의존성 그래프 생성
dependency_info['dependency_graph'] = await self._build_dependency_graph(task.id)
# 크리티컬 패스 계산
dependency_info['critical_path'] = await self._calculate_critical_path(task.id)
return dependency_info
# =========================================================================
# 유틸리티 및 헬퍼 메서드들
# =========================================================================
async def _load_task_from_db(self, task_id: str) -> Optional[Task]:
"""데이터베이스에서 작업 로드"""
# 캐시 확인
if task_id in self.task_cache:
cache_time = self.cache_timestamps.get(task_id)
if cache_time and datetime.now() - cache_time < self.cache_ttl:
return self.task_cache[task_id]
# 데이터베이스에서 로드
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT task_data FROM tasks WHERE id = ?", (task_id,))
row = cursor.fetchone()
if row:
task_data = json.loads(row[0])
task = Task.from_dict(task_data)
# 캐시에 저장
self.task_cache[task_id] = task
self.cache_timestamps[task_id] = datetime.now()
return task
return None
async def _load_all_tasks(self) -> List[Task]:
"""모든 작업 로드"""
tasks = []
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT task_data FROM tasks")
for row in cursor.fetchall():
task_data = json.loads(row[0])
task = Task.from_dict(task_data)
tasks.append(task)
return tasks
def _calculate_fuzzy_score(self, task: Task, keywords: List[str], scope: SearchScope) -> float:
"""퍼지 매칭 점수 계산"""
text_to_search = ""
if scope == SearchScope.ALL:
text_to_search = f"{task.name} {task.description} {task.implementation_guide} {task.notes}"
elif scope == SearchScope.NAME:
text_to_search = task.name
elif scope == SearchScope.DESCRIPTION:
text_to_search = task.description
elif scope == SearchScope.CONTENT:
text_to_search = f"{task.implementation_guide} {task.notes}"
text_to_search = text_to_search.lower()
total_score = 0.0
for keyword in keywords:
# 정확 매칭
if keyword in text_to_search:
total_score += 1.0
continue
# 유사도 매칭
words = text_to_search.split()
best_match = 0.0
for word in words:
similarity = difflib.SequenceMatcher(None, keyword, word).ratio()
best_match = max(best_match, similarity)
if best_match > 0.6: # 임계값
total_score += best_match
return min(total_score / len(keywords), 1.0) if keywords else 0.0
def _expand_keywords_semantically(self, keywords: List[str]) -> Dict[str, List[str]]:
"""키워드 의미적 확장"""
expansion_map = {
'implement': ['create', 'build', 'develop', 'code', 'construct'],
'fix': ['repair', 'correct', 'solve', 'debug', 'resolve'],
'test': ['verify', 'validate', 'check', 'examine', 'assess'],
'optimize': ['improve', 'enhance', 'refine', 'speed up', 'efficient'],
'design': ['plan', 'architect', 'structure', 'layout', 'blueprint'],
'api': ['interface', 'endpoint', 'service', 'rest', 'graphql'],
'database': ['db', 'storage', 'data', 'sql', 'nosql'],
'frontend': ['ui', 'interface', 'client', 'web', 'react', 'vue'],
'backend': ['server', 'api', 'service', 'database', 'logic'],
'security': ['auth', 'authentication', 'authorization', 'encryption', 'ssl']
}
expanded = {}
for keyword in keywords:
expanded[keyword] = expansion_map.get(keyword.lower(), [keyword])
return expanded
if __name__ == "__main__":
# 기본 테스트
print("🚀 Data Management System 테스트")
async def test_data_management():
manager = DataManagementManager()
# list_tasks 테스트
task_list = await manager.list_tasks(status="all", page=1, page_size=10)
print(f"✅ list_tasks 성공: {len(task_list.tasks)}개 작업, "
f"총 {task_list.total_count}개 중 {task_list.filtered_count}개 필터링")
# query_task 테스트
search_results = await manager.query_task("Python 개발", page=1, page_size=5)
print(f"✅ query_task 성공: {len(search_results)}개 검색 결과")
# get_task_detail 테스트
if task_list.tasks:
detail = await manager.get_task_detail(task_list.tasks[0].id)
if detail:
print(f"✅ get_task_detail 성공: {detail['task']['name']}")
else:
print("ℹ️ get_task_detail: 작업을 찾을 수 없음 (정상 - 테스트 환경)")
print("🎯 모든 Data Management 테스트 완료!")
# 비동기 테스트 실행
import asyncio
asyncio.run(test_data_management())