Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
shrimp_lifecycle.py30.3 kB
#!/usr/bin/env python3 """ Shrimp Task Manager - Task Lifecycle Management 로컬 AI의 핵심 사고 과정: 작업 계획, 분석, 분할 이 모듈은 작업의 생명주기를 관리하는 핵심 기능들을 제공합니다: - plan_task: 지능형 작업 계획 수립 - analyze_task: 심층 기술 분석 - split_tasks: 지능형 작업 분할 이 기능들은 로컬 AI가 복잡한 문제를 체계적으로 접근하고 해결할 수 있도록 하는 사고 구조의 핵심을 형성합니다. """ import os import re import json import asyncio import logging from typing import Dict, List, Optional, Any, Tuple, Set from datetime import datetime, timedelta from dataclasses import dataclass, field # 로컬 모듈 임포트 from .shrimp_base import ( Task, TaskStatus, TaskPriority, TaskCategory, RelatedFile, FileRelationType, Evidence, TaskDependency, ThoughtProcess, ThoughtStage, generate_task_id, validate_task_data ) # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================================= # 작업 분석 결과 클래스 (Task Analysis Results) # ============================================================================= @dataclass class TechnicalAnalysis: """기술적 분석 결과""" feasibility_score: float # 실현 가능성 점수 (0.0-1.0) complexity_score: float # 복잡도 점수 (0.0-1.0) risk_factors: List[str] # 위험 요소들 recommended_approach: str # 권장 접근 방법 required_skills: List[str] # 필요한 기술들 estimated_effort: float # 예상 노력 (시간) dependencies_identified: List[str] # 식별된 의존성들 potential_blockers: List[str] # 잠재적 차단 요소들 implementation_options: List[Dict[str, Any]] # 구현 옵션들 quality_considerations: List[str] # 품질 고려사항들 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'feasibility_score': self.feasibility_score, 'complexity_score': self.complexity_score, 'risk_factors': self.risk_factors, 'recommended_approach': self.recommended_approach, 'required_skills': self.required_skills, 'estimated_effort': self.estimated_effort, 'dependencies_identified': self.dependencies_identified, 'potential_blockers': self.potential_blockers, 'implementation_options': self.implementation_options, 'quality_considerations': self.quality_considerations } @dataclass class TaskPlan: """작업 계획 결과""" main_objective: str # 주요 목표 success_criteria: List[str] # 성공 기준들 deliverables: List[str] # 산출물들 milestones: List[Dict[str, Any]] # 마일스톤들 resource_requirements: Dict[str, Any] # 리소스 요구사항 timeline_estimate: Dict[str, float] # 일정 예상 (단계별 시간) quality_gates: List[str] # 품질 게이트들 communication_plan: List[str] # 커뮤니케이션 계획 def to_dict(self) -> Dict[str, Any]: """딕셔너리로 변환""" return { 'main_objective': self.main_objective, 'success_criteria': self.success_criteria, 'deliverables': self.deliverables, 'milestones': self.milestones, 'resource_requirements': self.resource_requirements, 'timeline_estimate': self.timeline_estimate, 'quality_gates': self.quality_gates, 'communication_plan': self.communication_plan } # ============================================================================= # Task Lifecycle Manager - 핵심 클래스 # ============================================================================= class TaskLifecycleManager: """작업 생명주기 관리자""" def __init__(self, working_directory: str = "/home/skyki/qwen2.5"): """초기화""" self.working_directory = working_directory self.tasks: Dict[str, Task] = {} self.task_plans: Dict[str, TaskPlan] = {} self.technical_analyses: Dict[str, TechnicalAnalysis] = {} # 컨텍스트 관리 (무한 컨텍스트 활용) self.context_layers = { 'immediate': {}, # 즉시 필요한 정보 'relevant': {}, # 관련 작업 정보 'background': {}, # 프로젝트 전체 맥락 'historical': {} # 학습된 패턴과 경험 } logger.info("TaskLifecycleManager 초기화 완료") # ========================================================================= # plan_task: 지능형 작업 계획 수립 # ========================================================================= async def plan_task(self, description: str, requirements: str = "", existing_tasks_reference: bool = False) -> Tuple[Task, TaskPlan]: """ 지능형 작업 계획 수립 Args: description: 작업 설명 requirements: 기술적 요구사항 existing_tasks_reference: 기존 작업 참조 여부 Returns: Tuple[Task, TaskPlan]: 생성된 작업과 계획 """ logger.info(f"🎯 작업 계획 수립 시작: {description[:50]}...") try: # 1. 요구사항 분석 analysis_result = await self._analyze_requirements(description, requirements) # 2. 프로젝트 컨텍스트 수집 project_context = await self._gather_project_context(existing_tasks_reference) # 3. 작업 설계 task_design = await self._design_task_structure(description, requirements, analysis_result, project_context) # 4. 완료 기준 정의 completion_criteria = await self._define_completion_criteria(task_design) # 5. Task 객체 생성 task = self._create_task_from_design(task_design, completion_criteria) # 6. TaskPlan 생성 task_plan = self._create_task_plan(task_design, completion_criteria) # 7. 증거 기반 의사결정 추가 self._add_planning_evidence(task, analysis_result, project_context) # 8. 저장 self.tasks[task.id] = task self.task_plans[task.id] = task_plan logger.info(f"✅ 작업 계획 완료: {task.id}") return task, task_plan except Exception as e: logger.error(f"❌ 작업 계획 실패: {e}") raise async def _analyze_requirements(self, description: str, requirements: str) -> Dict[str, Any]: """요구사항 분석""" logger.info("📊 요구사항 분석 중...") # 키워드 추출 keywords = self._extract_keywords(description + " " + requirements) # 복잡도 평가 complexity = self._assess_complexity(description, requirements, keywords) # 카테고리 분류 category = self._classify_task_category(keywords) # 우선순위 결정 priority = self._determine_priority(description, requirements, complexity) # 기술 스택 식별 tech_stack = self._identify_tech_stack(description, requirements, keywords) return { 'keywords': keywords, 'complexity': complexity, 'category': category, 'priority': priority, 'tech_stack': tech_stack, 'scope': self._determine_scope(description, requirements) } async def _gather_project_context(self, reference_existing: bool) -> Dict[str, Any]: """프로젝트 컨텍스트 수집""" logger.info("🔍 프로젝트 컨텍스트 수집 중...") context = { 'project_structure': {}, 'existing_tasks': {}, 'codebase_patterns': {}, 'constraints': {} } try: # 프로젝트 구조 분석 context['project_structure'] = await self._analyze_project_structure() # 기존 작업 분석 (요청 시) if reference_existing: context['existing_tasks'] = await self._analyze_existing_tasks() # 코드베이스 패턴 인식 context['codebase_patterns'] = await self._recognize_codebase_patterns() # 제약사항 식별 context['constraints'] = await self._identify_constraints() except Exception as e: logger.warning(f"⚠️ 컨텍스트 수집 부분 실패: {e}") return context async def _design_task_structure(self, description: str, requirements: str, analysis: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """작업 구조 설계""" logger.info("🏗️ 작업 구조 설계 중...") # 작업 이름 생성 task_name = self._generate_task_name(description, analysis) # 구현 가이드 생성 implementation_guide = self._generate_implementation_guide(description, requirements, analysis, context) # 관련 파일 식별 related_files = self._identify_related_files(description, requirements, context) # 의존성 식별 dependencies = self._identify_dependencies(description, requirements, context) # 예상 소요 시간 계산 estimated_hours = self._calculate_estimated_hours(analysis, context) return { 'name': task_name, 'description': description, 'requirements': requirements, 'category': analysis['category'], 'priority': analysis['priority'], 'implementation_guide': implementation_guide, 'related_files': related_files, 'dependencies': dependencies, 'estimated_hours': estimated_hours, 'complexity_score': analysis['complexity'], 'tech_stack': analysis['tech_stack'] } async def _define_completion_criteria(self, task_design: Dict[str, Any]) -> Dict[str, Any]: """완료 기준 정의""" logger.info("✅ 완료 기준 정의 중...") # 기능적 기준 functional_criteria = self._define_functional_criteria(task_design) # 품질 기준 quality_criteria = self._define_quality_criteria(task_design) # 성능 기준 performance_criteria = self._define_performance_criteria(task_design) # 테스트 기준 test_criteria = self._define_test_criteria(task_design) # 문서화 기준 documentation_criteria = self._define_documentation_criteria(task_design) return { 'functional': functional_criteria, 'quality': quality_criteria, 'performance': performance_criteria, 'testing': test_criteria, 'documentation': documentation_criteria } # ========================================================================= # analyze_task: 심층 기술 분석 # ========================================================================= async def analyze_task(self, summary: str, initial_concept: str, previous_analysis: str = "") -> TechnicalAnalysis: """ 심층 기술 분석 Args: summary: 작업 요약 initial_concept: 초기 구상 previous_analysis: 이전 분석 결과 Returns: TechnicalAnalysis: 분석 결과 """ logger.info(f"🔬 심층 기술 분석 시작: {summary[:50]}...") try: # 1. 기술 조사 tech_research = await self._conduct_technical_research(summary, initial_concept) # 2. 위험 평가 risk_assessment = await self._assess_technical_risks(summary, initial_concept, tech_research) # 3. 구현 전략 수립 implementation_strategy = await self._develop_implementation_strategy( summary, initial_concept, tech_research, risk_assessment ) # 4. 의사코드 생성 pseudocode = await self._generate_pseudocode(summary, initial_concept, implementation_strategy) # 5. 분석 결과 종합 analysis = TechnicalAnalysis( feasibility_score=self._calculate_feasibility_score(tech_research, risk_assessment), complexity_score=self._calculate_complexity_score(initial_concept, tech_research), risk_factors=risk_assessment.get('risk_factors', []), recommended_approach=implementation_strategy.get('recommended_approach', ''), required_skills=tech_research.get('required_skills', []), estimated_effort=implementation_strategy.get('estimated_effort', 0.0), dependencies_identified=tech_research.get('dependencies', []), potential_blockers=risk_assessment.get('potential_blockers', []), implementation_options=implementation_strategy.get('options', []), quality_considerations=implementation_strategy.get('quality_considerations', []) ) # 6. 저장 analysis_id = f"ANALYSIS-{datetime.now().strftime('%Y%m%d-%H%M%S')}" self.technical_analyses[analysis_id] = analysis logger.info(f"✅ 기술 분석 완료: 실현가능성={analysis.feasibility_score:.2f}") return analysis except Exception as e: logger.error(f"❌ 기술 분석 실패: {e}") raise async def _conduct_technical_research(self, summary: str, concept: str) -> Dict[str, Any]: """기술 조사 수행""" logger.info("📚 기술 조사 수행 중...") # 기술 키워드 추출 tech_keywords = self._extract_tech_keywords(summary + " " + concept) # 코드베이스 조사 codebase_analysis = await self._analyze_codebase_for_tech(tech_keywords) # 외부 의존성 조사 external_deps = await self._research_external_dependencies(tech_keywords) # 기술 스택 호환성 검증 compatibility = await self._verify_tech_stack_compatibility(tech_keywords, codebase_analysis) return { 'tech_keywords': tech_keywords, 'codebase_analysis': codebase_analysis, 'external_dependencies': external_deps, 'compatibility': compatibility, 'required_skills': self._identify_required_skills(tech_keywords), 'dependencies': self._extract_dependencies_from_research(codebase_analysis, external_deps) } async def _assess_technical_risks(self, summary: str, concept: str, research: Dict[str, Any]) -> Dict[str, Any]: """기술적 위험 평가""" logger.info("⚠️ 기술적 위험 평가 중...") risk_factors = [] potential_blockers = [] mitigation_strategies = [] # 복잡도 기반 위험 complexity_risks = self._assess_complexity_risks(concept, research) risk_factors.extend(complexity_risks) # 의존성 기반 위험 dependency_risks = self._assess_dependency_risks(research) risk_factors.extend(dependency_risks) # 기술 스택 위험 tech_stack_risks = self._assess_tech_stack_risks(research) risk_factors.extend(tech_stack_risks) # 리소스 위험 resource_risks = self._assess_resource_risks(summary, concept, research) risk_factors.extend(resource_risks) # 차단 요소 식별 potential_blockers = self._identify_potential_blockers(research, risk_factors) # 완화 전략 수립 mitigation_strategies = self._develop_mitigation_strategies(risk_factors, potential_blockers) return { 'risk_factors': risk_factors, 'potential_blockers': potential_blockers, 'mitigation_strategies': mitigation_strategies, 'overall_risk_score': self._calculate_overall_risk_score(risk_factors, potential_blockers) } # ========================================================================= # split_tasks: 지능형 작업 분할 # ========================================================================= async def split_tasks(self, global_analysis_result: str, tasks_raw: str, update_mode: str = "clearAllTasks") -> List[Task]: """ 지능형 작업 분할 Args: global_analysis_result: 전역 분석 결과 tasks_raw: 원시 작업 데이터 (JSON 문자열) update_mode: 업데이트 모드 (append, overwrite, selective, clearAllTasks) Returns: List[Task]: 분할된 작업 목록 """ logger.info(f"🔄 지능형 작업 분할 시작 (모드: {update_mode})") try: # 1. 원시 작업 데이터 파싱 raw_tasks = self._parse_raw_tasks(tasks_raw) # 2. 작업 분할 전 백업 (clearAllTasks 모드인 경우) if update_mode == "clearAllTasks": self._backup_existing_tasks() self._clear_incomplete_tasks() # 3. 의존성 분석 및 병렬 처리 기회 식별 dependency_analysis = self._analyze_task_dependencies(raw_tasks, global_analysis_result) # 4. 작업 크기 최적화 (1-2일 완료 가능 단위) optimized_tasks = await self._optimize_task_sizes(raw_tasks, dependency_analysis) # 5. 우선순위 결정 prioritized_tasks = self._prioritize_tasks(optimized_tasks, dependency_analysis) # 6. Task 객체 생성 created_tasks = [] for task_data in prioritized_tasks: task = await self._create_task_from_split_data(task_data, global_analysis_result) created_tasks.append(task) # 7. 의존성 관계 설정 self._establish_task_relationships(created_tasks, dependency_analysis) # 8. 업데이트 모드에 따른 처리 final_tasks = self._apply_update_mode(created_tasks, update_mode) # 9. 저장 for task in final_tasks: self.tasks[task.id] = task logger.info(f"✅ 작업 분할 완료: {len(final_tasks)}개 작업 생성") return final_tasks except Exception as e: logger.error(f"❌ 작업 분할 실패: {e}") raise def _parse_raw_tasks(self, tasks_raw: str) -> List[Dict[str, Any]]: """원시 작업 데이터 파싱""" logger.info("📝 원시 작업 데이터 파싱 중...") try: # JSON 문자열 파싱 if isinstance(tasks_raw, str): raw_data = json.loads(tasks_raw) else: raw_data = tasks_raw # 데이터 유효성 검증 if not isinstance(raw_data, list): raise ValueError("작업 데이터는 리스트 형태여야 합니다") validated_tasks = [] for task_data in raw_data: errors = validate_task_data(task_data) if errors: logger.warning(f"⚠️ 작업 데이터 유효성 오류: {errors}") continue validated_tasks.append(task_data) return validated_tasks except json.JSONDecodeError as e: logger.error(f"❌ JSON 파싱 오류: {e}") raise ValueError(f"Invalid JSON format: {e}") except Exception as e: logger.error(f"❌ 데이터 파싱 오류: {e}") raise def _analyze_task_dependencies(self, raw_tasks: List[Dict[str, Any]], global_analysis: str) -> Dict[str, Any]: """작업 의존성 분석""" logger.info("🔗 작업 의존성 분석 중...") dependency_graph = {} parallel_opportunities = [] bottlenecks = [] critical_path = [] # 명시적 의존성 추출 explicit_deps = {} for task in raw_tasks: task_name = task.get('name', '') dependencies = task.get('dependencies', []) explicit_deps[task_name] = dependencies # 암시적 의존성 추론 implicit_deps = self._infer_implicit_dependencies(raw_tasks, global_analysis) # 의존성 그래프 구성 for task_name in explicit_deps: all_deps = explicit_deps[task_name] + implicit_deps.get(task_name, []) dependency_graph[task_name] = list(set(all_deps)) # 중복 제거 # 병렬 처리 기회 식별 parallel_opportunities = self._identify_parallel_opportunities(dependency_graph) # 병목 지점 식별 bottlenecks = self._identify_bottlenecks(dependency_graph) # 크리티컬 패스 계산 critical_path = self._calculate_critical_path(dependency_graph, raw_tasks) return { 'dependency_graph': dependency_graph, 'explicit_dependencies': explicit_deps, 'implicit_dependencies': implicit_deps, 'parallel_opportunities': parallel_opportunities, 'bottlenecks': bottlenecks, 'critical_path': critical_path } # ========================================================================= # 유틸리티 및 헬퍼 메서드들 # ========================================================================= def _extract_keywords(self, text: str) -> List[str]: """텍스트에서 키워드 추출""" # 기술 키워드, 동작 키워드, 도메인 키워드 추출 keywords = [] # 기본 전처리 text = text.lower() # 기술 키워드 패턴 tech_patterns = [ r'\b(python|javascript|typescript|react|vue|angular|django|flask|fastapi)\b', r'\b(api|rest|graphql|database|sql|nosql|mongodb|postgresql)\b', r'\b(docker|kubernetes|aws|azure|gcp|ci/cd|devops)\b', r'\b(test|testing|unit|integration|e2e|tdd|bdd)\b', r'\b(security|auth|oauth|jwt|encryption|ssl|https)\b' ] for pattern in tech_patterns: matches = re.findall(pattern, text) keywords.extend(matches) # 동작 키워드 action_patterns = [ r'\b(create|build|implement|develop|design|refactor|optimize|fix|debug)\b', r'\b(analyze|research|investigate|review|validate|verify|test)\b', r'\b(deploy|configure|setup|install|migrate|update|upgrade)\b' ] for pattern in action_patterns: matches = re.findall(pattern, text) keywords.extend(matches) return list(set(keywords)) # 중복 제거 def _assess_complexity(self, description: str, requirements: str, keywords: List[str]) -> float: """복잡도 평가 (0.0-1.0)""" complexity_score = 0.0 # 텍스트 길이 기반 (10%) text_length = len(description) + len(requirements) complexity_score += min(text_length / 1000, 0.1) # 키워드 수 기반 (20%) complexity_score += min(len(keywords) * 0.02, 0.2) # 기술적 복잡성 키워드 (30%) complex_keywords = ['integration', 'microservice', 'distributed', 'scaling', 'performance', 'security', 'encryption', 'algorithm', 'optimization', 'concurrent'] complex_count = sum(1 for kw in keywords if kw in complex_keywords) complexity_score += min(complex_count * 0.1, 0.3) # 다중 도메인 (20%) domain_keywords = { 'frontend': ['ui', 'react', 'vue', 'css', 'html'], 'backend': ['api', 'server', 'database', 'service'], 'devops': ['docker', 'kubernetes', 'deploy', 'ci/cd'], 'data': ['analytics', 'ml', 'ai', 'processing'] } involved_domains = 0 for domain, domain_kws in domain_keywords.items(): if any(kw in keywords for kw in domain_kws): involved_domains += 1 complexity_score += min(involved_domains * 0.05, 0.2) # 요구사항 복잡성 (20%) if any(word in requirements.lower() for word in ['integrate', 'multiple', 'complex', 'advanced']): complexity_score += 0.2 return min(complexity_score, 1.0) def _classify_task_category(self, keywords: List[str]) -> TaskCategory: """작업 카테고리 분류""" category_mapping = { TaskCategory.DEVELOPMENT: ['create', 'build', 'implement', 'develop', 'code'], TaskCategory.ANALYSIS: ['analyze', 'research', 'investigate', 'review'], TaskCategory.TESTING: ['test', 'testing', 'unit', 'integration', 'e2e'], TaskCategory.DOCUMENTATION: ['document', 'documentation', 'guide', 'manual'], TaskCategory.BUGFIX: ['fix', 'debug', 'bug', 'error', 'issue'], TaskCategory.REFACTORING: ['refactor', 'optimize', 'improve', 'clean'], TaskCategory.RESEARCH: ['research', 'investigate', 'explore', 'study'], TaskCategory.INFRASTRUCTURE: ['deploy', 'configure', 'setup', 'devops', 'docker'], TaskCategory.PLANNING: ['plan', 'design', 'architect', 'strategy'], TaskCategory.VALIDATION: ['validate', 'verify', 'check', 'audit'] } category_scores = {} for category, category_keywords in category_mapping.items(): score = sum(1 for kw in keywords if kw in category_keywords) if score > 0: category_scores[category] = score if category_scores: return max(category_scores, key=category_scores.get) else: return TaskCategory.DEVELOPMENT # 기본값 def _determine_priority(self, description: str, requirements: str, complexity: float) -> TaskPriority: """우선순위 결정""" priority_score = 0.0 # 긴급성 키워드 urgent_keywords = ['urgent', 'critical', 'immediate', 'asap', 'emergency'] high_keywords = ['important', 'priority', 'blocking', 'needed'] text = (description + " " + requirements).lower() if any(kw in text for kw in urgent_keywords): return TaskPriority.URGENT elif any(kw in text for kw in high_keywords): priority_score += 0.4 # 복잡도 기반 priority_score += complexity * 0.3 # 의존성 키워드 if any(word in text for word in ['depends', 'blocks', 'prerequisite']): priority_score += 0.3 if priority_score >= 0.8: return TaskPriority.CRITICAL elif priority_score >= 0.6: return TaskPriority.HIGH elif priority_score >= 0.3: return TaskPriority.MEDIUM else: return TaskPriority.LOW if __name__ == "__main__": # 기본 테스트 print("🚀 Task Lifecycle Management 테스트") async def test_lifecycle(): manager = TaskLifecycleManager() # plan_task 테스트 task, plan = await manager.plan_task( "사용자 인증 시스템 구현", "JWT 토큰 기반 인증, OAuth 연동, 보안 강화" ) print(f"✅ plan_task 성공: {task.name}") # analyze_task 테스트 analysis = await manager.analyze_task( "사용자 인증 시스템", "JWT와 OAuth를 활용한 보안 인증 시스템 구현", "" ) print(f"✅ analyze_task 성공: 실현가능성={analysis.feasibility_score:.2f}") # split_tasks 테스트 sample_tasks = [ { "name": "JWT 토큰 시스템 구현", "description": "JWT 토큰 생성 및 검증 로직 구현", "dependencies": [] }, { "name": "OAuth 연동 구현", "description": "Google, GitHub OAuth 연동", "dependencies": ["JWT 토큰 시스템 구현"] } ] split_tasks = await manager.split_tasks( "사용자 인증 시스템 전체 구현", json.dumps(sample_tasks), "clearAllTasks" ) print(f"✅ split_tasks 성공: {len(split_tasks)}개 작업 생성") print("🎯 모든 Lifecycle 테스트 완료!") # 비동기 테스트 실행 import asyncio asyncio.run(test_lifecycle())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server