Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
shrimp_governance.py75.4 kB
#!/usr/bin/env python3 """ Shrimp Task Manager - Project Governance System 로컬 AI의 프로젝트 거버넌스 시스템: 규칙 관리 및 작업 거버넌스 이 모듈은 프로젝트의 거버넌스와 규칙을 관리합니다: - init_project_rules: 프로젝트 규칙 초기화 및 표준 설정 - update_task: 동적 작업 갱신 및 의존성 관리 - delete_task: 안전한 작업 삭제 및 무결성 검사 - clear_all_tasks: 전체 작업 초기화 및 백업 거버넌스 시스템은 프로젝트의 일관성과 품질을 보장하고, 변경사항의 영향을 분석하여 안전한 작업 관리를 제공합니다. """ import os import re import json import sqlite3 import shutil import asyncio import logging from typing import Dict, List, Optional, Any, Tuple, Set from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum from pathlib import Path # 로컬 모듈 임포트 from .shrimp_base import ( Task, TaskStatus, TaskPriority, TaskCategory, RelatedFile, ProjectRules, generate_task_id, calculate_dependency_depth ) # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================================= # 거버넌스 관련 클래스 (Governance Classes) # ============================================================================= class ChangeImpact(Enum): """변경 영향도""" MINIMAL = "minimal" # 최소 영향 LOW = "low" # 낮은 영향 MEDIUM = "medium" # 중간 영향 HIGH = "high" # 높은 영향 CRITICAL = "critical" # 중대 영향 class GovernanceAction(Enum): """거버넌스 액션""" CREATE = "create" # 생성 UPDATE = "update" # 업데이트 DELETE = "delete" # 삭제 ARCHIVE = "archive" # 아카이브 RESTORE = "restore" # 복원 @dataclass class ImpactAnalysis: """영향도 분석 결과""" target_task_id: str # 대상 작업 ID action: GovernanceAction # 수행할 액션 impact_level: ChangeImpact # 영향도 수준 affected_tasks: List[str] # 영향받는 작업들 dependency_chain: List[str] # 의존성 체인 risk_factors: List[str] # 위험 요소들 mitigation_strategies: List[str] # 완화 전략들 approval_required: bool # 승인 필요 여부 estimated_effort: float # 예상 소요 시간 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'target_task_id': self.target_task_id, 'action': self.action.value, 'impact_level': self.impact_level.value, 'affected_tasks': self.affected_tasks, 'dependency_chain': self.dependency_chain, 'risk_factors': self.risk_factors, 'mitigation_strategies': self.mitigation_strategies, 'approval_required': self.approval_required, 'estimated_effort': self.estimated_effort } @dataclass class BackupInfo: """백업 정보""" backup_id: str # 백업 ID backup_type: str # 백업 타입 task_count: int # 백업된 작업 수 backup_path: str # 백업 파일 경로 created_at: datetime # 생성 시간 description: str # 백업 설명 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'backup_id': self.backup_id, 'backup_type': self.backup_type, 'task_count': self.task_count, 'backup_path': self.backup_path, 'created_at': self.created_at.isoformat(), 'description': self.description } @dataclass class ValidationResult: """검증 결과""" is_valid: bool # 유효성 여부 errors: List[str] # 오류 목록 warnings: List[str] # 경고 목록 suggestions: List[str] # 제안 목록 confidence_score: float # 신뢰도 점수 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'is_valid': self.is_valid, 'errors': self.errors, 'warnings': self.warnings, 'suggestions': self.suggestions, 'confidence_score': self.confidence_score } # ============================================================================= # Project Governance Manager - 핵심 클래스 # ============================================================================= class ProjectGovernanceManager: """프로젝트 거버넌스 관리자""" def __init__(self, working_directory: str = "/home/skyki/qwen2.5"): """초기화""" self.working_directory = working_directory self.db_path = os.path.join(working_directory, "shrimp_tasks.db") self.backup_dir = os.path.join(working_directory, "backups") self.rules_file = os.path.join(working_directory, "project_rules.json") # 백업 디렉토리 생성 os.makedirs(self.backup_dir, exist_ok=True) # 현재 프로젝트 규칙 self.project_rules: Optional[ProjectRules] = None # 거버넌스 정책 self.governance_policies = { 'max_dependency_depth': 5, 'require_approval_for_critical_changes': True, 'auto_backup_before_major_changes': True, 'enforce_quality_gates': True, 'minimum_documentation_coverage': 0.8, 'maximum_orphaned_tasks': 10 } # 변경 추적 self.change_history: List[Dict[str, Any]] = [] logger.info("ProjectGovernanceManager 초기화 완료") # ========================================================================= # init_project_rules: 프로젝트 규칙 초기화 # ========================================================================= async def init_project_rules(self) -> Dict[str, Any]: """ 프로젝트 규칙 초기화 및 표준 설정 Returns: Dict[str, Any]: 초기화된 프로젝트 규칙 """ logger.info("📋 프로젝트 규칙 초기화 시작") try: # 1. 기존 프로젝트 분석 project_analysis = await self._analyze_existing_project() # 2. 프로젝트 타입 식별 project_type = self._identify_project_type(project_analysis) # 3. 기본 규칙 템플릿 생성 base_rules = self._create_base_rules_template(project_type) # 4. 프로젝트별 커스터마이징 customized_rules = await self._customize_rules_for_project(base_rules, project_analysis) # 5. 규칙 검증 validation_result = self._validate_project_rules(customized_rules) if not validation_result.is_valid: logger.warning(f"⚠️ 규칙 검증 경고: {validation_result.warnings}") # 6. ProjectRules 객체 생성 project_rules = ProjectRules( project_id=f"PROJECT-{datetime.now().strftime('%Y%m%d')}", coding_standards=customized_rules['coding_standards'], architecture_patterns=customized_rules['architecture_patterns'], quality_gates=customized_rules['quality_gates'], file_naming_conventions=customized_rules['file_naming_conventions'], dependency_rules=customized_rules['dependency_rules'], testing_requirements=customized_rules['testing_requirements'], documentation_standards=customized_rules['documentation_standards'] ) # 7. 규칙 저장 await self._save_project_rules(project_rules) self.project_rules = project_rules # 8. 기존 작업들에 규칙 적용 compliance_report = await self._apply_rules_to_existing_tasks(project_rules) # 9. 결과 생성 result = { 'project_rules': project_rules.to_dict(), 'project_type': project_type, 'validation_result': validation_result.to_dict(), 'compliance_report': compliance_report, 'recommendations': self._generate_rule_recommendations(project_analysis, compliance_report), 'initialized_at': datetime.now().isoformat() } logger.info(f"✅ 프로젝트 규칙 초기화 완료: {project_type} 타입") return result except Exception as e: logger.error(f"❌ 프로젝트 규칙 초기화 실패: {e}") raise async def _analyze_existing_project(self) -> Dict[str, Any]: """기존 프로젝트 분석""" logger.info("🔍 기존 프로젝트 분석 중...") analysis = { 'file_structure': {}, 'technology_stack': [], 'code_patterns': [], 'existing_standards': {}, 'quality_metrics': {}, 'dependencies': [] } # 파일 구조 분석 analysis['file_structure'] = await self._analyze_file_structure() # 기술 스택 식별 analysis['technology_stack'] = await self._identify_technology_stack() # 코드 패턴 인식 analysis['code_patterns'] = await self._recognize_existing_code_patterns() # 기존 표준 추출 analysis['existing_standards'] = await self._extract_existing_standards() # 품질 메트릭 계산 analysis['quality_metrics'] = await self._calculate_project_quality_metrics() # 의존성 분석 analysis['dependencies'] = await self._analyze_project_dependencies() return analysis def _identify_project_type(self, analysis: Dict[str, Any]) -> str: """프로젝트 타입 식별""" tech_stack = analysis.get('technology_stack', []) file_structure = analysis.get('file_structure', {}) # 웹 애플리케이션 if any(tech in tech_stack for tech in ['react', 'vue', 'angular', 'django', 'flask', 'express']): return "web_application" # 라이브러리/SDK elif any(pattern in file_structure for pattern in ['src/', 'lib/', 'dist/']): return "library" # 데이터 사이언스/ML elif any(tech in tech_stack for tech in ['jupyter', 'pandas', 'tensorflow', 'pytorch']): return "data_science" # 마이크로서비스 elif 'docker' in tech_stack and any('service' in key for key in file_structure.keys()): return "microservices" # 모바일 앱 elif any(tech in tech_stack for tech in ['react-native', 'flutter', 'swift', 'kotlin']): return "mobile_app" # 기본값 else: return "general_software" def _create_base_rules_template(self, project_type: str) -> Dict[str, Any]: """기본 규칙 템플릿 생성""" logger.info(f"📝 {project_type} 타입 기본 규칙 생성 중...") base_templates = { "web_application": { 'coding_standards': { 'max_function_length': 50, 'max_file_length': 500, 'naming_convention': 'camelCase', 'indent_style': 'spaces', 'indent_size': 2 }, 'architecture_patterns': ['MVC', 'Component-based', 'RESTful API'], 'quality_gates': { 'code_coverage': 80.0, 'complexity_score': 70.0, 'documentation_coverage': 75.0 }, 'testing_requirements': { 'unit_test_coverage': 80.0, 'integration_test_coverage': 70.0, 'e2e_test_coverage': 50.0 } }, "library": { 'coding_standards': { 'max_function_length': 30, 'max_file_length': 300, 'naming_convention': 'snake_case', 'indent_style': 'spaces', 'indent_size': 4 }, 'architecture_patterns': ['Modular', 'API-first'], 'quality_gates': { 'code_coverage': 90.0, 'complexity_score': 80.0, 'documentation_coverage': 95.0 } }, "general_software": { 'coding_standards': { 'max_function_length': 40, 'max_file_length': 400, 'naming_convention': 'snake_case', 'indent_style': 'spaces', 'indent_size': 4 }, 'architecture_patterns': ['Layered', 'Modular'], 'quality_gates': { 'code_coverage': 75.0, 'complexity_score': 75.0, 'documentation_coverage': 70.0 } } } template = base_templates.get(project_type, base_templates["general_software"]) # 공통 필드 추가 template.update({ 'file_naming_conventions': { 'class_files': 'PascalCase', 'function_files': 'snake_case', 'test_files': 'test_*.py', 'config_files': 'config.*' }, 'dependency_rules': [ 'No circular dependencies', 'Maximum dependency depth: 5', 'Prefer composition over inheritance', 'Use semantic versioning' ], 'documentation_standards': { 'function_docstring': 'required', 'class_docstring': 'required', 'module_docstring': 'recommended', 'api_documentation': 'required' } }) return template # ========================================================================= # update_task: 동적 작업 갱신 # ========================================================================= async def update_task(self, task_id: str, name: Optional[str] = None, description: Optional[str] = None, notes: Optional[str] = None, dependencies: Optional[List[str]] = None, related_files: Optional[List[Dict[str, Any]]] = None, implementation_guide: Optional[str] = None, verification_criteria: Optional[str] = None) -> Dict[str, Any]: """ 동적 작업 갱신 및 의존성 관리 Args: task_id: 업데이트할 작업 ID name: 새로운 이름 description: 새로운 설명 notes: 새로운 노트 dependencies: 새로운 의존성 목록 related_files: 새로운 관련 파일 목록 implementation_guide: 새로운 구현 가이드 verification_criteria: 새로운 검증 기준 Returns: Dict[str, Any]: 업데이트 결과 """ logger.info(f"🔄 작업 업데이트 시작: {task_id}") try: # 1. 기존 작업 로드 original_task = await self._load_task_from_db(task_id) if not original_task: raise ValueError(f"작업 {task_id}를 찾을 수 없습니다") # 2. 영향도 분석 impact_analysis = await self._analyze_update_impact( original_task, name, description, notes, dependencies, related_files, implementation_guide, verification_criteria ) # 3. 승인 필요 여부 확인 if impact_analysis.approval_required: logger.warning(f"⚠️ 중대한 변경으로 승인이 필요합니다: {impact_analysis.impact_level.value}") # 실제 구현에서는 승인 프로세스 수행 # 4. 백업 생성 (중요한 변경인 경우) backup_info = None if impact_analysis.impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL]: backup_info = await self._create_task_backup(task_id, "before_update") # 5. 작업 업데이트 수행 updated_task = await self._perform_task_update( original_task, name, description, notes, dependencies, related_files, implementation_guide, verification_criteria ) # 6. 의존성 체인 업데이트 dependency_updates = await self._update_dependency_chain(updated_task, original_task) # 7. 프로젝트 규칙 준수 검증 compliance_check = await self._check_rule_compliance(updated_task) # 8. 변경 기록 change_record = self._record_change( GovernanceAction.UPDATE, task_id, impact_analysis, original_task.to_dict(), updated_task.to_dict() ) # 9. 결과 생성 result = { 'success': True, 'updated_task': updated_task.to_dict(), 'impact_analysis': impact_analysis.to_dict(), 'dependency_updates': dependency_updates, 'compliance_check': compliance_check.to_dict(), 'backup_info': backup_info.to_dict() if backup_info else None, 'change_record': change_record, 'updated_at': datetime.now().isoformat() } logger.info(f"✅ 작업 업데이트 완료: {updated_task.name}") return result except Exception as e: logger.error(f"❌ 작업 업데이트 실패: {e}") raise async def _analyze_update_impact(self, original_task: Task, **update_fields) -> ImpactAnalysis: """업데이트 영향도 분석""" logger.info("📊 업데이트 영향도 분석 중...") impact_level = ChangeImpact.MINIMAL affected_tasks = [] risk_factors = [] mitigation_strategies = [] # 의존성 변경 분석 if update_fields.get('dependencies') is not None: new_deps = update_fields['dependencies'] old_deps = [dep.task_id for dep in original_task.dependencies] if set(new_deps) != set(old_deps): impact_level = max(impact_level, ChangeImpact.HIGH) affected_tasks.extend(await self._find_tasks_in_dependency_chain(original_task.id)) risk_factors.append("Dependency chain modification") mitigation_strategies.append("Validate dependency integrity") # 이름 변경 분석 if update_fields.get('name') and update_fields['name'] != original_task.name: impact_level = max(impact_level, ChangeImpact.MEDIUM) risk_factors.append("Task name change may affect references") mitigation_strategies.append("Update all task references") # 상태가 완료인 작업의 수정 if original_task.status == TaskStatus.COMPLETED: impact_level = max(impact_level, ChangeImpact.HIGH) risk_factors.append("Modifying completed task") mitigation_strategies.append("Require additional approval") # 중요한 필드 변경 critical_fields = ['implementation_guide', 'verification_criteria'] for field in critical_fields: if (update_fields.get(field) is not None and update_fields[field] != getattr(original_task, field, '')): impact_level = max(impact_level, ChangeImpact.MEDIUM) return ImpactAnalysis( target_task_id=original_task.id, action=GovernanceAction.UPDATE, impact_level=impact_level, affected_tasks=affected_tasks, dependency_chain=await self._build_dependency_chain(original_task.id), risk_factors=risk_factors, mitigation_strategies=mitigation_strategies, approval_required=impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL], estimated_effort=self._estimate_update_effort(impact_level, len(affected_tasks)) ) # ========================================================================= # delete_task: 안전한 작업 삭제 # ========================================================================= async def delete_task(self, task_id: str, force: bool = False) -> Dict[str, Any]: """ 안전한 작업 삭제 및 무결성 검사 Args: task_id: 삭제할 작업 ID force: 강제 삭제 여부 Returns: Dict[str, Any]: 삭제 결과 """ logger.info(f"🗑️ 작업 삭제 시작: {task_id} (강제: {force})") try: # 1. 작업 존재 확인 task = await self._load_task_from_db(task_id) if not task: raise ValueError(f"작업 {task_id}를 찾을 수 없습니다") # 2. 삭제 가능성 검증 deletion_check = await self._check_deletion_safety(task, force) if not deletion_check['safe'] and not force: return { 'success': False, 'reason': 'unsafe_deletion', 'checks': deletion_check, 'requires_force': True } # 3. 영향도 분석 impact_analysis = await self._analyze_deletion_impact(task) # 4. 백업 생성 backup_info = await self._create_task_backup(task_id, "before_deletion") # 5. 의존성 정리 dependency_cleanup = await self._cleanup_task_dependencies(task) # 6. 실제 삭제 수행 deletion_result = await self._perform_task_deletion(task, force) # 7. 고아 작업 처리 orphan_handling = await self._handle_orphaned_tasks(task.subtasks) # 8. 변경 기록 change_record = self._record_change( GovernanceAction.DELETE, task_id, impact_analysis, task.to_dict(), None ) # 9. 결과 생성 result = { 'success': True, 'deleted_task_id': task_id, 'deleted_task_name': task.name, 'impact_analysis': impact_analysis.to_dict(), 'backup_info': backup_info.to_dict(), 'dependency_cleanup': dependency_cleanup, 'orphan_handling': orphan_handling, 'change_record': change_record, 'deleted_at': datetime.now().isoformat() } logger.info(f"✅ 작업 삭제 완료: {task.name}") return result except Exception as e: logger.error(f"❌ 작업 삭제 실패: {e}") raise async def _check_deletion_safety(self, task: Task, force: bool) -> Dict[str, Any]: """삭제 안전성 검사""" logger.info("🔍 삭제 안전성 검사 중...") checks = { 'safe': True, 'blocking_factors': [], 'warnings': [], 'affected_count': 0 } # 완료된 작업 삭제 검사 if task.status == TaskStatus.COMPLETED: checks['warnings'].append("완료된 작업을 삭제하려고 합니다") if not force: checks['safe'] = False checks['blocking_factors'].append("완료된 작업은 강제 삭제만 가능합니다") # 의존성 검사 dependent_tasks = await self._find_tasks_depending_on(task.id) if dependent_tasks: checks['affected_count'] = len(dependent_tasks) checks['warnings'].append(f"{len(dependent_tasks)}개 작업이 이 작업에 의존합니다") if not force: checks['safe'] = False checks['blocking_factors'].append("의존하는 작업들이 있어 강제 삭제만 가능합니다") # 자식 작업 검사 if task.subtasks: checks['warnings'].append(f"{len(task.subtasks)}개 하위 작업이 있습니다") # 중요도 검사 if task.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]: checks['warnings'].append("중요한 작업을 삭제하려고 합니다") return checks # ========================================================================= # clear_all_tasks: 전체 작업 초기화 # ========================================================================= async def clear_all_tasks(self, confirm: bool = False) -> Dict[str, Any]: """ 전체 작업 초기화 및 백업 Args: confirm: 삭제 확인 여부 Returns: Dict[str, Any]: 초기화 결과 """ logger.info(f"🧹 전체 작업 초기화 시작 (확인: {confirm})") if not confirm: return { 'success': False, 'reason': 'confirmation_required', 'message': '전체 작업 삭제는 확인이 필요합니다' } try: # 1. 현재 작업 현황 조회 task_statistics = await self._get_task_statistics() # 2. 전체 백업 생성 backup_info = await self._create_full_backup("before_clear_all") # 3. 완료된 작업 별도 아카이브 archive_info = await self._archive_completed_tasks() # 4. 미완료 작업만 삭제 deletion_result = await self._delete_incomplete_tasks() # 5. 데이터베이스 정리 cleanup_result = await self._cleanup_database() # 6. 인덱스 재구성 reindex_result = await self._rebuild_indexes() # 7. 변경 기록 change_record = self._record_change( GovernanceAction.DELETE, "ALL_TASKS", None, {'task_count': task_statistics['total_tasks']}, {'task_count': 0} ) # 8. 결과 생성 result = { 'success': True, 'operation': 'clear_all_tasks', 'before_statistics': task_statistics, 'backup_info': backup_info.to_dict(), 'archive_info': archive_info, 'deletion_result': deletion_result, 'cleanup_result': cleanup_result, 'reindex_result': reindex_result, 'change_record': change_record, 'cleared_at': datetime.now().isoformat() } logger.info(f"✅ 전체 작업 초기화 완료: {deletion_result['deleted_count']}개 작업 삭제") return result except Exception as e: logger.error(f"❌ 전체 작업 초기화 실패: {e}") raise async def _create_full_backup(self, backup_type: str) -> BackupInfo: """전체 백업 생성""" logger.info("💾 전체 백업 생성 중...") backup_id = f"BACKUP-{datetime.now().strftime('%Y%m%d-%H%M%S')}" backup_filename = f"{backup_id}.json" backup_path = os.path.join(self.backup_dir, backup_filename) # 모든 데이터 수집 backup_data = { 'backup_info': { 'id': backup_id, 'type': backup_type, 'created_at': datetime.now().isoformat(), 'version': '1.0' }, 'tasks': await self._export_all_tasks(), 'project_rules': self.project_rules.to_dict() if self.project_rules else None, 'governance_policies': self.governance_policies, 'change_history': self.change_history[-100:] # 최근 100개 } # 백업 파일 저장 with open(backup_path, 'w', encoding='utf-8') as f: json.dump(backup_data, f, indent=2, ensure_ascii=False) backup_info = BackupInfo( backup_id=backup_id, backup_type=backup_type, task_count=len(backup_data['tasks']), backup_path=backup_path, created_at=datetime.now(), description=f"Full system backup - {backup_type}" ) logger.info(f"✅ 백업 생성 완료: {backup_path}") return backup_info # ========================================================================= # 유틸리티 및 헬퍼 메서드들 # ========================================================================= async def _load_task_from_db(self, task_id: str) -> Optional[Task]: """데이터베이스에서 작업 로드""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT task_data FROM tasks WHERE id = ?", (task_id,)) row = cursor.fetchone() if row: task_data = json.loads(row[0]) return Task.from_dict(task_data) except Exception as e: logger.error(f"작업 로드 실패: {e}") return None async def _find_tasks_depending_on(self, task_id: str) -> List[str]: """특정 작업에 의존하는 작업들 찾기""" dependent_tasks = [] try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT task_id FROM task_dependencies WHERE dependency_task_id = ? """, (task_id,)) for row in cursor.fetchall(): dependent_tasks.append(row[0]) except Exception as e: logger.error(f"의존 작업 조회 실패: {e}") return dependent_tasks def _record_change(self, action: GovernanceAction, target_id: str, impact_analysis: Optional[ImpactAnalysis], before_state: Optional[Dict[str, Any]], after_state: Optional[Dict[str, Any]]) -> Dict[str, Any]: """변경 기록""" change_record = { 'id': f"CHANGE-{datetime.now().strftime('%Y%m%d-%H%M%S')}", 'action': action.value, 'target_id': target_id, 'impact_analysis': impact_analysis.to_dict() if impact_analysis else None, 'before_state': before_state, 'after_state': after_state, 'timestamp': datetime.now().isoformat(), 'user': 'system' # 실제 구현에서는 현재 사용자 } self.change_history.append(change_record) # 변경 기록 개수 제한 (메모리 관리) if len(self.change_history) > 1000: self.change_history = self.change_history[-500:] return change_record async def _get_task_statistics(self) -> Dict[str, Any]: """작업 통계 조회""" stats = { 'total_tasks': 0, 'by_status': {}, 'by_priority': {}, 'by_category': {}, 'completed_tasks': 0, 'incomplete_tasks': 0 } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 전체 작업 수 cursor.execute("SELECT COUNT(*) FROM tasks") stats['total_tasks'] = cursor.fetchone()[0] # 상태별 통계 cursor.execute("SELECT status, COUNT(*) FROM tasks GROUP BY status") for status, count in cursor.fetchall(): stats['by_status'][status] = count if status == 'completed': stats['completed_tasks'] = count else: stats['incomplete_tasks'] += count # 우선순위별 통계 cursor.execute("SELECT priority, COUNT(*) FROM tasks GROUP BY priority") for priority, count in cursor.fetchall(): stats['by_priority'][priority] = count # 카테고리별 통계 cursor.execute("SELECT category, COUNT(*) FROM tasks GROUP BY category") for category, count in cursor.fetchall(): stats['by_category'][category] = count except Exception as e: logger.error(f"통계 조회 실패: {e}") return stats # ========================================================================= # 프로젝트 분석 메서드들 # ========================================================================= async def _analyze_file_structure(self) -> Dict[str, Any]: """파일 구조 분석""" structure = {'directories': [], 'files': [], 'patterns': []} try: for root, dirs, files in os.walk(self.working_directory): rel_root = os.path.relpath(root, self.working_directory) if not rel_root.startswith('.'): structure['directories'].append(rel_root) structure['files'].extend([ os.path.join(rel_root, f) for f in files if not f.startswith('.') ]) except Exception as e: logger.warning(f"파일 구조 분석 실패: {e}") return structure async def _identify_technology_stack(self) -> List[str]: """기술 스택 식별""" tech_stack = [] # package.json 확인 package_json = os.path.join(self.working_directory, 'package.json') if os.path.exists(package_json): tech_stack.extend(['node.js', 'npm']) try: with open(package_json, 'r') as f: data = json.load(f) deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})} if 'react' in deps: tech_stack.append('react') if 'vue' in deps: tech_stack.append('vue') if 'express' in deps: tech_stack.append('express') except Exception: pass # requirements.txt 확인 requirements = os.path.join(self.working_directory, 'requirements.txt') if os.path.exists(requirements): tech_stack.extend(['python', 'pip']) try: with open(requirements, 'r') as f: content = f.read().lower() if 'django' in content: tech_stack.append('django') if 'flask' in content: tech_stack.append('flask') if 'fastapi' in content: tech_stack.append('fastapi') except Exception: pass # Dockerfile 확인 if os.path.exists(os.path.join(self.working_directory, 'Dockerfile')): tech_stack.append('docker') return list(set(tech_stack)) async def _recognize_existing_code_patterns(self) -> List[str]: """기존 코드 패턴 인식""" patterns = [] # Python 파일 패턴 확인 for root, dirs, files in os.walk(self.working_directory): for file in files: if file.endswith('.py'): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if 'class ' in content: patterns.append('OOP') if 'async def' in content: patterns.append('Async') if 'import asyncio' in content: patterns.append('AsyncIO') if 'from dataclasses' in content: patterns.append('Dataclasses') except Exception: continue return list(set(patterns)) async def _extract_existing_standards(self) -> Dict[str, Any]: """기존 표준 추출""" standards = {'naming': 'snake_case', 'docstrings': False, 'type_hints': False} # Python 파일에서 패턴 추출 snake_case_count = 0 camel_case_count = 0 docstring_count = 0 type_hint_count = 0 total_functions = 0 for root, dirs, files in os.walk(self.working_directory): for file in files: if file.endswith('.py'): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 함수 패턴 분석 func_patterns = re.findall(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', content) for func_name in func_patterns: total_functions += 1 if '_' in func_name: snake_case_count += 1 elif any(c.isupper() for c in func_name[1:]): camel_case_count += 1 # 독스트링 확인 if '"""' in content or "'''" in content: docstring_count += len(func_patterns) # 타입 힌트 확인 if '->' in content or ': ' in content: type_hint_count += len(func_patterns) except Exception: continue if total_functions > 0: if snake_case_count > camel_case_count: standards['naming'] = 'snake_case' else: standards['naming'] = 'camelCase' standards['docstrings'] = (docstring_count / total_functions) > 0.5 standards['type_hints'] = (type_hint_count / total_functions) > 0.3 return standards async def _calculate_project_quality_metrics(self) -> Dict[str, float]: """프로젝트 품질 메트릭 계산""" metrics = { 'code_coverage': 0.0, 'documentation_coverage': 0.0, 'complexity_score': 0.0, 'maintainability_index': 0.0 } # 간단한 메트릭 계산 (실제로는 더 정교한 도구 사용) python_files = [] for root, dirs, files in os.walk(self.working_directory): for file in files: if file.endswith('.py'): python_files.append(os.path.join(root, file)) if python_files: doc_files = 0 total_lines = 0 complex_functions = 0 total_functions = 0 for file_path in python_files: try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') total_lines += len(lines) if '"""' in content or "'''" in content: doc_files += 1 # 함수 복잡도 간단 계산 functions = re.findall(r'def\s+[a-zA-Z_][a-zA-Z0-9_]*', content) total_functions += len(functions) # if/for/while 문 개수로 복잡도 추정 complexity = len(re.findall(r'\b(if|for|while|try|except)\b', content)) if complexity > 10: complex_functions += len(functions) except Exception: continue metrics['documentation_coverage'] = (doc_files / len(python_files)) * 100 metrics['complexity_score'] = max(0, 100 - (complex_functions / max(total_functions, 1)) * 100) metrics['maintainability_index'] = (metrics['documentation_coverage'] + metrics['complexity_score']) / 2 return metrics async def _analyze_project_dependencies(self) -> List[str]: """프로젝트 의존성 분석""" dependencies = [] # Python 의존성 requirements_file = os.path.join(self.working_directory, 'requirements.txt') if os.path.exists(requirements_file): try: with open(requirements_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): dep = line.split('=')[0].split('>')[0].split('<')[0] dependencies.append(dep) except Exception: pass # Node.js 의존성 package_file = os.path.join(self.working_directory, 'package.json') if os.path.exists(package_file): try: with open(package_file, 'r') as f: data = json.load(f) deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})} dependencies.extend(deps.keys()) except Exception: pass return dependencies async def _customize_rules_for_project(self, base_rules: Dict[str, Any], analysis: Dict[str, Any]) -> Dict[str, Any]: """프로젝트별 규칙 커스터마이징""" customized = base_rules.copy() # 기존 표준 반영 existing_standards = analysis.get('existing_standards', {}) if existing_standards: customized['coding_standards']['naming_convention'] = existing_standards.get('naming', 'snake_case') if existing_standards.get('docstrings'): customized['documentation_standards']['function_docstring'] = 'required' if existing_standards.get('type_hints'): customized['coding_standards']['type_hints'] = 'required' # 기술 스택에 따른 조정 tech_stack = analysis.get('technology_stack', []) if 'react' in tech_stack or 'vue' in tech_stack: customized['coding_standards']['naming_convention'] = 'camelCase' customized['coding_standards']['indent_size'] = 2 if 'django' in tech_stack: customized['architecture_patterns'].append('Django MVT') customized['file_naming_conventions']['model_files'] = 'models.py' customized['file_naming_conventions']['view_files'] = 'views.py' return customized def _validate_project_rules(self, rules: Dict[str, Any]) -> ValidationResult: """프로젝트 규칙 검증""" errors = [] warnings = [] suggestions = [] # 필수 필드 확인 required_fields = ['coding_standards', 'architecture_patterns', 'quality_gates'] for field in required_fields: if field not in rules: errors.append(f"필수 필드 누락: {field}") # 품질 게이트 값 확인 if 'quality_gates' in rules: quality_gates = rules['quality_gates'] if quality_gates.get('code_coverage', 0) < 50: warnings.append("코드 커버리지 기준이 너무 낮습니다 (50% 미만)") if quality_gates.get('complexity_score', 0) < 60: warnings.append("복잡도 점수 기준이 너무 낮습니다 (60% 미만)") # 제안사항 if 'testing_requirements' not in rules: suggestions.append("테스트 요구사항을 추가하는 것을 권장합니다") return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, suggestions=suggestions, confidence_score=1.0 - (len(errors) * 0.3 + len(warnings) * 0.1) ) async def _save_project_rules(self, rules: ProjectRules): """프로젝트 규칙 저장""" try: with open(self.rules_file, 'w', encoding='utf-8') as f: json.dump(rules.to_dict(), f, indent=2, ensure_ascii=False) logger.info(f"✅ 프로젝트 규칙 저장 완료: {self.rules_file}") except Exception as e: logger.error(f"❌ 프로젝트 규칙 저장 실패: {e}") raise async def _apply_rules_to_existing_tasks(self, rules: ProjectRules) -> Dict[str, Any]: """기존 작업들에 규칙 적용""" report = { 'total_tasks': 0, 'compliant_tasks': 0, 'non_compliant_tasks': 0, 'compliance_issues': [] } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT id, task_data FROM tasks") for row in cursor.fetchall(): report['total_tasks'] += 1 task_id, task_data_json = row task_data = json.loads(task_data_json) # 규칙 준수 검사 (간단한 예시) compliant = True issues = [] # 이름 규칙 검사 task_name = task_data.get('name', '') if not task_name: compliant = False issues.append("작업 이름이 비어있음") # 설명 요구사항 검사 description = task_data.get('description', '') if len(description) < 10: compliant = False issues.append("설명이 너무 짧음") if compliant: report['compliant_tasks'] += 1 else: report['non_compliant_tasks'] += 1 report['compliance_issues'].append({ 'task_id': task_id, 'issues': issues }) except Exception as e: logger.error(f"규칙 적용 검사 실패: {e}") return report def _generate_rule_recommendations(self, analysis: Dict[str, Any], compliance_report: Dict[str, Any]) -> List[str]: """규칙 권장사항 생성""" recommendations = [] # 품질 메트릭 기반 권장사항 quality_metrics = analysis.get('quality_metrics', {}) if quality_metrics.get('documentation_coverage', 0) < 50: recommendations.append("문서화 커버리지가 낮습니다. 독스트링 작성을 의무화하세요.") if quality_metrics.get('complexity_score', 100) < 70: recommendations.append("코드 복잡도가 높습니다. 함수 분할을 고려하세요.") # 준수성 리포트 기반 권장사항 if compliance_report['non_compliant_tasks'] > compliance_report['compliant_tasks']: recommendations.append("기존 작업들의 규칙 준수율이 낮습니다. 점진적 개선을 계획하세요.") # 기술 스택 기반 권장사항 tech_stack = analysis.get('technology_stack', []) if 'docker' in tech_stack: recommendations.append("Docker 사용 시 컨테이너 보안 규칙을 추가하세요.") if not recommendations: recommendations.append("현재 프로젝트 상태가 양호합니다. 정기적인 규칙 검토를 권장합니다.") return recommendations # ========================================================================= # 작업 업데이트 관련 메서드들 # ========================================================================= async def _perform_task_update(self, original_task: Task, **update_fields) -> Task: """작업 업데이트 수행""" logger.info("🔄 작업 업데이트 수행 중...") # 업데이트할 필드들 적용 updated_data = original_task.to_dict() for field, value in update_fields.items(): if value is not None: if field == 'related_files' and value: # RelatedFile 객체로 변환 updated_data[field] = [ RelatedFile(**file_data) if isinstance(file_data, dict) else file_data for file_data in value ] else: updated_data[field] = value # 수정 시간 업데이트 updated_data['modified_at'] = datetime.now() # Task 객체 재생성 updated_task = Task.from_dict(updated_data) # 데이터베이스 업데이트 try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE tasks SET task_data = ?, modified_at = ? WHERE id = ? """, (json.dumps(updated_task.to_dict()), datetime.now().isoformat(), updated_task.id)) conn.commit() except Exception as e: logger.error(f"데이터베이스 업데이트 실패: {e}") raise return updated_task async def _update_dependency_chain(self, updated_task: Task, original_task: Task) -> Dict[str, Any]: """의존성 체인 업데이트""" logger.info("🔗 의존성 체인 업데이트 중...") result = { 'dependencies_added': [], 'dependencies_removed': [], 'affected_tasks': [], 'validation_errors': [] } # 기존 의존성과 새 의존성 비교 old_deps = set(dep.task_id for dep in original_task.dependencies) new_deps = set(dep.task_id for dep in updated_task.dependencies) added_deps = new_deps - old_deps removed_deps = old_deps - new_deps result['dependencies_added'] = list(added_deps) result['dependencies_removed'] = list(removed_deps) # 순환 의존성 검사 for dep_id in added_deps: if await self._would_create_circular_dependency(updated_task.id, dep_id): result['validation_errors'].append(f"순환 의존성 감지: {dep_id}") # 영향받는 작업들 식별 result['affected_tasks'] = await self._find_tasks_in_dependency_chain(updated_task.id) return result async def _check_rule_compliance(self, task: Task) -> ValidationResult: """프로젝트 규칙 준수 검증""" logger.info("📋 규칙 준수 검증 중...") errors = [] warnings = [] suggestions = [] if not self.project_rules: return ValidationResult(True, [], ["프로젝트 규칙이 설정되지 않음"], [], 0.5) # 이름 규칙 검사 if len(task.name) < 5: errors.append("작업 이름이 너무 짧습니다 (최소 5자)") # 설명 규칙 검사 if len(task.description) < 20: warnings.append("작업 설명이 너무 짧습니다 (권장 20자 이상)") # 구현 가이드 검사 if not task.implementation_guide: suggestions.append("구현 가이드를 추가하는 것을 권장합니다") # 검증 기준 검사 if not task.verification_criteria: suggestions.append("검증 기준을 추가하는 것을 권장합니다") # 의존성 깊이 검사 depth = await self._calculate_dependency_depth(task.id) max_depth = self.governance_policies.get('max_dependency_depth', 5) if depth > max_depth: errors.append(f"의존성 깊이가 너무 깊습니다 ({depth} > {max_depth})") return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, suggestions=suggestions, confidence_score=1.0 - (len(errors) * 0.4 + len(warnings) * 0.2) ) async def _build_dependency_chain(self, task_id: str) -> List[str]: """의존성 체인 구축""" chain = [] visited = set() async def traverse_dependencies(current_id: str): if current_id in visited: return visited.add(current_id) chain.append(current_id) # 현재 작업의 의존성들 조회 task = await self._load_task_from_db(current_id) if task: for dep in task.dependencies: await traverse_dependencies(dep.task_id) await traverse_dependencies(task_id) return chain def _estimate_update_effort(self, impact_level: ChangeImpact, affected_count: int) -> float: """업데이트 소요 시간 추정""" base_effort = { ChangeImpact.MINIMAL: 0.1, ChangeImpact.LOW: 0.25, ChangeImpact.MEDIUM: 0.5, ChangeImpact.HIGH: 1.0, ChangeImpact.CRITICAL: 2.0 } effort = base_effort.get(impact_level, 0.5) effort += affected_count * 0.1 # 영향받는 작업당 추가 시간 return min(effort, 8.0) # 최대 8시간 # ========================================================================= # 작업 삭제 관련 메서드들 # ========================================================================= async def _analyze_deletion_impact(self, task: Task) -> ImpactAnalysis: """삭제 영향도 분석""" logger.info("📊 삭제 영향도 분석 중...") affected_tasks = await self._find_tasks_depending_on(task.id) impact_level = ChangeImpact.LOW risk_factors = [] mitigation_strategies = [] # 의존성 기반 영향도 if affected_tasks: impact_level = ChangeImpact.HIGH risk_factors.append(f"{len(affected_tasks)}개 작업이 이 작업에 의존함") mitigation_strategies.append("의존 작업들의 의존성 재설정 필요") # 완료된 작업 삭제 if task.status == TaskStatus.COMPLETED: impact_level = max(impact_level, ChangeImpact.MEDIUM) risk_factors.append("완료된 작업 삭제로 인한 기록 손실") mitigation_strategies.append("아카이브 백업 필수") # 중요한 작업 if task.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]: impact_level = max(impact_level, ChangeImpact.HIGH) risk_factors.append("중요한 작업 삭제") mitigation_strategies.append("추가 승인 및 백업 필요") return ImpactAnalysis( target_task_id=task.id, action=GovernanceAction.DELETE, impact_level=impact_level, affected_tasks=affected_tasks, dependency_chain=await self._build_dependency_chain(task.id), risk_factors=risk_factors, mitigation_strategies=mitigation_strategies, approval_required=impact_level in [ChangeImpact.HIGH, ChangeImpact.CRITICAL], estimated_effort=self._estimate_update_effort(impact_level, len(affected_tasks)) ) async def _create_task_backup(self, task_id: str, backup_type: str) -> BackupInfo: """작업 백업 생성""" logger.info(f"💾 작업 백업 생성: {task_id}") task = await self._load_task_from_db(task_id) if not task: raise ValueError(f"작업 {task_id}를 찾을 수 없습니다") backup_id = f"TASK-BACKUP-{datetime.now().strftime('%Y%m%d-%H%M%S')}" backup_filename = f"{backup_id}.json" backup_path = os.path.join(self.backup_dir, backup_filename) backup_data = { 'backup_info': { 'id': backup_id, 'type': backup_type, 'task_id': task_id, 'created_at': datetime.now().isoformat() }, 'task_data': task.to_dict() } with open(backup_path, 'w', encoding='utf-8') as f: json.dump(backup_data, f, indent=2, ensure_ascii=False) return BackupInfo( backup_id=backup_id, backup_type=backup_type, task_count=1, backup_path=backup_path, created_at=datetime.now(), description=f"Task backup - {backup_type}" ) async def _cleanup_task_dependencies(self, task: Task) -> Dict[str, Any]: """작업 의존성 정리""" logger.info("🧹 작업 의존성 정리 중...") result = { 'dependencies_removed': 0, 'dependent_tasks_updated': 0, 'orphaned_tasks': [] } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 이 작업을 의존하는 다른 작업들 찾기 dependent_tasks = await self._find_tasks_depending_on(task.id) for dep_task_id in dependent_tasks: dep_task = await self._load_task_from_db(dep_task_id) if dep_task: # 의존성에서 삭제될 작업 제거 updated_deps = [dep for dep in dep_task.dependencies if dep.task_id != task.id] # 의존성이 모두 제거되면 고아 작업으로 표시 if not updated_deps and dep_task.dependencies: result['orphaned_tasks'].append(dep_task_id) # 업데이트된 의존성으로 저장 dep_task.dependencies = updated_deps cursor.execute(""" UPDATE tasks SET task_data = ? WHERE id = ? """, (json.dumps(dep_task.to_dict()), dep_task_id)) result['dependent_tasks_updated'] += 1 # 의존성 테이블에서 관련 레코드 삭제 cursor.execute("DELETE FROM task_dependencies WHERE task_id = ? OR dependency_task_id = ?", (task.id, task.id)) result['dependencies_removed'] = cursor.rowcount conn.commit() except Exception as e: logger.error(f"의존성 정리 실패: {e}") raise return result async def _perform_task_deletion(self, task: Task, force: bool) -> Dict[str, Any]: """실제 작업 삭제 수행""" logger.info(f"🗑️ 작업 삭제 수행: {task.id}") result = { 'deleted': False, 'archived': False, 'error': None } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 완료된 작업은 아카이브 테이블로 이동 if task.status == TaskStatus.COMPLETED: cursor.execute(""" INSERT INTO archived_tasks (id, task_data, archived_at) VALUES (?, ?, ?) """, (task.id, json.dumps(task.to_dict()), datetime.now().isoformat())) result['archived'] = True # 원본 테이블에서 삭제 cursor.execute("DELETE FROM tasks WHERE id = ?", (task.id,)) if cursor.rowcount > 0: result['deleted'] = True conn.commit() except Exception as e: result['error'] = str(e) logger.error(f"작업 삭제 실패: {e}") raise return result async def _handle_orphaned_tasks(self, subtask_ids: List[str]) -> Dict[str, Any]: """고아 작업 처리""" logger.info("👨‍👩‍👧‍👦 고아 작업 처리 중...") result = { 'orphaned_count': 0, 'handled_count': 0, 'actions_taken': [] } for subtask_id in subtask_ids: subtask = await self._load_task_from_db(subtask_id) if subtask: result['orphaned_count'] += 1 # 고아 작업 처리 전략 결정 if subtask.status == TaskStatus.COMPLETED: # 완료된 고아 작업은 유지 result['actions_taken'].append(f"{subtask_id}: 완료된 작업으로 유지") elif subtask.priority in [TaskPriority.CRITICAL, TaskPriority.URGENT]: # 중요한 고아 작업은 최상위로 승격 subtask.parent_task_id = None await self._perform_task_update(subtask, parent_task_id=None) result['actions_taken'].append(f"{subtask_id}: 최상위 작업으로 승격") result['handled_count'] += 1 else: # 일반 고아 작업은 삭제 고려 result['actions_taken'].append(f"{subtask_id}: 삭제 후보로 표시") return result # ========================================================================= # 전체 초기화 관련 메서드들 # ========================================================================= async def _archive_completed_tasks(self) -> Dict[str, Any]: """완료된 작업 아카이브""" logger.info("📦 완료된 작업 아카이브 중...") result = { 'archived_count': 0, 'archive_file': None, 'error': None } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 완료된 작업들 조회 cursor.execute("SELECT * FROM tasks WHERE status = 'completed'") completed_tasks = cursor.fetchall() if completed_tasks: # 아카이브 파일 생성 archive_filename = f"completed_tasks_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" archive_path = os.path.join(self.backup_dir, archive_filename) archive_data = { 'archive_info': { 'created_at': datetime.now().isoformat(), 'task_count': len(completed_tasks) }, 'tasks': [json.loads(task[1]) for task in completed_tasks] # task_data 컬럼 } with open(archive_path, 'w', encoding='utf-8') as f: json.dump(archive_data, f, indent=2, ensure_ascii=False) # 아카이브 테이블로 이동 for task_row in completed_tasks: cursor.execute(""" INSERT OR REPLACE INTO archived_tasks (id, task_data, archived_at) VALUES (?, ?, ?) """, (task_row[0], task_row[1], datetime.now().isoformat())) conn.commit() result['archived_count'] = len(completed_tasks) result['archive_file'] = archive_path except Exception as e: result['error'] = str(e) logger.error(f"아카이브 실패: {e}") return result async def _delete_incomplete_tasks(self) -> Dict[str, Any]: """미완료 작업 삭제""" logger.info("🗑️ 미완료 작업 삭제 중...") result = { 'deleted_count': 0, 'error': None } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 미완료 작업 삭제 cursor.execute("DELETE FROM tasks WHERE status != 'completed'") result['deleted_count'] = cursor.rowcount # 관련 의존성 정리 cursor.execute("DELETE FROM task_dependencies") conn.commit() except Exception as e: result['error'] = str(e) logger.error(f"미완료 작업 삭제 실패: {e}") return result async def _cleanup_database(self) -> Dict[str, Any]: """데이터베이스 정리""" logger.info("🧹 데이터베이스 정리 중...") result = { 'vacuum_completed': False, 'orphaned_records_removed': 0, 'error': None } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 고아 의존성 레코드 정리 cursor.execute(""" DELETE FROM task_dependencies WHERE task_id NOT IN (SELECT id FROM tasks) OR dependency_task_id NOT IN (SELECT id FROM tasks) """) result['orphaned_records_removed'] = cursor.rowcount # 데이터베이스 최적화 cursor.execute("VACUUM") result['vacuum_completed'] = True conn.commit() except Exception as e: result['error'] = str(e) logger.error(f"데이터베이스 정리 실패: {e}") return result async def _rebuild_indexes(self) -> Dict[str, Any]: """인덱스 재구성""" logger.info("🔧 인덱스 재구성 중...") result = { 'indexes_rebuilt': 0, 'error': None } try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 인덱스 재구성 indexes = [ "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)", "CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)", "CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category)", "CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at)", "CREATE INDEX IF NOT EXISTS idx_dependencies_task ON task_dependencies(task_id)", "CREATE INDEX IF NOT EXISTS idx_dependencies_dep ON task_dependencies(dependency_task_id)" ] for index_sql in indexes: cursor.execute(index_sql) result['indexes_rebuilt'] += 1 conn.commit() except Exception as e: result['error'] = str(e) logger.error(f"인덱스 재구성 실패: {e}") return result async def _export_all_tasks(self) -> List[Dict[str, Any]]: """모든 작업 내보내기""" tasks = [] try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT task_data FROM tasks") for row in cursor.fetchall(): task_data = json.loads(row[0]) tasks.append(task_data) except Exception as e: logger.error(f"작업 내보내기 실패: {e}") return tasks # ========================================================================= # 유틸리티 메서드들 # ========================================================================= async def _would_create_circular_dependency(self, task_id: str, dependency_id: str) -> bool: """순환 의존성 검사""" if task_id == dependency_id: return True visited = set() async def check_path(current_id: str, target_id: str) -> bool: if current_id == target_id: return True if current_id in visited: return False visited.add(current_id) task = await self._load_task_from_db(current_id) if task: for dep in task.dependencies: if await check_path(dep.task_id, target_id): return True return False return await check_path(dependency_id, task_id) async def _calculate_dependency_depth(self, task_id: str) -> int: """의존성 깊이 계산""" max_depth = 0 visited = set() async def calculate_depth(current_id: str, current_depth: int) -> int: if current_id in visited: return current_depth visited.add(current_id) task = await self._load_task_from_db(current_id) if not task or not task.dependencies: return current_depth max_child_depth = current_depth for dep in task.dependencies: child_depth = await calculate_depth(dep.task_id, current_depth + 1) max_child_depth = max(max_child_depth, child_depth) return max_child_depth return await calculate_depth(task_id, 0) async def _find_tasks_in_dependency_chain(self, task_id: str) -> List[str]: """의존성 체인 내 모든 작업 찾기""" all_tasks = set() visited = set() async def traverse(current_id: str): if current_id in visited: return visited.add(current_id) all_tasks.add(current_id) # 상위 의존성 task = await self._load_task_from_db(current_id) if task: for dep in task.dependencies: await traverse(dep.task_id) # 하위 의존성 dependent_tasks = await self._find_tasks_depending_on(current_id) for dep_task_id in dependent_tasks: await traverse(dep_task_id) await traverse(task_id) all_tasks.discard(task_id) # 자기 자신 제외 return list(all_tasks) if __name__ == "__main__": # 기본 테스트 print("🚀 Project Governance System 테스트") async def test_governance(): manager = ProjectGovernanceManager() # init_project_rules 테스트 rules_result = await manager.init_project_rules() print(f"✅ init_project_rules 성공: {rules_result['project_type']} 타입") # update_task 테스트 (모의 작업 ID) try: update_result = await manager.update_task( "TASK-20250719-TEST001", name="업데이트된 작업명", description="업데이트된 설명" ) print(f"✅ update_task 성공: {update_result['success']}") except ValueError: print("ℹ️ update_task: 작업을 찾을 수 없음 (정상 - 테스트 환경)") # delete_task 테스트 try: delete_result = await manager.delete_task("TASK-20250719-TEST001") print(f"✅ delete_task 결과: {delete_result['success']}") except ValueError: print("ℹ️ delete_task: 작업을 찾을 수 없음 (정상 - 테스트 환경)") # clear_all_tasks 테스트 (확인 없이) clear_result = await manager.clear_all_tasks(confirm=False) print(f"✅ clear_all_tasks (확인 필요): {clear_result['reason']}") print("🎯 모든 Project Governance 테스트 완료!") # 비동기 테스트 실행 import asyncio asyncio.run(test_governance())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server