Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
auto_compact_main.py44.1 kB
#!/usr/bin/env python3 """ Auto Compact System - 무제한 컨텍스트 압축 시스템 Claude Code 기반의 지능형 컨텍스트 관리 및 압축 도구 주요 기능: - 지능형 컨텍스트 압축 (중요 정보 보존) - 자동 대화 히스토리 관리 및 SSD 저장 - 실시간 컨텍스트 사용량 모니터링 - 적응형 압축률 조정 - 무중단 작업 보장 (seamless operation) - 컨텍스트 복구 및 검색 시스템 """ import asyncio import json import time import uuid import logging import os import sqlite3 import hashlib import zlib import re from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, asdict from pathlib import Path from enum import Enum from datetime import datetime, timedelta # import tiktoken # Optional dependency # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CompressionLevel(Enum): """압축 레벨""" MINIMAL = "minimal" # 10-20% 압축 MODERATE = "moderate" # 30-50% 압축 AGGRESSIVE = "aggressive" # 60-80% 압축 EXTREME = "extreme" # 80-95% 압축 class ContentType(Enum): """컨텍스트 내용 유형""" CODE = "code" CONVERSATION = "conversation" DOCUMENTATION = "documentation" ERROR_LOG = "error_log" SYSTEM_MESSAGE = "system_message" USER_INPUT = "user_input" AI_RESPONSE = "ai_response" @dataclass class ContextChunk: """컨텍스트 청크""" id: str content: str content_type: str importance_score: float token_count: int timestamp: str compression_level: str metadata: Dict[str, Any] @dataclass class CompressionResult: """압축 결과""" original_tokens: int compressed_tokens: int compression_ratio: float time_taken: float preserved_elements: List[str] removed_elements: List[str] compression_summary: str @dataclass class ContextSession: """컨텍스트 세션""" session_id: str created_at: str last_updated: str total_tokens: int compressed_tokens: int chunk_count: int compression_history: List[Dict] class TokenCounter: """토큰 계산기""" def __init__(self, model_name: str = "gpt-4"): try: # tiktoken이 있으면 사용 import tiktoken self.encoding = tiktoken.encoding_for_model(model_name) self.use_tiktoken = True except ImportError: # tiktoken이 없으면 근사치 사용 self.encoding = None self.use_tiktoken = False logger.info("📝 tiktoken 없음 - 근사치 토큰 계산 사용") except: # 기본 인코딩 사용 try: import tiktoken self.encoding = tiktoken.get_encoding("cl100k_base") self.use_tiktoken = True except: self.encoding = None self.use_tiktoken = False def count_tokens(self, text: str) -> int: """텍스트의 토큰 수 계산""" if self.use_tiktoken and self.encoding: try: return len(self.encoding.encode(text)) except: pass # 백업 방법: 대략적인 계산 (1 토큰 ≈ 4 문자) # 영어: ~4문자/토큰, 한국어: ~1.5문자/토큰, 코드: ~3문자/토큰 # 간단한 휴리스틱 if re.search(r'[가-힣]', text): # 한국어 포함 return len(text) // 2 elif re.search(r'(def |class |import |function)', text): # 코드 패턴 return len(text) // 3 else: # 일반 영어 텍스트 return len(text) // 4 class ContextAnalyzer: """컨텍스트 분석기 - 중요도 및 압축 가능성 분석""" def __init__(self): self.token_counter = TokenCounter() # 중요도 키워드 (가중치 포함) self.importance_keywords = { # 높은 중요도 (유지해야 할 내용) "error": 0.9, "exception": 0.9, "fail": 0.9, "bug": 0.9, "critical": 0.95, "important": 0.8, "warning": 0.7, "todo": 0.8, "fixme": 0.9, "hack": 0.7, "class": 0.7, "function": 0.7, "def": 0.7, "async": 0.7, "import": 0.6, "from": 0.6, "config": 0.8, # 중간 중요도 "test": 0.6, "debug": 0.5, "log": 0.4, "comment": 0.3, "note": 0.4, "info": 0.4, # 낮은 중요도 (압축 가능) "hello": 0.2, "thanks": 0.2, "please": 0.2, "output": 0.3, "result": 0.3, "response": 0.3 } def analyze_importance(self, content: str, content_type: ContentType) -> float: """컨텍스트 중요도 분석 (0.0 - 1.0)""" # 기본 중요도 (콘텐츠 타입별) base_importance = { ContentType.CODE: 0.8, ContentType.ERROR_LOG: 0.9, ContentType.SYSTEM_MESSAGE: 0.7, ContentType.USER_INPUT: 0.6, ContentType.AI_RESPONSE: 0.5, ContentType.CONVERSATION: 0.4, ContentType.DOCUMENTATION: 0.6 } importance = base_importance.get(content_type, 0.5) # 키워드 기반 중요도 조정 content_lower = content.lower() keyword_score = 0.0 keyword_count = 0 for keyword, weight in self.importance_keywords.items(): if keyword in content_lower: keyword_score += weight keyword_count += 1 if keyword_count > 0: keyword_score /= keyword_count importance = (importance + keyword_score) / 2 # 길이 기반 조정 (너무 짧거나 긴 내용은 중요도 감소) length = len(content) if length < 50: importance *= 0.8 # 너무 짧음 elif length > 5000: importance *= 0.9 # 너무 김 # 코드 구조 탐지 if self._contains_code_structure(content): importance += 0.1 # 최종 값 제한 return min(max(importance, 0.0), 1.0) def _contains_code_structure(self, content: str) -> bool: """코드 구조 포함 여부 확인""" code_patterns = [ r'def\s+\w+\s*\(', # Python 함수 r'class\s+\w+\s*[:\(]', # Python 클래스 r'function\s+\w+\s*\(', # JavaScript 함수 r'import\s+\w+', # Import 문 r'from\s+\w+\s+import', # From import r'if\s+__name__\s*==\s*["\']__main__["\']', # Python main r'try\s*:\s*\n', # Try-except r'async\s+def\s+\w+', # Async 함수 ] return any(re.search(pattern, content, re.IGNORECASE) for pattern in code_patterns) class ContextCompressor: """컨텍스트 압축기""" def __init__(self): self.analyzer = ContextAnalyzer() self.token_counter = TokenCounter() def compress_content(self, content: str, content_type: ContentType, target_compression: CompressionLevel) -> Tuple[str, CompressionResult]: """컨텍스트 압축 실행""" start_time = time.time() original_tokens = self.token_counter.count_tokens(content) # 압축 레벨별 타겟 비율 compression_targets = { CompressionLevel.MINIMAL: 0.8, # 20% 압축 CompressionLevel.MODERATE: 0.5, # 50% 압축 CompressionLevel.AGGRESSIVE: 0.3, # 70% 압축 CompressionLevel.EXTREME: 0.15 # 85% 압축 } target_ratio = compression_targets[target_compression] target_tokens = int(original_tokens * target_ratio) # 압축 전략 적용 compressed_content, preserved, removed = self._apply_compression_strategy( content, content_type, target_tokens ) compressed_tokens = self.token_counter.count_tokens(compressed_content) actual_ratio = compressed_tokens / original_tokens if original_tokens > 0 else 1.0 time_taken = time.time() - start_time # 압축 결과 생성 result = CompressionResult( original_tokens=original_tokens, compressed_tokens=compressed_tokens, compression_ratio=actual_ratio, time_taken=time_taken, preserved_elements=preserved, removed_elements=removed, compression_summary=f"압축: {original_tokens}→{compressed_tokens} 토큰 ({actual_ratio:.1%})" ) return compressed_content, result def _apply_compression_strategy(self, content: str, content_type: ContentType, target_tokens: int) -> Tuple[str, List[str], List[str]]: """압축 전략 적용""" preserved = [] removed = [] # 압축 전략별 처리 if content_type == ContentType.CODE: return self._compress_code(content, target_tokens) elif content_type == ContentType.CONVERSATION: return self._compress_conversation(content, target_tokens) elif content_type == ContentType.ERROR_LOG: return self._compress_error_log(content, target_tokens) else: return self._compress_generic(content, target_tokens) def _compress_code(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]: """코드 압축""" lines = content.split('\n') preserved = [] removed = [] # 중요한 라인 식별 및 보존 important_lines = [] for i, line in enumerate(lines): stripped = line.strip() # 보존해야 할 코드 라인 if any(pattern in stripped.lower() for pattern in [ 'def ', 'class ', 'import ', 'from ', 'async def', 'if __name__', 'try:', 'except:', 'raise', 'return' ]): important_lines.append((i, line)) preserved.append(f"Line {i+1}: {stripped[:50]}") # 주석은 선택적 보존 elif stripped.startswith('#') and len(stripped) > 10: if self.analyzer.analyze_importance(stripped, ContentType.CODE) > 0.7: important_lines.append((i, line)) preserved.append(f"Comment {i+1}: {stripped[:30]}") else: removed.append(f"Comment {i+1}: {stripped[:30]}") # 빈 라인은 제거 elif not stripped: removed.append(f"Empty line {i+1}") # 일반 라인은 중요도에 따라 결정 else: importance = self.analyzer.analyze_importance(stripped, ContentType.CODE) if importance > 0.6: important_lines.append((i, line)) preserved.append(f"Line {i+1}: {stripped[:50]}") else: removed.append(f"Line {i+1}: {stripped[:50]}") # 압축된 코드 재구성 compressed_lines = [line for _, line in important_lines] compressed_content = '\n'.join(compressed_lines) # 타겟 토큰에 맞춰 추가 압축 if self.token_counter.count_tokens(compressed_content) > target_tokens: # 더 적극적인 압축 (함수 시그니처만 보존 등) signature_lines = [] for line_num, line in important_lines: stripped = line.strip() if any(keyword in stripped for keyword in ['def ', 'class ', 'async def']): signature_lines.append(line) if signature_lines: compressed_content = '\n'.join(signature_lines) compressed_content += '\n# ... (compressed content) ...' return compressed_content, preserved, removed def _compress_conversation(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]: """대화 압축""" # 대화를 턴으로 분할 turns = re.split(r'\n(?=(?:User|Assistant|Human|AI)[::])', content) preserved = [] removed = [] important_turns = [] for i, turn in enumerate(turns): if not turn.strip(): removed.append(f"Empty turn {i}") continue importance = self.analyzer.analyze_importance(turn, ContentType.CONVERSATION) if importance > 0.5: # 중요한 대화만 보존 # 긴 대화는 요약 if len(turn) > 300: summary = self._summarize_turn(turn) important_turns.append(summary) preserved.append(f"Turn {i} (summarized): {turn[:50]}...") else: important_turns.append(turn) preserved.append(f"Turn {i}: {turn[:50]}...") else: removed.append(f"Turn {i}: {turn[:50]}...") compressed_content = '\n'.join(important_turns) return compressed_content, preserved, removed def _compress_error_log(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]: """에러 로그 압축""" lines = content.split('\n') preserved = [] removed = [] # 에러 관련 키워드가 있는 라인 우선 보존 error_keywords = ['error', 'exception', 'traceback', 'failed', 'critical'] important_lines = [] for i, line in enumerate(lines): stripped = line.strip() if any(keyword in stripped.lower() for keyword in error_keywords): important_lines.append(line) preserved.append(f"Error line {i+1}: {stripped[:50]}") elif stripped and not re.match(r'^\s*at\s+', stripped): # 스택 트레이스 제외 if len(important_lines) < 10: # 최대 10라인만 보존 important_lines.append(line) preserved.append(f"Context line {i+1}: {stripped[:50]}") else: removed.append(f"Line {i+1}: {stripped[:50]}") else: removed.append(f"Stack trace {i+1}: {stripped[:30]}") compressed_content = '\n'.join(important_lines) if not compressed_content.strip(): compressed_content = "# Error log compressed - no critical errors found" return compressed_content, preserved, removed def _compress_generic(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]: """일반 컨텍스트 압축""" # 문장별로 분할 sentences = re.split(r'[.!?]\s+', content) preserved = [] removed = [] # 중요도별로 정렬 sentence_scores = [] for i, sentence in enumerate(sentences): if sentence.strip(): importance = self.analyzer.analyze_importance(sentence, ContentType.CONVERSATION) sentence_scores.append((importance, i, sentence)) # 중요도 순으로 정렬 sentence_scores.sort(reverse=True) # 타겟 토큰에 맞춰 선택 selected_sentences = [] current_tokens = 0 for importance, i, sentence in sentence_scores: sentence_tokens = self.token_counter.count_tokens(sentence) if current_tokens + sentence_tokens <= target_tokens: selected_sentences.append((i, sentence)) current_tokens += sentence_tokens preserved.append(f"Sentence {i}: {sentence[:50]}...") else: removed.append(f"Sentence {i}: {sentence[:50]}...") # 원래 순서대로 재정렬 selected_sentences.sort() compressed_content = '. '.join([sentence for _, sentence in selected_sentences]) return compressed_content, preserved, removed def _summarize_turn(self, turn: str) -> str: """대화 턴 요약""" # 간단한 요약 로직 (실제로는 더 정교한 요약 모델 사용 가능) lines = turn.split('\n') if len(lines) <= 3: return turn # 첫 줄과 마지막 줄 보존, 중간은 요약 summary_parts = [ lines[0], f"... ({len(lines)-2} lines compressed) ...", lines[-1] if len(lines) > 1 else "" ] return '\n'.join(filter(None, summary_parts)) class AutoCompactDatabase: """Auto Compact 데이터베이스 관리자""" def __init__(self, db_path: str = "auto_compact.db"): self.db_path = db_path self.init_database() def init_database(self): """데이터베이스 초기화""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS context_sessions ( session_id TEXT PRIMARY KEY, created_at TEXT NOT NULL, last_updated TEXT NOT NULL, total_tokens INTEGER NOT NULL, compressed_tokens INTEGER NOT NULL, chunk_count INTEGER NOT NULL, compression_history TEXT, metadata TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, content TEXT NOT NULL, content_type TEXT NOT NULL, importance_score REAL NOT NULL, token_count INTEGER NOT NULL, timestamp TEXT NOT NULL, compression_level TEXT NOT NULL, metadata TEXT, original_content TEXT, FOREIGN KEY (session_id) REFERENCES context_sessions (session_id) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS compression_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, operation_type TEXT NOT NULL, original_tokens INTEGER NOT NULL, compressed_tokens INTEGER NOT NULL, compression_ratio REAL NOT NULL, time_taken REAL NOT NULL, timestamp TEXT NOT NULL, details TEXT, FOREIGN KEY (session_id) REFERENCES context_sessions (session_id) ) """) def store_context_session(self, session: ContextSession): """컨텍스트 세션 저장""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO context_sessions (session_id, created_at, last_updated, total_tokens, compressed_tokens, chunk_count, compression_history, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( session.session_id, session.created_at, session.last_updated, session.total_tokens, session.compressed_tokens, session.chunk_count, json.dumps(session.compression_history), "{}" )) def store_context_chunk(self, chunk: ContextChunk): """컨텍스트 청크 저장""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO context_chunks (id, session_id, content, content_type, importance_score, token_count, timestamp, compression_level, metadata, original_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( chunk.id, chunk.metadata.get('session_id', ''), chunk.content, chunk.content_type, chunk.importance_score, chunk.token_count, chunk.timestamp, chunk.compression_level, json.dumps(chunk.metadata), chunk.metadata.get('original_content', chunk.content) )) def store_compression_history(self, session_id: str, result: CompressionResult, operation_type: str): """압축 히스토리 저장""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO compression_history (session_id, operation_type, original_tokens, compressed_tokens, compression_ratio, time_taken, timestamp, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( session_id, operation_type, result.original_tokens, result.compressed_tokens, result.compression_ratio, result.time_taken, datetime.now().isoformat(), json.dumps(asdict(result)) )) class AutoCompactSystem: """Auto Compact System 메인 클래스""" def __init__(self): self.database = AutoCompactDatabase() self.compressor = ContextCompressor() self.analyzer = ContextAnalyzer() self.token_counter = TokenCounter() self.current_session_id = str(uuid.uuid4()) # 설정 self.max_context_tokens = 32000 # 최대 컨텍스트 토큰 self.compression_threshold = 0.8 # 80% 사용시 압축 시작 self.auto_compression_enabled = True logger.info("🔧 Auto Compact System 초기화 완료") # 1. 컨텍스트 추가 및 관리 async def add_context(self, content: str, content_type: ContentType, metadata: Optional[Dict] = None) -> Dict[str, Any]: """새로운 컨텍스트 추가""" try: if metadata is None: metadata = {} # 토큰 수 계산 token_count = self.token_counter.count_tokens(content) # 중요도 분석 importance_score = self.analyzer.analyze_importance(content, content_type) # 컨텍스트 청크 생성 chunk_id = str(uuid.uuid4()) chunk = ContextChunk( id=chunk_id, content=content, content_type=content_type.value, importance_score=importance_score, token_count=token_count, timestamp=datetime.now().isoformat(), compression_level=CompressionLevel.MINIMAL.value, metadata={**metadata, 'session_id': self.current_session_id, 'original_content': content} ) # 데이터베이스에 저장 self.database.store_context_chunk(chunk) # 자동 압축 확인 if self.auto_compression_enabled: await self._check_auto_compression() logger.info(f"➕ 컨텍스트 추가: {token_count} 토큰, 중요도 {importance_score:.2f}") return { "success": True, "chunk_id": chunk_id, "token_count": token_count, "importance_score": importance_score, "auto_compression_triggered": False # 실제 구현에서는 확인 필요 } except Exception as e: logger.error(f"❌ 컨텍스트 추가 실패: {e}") return {"success": False, "error": str(e)} # 2. 수동 압축 async def compress_context(self, target_compression: CompressionLevel, content_filter: Optional[List[str]] = None) -> Dict[str, Any]: """수동 컨텍스트 압축""" try: # 현재 세션의 모든 청크 조회 chunks = await self._get_session_chunks(self.current_session_id) if not chunks: return {"success": False, "error": "No context to compress"} # 필터 적용 if content_filter: chunks = [chunk for chunk in chunks if chunk.content_type in content_filter] total_original_tokens = sum(chunk.token_count for chunk in chunks) compressed_chunks = [] total_compressed_tokens = 0 compression_results = [] for chunk in chunks: # 이미 충분히 압축된 청크는 건너뛰기 if chunk.compression_level == target_compression.value: compressed_chunks.append(chunk) total_compressed_tokens += chunk.token_count continue # 압축 실행 compressed_content, result = self.compressor.compress_content( chunk.content, ContentType(chunk.content_type), target_compression ) # 압축된 청크 생성 compressed_chunk = ContextChunk( id=chunk.id, content=compressed_content, content_type=chunk.content_type, importance_score=chunk.importance_score, token_count=result.compressed_tokens, timestamp=chunk.timestamp, compression_level=target_compression.value, metadata={**chunk.metadata, 'compressed_at': datetime.now().isoformat()} ) compressed_chunks.append(compressed_chunk) total_compressed_tokens += result.compressed_tokens compression_results.append(result) # 데이터베이스 업데이트 self.database.store_context_chunk(compressed_chunk) self.database.store_compression_history( self.current_session_id, result, "manual_compression" ) # 전체 압축 결과 계산 overall_ratio = total_compressed_tokens / total_original_tokens if total_original_tokens > 0 else 1.0 tokens_saved = total_original_tokens - total_compressed_tokens logger.info(f"🗜️ 수동 압축 완료: {total_original_tokens}→{total_compressed_tokens} 토큰 ({overall_ratio:.1%})") return { "success": True, "compression_level": target_compression.value, "original_tokens": total_original_tokens, "compressed_tokens": total_compressed_tokens, "compression_ratio": overall_ratio, "tokens_saved": tokens_saved, "chunks_compressed": len(compression_results), "compression_results": [asdict(result) for result in compression_results] } except Exception as e: logger.error(f"❌ 수동 압축 실패: {e}") return {"success": False, "error": str(e)} # 3. 자동 압축 확인 async def _check_auto_compression(self) -> bool: """자동 압축 필요성 확인 및 실행""" try: current_tokens = await self._get_current_token_count() if current_tokens >= self.max_context_tokens * self.compression_threshold: logger.info(f"🤖 자동 압축 트리거: {current_tokens}/{self.max_context_tokens} 토큰") # 적응형 압축 레벨 결정 if current_tokens >= self.max_context_tokens * 0.95: compression_level = CompressionLevel.EXTREME elif current_tokens >= self.max_context_tokens * 0.9: compression_level = CompressionLevel.AGGRESSIVE else: compression_level = CompressionLevel.MODERATE # 자동 압축 실행 result = await self.compress_context(compression_level) return result.get("success", False) return False except Exception as e: logger.error(f"❌ 자동 압축 확인 실패: {e}") return False # 4. 컨텍스트 검색 async def search_context(self, query: str, limit: int = 10) -> Dict[str, Any]: """컨텍스트 검색""" try: chunks = await self._get_session_chunks(self.current_session_id) # 단순한 텍스트 검색 (실제로는 더 정교한 검색 가능) matches = [] query_lower = query.lower() for chunk in chunks: if query_lower in chunk.content.lower(): # 관련성 점수 계산 (간단한 버전) relevance = chunk.content.lower().count(query_lower) * chunk.importance_score matches.append((relevance, chunk)) # 관련성 순으로 정렬 matches.sort(reverse=True) matches = matches[:limit] logger.info(f"🔍 컨텍스트 검색: '{query}' - {len(matches)}개 결과") return { "success": True, "query": query, "results": [ { "chunk_id": chunk.id, "content_preview": chunk.content[:200] + "..." if len(chunk.content) > 200 else chunk.content, "content_type": chunk.content_type, "importance_score": chunk.importance_score, "relevance_score": relevance, "timestamp": chunk.timestamp } for relevance, chunk in matches ], "total_results": len(matches) } except Exception as e: logger.error(f"❌ 컨텍스트 검색 실패: {e}") return {"success": False, "error": str(e)} # 5. 세션 관리 async def create_new_session(self) -> Dict[str, Any]: """새로운 세션 생성""" try: # 현재 세션 저장 await self._save_current_session() # 새 세션 ID 생성 self.current_session_id = str(uuid.uuid4()) logger.info(f"🆕 새 세션 생성: {self.current_session_id}") return { "success": True, "session_id": self.current_session_id, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ 새 세션 생성 실패: {e}") return {"success": False, "error": str(e)} # 6. 통계 및 모니터링 async def get_context_stats(self) -> Dict[str, Any]: """컨텍스트 사용 통계""" try: chunks = await self._get_session_chunks(self.current_session_id) if not chunks: return { "success": True, "session_id": self.current_session_id, "total_chunks": 0, "total_tokens": 0, "usage_percentage": 0.0 } total_tokens = sum(chunk.token_count for chunk in chunks) usage_percentage = (total_tokens / self.max_context_tokens) * 100 # 컨텐츠 타입별 통계 type_stats = {} for chunk in chunks: content_type = chunk.content_type if content_type not in type_stats: type_stats[content_type] = {"count": 0, "tokens": 0} type_stats[content_type]["count"] += 1 type_stats[content_type]["tokens"] += chunk.token_count # 압축 레벨별 통계 compression_stats = {} for chunk in chunks: level = chunk.compression_level if level not in compression_stats: compression_stats[level] = {"count": 0, "tokens": 0} compression_stats[level]["count"] += 1 compression_stats[level]["tokens"] += chunk.token_count logger.info(f"📊 컨텍스트 통계: {len(chunks)}개 청크, {total_tokens} 토큰 ({usage_percentage:.1f}%)") return { "success": True, "session_id": self.current_session_id, "total_chunks": len(chunks), "total_tokens": total_tokens, "max_tokens": self.max_context_tokens, "usage_percentage": usage_percentage, "compression_threshold": self.compression_threshold * 100, "auto_compression_enabled": self.auto_compression_enabled, "content_type_stats": type_stats, "compression_level_stats": compression_stats, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ 컨텍스트 통계 조회 실패: {e}") return {"success": False, "error": str(e)} # 헬퍼 메서드들 async def _get_session_chunks(self, session_id: str) -> List[ContextChunk]: """세션의 모든 청크 조회""" chunks = [] with sqlite3.connect(self.database.db_path) as conn: cursor = conn.execute(""" SELECT id, content, content_type, importance_score, token_count, timestamp, compression_level, metadata FROM context_chunks WHERE metadata LIKE ? ORDER BY timestamp """, (f'%"session_id": "{session_id}"%',)) for row in cursor.fetchall(): metadata = json.loads(row[7]) if row[7] else {} chunk = ContextChunk( id=row[0], content=row[1], content_type=row[2], importance_score=row[3], token_count=row[4], timestamp=row[5], compression_level=row[6], metadata=metadata ) chunks.append(chunk) return chunks async def _get_current_token_count(self) -> int: """현재 세션의 총 토큰 수""" chunks = await self._get_session_chunks(self.current_session_id) return sum(chunk.token_count for chunk in chunks) async def _save_current_session(self): """현재 세션 저장""" chunks = await self._get_session_chunks(self.current_session_id) if chunks: total_tokens = sum(chunk.token_count for chunk in chunks) session = ContextSession( session_id=self.current_session_id, created_at=min(chunk.timestamp for chunk in chunks), last_updated=datetime.now().isoformat(), total_tokens=total_tokens, compressed_tokens=total_tokens, # 실제 압축 토큰 계산 필요 chunk_count=len(chunks), compression_history=[] ) self.database.store_context_session(session) class AutoCompactMCPServer: """Auto Compact MCP 서버 클래스""" def __init__(self): self.auto_compact = AutoCompactSystem() logger.info("🔧 Auto Compact MCP 서버 초기화 완료") async def handle_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """MCP 요청 처리""" if params is None: params = {} # 메서드 매핑 method_map = { "addContext": self._add_context_handler, "compressContext": self._compress_context_handler, "searchContext": self.auto_compact.search_context, "getContextStats": self.auto_compact.get_context_stats, "createNewSession": self.auto_compact.create_new_session, "enableAutoCompression": self._enable_auto_compression, "disableAutoCompression": self._disable_auto_compression, "setCompressionThreshold": self._set_compression_threshold } if method not in method_map: return { "success": False, "error": f"Unknown method: {method}" } try: return await method_map[method](**params) except Exception as e: logger.error(f"❌ 메서드 실행 실패 ({method}): {e}") return { "success": False, "error": str(e) } async def _add_context_handler(self, content: str, content_type: str, metadata: Dict = None): """컨텍스트 추가 핸들러""" try: content_type_enum = ContentType(content_type) except ValueError: content_type_enum = ContentType.CONVERSATION return await self.auto_compact.add_context(content, content_type_enum, metadata) async def _compress_context_handler(self, compression_level: str, content_filter: List[str] = None): """컨텍스트 압축 핸들러""" try: level_enum = CompressionLevel(compression_level) except ValueError: level_enum = CompressionLevel.MODERATE return await self.auto_compact.compress_context(level_enum, content_filter) async def _enable_auto_compression(self): """자동 압축 활성화""" self.auto_compact.auto_compression_enabled = True return {"success": True, "auto_compression_enabled": True} async def _disable_auto_compression(self): """자동 압축 비활성화""" self.auto_compact.auto_compression_enabled = False return {"success": True, "auto_compression_enabled": False} async def _set_compression_threshold(self, threshold: float): """압축 임계값 설정""" if 0.1 <= threshold <= 1.0: self.auto_compact.compression_threshold = threshold return {"success": True, "compression_threshold": threshold} else: return {"success": False, "error": "Threshold must be between 0.1 and 1.0"} # 테스트 함수 async def test_auto_compact_system(): """Auto Compact System 종합 테스트""" try: print("🧪 Auto Compact System 테스트 시작...\n") # MCP 서버 초기화 print("🔧 Auto Compact MCP 서버 초기화...") server = AutoCompactMCPServer() print(f"초기화 결과: True\n") # 1. 컨텍스트 추가 (코드) print("➕ 컨텍스트 추가 (코드)...") code_content = """ def calculate_fibonacci(n): if n <= 1: return n else: return calculate_fibonacci(n-1) + calculate_fibonacci(n-2) # 이것은 중요한 주석입니다 def main(): print("Hello World") for i in range(10): print(f"Fibonacci({i}) = {calculate_fibonacci(i)}") if __name__ == "__main__": main() """ add_result = await server.handle_request("addContext", { "content": code_content, "content_type": "code", "metadata": {"language": "python", "file": "fibonacci.py"} }) print(f"코드 추가: {add_result.get('success', False)}") if add_result.get('success'): print(f" 토큰 수: {add_result.get('token_count', 0)}") print(f" 중요도: {add_result.get('importance_score', 0):.2f}") # 2. 컨텍스트 추가 (대화) print("\n➕ 컨텍스트 추가 (대화)...") conversation = """ User: 안녕하세요! 오늘 날씨가 정말 좋네요. Assistant: 안녕하세요! 네, 정말 좋은 날씨입니다. 무엇을 도와드릴까요? User: Python으로 피보나치 수열을 구현하는 방법을 알려주세요. Assistant: 피보나치 수열은 재귀함수나 반복문으로 구현할 수 있습니다. 재귀적 방법은 이해하기 쉽지만 성능이 떨어질 수 있습니다. """ add_result2 = await server.handle_request("addContext", { "content": conversation, "content_type": "conversation" }) print(f"대화 추가: {add_result2.get('success', False)}") # 3. 컨텍스트 통계 조회 print("\n📊 컨텍스트 통계 조회...") stats_result = await server.handle_request("getContextStats") print(f"통계 조회: {stats_result.get('success', False)}") if stats_result.get('success'): print(f" 총 청크: {stats_result.get('total_chunks', 0)}") print(f" 총 토큰: {stats_result.get('total_tokens', 0)}") print(f" 사용률: {stats_result.get('usage_percentage', 0):.1f}%") # 4. 수동 압축 실행 print("\n🗜️ 수동 압축 실행...") compress_result = await server.handle_request("compressContext", { "compression_level": "moderate" }) print(f"압축 실행: {compress_result.get('success', False)}") if compress_result.get('success'): print(f" 압축 전: {compress_result.get('original_tokens', 0)} 토큰") print(f" 압축 후: {compress_result.get('compressed_tokens', 0)} 토큰") print(f" 압축률: {compress_result.get('compression_ratio', 0):.1%}") # 5. 컨텍스트 검색 print("\n🔍 컨텍스트 검색...") search_result = await server.handle_request("searchContext", { "query": "fibonacci", "limit": 5 }) print(f"검색 실행: {search_result.get('success', False)}") if search_result.get('success'): print(f" 검색 결과: {search_result.get('total_results', 0)}개") # 6. 자동 압축 설정 print("\n🤖 자동 압축 설정...") auto_result = await server.handle_request("setCompressionThreshold", { "threshold": 0.7 }) print(f"임계값 설정: {auto_result.get('success', False)}") # 7. 새 세션 생성 print("\n🆕 새 세션 생성...") session_result = await server.handle_request("createNewSession") print(f"세션 생성: {session_result.get('success', False)}") print("\n🎯 Auto Compact System 테스트 완료!") # 테스트 결과 통계 test_results = [ add_result.get('success', False), add_result2.get('success', False), stats_result.get('success', False), compress_result.get('success', False), search_result.get('success', False), auto_result.get('success', False), session_result.get('success', False) ] success_count = sum(test_results) total_count = len(test_results) success_rate = (success_count / total_count) * 100 print(f"\n📊 테스트 결과: {success_count}/{total_count} 성공 ({success_rate:.1f}%)") except Exception as e: print(f"❌ 테스트 중 오류: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(test_auto_compact_system())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server