#!/usr/bin/env python3
"""
Shrimp Task Manager - Project Governance System
로컬 AI의 프로젝트 거버넌스 시스템: 규칙 관리 및 작업 거버넌스
이 모듈은 프로젝트의 거버넌스와 규칙을 관리합니다:
- init_project_rules: 프로젝트 규칙 초기화 및 표준 설정
- update_task: 동적 작업 갱신 및 의존성 관리
- delete_task: 안전한 작업 삭제 및 무결성 검사
- clear_all_tasks: 전체 작업 초기화 및 백업
거버넌스 시스템은 프로젝트의 일관성과 품질을 보장하고,
변경사항의 영향을 분석하여 안전한 작업 관리를 제공합니다.
"""
import os
import re
import json
import sqlite3
import shutil
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple, Set
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
# 로컬 모듈 임포트
from .shrimp_base import (
Task, TaskStatus, TaskPriority, TaskCategory, RelatedFile, ProjectRules,
generate_task_id, calculate_dependency_depth
)
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# =============================================================================
# 거버넌스 관련 클래스 (Governance Classes)
# =============================================================================
class ChangeImpact(Enum):
"""변경 영향도"""
MINIMAL = "minimal" # 최소 영향
LOW = "low" # 낮은 영향
MEDIUM = "medium" # 중간 영향
HIGH = "high" # 높은 영향
CRITICAL = "critical" # 중대 영향
class GovernanceAction(Enum):
"""거버넌스 액션"""
CREATE = "create" # 생성
UPDATE = "update" # 업데이트
DELETE = "delete" # 삭제
ARCHIVE = "archive" # 아카이브
RESTORE = "restore" # 복원
@dataclass
class ImpactAnalysis:
"""영향도 분석 결과"""
target_task_id: str # 대상 작업 ID
action: GovernanceAction # 수행할 액션
impact_level: ChangeImpact # 영향도 수준
affected_tasks: List[str] # 영향받는 작업들
dependency_chain: List[str] # 의존성 체인
risk_factors: List[str] # 위험 요소들
mitigation_strategies: List[str] # 완화 전략들
approval_required: bool # 승인 필요 여부
estimated_effort: float # 예상 소요 시간
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'target_task_id': self.target_task_id,
'action': self.action.value,
'impact_level': self.impact_level.value,
'affected_tasks': self.affected_tasks,
'dependency_chain': self.dependency_chain,
'risk_factors': self.risk_factors,
'mitigation_strategies': self.mitigation_strategies,
'approval_required': self.approval_required,
'estimated_effort': self.estimated_effort
}
@dataclass
class BackupInfo:
"""백업 정보"""
backup_id: str # 백업 ID
backup_type: str # 백업 타입
task_count: int # 백업된 작업 수
backup_path: str # 백업 파일 경로
created_at: datetime # 생성 시간
description: str # 백업 설명
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'backup_id': self.backup_id,
'backup_type': self.backup_type,
'task_count': self.task_count,
'backup_path': self.backup_path,
'created_at': self.created_at.isoformat(),
'description': self.description
}
@dataclass
class ValidationResult:
"""검증 결과"""
is_valid: bool # 유효성 여부
errors: List[str] # 오류 목록
warnings: List[str] # 경고 목록
suggestions: List[str] # 제안 목록
confidence_score: float # 신뢰도 점수
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'is_valid': self.is_valid,
'errors': self.errors,
'warnings': self.warnings,
'suggestions': self.suggestions,
'confidence_score': self.confidence_score
}
# =============================================================================
# Project Governance Manager - 핵심 클래스
# =============================================================================
class ProjectGovernanceManager:
"""프로젝트 거버넌스 관리자"""
def __init__(self, working_directory: str = "/home/skyki/qwen2.5"):
"""초기화"""
self.working_directory = working_directory
self.db_path = os.path.join(working_directory, "shrimp_tasks.db")
self.backup_dir = os.path.join(working_directory, "backups")
self.rules_file = os.path.join(working_directory, "project_rules.json")
# 백업 디렉토리 생성
os.makedirs(self.backup_dir, exist_ok=True)
# 현재 프로젝트 규칙
self.project_rules: Optional[ProjectRules] = None
# 거버넌스 정책
self.governance_policies = {
'max_dependency_depth': 5,
'require_approval_for_critical_changes': True,
'auto_backup_before_major_changes': True,
'enforce_quality_gates': True,
'minimum_documentation_coverage': 0.8,
'maximum_orphaned_tasks': 10
}
# 변경 추적
self.change_history: List[Dict[str, Any]] = []
logger.info("ProjectGovernanceManager 초기화 완료")
# =========================================================================
# init_project_rules: 프로젝트 규칙 초기화
# =========================================================================
async def init_project_rules(self) -> Dict[str, Any]:
"""
프로젝트 규칙 초기화 및 표준 설정
Returns:
Dict[str, Any]: 초기화된 프로젝트 규칙
"""
logger.info("📋 프로젝트 규칙 초기화 시작")
try:
# 1. 기존 프로젝트 분석
project_analysis = await self._analyze_existing_project()
# 2. 프로젝트 타입 식별
project_type = self._identify_project_type(project_analysis)
# 3. 기본 규칙 템플릿 생성
base_rules = self._create_base_rules_template(project_type)
# 4. 프로젝트별 커스터마이징
customized_rules = await self._customize_rules_for_project(base_rules, project_analysis)
# 5. 규칙 검증
validation_result = self._validate_project_rules(customized_rules)
if not validation_result.is_valid:
logger.warning(f"⚠️ 규칙 검증 경고: {validation_result.warnings}")
# 6. ProjectRules 객체 생성
project_rules = ProjectRules(
project_id=f"PROJECT-{datetime.now().strftime('%Y%m%d')}",
coding_standards=customized_rules['coding_standards'],
architecture_patterns=customized_rules['architecture_patterns'],
quality_gates=customized_rules['quality_gates'],
file_naming_conventions=customized_rules['file_naming_conventions'],
dependency_rules=customized_rules['dependency_rules'],
testing_requirements=customized_rules['testing_requirements'],
documentation_standards=customized_rules['documentation_standards']
)
# 7. 규칙 저장
await self._save_project_rules(project_rules)
self.project_rules = project_rules
# 8. 기존 작업들에 규칙 적용
compliance_report = await self._apply_rules_to_existing_tasks(project_rules)
# 9. 결과 생성
result = {
'project_rules': project_rules.to_dict(),
'project_type': project_type,
'validation_result': validation_result.to_dict(),
'compliance_report': compliance_report,
'recommendations': self._generate_rule_recommendations(project_analysis, compliance_report),
'initialized_at': datetime.now().isoformat()
}
logger.info(f"✅ 프로젝트 규칙 초기화 완료: {project_type} 타입")
return result
except Exception as e:
logger.error(f"❌ 프로젝트 규칙 초기화 실패: {e}")
raise
async def _analyze_existing_project(self) -> Dict[str, Any]:
"""기존 프로젝트 분석"""
logger.info("🔍 기존 프로젝트 분석 중...")
analysis = {
'file_structure': {},
'technology_stack': [],
'code_patterns': [],
'existing_standards': {},
'quality_metrics': {},
'dependencies': []
}
# 파일 구조 분석
analysis['file_structure'] = await self._analyze_file_structure()
# 기술 스택 식별
analysis['technology_stack'] = await self._identify_technology_stack()
# 코드 패턴 인식
analysis['code_patterns'] = await self._recognize_existing_code_patterns()
# 기존 표준 추출
analysis['existing_standards'] = await self._extract_existing_standards()
# 품질 메트릭 계산
analysis['quality_metrics'] = await self._calculate_project_quality_metrics()
# 의존성 분석
analysis['dependencies'] = await self._analyze_project_dependencies()
return analysis
def _identify_project_type(self, analysis: Dict[str, Any]) -> str:
"""프로젝트 타입 식별"""
tech_stack = analysis.get('technology_stack', [])
file_structure = analysis.get('file_structure', {})
# 웹 애플리케이션
if any(tech in tech_stack for tech in ['react', 'vue', 'angular', 'django', 'flask', 'express']):
return "web_application"
# 라이브러리/SDK
elif any(pattern in file_structure for pattern in ['src/', 'lib/', 'dist/']):
return "library"
# 데이터 사이언스/ML
elif any(tech in tech_stack for tech in ['jupyter', 'pandas', 'tensorflow', 'pytorch']):
return "data_science"
# 마이크로서비스
elif 'docker' in tech_stack and any('service' in key for key in file_structure.keys()):
return "microservices"
# 모바일 앱
elif any(tech in tech_stack for tech in ['react-native', 'flutter', 'swift', 'kotlin']):
return "mobile_app"
# 기본값
else:
return "general_software"
def _create_base_rules_template(self, project_type: str) -> Dict[str, Any]:
"""기본 규칙 템플릿 생성"""
logger.info(f"📝 {project_type} 타입 기본 규칙 생성 중...")
base_templates = {
"web_application": {
'coding_standards': {
'max_function_length': 50,
'max_file_length': 500,
'naming_convention': 'camelCase',
'indent_style': 'spaces',
'indent_size': 2
},
'architecture_patterns': ['MVC', 'Component-based', 'RESTful API'],
'quality_gates': {
'code_coverage': 80.0,
'complexity_score': 70.0,
'documentation_coverage': 75.0
},
'testing_requirements': {
'unit_test_coverage': 80.0,
'integration_test_coverage': 70.0,
'e2e_test_coverage': 50.0
}
},
"library": {
'coding_standards': {
'max_function_length': 30,
'max_file_length': 300,
'naming_convention': 'snake_case',
'indent_style': 'spaces',
'indent_size': 4
},
'architecture_patterns': ['Modular', 'API-first'],
'quality_gates': {
'code_coverage': 90.0,
'complexity_score': 80.0,
'documentation_coverage': 95.0
}
},
"general_software": {
'coding_standards': {
'max_function_length': 40,
'max_file_length': 400,
'naming_convention': 'snake_case',
'indent_style': 'spaces',
'indent_size': 4
},
'architecture_patterns': ['Layered', 'Modular'],
'quality_gates': {
'code_coverage': 75.0,
'complexity_score': 75.0,
'documentation_coverage': 70.0
}
}
}
template = base_templates.get(project_type, base_templates["general_software"])
# 공통 필드 추가
template.update({
'file_naming_conventions': {
'class_files': 'PascalCase',
'function_files': 'snake_case',
'test_files': 'test_*.py',
'config_files': 'config.*'
},
'dependency_rules': [
'No circular dependencies',
'Maximum dependency depth: 5',
'Prefer composition over inheritance',
'Use semantic versioning'
],
'documentation_standards': {
'function_docstring': 'required',
'class_docstring': 'required',
'module_docstring': 'recommended',
'api_documentation': 'required'
}
})
return template
# =========================================================================
# update_task: 동적 작업 갱신
# =========================================================================
async def update_task(self,
task_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
notes: Optional[str] = None,
dependencies: Optional[List[str]] = None,
related_files: Optional[List[Dict[str, Any]]] = None,
implementation_guide: Optional[str] = None,
verification_criteria: Optional[str] = None) -> Dict[str, Any]:
"""
동적 작업 갱신 및 의존성 관리
Args:
task_id: 업데이트할 작업 ID
name: 새로운 이름
description: 새로운 설명
notes: 새로운 노트
dependencies: 새로운 의존성 목록
related_files: 새로운 관련 파일 목록
implementation_guide: 새로운 구현 가이드
verification_criteria: 새로운 검증 기준
Returns:
Dict[str, Any]: 업데이트 결과
"""
logger.info(f"🔄 작업 업데이트 시작: {task_id}")
try:
# 1. 기존 작업 로드
original_task = await self._load_task_from_db(task_id)
if not original_task:
raise ValueError(f"작업 {task_id}를 찾을 수 없습니다")
# 2. 영향도 분석
impact_analysis = await self._analyze_update_impact(
original_task, name, description, notes, dependencies,
related_files, implementation_guide, verification_criteria
)
# 3. 승인 필요 여부 확인
if impact_analysis.approval_required:
logger.warning(f"⚠️ 중대한 변경으로 승인이 필요합니다: {impact_analysis.impact_level.value}")
# 실제 구현에서는 승인 프로세스 수행
# 4. 백업 생성 (중요한 변경인 경우)
backup_info = None
if impact_analysis.impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL]:
backup_info = await self._create_task_backup(task_id, "before_update")
# 5. 작업 업데이트 수행
updated_task = await self._perform_task_update(
original_task, name, description, notes, dependencies,
related_files, implementation_guide, verification_criteria
)
# 6. 의존성 체인 업데이트
dependency_updates = await self._update_dependency_chain(updated_task, original_task)
# 7. 프로젝트 규칙 준수 검증
compliance_check = await self._check_rule_compliance(updated_task)
# 8. 변경 기록
change_record = self._record_change(
GovernanceAction.UPDATE, task_id, impact_analysis,
original_task.to_dict(), updated_task.to_dict()
)
# 9. 결과 생성
result = {
'success': True,
'updated_task': updated_task.to_dict(),
'impact_analysis': impact_analysis.to_dict(),
'dependency_updates': dependency_updates,
'compliance_check': compliance_check.to_dict(),
'backup_info': backup_info.to_dict() if backup_info else None,
'change_record': change_record,
'updated_at': datetime.now().isoformat()
}
logger.info(f"✅ 작업 업데이트 완료: {updated_task.name}")
return result
except Exception as e:
logger.error(f"❌ 작업 업데이트 실패: {e}")
raise
async def _analyze_update_impact(self, original_task: Task, **update_fields) -> ImpactAnalysis:
"""업데이트 영향도 분석"""
logger.info("📊 업데이트 영향도 분석 중...")
impact_level = ChangeImpact.MINIMAL
affected_tasks = []
risk_factors = []
mitigation_strategies = []
# 의존성 변경 분석
if update_fields.get('dependencies') is not None:
new_deps = update_fields['dependencies']
old_deps = [dep.task_id for dep in original_task.dependencies]
if set(new_deps) != set(old_deps):
impact_level = max(impact_level, ChangeImpact.HIGH)
affected_tasks.extend(await self._find_tasks_in_dependency_chain(original_task.id))
risk_factors.append("Dependency chain modification")
mitigation_strategies.append("Validate dependency integrity")
# 이름 변경 분석
if update_fields.get('name') and update_fields['name'] != original_task.name:
impact_level = max(impact_level, ChangeImpact.MEDIUM)
risk_factors.append("Task name change may affect references")
mitigation_strategies.append("Update all task references")
# 상태가 완료인 작업의 수정
if original_task.status == TaskStatus.COMPLETED:
impact_level = max(impact_level, ChangeImpact.HIGH)
risk_factors.append("Modifying completed task")
mitigation_strategies.append("Require additional approval")
# 중요한 필드 변경
critical_fields = ['implementation_guide', 'verification_criteria']
for field in critical_fields:
if (update_fields.get(field) is not None and
update_fields[field] != getattr(original_task, field, '')):
impact_level = max(impact_level, ChangeImpact.MEDIUM)
return ImpactAnalysis(
target_task_id=original_task.id,
action=GovernanceAction.UPDATE,
impact_level=impact_level,
affected_tasks=affected_tasks,
dependency_chain=await self._build_dependency_chain(original_task.id),
risk_factors=risk_factors,
mitigation_strategies=mitigation_strategies,
approval_required=impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL],
estimated_effort=self._estimate_update_effort(impact_level, len(affected_tasks))
)
# =========================================================================
# delete_task: 안전한 작업 삭제
# =========================================================================
async def delete_task(self, task_id: str, force: bool = False) -> Dict[str, Any]:
"""
안전한 작업 삭제 및 무결성 검사
Args:
task_id: 삭제할 작업 ID
force: 강제 삭제 여부
Returns:
Dict[str, Any]: 삭제 결과
"""
logger.info(f"🗑️ 작업 삭제 시작: {task_id} (강제: {force})")
try:
# 1. 작업 존재 확인
task = await self._load_task_from_db(task_id)
if not task:
raise ValueError(f"작업 {task_id}를 찾을 수 없습니다")
# 2. 삭제 가능성 검증
deletion_check = await self._check_deletion_safety(task, force)
if not deletion_check['safe'] and not force:
return {
'success': False,
'reason': 'unsafe_deletion',
'checks': deletion_check,
'requires_force': True
}
# 3. 영향도 분석
impact_analysis = await self._analyze_deletion_impact(task)
# 4. 백업 생성
backup_info = await self._create_task_backup(task_id, "before_deletion")
# 5. 의존성 정리
dependency_cleanup = await self._cleanup_task_dependencies(task)
# 6. 실제 삭제 수행
deletion_result = await self._perform_task_deletion(task, force)
# 7. 고아 작업 처리
orphan_handling = await self._handle_orphaned_tasks(task.subtasks)
# 8. 변경 기록
change_record = self._record_change(
GovernanceAction.DELETE, task_id, impact_analysis,
task.to_dict(), None
)
# 9. 결과 생성
result = {
'success': True,
'deleted_task_id': task_id,
'deleted_task_name': task.name,
'impact_analysis': impact_analysis.to_dict(),
'backup_info': backup_info.to_dict(),
'dependency_cleanup': dependency_cleanup,
'orphan_handling': orphan_handling,
'change_record': change_record,
'deleted_at': datetime.now().isoformat()
}
logger.info(f"✅ 작업 삭제 완료: {task.name}")
return result
except Exception as e:
logger.error(f"❌ 작업 삭제 실패: {e}")
raise
async def _check_deletion_safety(self, task: Task, force: bool) -> Dict[str, Any]:
"""삭제 안전성 검사"""
logger.info("🔍 삭제 안전성 검사 중...")
checks = {
'safe': True,
'blocking_factors': [],
'warnings': [],
'affected_count': 0
}
# 완료된 작업 삭제 검사
if task.status == TaskStatus.COMPLETED:
checks['warnings'].append("완료된 작업을 삭제하려고 합니다")
if not force:
checks['safe'] = False
checks['blocking_factors'].append("완료된 작업은 강제 삭제만 가능합니다")
# 의존성 검사
dependent_tasks = await self._find_tasks_depending_on(task.id)
if dependent_tasks:
checks['affected_count'] = len(dependent_tasks)
checks['warnings'].append(f"{len(dependent_tasks)}개 작업이 이 작업에 의존합니다")
if not force:
checks['safe'] = False
checks['blocking_factors'].append("의존하는 작업들이 있어 강제 삭제만 가능합니다")
# 자식 작업 검사
if task.subtasks:
checks['warnings'].append(f"{len(task.subtasks)}개 하위 작업이 있습니다")
# 중요도 검사
if task.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]:
checks['warnings'].append("중요한 작업을 삭제하려고 합니다")
return checks
# =========================================================================
# clear_all_tasks: 전체 작업 초기화
# =========================================================================
async def clear_all_tasks(self, confirm: bool = False) -> Dict[str, Any]:
"""
전체 작업 초기화 및 백업
Args:
confirm: 삭제 확인 여부
Returns:
Dict[str, Any]: 초기화 결과
"""
logger.info(f"🧹 전체 작업 초기화 시작 (확인: {confirm})")
if not confirm:
return {
'success': False,
'reason': 'confirmation_required',
'message': '전체 작업 삭제는 확인이 필요합니다'
}
try:
# 1. 현재 작업 현황 조회
task_statistics = await self._get_task_statistics()
# 2. 전체 백업 생성
backup_info = await self._create_full_backup("before_clear_all")
# 3. 완료된 작업 별도 아카이브
archive_info = await self._archive_completed_tasks()
# 4. 미완료 작업만 삭제
deletion_result = await self._delete_incomplete_tasks()
# 5. 데이터베이스 정리
cleanup_result = await self._cleanup_database()
# 6. 인덱스 재구성
reindex_result = await self._rebuild_indexes()
# 7. 변경 기록
change_record = self._record_change(
GovernanceAction.DELETE, "ALL_TASKS", None,
{'task_count': task_statistics['total_tasks']},
{'task_count': 0}
)
# 8. 결과 생성
result = {
'success': True,
'operation': 'clear_all_tasks',
'before_statistics': task_statistics,
'backup_info': backup_info.to_dict(),
'archive_info': archive_info,
'deletion_result': deletion_result,
'cleanup_result': cleanup_result,
'reindex_result': reindex_result,
'change_record': change_record,
'cleared_at': datetime.now().isoformat()
}
logger.info(f"✅ 전체 작업 초기화 완료: {deletion_result['deleted_count']}개 작업 삭제")
return result
except Exception as e:
logger.error(f"❌ 전체 작업 초기화 실패: {e}")
raise
async def _create_full_backup(self, backup_type: str) -> BackupInfo:
"""전체 백업 생성"""
logger.info("💾 전체 백업 생성 중...")
backup_id = f"BACKUP-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
backup_filename = f"{backup_id}.json"
backup_path = os.path.join(self.backup_dir, backup_filename)
# 모든 데이터 수집
backup_data = {
'backup_info': {
'id': backup_id,
'type': backup_type,
'created_at': datetime.now().isoformat(),
'version': '1.0'
},
'tasks': await self._export_all_tasks(),
'project_rules': self.project_rules.to_dict() if self.project_rules else None,
'governance_policies': self.governance_policies,
'change_history': self.change_history[-100:] # 최근 100개
}
# 백업 파일 저장
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
backup_info = BackupInfo(
backup_id=backup_id,
backup_type=backup_type,
task_count=len(backup_data['tasks']),
backup_path=backup_path,
created_at=datetime.now(),
description=f"Full system backup - {backup_type}"
)
logger.info(f"✅ 백업 생성 완료: {backup_path}")
return backup_info
# =========================================================================
# 유틸리티 및 헬퍼 메서드들
# =========================================================================
async def _load_task_from_db(self, task_id: str) -> Optional[Task]:
"""데이터베이스에서 작업 로드"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT task_data FROM tasks WHERE id = ?", (task_id,))
row = cursor.fetchone()
if row:
task_data = json.loads(row[0])
return Task.from_dict(task_data)
except Exception as e:
logger.error(f"작업 로드 실패: {e}")
return None
async def _find_tasks_depending_on(self, task_id: str) -> List[str]:
"""특정 작업에 의존하는 작업들 찾기"""
dependent_tasks = []
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT task_id FROM task_dependencies
WHERE dependency_task_id = ?
""", (task_id,))
for row in cursor.fetchall():
dependent_tasks.append(row[0])
except Exception as e:
logger.error(f"의존 작업 조회 실패: {e}")
return dependent_tasks
def _record_change(self, action: GovernanceAction, target_id: str,
impact_analysis: Optional[ImpactAnalysis],
before_state: Optional[Dict[str, Any]],
after_state: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""변경 기록"""
change_record = {
'id': f"CHANGE-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
'action': action.value,
'target_id': target_id,
'impact_analysis': impact_analysis.to_dict() if impact_analysis else None,
'before_state': before_state,
'after_state': after_state,
'timestamp': datetime.now().isoformat(),
'user': 'system' # 실제 구현에서는 현재 사용자
}
self.change_history.append(change_record)
# 변경 기록 개수 제한 (메모리 관리)
if len(self.change_history) > 1000:
self.change_history = self.change_history[-500:]
return change_record
async def _get_task_statistics(self) -> Dict[str, Any]:
"""작업 통계 조회"""
stats = {
'total_tasks': 0,
'by_status': {},
'by_priority': {},
'by_category': {},
'completed_tasks': 0,
'incomplete_tasks': 0
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 전체 작업 수
cursor.execute("SELECT COUNT(*) FROM tasks")
stats['total_tasks'] = cursor.fetchone()[0]
# 상태별 통계
cursor.execute("SELECT status, COUNT(*) FROM tasks GROUP BY status")
for status, count in cursor.fetchall():
stats['by_status'][status] = count
if status == 'completed':
stats['completed_tasks'] = count
else:
stats['incomplete_tasks'] += count
# 우선순위별 통계
cursor.execute("SELECT priority, COUNT(*) FROM tasks GROUP BY priority")
for priority, count in cursor.fetchall():
stats['by_priority'][priority] = count
# 카테고리별 통계
cursor.execute("SELECT category, COUNT(*) FROM tasks GROUP BY category")
for category, count in cursor.fetchall():
stats['by_category'][category] = count
except Exception as e:
logger.error(f"통계 조회 실패: {e}")
return stats
# =========================================================================
# 프로젝트 분석 메서드들
# =========================================================================
async def _analyze_file_structure(self) -> Dict[str, Any]:
"""파일 구조 분석"""
structure = {'directories': [], 'files': [], 'patterns': []}
try:
for root, dirs, files in os.walk(self.working_directory):
rel_root = os.path.relpath(root, self.working_directory)
if not rel_root.startswith('.'):
structure['directories'].append(rel_root)
structure['files'].extend([
os.path.join(rel_root, f) for f in files
if not f.startswith('.')
])
except Exception as e:
logger.warning(f"파일 구조 분석 실패: {e}")
return structure
async def _identify_technology_stack(self) -> List[str]:
"""기술 스택 식별"""
tech_stack = []
# package.json 확인
package_json = os.path.join(self.working_directory, 'package.json')
if os.path.exists(package_json):
tech_stack.extend(['node.js', 'npm'])
try:
with open(package_json, 'r') as f:
data = json.load(f)
deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})}
if 'react' in deps: tech_stack.append('react')
if 'vue' in deps: tech_stack.append('vue')
if 'express' in deps: tech_stack.append('express')
except Exception:
pass
# requirements.txt 확인
requirements = os.path.join(self.working_directory, 'requirements.txt')
if os.path.exists(requirements):
tech_stack.extend(['python', 'pip'])
try:
with open(requirements, 'r') as f:
content = f.read().lower()
if 'django' in content: tech_stack.append('django')
if 'flask' in content: tech_stack.append('flask')
if 'fastapi' in content: tech_stack.append('fastapi')
except Exception:
pass
# Dockerfile 확인
if os.path.exists(os.path.join(self.working_directory, 'Dockerfile')):
tech_stack.append('docker')
return list(set(tech_stack))
async def _recognize_existing_code_patterns(self) -> List[str]:
"""기존 코드 패턴 인식"""
patterns = []
# Python 파일 패턴 확인
for root, dirs, files in os.walk(self.working_directory):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if 'class ' in content: patterns.append('OOP')
if 'async def' in content: patterns.append('Async')
if 'import asyncio' in content: patterns.append('AsyncIO')
if 'from dataclasses' in content: patterns.append('Dataclasses')
except Exception:
continue
return list(set(patterns))
async def _extract_existing_standards(self) -> Dict[str, Any]:
"""기존 표준 추출"""
standards = {'naming': 'snake_case', 'docstrings': False, 'type_hints': False}
# Python 파일에서 패턴 추출
snake_case_count = 0
camel_case_count = 0
docstring_count = 0
type_hint_count = 0
total_functions = 0
for root, dirs, files in os.walk(self.working_directory):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 함수 패턴 분석
func_patterns = re.findall(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', content)
for func_name in func_patterns:
total_functions += 1
if '_' in func_name:
snake_case_count += 1
elif any(c.isupper() for c in func_name[1:]):
camel_case_count += 1
# 독스트링 확인
if '"""' in content or "'''" in content:
docstring_count += len(func_patterns)
# 타입 힌트 확인
if '->' in content or ': ' in content:
type_hint_count += len(func_patterns)
except Exception:
continue
if total_functions > 0:
if snake_case_count > camel_case_count:
standards['naming'] = 'snake_case'
else:
standards['naming'] = 'camelCase'
standards['docstrings'] = (docstring_count / total_functions) > 0.5
standards['type_hints'] = (type_hint_count / total_functions) > 0.3
return standards
async def _calculate_project_quality_metrics(self) -> Dict[str, float]:
"""프로젝트 품질 메트릭 계산"""
metrics = {
'code_coverage': 0.0,
'documentation_coverage': 0.0,
'complexity_score': 0.0,
'maintainability_index': 0.0
}
# 간단한 메트릭 계산 (실제로는 더 정교한 도구 사용)
python_files = []
for root, dirs, files in os.walk(self.working_directory):
for file in files:
if file.endswith('.py'):
python_files.append(os.path.join(root, file))
if python_files:
doc_files = 0
total_lines = 0
complex_functions = 0
total_functions = 0
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
total_lines += len(lines)
if '"""' in content or "'''" in content:
doc_files += 1
# 함수 복잡도 간단 계산
functions = re.findall(r'def\s+[a-zA-Z_][a-zA-Z0-9_]*', content)
total_functions += len(functions)
# if/for/while 문 개수로 복잡도 추정
complexity = len(re.findall(r'\b(if|for|while|try|except)\b', content))
if complexity > 10:
complex_functions += len(functions)
except Exception:
continue
metrics['documentation_coverage'] = (doc_files / len(python_files)) * 100
metrics['complexity_score'] = max(0, 100 - (complex_functions / max(total_functions, 1)) * 100)
metrics['maintainability_index'] = (metrics['documentation_coverage'] + metrics['complexity_score']) / 2
return metrics
async def _analyze_project_dependencies(self) -> List[str]:
"""프로젝트 의존성 분석"""
dependencies = []
# Python 의존성
requirements_file = os.path.join(self.working_directory, 'requirements.txt')
if os.path.exists(requirements_file):
try:
with open(requirements_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
dep = line.split('=')[0].split('>')[0].split('<')[0]
dependencies.append(dep)
except Exception:
pass
# Node.js 의존성
package_file = os.path.join(self.working_directory, 'package.json')
if os.path.exists(package_file):
try:
with open(package_file, 'r') as f:
data = json.load(f)
deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})}
dependencies.extend(deps.keys())
except Exception:
pass
return dependencies
async def _customize_rules_for_project(self, base_rules: Dict[str, Any],
analysis: Dict[str, Any]) -> Dict[str, Any]:
"""프로젝트별 규칙 커스터마이징"""
customized = base_rules.copy()
# 기존 표준 반영
existing_standards = analysis.get('existing_standards', {})
if existing_standards:
customized['coding_standards']['naming_convention'] = existing_standards.get('naming', 'snake_case')
if existing_standards.get('docstrings'):
customized['documentation_standards']['function_docstring'] = 'required'
if existing_standards.get('type_hints'):
customized['coding_standards']['type_hints'] = 'required'
# 기술 스택에 따른 조정
tech_stack = analysis.get('technology_stack', [])
if 'react' in tech_stack or 'vue' in tech_stack:
customized['coding_standards']['naming_convention'] = 'camelCase'
customized['coding_standards']['indent_size'] = 2
if 'django' in tech_stack:
customized['architecture_patterns'].append('Django MVT')
customized['file_naming_conventions']['model_files'] = 'models.py'
customized['file_naming_conventions']['view_files'] = 'views.py'
return customized
def _validate_project_rules(self, rules: Dict[str, Any]) -> ValidationResult:
"""프로젝트 규칙 검증"""
errors = []
warnings = []
suggestions = []
# 필수 필드 확인
required_fields = ['coding_standards', 'architecture_patterns', 'quality_gates']
for field in required_fields:
if field not in rules:
errors.append(f"필수 필드 누락: {field}")
# 품질 게이트 값 확인
if 'quality_gates' in rules:
quality_gates = rules['quality_gates']
if quality_gates.get('code_coverage', 0) < 50:
warnings.append("코드 커버리지 기준이 너무 낮습니다 (50% 미만)")
if quality_gates.get('complexity_score', 0) < 60:
warnings.append("복잡도 점수 기준이 너무 낮습니다 (60% 미만)")
# 제안사항
if 'testing_requirements' not in rules:
suggestions.append("테스트 요구사항을 추가하는 것을 권장합니다")
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
suggestions=suggestions,
confidence_score=1.0 - (len(errors) * 0.3 + len(warnings) * 0.1)
)
async def _save_project_rules(self, rules: ProjectRules):
"""프로젝트 규칙 저장"""
try:
with open(self.rules_file, 'w', encoding='utf-8') as f:
json.dump(rules.to_dict(), f, indent=2, ensure_ascii=False)
logger.info(f"✅ 프로젝트 규칙 저장 완료: {self.rules_file}")
except Exception as e:
logger.error(f"❌ 프로젝트 규칙 저장 실패: {e}")
raise
async def _apply_rules_to_existing_tasks(self, rules: ProjectRules) -> Dict[str, Any]:
"""기존 작업들에 규칙 적용"""
report = {
'total_tasks': 0,
'compliant_tasks': 0,
'non_compliant_tasks': 0,
'compliance_issues': []
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, task_data FROM tasks")
for row in cursor.fetchall():
report['total_tasks'] += 1
task_id, task_data_json = row
task_data = json.loads(task_data_json)
# 규칙 준수 검사 (간단한 예시)
compliant = True
issues = []
# 이름 규칙 검사
task_name = task_data.get('name', '')
if not task_name:
compliant = False
issues.append("작업 이름이 비어있음")
# 설명 요구사항 검사
description = task_data.get('description', '')
if len(description) < 10:
compliant = False
issues.append("설명이 너무 짧음")
if compliant:
report['compliant_tasks'] += 1
else:
report['non_compliant_tasks'] += 1
report['compliance_issues'].append({
'task_id': task_id,
'issues': issues
})
except Exception as e:
logger.error(f"규칙 적용 검사 실패: {e}")
return report
def _generate_rule_recommendations(self, analysis: Dict[str, Any],
compliance_report: Dict[str, Any]) -> List[str]:
"""규칙 권장사항 생성"""
recommendations = []
# 품질 메트릭 기반 권장사항
quality_metrics = analysis.get('quality_metrics', {})
if quality_metrics.get('documentation_coverage', 0) < 50:
recommendations.append("문서화 커버리지가 낮습니다. 독스트링 작성을 의무화하세요.")
if quality_metrics.get('complexity_score', 100) < 70:
recommendations.append("코드 복잡도가 높습니다. 함수 분할을 고려하세요.")
# 준수성 리포트 기반 권장사항
if compliance_report['non_compliant_tasks'] > compliance_report['compliant_tasks']:
recommendations.append("기존 작업들의 규칙 준수율이 낮습니다. 점진적 개선을 계획하세요.")
# 기술 스택 기반 권장사항
tech_stack = analysis.get('technology_stack', [])
if 'docker' in tech_stack:
recommendations.append("Docker 사용 시 컨테이너 보안 규칙을 추가하세요.")
if not recommendations:
recommendations.append("현재 프로젝트 상태가 양호합니다. 정기적인 규칙 검토를 권장합니다.")
return recommendations
# =========================================================================
# 작업 업데이트 관련 메서드들
# =========================================================================
async def _perform_task_update(self, original_task: Task, **update_fields) -> Task:
"""작업 업데이트 수행"""
logger.info("🔄 작업 업데이트 수행 중...")
# 업데이트할 필드들 적용
updated_data = original_task.to_dict()
for field, value in update_fields.items():
if value is not None:
if field == 'related_files' and value:
# RelatedFile 객체로 변환
updated_data[field] = [
RelatedFile(**file_data) if isinstance(file_data, dict) else file_data
for file_data in value
]
else:
updated_data[field] = value
# 수정 시간 업데이트
updated_data['modified_at'] = datetime.now()
# Task 객체 재생성
updated_task = Task.from_dict(updated_data)
# 데이터베이스 업데이트
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE tasks SET task_data = ?, modified_at = ? WHERE id = ?
""", (json.dumps(updated_task.to_dict()), datetime.now().isoformat(), updated_task.id))
conn.commit()
except Exception as e:
logger.error(f"데이터베이스 업데이트 실패: {e}")
raise
return updated_task
async def _update_dependency_chain(self, updated_task: Task, original_task: Task) -> Dict[str, Any]:
"""의존성 체인 업데이트"""
logger.info("🔗 의존성 체인 업데이트 중...")
result = {
'dependencies_added': [],
'dependencies_removed': [],
'affected_tasks': [],
'validation_errors': []
}
# 기존 의존성과 새 의존성 비교
old_deps = set(dep.task_id for dep in original_task.dependencies)
new_deps = set(dep.task_id for dep in updated_task.dependencies)
added_deps = new_deps - old_deps
removed_deps = old_deps - new_deps
result['dependencies_added'] = list(added_deps)
result['dependencies_removed'] = list(removed_deps)
# 순환 의존성 검사
for dep_id in added_deps:
if await self._would_create_circular_dependency(updated_task.id, dep_id):
result['validation_errors'].append(f"순환 의존성 감지: {dep_id}")
# 영향받는 작업들 식별
result['affected_tasks'] = await self._find_tasks_in_dependency_chain(updated_task.id)
return result
async def _check_rule_compliance(self, task: Task) -> ValidationResult:
"""프로젝트 규칙 준수 검증"""
logger.info("📋 규칙 준수 검증 중...")
errors = []
warnings = []
suggestions = []
if not self.project_rules:
return ValidationResult(True, [], ["프로젝트 규칙이 설정되지 않음"], [], 0.5)
# 이름 규칙 검사
if len(task.name) < 5:
errors.append("작업 이름이 너무 짧습니다 (최소 5자)")
# 설명 규칙 검사
if len(task.description) < 20:
warnings.append("작업 설명이 너무 짧습니다 (권장 20자 이상)")
# 구현 가이드 검사
if not task.implementation_guide:
suggestions.append("구현 가이드를 추가하는 것을 권장합니다")
# 검증 기준 검사
if not task.verification_criteria:
suggestions.append("검증 기준을 추가하는 것을 권장합니다")
# 의존성 깊이 검사
depth = await self._calculate_dependency_depth(task.id)
max_depth = self.governance_policies.get('max_dependency_depth', 5)
if depth > max_depth:
errors.append(f"의존성 깊이가 너무 깊습니다 ({depth} > {max_depth})")
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
suggestions=suggestions,
confidence_score=1.0 - (len(errors) * 0.4 + len(warnings) * 0.2)
)
async def _build_dependency_chain(self, task_id: str) -> List[str]:
"""의존성 체인 구축"""
chain = []
visited = set()
async def traverse_dependencies(current_id: str):
if current_id in visited:
return
visited.add(current_id)
chain.append(current_id)
# 현재 작업의 의존성들 조회
task = await self._load_task_from_db(current_id)
if task:
for dep in task.dependencies:
await traverse_dependencies(dep.task_id)
await traverse_dependencies(task_id)
return chain
def _estimate_update_effort(self, impact_level: ChangeImpact, affected_count: int) -> float:
"""업데이트 소요 시간 추정"""
base_effort = {
ChangeImpact.MINIMAL: 0.1,
ChangeImpact.LOW: 0.25,
ChangeImpact.MEDIUM: 0.5,
ChangeImpact.HIGH: 1.0,
ChangeImpact.CRITICAL: 2.0
}
effort = base_effort.get(impact_level, 0.5)
effort += affected_count * 0.1 # 영향받는 작업당 추가 시간
return min(effort, 8.0) # 최대 8시간
# =========================================================================
# 작업 삭제 관련 메서드들
# =========================================================================
async def _analyze_deletion_impact(self, task: Task) -> ImpactAnalysis:
"""삭제 영향도 분석"""
logger.info("📊 삭제 영향도 분석 중...")
affected_tasks = await self._find_tasks_depending_on(task.id)
impact_level = ChangeImpact.LOW
risk_factors = []
mitigation_strategies = []
# 의존성 기반 영향도
if affected_tasks:
impact_level = ChangeImpact.HIGH
risk_factors.append(f"{len(affected_tasks)}개 작업이 이 작업에 의존함")
mitigation_strategies.append("의존 작업들의 의존성 재설정 필요")
# 완료된 작업 삭제
if task.status == TaskStatus.COMPLETED:
impact_level = max(impact_level, ChangeImpact.MEDIUM)
risk_factors.append("완료된 작업 삭제로 인한 기록 손실")
mitigation_strategies.append("아카이브 백업 필수")
# 중요한 작업
if task.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]:
impact_level = max(impact_level, ChangeImpact.HIGH)
risk_factors.append("중요한 작업 삭제")
mitigation_strategies.append("추가 승인 및 백업 필요")
return ImpactAnalysis(
target_task_id=task.id,
action=GovernanceAction.DELETE,
impact_level=impact_level,
affected_tasks=affected_tasks,
dependency_chain=await self._build_dependency_chain(task.id),
risk_factors=risk_factors,
mitigation_strategies=mitigation_strategies,
approval_required=impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL],
estimated_effort=self._estimate_update_effort(impact_level, len(affected_tasks))
)
async def _create_task_backup(self, task_id: str, backup_type: str) -> BackupInfo:
"""작업 백업 생성"""
logger.info(f"💾 작업 백업 생성: {task_id}")
task = await self._load_task_from_db(task_id)
if not task:
raise ValueError(f"작업 {task_id}를 찾을 수 없습니다")
backup_id = f"TASK-BACKUP-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
backup_filename = f"{backup_id}.json"
backup_path = os.path.join(self.backup_dir, backup_filename)
backup_data = {
'backup_info': {
'id': backup_id,
'type': backup_type,
'task_id': task_id,
'created_at': datetime.now().isoformat()
},
'task_data': task.to_dict()
}
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
return BackupInfo(
backup_id=backup_id,
backup_type=backup_type,
task_count=1,
backup_path=backup_path,
created_at=datetime.now(),
description=f"Task backup - {backup_type}"
)
async def _cleanup_task_dependencies(self, task: Task) -> Dict[str, Any]:
"""작업 의존성 정리"""
logger.info("🧹 작업 의존성 정리 중...")
result = {
'dependencies_removed': 0,
'dependent_tasks_updated': 0,
'orphaned_tasks': []
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 이 작업을 의존하는 다른 작업들 찾기
dependent_tasks = await self._find_tasks_depending_on(task.id)
for dep_task_id in dependent_tasks:
dep_task = await self._load_task_from_db(dep_task_id)
if dep_task:
# 의존성에서 삭제될 작업 제거
updated_deps = [dep for dep in dep_task.dependencies if dep.task_id != task.id]
# 의존성이 모두 제거되면 고아 작업으로 표시
if not updated_deps and dep_task.dependencies:
result['orphaned_tasks'].append(dep_task_id)
# 업데이트된 의존성으로 저장
dep_task.dependencies = updated_deps
cursor.execute("""
UPDATE tasks SET task_data = ? WHERE id = ?
""", (json.dumps(dep_task.to_dict()), dep_task_id))
result['dependent_tasks_updated'] += 1
# 의존성 테이블에서 관련 레코드 삭제
cursor.execute("DELETE FROM task_dependencies WHERE task_id = ? OR dependency_task_id = ?",
(task.id, task.id))
result['dependencies_removed'] = cursor.rowcount
conn.commit()
except Exception as e:
logger.error(f"의존성 정리 실패: {e}")
raise
return result
async def _perform_task_deletion(self, task: Task, force: bool) -> Dict[str, Any]:
"""실제 작업 삭제 수행"""
logger.info(f"🗑️ 작업 삭제 수행: {task.id}")
result = {
'deleted': False,
'archived': False,
'error': None
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 완료된 작업은 아카이브 테이블로 이동
if task.status == TaskStatus.COMPLETED:
cursor.execute("""
INSERT INTO archived_tasks (id, task_data, archived_at)
VALUES (?, ?, ?)
""", (task.id, json.dumps(task.to_dict()), datetime.now().isoformat()))
result['archived'] = True
# 원본 테이블에서 삭제
cursor.execute("DELETE FROM tasks WHERE id = ?", (task.id,))
if cursor.rowcount > 0:
result['deleted'] = True
conn.commit()
except Exception as e:
result['error'] = str(e)
logger.error(f"작업 삭제 실패: {e}")
raise
return result
async def _handle_orphaned_tasks(self, subtask_ids: List[str]) -> Dict[str, Any]:
"""고아 작업 처리"""
logger.info("👨👩👧👦 고아 작업 처리 중...")
result = {
'orphaned_count': 0,
'handled_count': 0,
'actions_taken': []
}
for subtask_id in subtask_ids:
subtask = await self._load_task_from_db(subtask_id)
if subtask:
result['orphaned_count'] += 1
# 고아 작업 처리 전략 결정
if subtask.status == TaskStatus.COMPLETED:
# 완료된 고아 작업은 유지
result['actions_taken'].append(f"{subtask_id}: 완료된 작업으로 유지")
elif subtask.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]:
# 중요한 고아 작업은 최상위로 승격
subtask.parent_task_id = None
await self._perform_task_update(subtask, parent_task_id=None)
result['actions_taken'].append(f"{subtask_id}: 최상위 작업으로 승격")
result['handled_count'] += 1
else:
# 일반 고아 작업은 삭제 고려
result['actions_taken'].append(f"{subtask_id}: 삭제 후보로 표시")
return result
# =========================================================================
# 전체 초기화 관련 메서드들
# =========================================================================
async def _archive_completed_tasks(self) -> Dict[str, Any]:
"""완료된 작업 아카이브"""
logger.info("📦 완료된 작업 아카이브 중...")
result = {
'archived_count': 0,
'archive_file': None,
'error': None
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 완료된 작업들 조회
cursor.execute("SELECT * FROM tasks WHERE status = 'completed'")
completed_tasks = cursor.fetchall()
if completed_tasks:
# 아카이브 파일 생성
archive_filename = f"completed_tasks_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
archive_path = os.path.join(self.backup_dir, archive_filename)
archive_data = {
'archive_info': {
'created_at': datetime.now().isoformat(),
'task_count': len(completed_tasks)
},
'tasks': [json.loads(task[1]) for task in completed_tasks] # task_data 컬럼
}
with open(archive_path, 'w', encoding='utf-8') as f:
json.dump(archive_data, f, indent=2, ensure_ascii=False)
# 아카이브 테이블로 이동
for task_row in completed_tasks:
cursor.execute("""
INSERT OR REPLACE INTO archived_tasks (id, task_data, archived_at)
VALUES (?, ?, ?)
""", (task_row[0], task_row[1], datetime.now().isoformat()))
conn.commit()
result['archived_count'] = len(completed_tasks)
result['archive_file'] = archive_path
except Exception as e:
result['error'] = str(e)
logger.error(f"아카이브 실패: {e}")
return result
async def _delete_incomplete_tasks(self) -> Dict[str, Any]:
"""미완료 작업 삭제"""
logger.info("🗑️ 미완료 작업 삭제 중...")
result = {
'deleted_count': 0,
'error': None
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 미완료 작업 삭제
cursor.execute("DELETE FROM tasks WHERE status != 'completed'")
result['deleted_count'] = cursor.rowcount
# 관련 의존성 정리
cursor.execute("DELETE FROM task_dependencies")
conn.commit()
except Exception as e:
result['error'] = str(e)
logger.error(f"미완료 작업 삭제 실패: {e}")
return result
async def _cleanup_database(self) -> Dict[str, Any]:
"""데이터베이스 정리"""
logger.info("🧹 데이터베이스 정리 중...")
result = {
'vacuum_completed': False,
'orphaned_records_removed': 0,
'error': None
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 고아 의존성 레코드 정리
cursor.execute("""
DELETE FROM task_dependencies
WHERE task_id NOT IN (SELECT id FROM tasks)
OR dependency_task_id NOT IN (SELECT id FROM tasks)
""")
result['orphaned_records_removed'] = cursor.rowcount
# 데이터베이스 최적화
cursor.execute("VACUUM")
result['vacuum_completed'] = True
conn.commit()
except Exception as e:
result['error'] = str(e)
logger.error(f"데이터베이스 정리 실패: {e}")
return result
async def _rebuild_indexes(self) -> Dict[str, Any]:
"""인덱스 재구성"""
logger.info("🔧 인덱스 재구성 중...")
result = {
'indexes_rebuilt': 0,
'error': None
}
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 인덱스 재구성
indexes = [
"CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)",
"CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)",
"CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category)",
"CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at)",
"CREATE INDEX IF NOT EXISTS idx_dependencies_task ON task_dependencies(task_id)",
"CREATE INDEX IF NOT EXISTS idx_dependencies_dep ON task_dependencies(dependency_task_id)"
]
for index_sql in indexes:
cursor.execute(index_sql)
result['indexes_rebuilt'] += 1
conn.commit()
except Exception as e:
result['error'] = str(e)
logger.error(f"인덱스 재구성 실패: {e}")
return result
async def _export_all_tasks(self) -> List[Dict[str, Any]]:
"""모든 작업 내보내기"""
tasks = []
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT task_data FROM tasks")
for row in cursor.fetchall():
task_data = json.loads(row[0])
tasks.append(task_data)
except Exception as e:
logger.error(f"작업 내보내기 실패: {e}")
return tasks
# =========================================================================
# 유틸리티 메서드들
# =========================================================================
async def _would_create_circular_dependency(self, task_id: str, dependency_id: str) -> bool:
"""순환 의존성 검사"""
if task_id == dependency_id:
return True
visited = set()
async def check_path(current_id: str, target_id: str) -> bool:
if current_id == target_id:
return True
if current_id in visited:
return False
visited.add(current_id)
task = await self._load_task_from_db(current_id)
if task:
for dep in task.dependencies:
if await check_path(dep.task_id, target_id):
return True
return False
return await check_path(dependency_id, task_id)
async def _calculate_dependency_depth(self, task_id: str) -> int:
"""의존성 깊이 계산"""
max_depth = 0
visited = set()
async def calculate_depth(current_id: str, current_depth: int) -> int:
if current_id in visited:
return current_depth
visited.add(current_id)
task = await self._load_task_from_db(current_id)
if not task or not task.dependencies:
return current_depth
max_child_depth = current_depth
for dep in task.dependencies:
child_depth = await calculate_depth(dep.task_id, current_depth + 1)
max_child_depth = max(max_child_depth, child_depth)
return max_child_depth
return await calculate_depth(task_id, 0)
async def _find_tasks_in_dependency_chain(self, task_id: str) -> List[str]:
"""의존성 체인 내 모든 작업 찾기"""
all_tasks = set()
visited = set()
async def traverse(current_id: str):
if current_id in visited:
return
visited.add(current_id)
all_tasks.add(current_id)
# 상위 의존성
task = await self._load_task_from_db(current_id)
if task:
for dep in task.dependencies:
await traverse(dep.task_id)
# 하위 의존성
dependent_tasks = await self._find_tasks_depending_on(current_id)
for dep_task_id in dependent_tasks:
await traverse(dep_task_id)
await traverse(task_id)
all_tasks.discard(task_id) # 자기 자신 제외
return list(all_tasks)
if __name__ == "__main__":
# 기본 테스트
print("🚀 Project Governance System 테스트")
async def test_governance():
manager = ProjectGovernanceManager()
# init_project_rules 테스트
rules_result = await manager.init_project_rules()
print(f"✅ init_project_rules 성공: {rules_result['project_type']} 타입")
# update_task 테스트 (모의 작업 ID)
try:
update_result = await manager.update_task(
"TASK-20250719-TEST001",
name="업데이트된 작업명",
description="업데이트된 설명"
)
print(f"✅ update_task 성공: {update_result['success']}")
except ValueError:
print("ℹ️ update_task: 작업을 찾을 수 없음 (정상 - 테스트 환경)")
# delete_task 테스트
try:
delete_result = await manager.delete_task("TASK-20250719-TEST001")
print(f"✅ delete_task 결과: {delete_result['success']}")
except ValueError:
print("ℹ️ delete_task: 작업을 찾을 수 없음 (정상 - 테스트 환경)")
# clear_all_tasks 테스트 (확인 없이)
clear_result = await manager.clear_all_tasks(confirm=False)
print(f"✅ clear_all_tasks (확인 필요): {clear_result['reason']}")
print("🎯 모든 Project Governance 테스트 완료!")
# 비동기 테스트 실행
import asyncio
asyncio.run(test_governance())