Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
shrimp_main_part2.py20.7 kB
#!/usr/bin/env python3 """ Shrimp Task Manager - 메인 클래스 Part 2 나머지 기능들 구현 """ # 나머지 15개 핵심 기능 중 12개 기능 구현 async def process_thought( self, thought: str, thought_number: int, total_thoughts: int, next_thought_needed: bool, stage: str, tags: Optional[List[str]] = None, axioms_used: Optional[List[str]] = None, assumptions_challenged: Optional[List[str]] = None ) -> Dict[str, Any]: """ 구조화된 사고 처리 복잡한 문제에 대한 체계적 사고 과정을 관리하고 다각도 정보 수집 및 종합적 판단을 수행합니다. Args: thought: 사고 내용 thought_number: 현재 사고 번호 total_thoughts: 예상 총 사고 수량 next_thought_needed: 다음 사고 필요 여부 stage: 사고 단계 tags: 사고 태그 axioms_used: 사용된 공리 assumptions_challenged: 도전된 가정 Returns: 사고 처리 결과 및 다음 단계 가이드 """ logger.info(f"💭 사고 처리 시작: 단계 {thought_number}/{total_thoughts} - {stage}") try: # 메타인지 엔진을 통한 사고 처리 thought_result = await self.metacognition.process_structured_thought( thought=thought, thought_number=thought_number, total_thoughts=total_thoughts, next_thought_needed=next_thought_needed, stage=stage, tags=tags or [], axioms_used=axioms_used or [], assumptions_challenged=assumptions_challenged or [] ) return thought_result except Exception as e: logger.error(f"❌ 사고 처리 실패: {e}") return { "success": False, "error": str(e), "thought_number": thought_number } async def verify_task( self, task_id: str, summary: str, score: int ) -> Dict[str, Any]: """ 엄격한 품질 검증 완료된 작업의 품질과 요구사항 충족도를 검증하고 프로젝트 품질 기준과 업계 표준을 적용합니다. Args: task_id: 검증할 작업 ID summary: 작업 완성 요약 또는 수정 필요 사항 score: 작업 평점 (0-100) Returns: 검증 결과 및 품질 점수 """ logger.info(f"✅ 품질 검증 시작: {task_id}") try: # 실행 엔진을 통한 검증 verification_result = await self.execution_engine.verify_task_completion( task_id=task_id, summary=summary, score=score ) # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.VERIFICATION_COMPLETED, description=f"품질 검증 완료: {task_id} (점수: {score})" ) return verification_result except Exception as e: logger.error(f"❌ 품질 검증 실패: {e}") return { "success": False, "error": str(e), "task_id": task_id } async def execute_task( self, task_id: str ) -> Dict[str, Any]: """ 지능형 작업 실행 계획된 작업을 체계적으로 실행하고 모니터링하며 실시간 문제 해결 및 적응적 실행을 수행합니다. Args: task_id: 실행할 작업의 고유 ID Returns: 작업 실행 결과 및 실행 가이드 """ logger.info(f"⚡ 작업 실행 시작: {task_id}") try: # 실행 엔진을 통한 작업 실행 execution_result = await self.execution_engine.execute_planned_task( task_id=task_id ) # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.EXECUTION_STARTED, description=f"작업 실행 시작: {task_id}" ) return execution_result except Exception as e: logger.error(f"❌ 작업 실행 실패: {e}") return { "success": False, "error": str(e), "task_id": task_id } async def research_mode( self, topic: str, current_state: str, next_steps: str, previous_state: str = "" ) -> Dict[str, Any]: """ 심층 연구 모드 복잡한 기술 문제에 대한 체계적 연구를 수행하고 다양한 정보원을 활용한 종합적 연구를 진행합니다. Args: topic: 연구할 프로그래밍 주제 current_state: 현재 수행할 내용 next_steps: 후속 계획 및 연구 방향 previous_state: 이전 연구 상태 및 내용 요약 Returns: 연구 결과 및 통찰 """ logger.info(f"🔬 심층 연구 모드 시작: {topic}") try: # 메타인지 엔진을 통한 연구 research_result = await self.metacognition.conduct_systematic_research( topic=topic, current_state=current_state, next_steps=next_steps, previous_state=previous_state ) # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.RESEARCH_COMPLETED, description=f"연구 완료: {topic}" ) return research_result except Exception as e: logger.error(f"❌ 연구 모드 실패: {e}") return { "success": False, "error": str(e), "topic": topic } async def list_tasks( self, status: str = "all" ) -> Dict[str, Any]: """ 지능형 작업 목록 관리 작업 현황을 체계적으로 관리하고 시각화하며 프로젝트 전체 맥락에서 작업 우선순위를 결정합니다. Args: status: 조회할 작업 상태 (all, pending, in_progress, completed) Returns: 구조화된 작업 목록 및 통계 """ logger.info(f"📋 작업 목록 조회: {status}") try: # 데이터 매니저를 통한 목록 조회 list_result = await self.data_manager.get_organized_task_list( status_filter=status ) return list_result except Exception as e: logger.error(f"❌ 작업 목록 조회 실패: {e}") return { "success": False, "error": str(e), "status": status } async def query_task( self, query: str, is_id: bool = False, page: int = 1, page_size: int = 5 ) -> Dict[str, Any]: """ 스마트 작업 검색 자연어로 작업을 검색하고 관련 정보를 제공하며 작업 간 관계성 및 히스토리를 고려합니다. Args: query: 검색 쿼리 (키워드 또는 작업 ID) is_id: ID 검색 모드 여부 page: 분페이지 페이지 번호 page_size: 페이지당 작업 수 Returns: 검색 결과 및 관련 작업 정보 """ logger.info(f"🔍 작업 검색: {query}") try: # 데이터 매니저를 통한 검색 search_result = await self.data_manager.search_tasks_intelligently( query=query, is_id=is_id, page=page, page_size=page_size ) return search_result except Exception as e: logger.error(f"❌ 작업 검색 실패: {e}") return { "success": False, "error": str(e), "query": query } async def get_task_detail( self, task_id: str ) -> Dict[str, Any]: """ 상세 정보 조회 작업의 모든 세부사항과 진행 맥락을 제공하고 연관 작업 및 의존성 정보를 포함합니다. Args: task_id: 조회할 작업 ID Returns: 완전한 작업 상세 정보 """ logger.info(f"📖 작업 상세 정보 조회: {task_id}") try: # 데이터 매니저를 통한 상세 조회 detail_result = await self.data_manager.get_comprehensive_task_detail( task_id=task_id ) return detail_result except Exception as e: logger.error(f"❌ 작업 상세 정보 조회 실패: {e}") return { "success": False, "error": str(e), "task_id": task_id } async def init_project_rules(self) -> Dict[str, Any]: """ 프로젝트 규칙 초기화 프로젝트별 코딩 표준과 작업 규칙을 설정하고 프로젝트 특성과 팀 선호도를 반영합니다. Returns: 초기화된 프로젝트 규칙 정보 """ logger.info("⚙️ 프로젝트 규칙 초기화 시작") try: # 거버넌스 시스템을 통한 초기화 init_result = await self.governance.initialize_project_standards() # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.PROJECT_INITIALIZED, description="프로젝트 규칙 초기화 완료" ) return init_result except Exception as e: logger.error(f"❌ 프로젝트 규칙 초기화 실패: {e}") return { "success": False, "error": str(e) } async def update_task( self, task_id: str, name: Optional[str] = None, description: Optional[str] = None, notes: Optional[str] = None, dependencies: Optional[List[str]] = None, related_files: Optional[List[Dict[str, Any]]] = None, implementation_guide: Optional[str] = None, verification_criteria: Optional[str] = None ) -> Dict[str, Any]: """ 동적 작업 갱신 작업 진행 중 발생하는 변경사항을 지능적으로 반영하고 전체 프로젝트 영향도를 고려합니다. Args: task_id: 업데이트할 작업 ID name: 새로운 작업 이름 description: 새로운 작업 설명 notes: 새로운 보충 설명 dependencies: 새로운 의존성 목록 related_files: 새로운 관련 파일 목록 implementation_guide: 새로운 구현 가이드 verification_criteria: 새로운 검증 기준 Returns: 업데이트 결과 및 영향 분석 """ logger.info(f"📝 작업 업데이트: {task_id}") try: # 거버넌스 시스템을 통한 업데이트 update_result = await self.governance.update_task_intelligently( task_id=task_id, name=name, description=description, notes=notes, dependencies=dependencies, related_files=related_files, implementation_guide=implementation_guide, verification_criteria=verification_criteria ) # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.TASK_UPDATED, description=f"작업 업데이트 완료: {task_id}" ) return update_result except Exception as e: logger.error(f"❌ 작업 업데이트 실패: {e}") return { "success": False, "error": str(e), "task_id": task_id } async def delete_task( self, task_id: str ) -> Dict[str, Any]: """ 안전한 작업 삭제 불필요한 작업을 안전하게 제거하고 정리하며 연관 작업 보호 및 데이터 무결성을 유지합니다. Args: task_id: 삭제할 작업 ID Returns: 삭제 결과 및 영향 분석 """ logger.info(f"🗑️ 작업 삭제: {task_id}") try: # 거버넌스 시스템을 통한 안전 삭제 delete_result = await self.governance.safely_delete_task( task_id=task_id ) # 라이프사이클 이벤트 기록 await self.lifecycle.record_event( event_type=LifecycleEvent.TASK_DELETED, description=f"작업 삭제 완료: {task_id}" ) return delete_result except Exception as e: logger.error(f"❌ 작업 삭제 실패: {e}") return { "success": False, "error": str(e), "task_id": task_id } async def clear_all_tasks( self, confirm: bool ) -> Dict[str, Any]: """ 전체 작업 초기화 프로젝트 재시작 시 모든 작업을 안전하게 초기화하고 프로젝트 히스토리 보존 및 학습 데이터를 유지합니다. Args: confirm: 삭제 확인 (True: 실행, False: 확인 필요) Returns: 초기화 결과 및 백업 정보 """ logger.info(f"🧹 전체 작업 초기화: confirm={confirm}") try: # 거버넌스 시스템을 통한 전체 초기화 clear_result = await self.governance.clear_all_tasks_safely( confirm=confirm ) # 라이프사이클 이벤트 기록 if confirm: await self.lifecycle.record_event( event_type=LifecycleEvent.PROJECT_RESET, description="전체 작업 초기화 완료" ) return clear_result except Exception as e: logger.error(f"❌ 전체 작업 초기화 실패: {e}") return { "success": False, "error": str(e), "confirm": confirm } # ========================================================================= # MCP 서버 인터페이스 구현 (MCP Server Interface) # ========================================================================= async def get_system_status(self) -> Dict[str, Any]: """시스템 전체 상태 조회""" try: # 각 컴포넌트 상태 수집 data_status = await self.data_manager.get_status() execution_status = await self.execution_engine.get_status() governance_status = await self.governance.get_status() lifecycle_status = await self.lifecycle.get_status() metacognition_status = await self.metacognition.get_status() return { "success": True, "session_id": self.session_id, "uptime": str(datetime.now() - self.start_time), "components": { "data_manager": data_status, "execution_engine": execution_status, "governance": governance_status, "lifecycle": lifecycle_status, "metacognition": metacognition_status }, "database": { "path": self.db_path, "connected": self.db_connection is not None } } except Exception as e: logger.error(f"❌ 시스템 상태 조회 실패: {e}") return { "success": False, "error": str(e) } async def close(self) -> Dict[str, Any]: """시스템 종료 및 정리""" try: logger.info("🔄 ShrimpTaskManager 종료 시작...") # 세션 종료 기록 if self.db_connection: cursor = self.db_connection.cursor() cursor.execute(""" UPDATE sessions SET end_time = ?, status = 'closed' WHERE session_id = ? """, (datetime.now().isoformat(), self.session_id)) self.db_connection.commit() self.db_connection.close() # 각 컴포넌트 정리 await self.data_manager.close() await self.execution_engine.close() await self.governance.close() await self.lifecycle.close() await self.metacognition.close() logger.info("✅ ShrimpTaskManager 종료 완료") return { "success": True, "session_id": self.session_id, "end_time": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ 시스템 종료 실패: {e}") return { "success": False, "error": str(e) } # ============================================================================= # MCP 서버 테스트 함수 (MCP Server Test Functions) # ============================================================================= async def test_shrimp_task_manager(): """ShrimpTaskManager 통합 테스트""" print("🎯 ShrimpTaskManager 통합 테스트 시작") # 매니저 초기화 manager = ShrimpTaskManager() init_result = await manager.initialize() print(f"✅ 초기화: {init_result['success']}") if not init_result['success']: print(f"❌ 초기화 실패: {init_result['error']}") return try: # 1. 프로젝트 규칙 초기화 테스트 rules_result = await manager.init_project_rules() print(f"✅ 프로젝트 규칙 초기화: {rules_result['success']}") # 2. 작업 계획 테스트 plan_result = await manager.plan_task( description="테스트 작업 생성 및 관리 시스템 구현", requirements="Python 기반, SQLite 데이터베이스 사용" ) print(f"✅ 작업 계획: {plan_result.get('success', False)}") # 3. 작업 목록 조회 테스트 list_result = await manager.list_tasks() print(f"✅ 작업 목록 조회: {list_result.get('success', False)}") # 4. 시스템 상태 조회 status_result = await manager.get_system_status() print(f"✅ 시스템 상태: {status_result['success']}") print("🎉 모든 테스트 완료!") except Exception as e: print(f"❌ 테스트 중 오류: {e}") finally: # 시스템 종료 close_result = await manager.close() print(f"✅ 시스템 종료: {close_result['success']}") if __name__ == "__main__": # 비동기 테스트 실행 import asyncio asyncio.run(test_shrimp_task_manager())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server