#!/usr/bin/env python3
"""
Auto Compact System - 무제한 컨텍스트 압축 시스템
Claude Code 기반의 지능형 컨텍스트 관리 및 압축 도구
주요 기능:
- 지능형 컨텍스트 압축 (중요 정보 보존)
- 자동 대화 히스토리 관리 및 SSD 저장
- 실시간 컨텍스트 사용량 모니터링
- 적응형 압축률 조정
- 무중단 작업 보장 (seamless operation)
- 컨텍스트 복구 및 검색 시스템
"""
import asyncio
import json
import time
import uuid
import logging
import os
import sqlite3
import hashlib
import zlib
import re
from typing import Dict, List, Optional, Any, Union, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
from enum import Enum
from datetime import datetime, timedelta
# import tiktoken # Optional dependency
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CompressionLevel(Enum):
"""압축 레벨"""
MINIMAL = "minimal" # 10-20% 압축
MODERATE = "moderate" # 30-50% 압축
AGGRESSIVE = "aggressive" # 60-80% 압축
EXTREME = "extreme" # 80-95% 압축
class ContentType(Enum):
"""컨텍스트 내용 유형"""
CODE = "code"
CONVERSATION = "conversation"
DOCUMENTATION = "documentation"
ERROR_LOG = "error_log"
SYSTEM_MESSAGE = "system_message"
USER_INPUT = "user_input"
AI_RESPONSE = "ai_response"
@dataclass
class ContextChunk:
"""컨텍스트 청크"""
id: str
content: str
content_type: str
importance_score: float
token_count: int
timestamp: str
compression_level: str
metadata: Dict[str, Any]
@dataclass
class CompressionResult:
"""압축 결과"""
original_tokens: int
compressed_tokens: int
compression_ratio: float
time_taken: float
preserved_elements: List[str]
removed_elements: List[str]
compression_summary: str
@dataclass
class ContextSession:
"""컨텍스트 세션"""
session_id: str
created_at: str
last_updated: str
total_tokens: int
compressed_tokens: int
chunk_count: int
compression_history: List[Dict]
class TokenCounter:
"""토큰 계산기"""
def __init__(self, model_name: str = "gpt-4"):
try:
# tiktoken이 있으면 사용
import tiktoken
self.encoding = tiktoken.encoding_for_model(model_name)
self.use_tiktoken = True
except ImportError:
# tiktoken이 없으면 근사치 사용
self.encoding = None
self.use_tiktoken = False
logger.info("📝 tiktoken 없음 - 근사치 토큰 계산 사용")
except:
# 기본 인코딩 사용
try:
import tiktoken
self.encoding = tiktoken.get_encoding("cl100k_base")
self.use_tiktoken = True
except:
self.encoding = None
self.use_tiktoken = False
def count_tokens(self, text: str) -> int:
"""텍스트의 토큰 수 계산"""
if self.use_tiktoken and self.encoding:
try:
return len(self.encoding.encode(text))
except:
pass
# 백업 방법: 대략적인 계산 (1 토큰 ≈ 4 문자)
# 영어: ~4문자/토큰, 한국어: ~1.5문자/토큰, 코드: ~3문자/토큰
# 간단한 휴리스틱
if re.search(r'[가-힣]', text): # 한국어 포함
return len(text) // 2
elif re.search(r'(def |class |import |function)', text): # 코드 패턴
return len(text) // 3
else: # 일반 영어 텍스트
return len(text) // 4
class ContextAnalyzer:
"""컨텍스트 분석기 - 중요도 및 압축 가능성 분석"""
def __init__(self):
self.token_counter = TokenCounter()
# 중요도 키워드 (가중치 포함)
self.importance_keywords = {
# 높은 중요도 (유지해야 할 내용)
"error": 0.9, "exception": 0.9, "fail": 0.9, "bug": 0.9,
"critical": 0.95, "important": 0.8, "warning": 0.7,
"todo": 0.8, "fixme": 0.9, "hack": 0.7,
"class": 0.7, "function": 0.7, "def": 0.7, "async": 0.7,
"import": 0.6, "from": 0.6, "config": 0.8,
# 중간 중요도
"test": 0.6, "debug": 0.5, "log": 0.4,
"comment": 0.3, "note": 0.4, "info": 0.4,
# 낮은 중요도 (압축 가능)
"hello": 0.2, "thanks": 0.2, "please": 0.2,
"output": 0.3, "result": 0.3, "response": 0.3
}
def analyze_importance(self, content: str, content_type: ContentType) -> float:
"""컨텍스트 중요도 분석 (0.0 - 1.0)"""
# 기본 중요도 (콘텐츠 타입별)
base_importance = {
ContentType.CODE: 0.8,
ContentType.ERROR_LOG: 0.9,
ContentType.SYSTEM_MESSAGE: 0.7,
ContentType.USER_INPUT: 0.6,
ContentType.AI_RESPONSE: 0.5,
ContentType.CONVERSATION: 0.4,
ContentType.DOCUMENTATION: 0.6
}
importance = base_importance.get(content_type, 0.5)
# 키워드 기반 중요도 조정
content_lower = content.lower()
keyword_score = 0.0
keyword_count = 0
for keyword, weight in self.importance_keywords.items():
if keyword in content_lower:
keyword_score += weight
keyword_count += 1
if keyword_count > 0:
keyword_score /= keyword_count
importance = (importance + keyword_score) / 2
# 길이 기반 조정 (너무 짧거나 긴 내용은 중요도 감소)
length = len(content)
if length < 50:
importance *= 0.8 # 너무 짧음
elif length > 5000:
importance *= 0.9 # 너무 김
# 코드 구조 탐지
if self._contains_code_structure(content):
importance += 0.1
# 최종 값 제한
return min(max(importance, 0.0), 1.0)
def _contains_code_structure(self, content: str) -> bool:
"""코드 구조 포함 여부 확인"""
code_patterns = [
r'def\s+\w+\s*\(', # Python 함수
r'class\s+\w+\s*[:\(]', # Python 클래스
r'function\s+\w+\s*\(', # JavaScript 함수
r'import\s+\w+', # Import 문
r'from\s+\w+\s+import', # From import
r'if\s+__name__\s*==\s*["\']__main__["\']', # Python main
r'try\s*:\s*\n', # Try-except
r'async\s+def\s+\w+', # Async 함수
]
return any(re.search(pattern, content, re.IGNORECASE) for pattern in code_patterns)
class ContextCompressor:
"""컨텍스트 압축기"""
def __init__(self):
self.analyzer = ContextAnalyzer()
self.token_counter = TokenCounter()
def compress_content(self, content: str, content_type: ContentType,
target_compression: CompressionLevel) -> Tuple[str, CompressionResult]:
"""컨텍스트 압축 실행"""
start_time = time.time()
original_tokens = self.token_counter.count_tokens(content)
# 압축 레벨별 타겟 비율
compression_targets = {
CompressionLevel.MINIMAL: 0.8, # 20% 압축
CompressionLevel.MODERATE: 0.5, # 50% 압축
CompressionLevel.AGGRESSIVE: 0.3, # 70% 압축
CompressionLevel.EXTREME: 0.15 # 85% 압축
}
target_ratio = compression_targets[target_compression]
target_tokens = int(original_tokens * target_ratio)
# 압축 전략 적용
compressed_content, preserved, removed = self._apply_compression_strategy(
content, content_type, target_tokens
)
compressed_tokens = self.token_counter.count_tokens(compressed_content)
actual_ratio = compressed_tokens / original_tokens if original_tokens > 0 else 1.0
time_taken = time.time() - start_time
# 압축 결과 생성
result = CompressionResult(
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=actual_ratio,
time_taken=time_taken,
preserved_elements=preserved,
removed_elements=removed,
compression_summary=f"압축: {original_tokens}→{compressed_tokens} 토큰 ({actual_ratio:.1%})"
)
return compressed_content, result
def _apply_compression_strategy(self, content: str, content_type: ContentType,
target_tokens: int) -> Tuple[str, List[str], List[str]]:
"""압축 전략 적용"""
preserved = []
removed = []
# 압축 전략별 처리
if content_type == ContentType.CODE:
return self._compress_code(content, target_tokens)
elif content_type == ContentType.CONVERSATION:
return self._compress_conversation(content, target_tokens)
elif content_type == ContentType.ERROR_LOG:
return self._compress_error_log(content, target_tokens)
else:
return self._compress_generic(content, target_tokens)
def _compress_code(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]:
"""코드 압축"""
lines = content.split('\n')
preserved = []
removed = []
# 중요한 라인 식별 및 보존
important_lines = []
for i, line in enumerate(lines):
stripped = line.strip()
# 보존해야 할 코드 라인
if any(pattern in stripped.lower() for pattern in [
'def ', 'class ', 'import ', 'from ', 'async def',
'if __name__', 'try:', 'except:', 'raise', 'return'
]):
important_lines.append((i, line))
preserved.append(f"Line {i+1}: {stripped[:50]}")
# 주석은 선택적 보존
elif stripped.startswith('#') and len(stripped) > 10:
if self.analyzer.analyze_importance(stripped, ContentType.CODE) > 0.7:
important_lines.append((i, line))
preserved.append(f"Comment {i+1}: {stripped[:30]}")
else:
removed.append(f"Comment {i+1}: {stripped[:30]}")
# 빈 라인은 제거
elif not stripped:
removed.append(f"Empty line {i+1}")
# 일반 라인은 중요도에 따라 결정
else:
importance = self.analyzer.analyze_importance(stripped, ContentType.CODE)
if importance > 0.6:
important_lines.append((i, line))
preserved.append(f"Line {i+1}: {stripped[:50]}")
else:
removed.append(f"Line {i+1}: {stripped[:50]}")
# 압축된 코드 재구성
compressed_lines = [line for _, line in important_lines]
compressed_content = '\n'.join(compressed_lines)
# 타겟 토큰에 맞춰 추가 압축
if self.token_counter.count_tokens(compressed_content) > target_tokens:
# 더 적극적인 압축 (함수 시그니처만 보존 등)
signature_lines = []
for line_num, line in important_lines:
stripped = line.strip()
if any(keyword in stripped for keyword in ['def ', 'class ', 'async def']):
signature_lines.append(line)
if signature_lines:
compressed_content = '\n'.join(signature_lines)
compressed_content += '\n# ... (compressed content) ...'
return compressed_content, preserved, removed
def _compress_conversation(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]:
"""대화 압축"""
# 대화를 턴으로 분할
turns = re.split(r'\n(?=(?:User|Assistant|Human|AI)[::])', content)
preserved = []
removed = []
important_turns = []
for i, turn in enumerate(turns):
if not turn.strip():
removed.append(f"Empty turn {i}")
continue
importance = self.analyzer.analyze_importance(turn, ContentType.CONVERSATION)
if importance > 0.5: # 중요한 대화만 보존
# 긴 대화는 요약
if len(turn) > 300:
summary = self._summarize_turn(turn)
important_turns.append(summary)
preserved.append(f"Turn {i} (summarized): {turn[:50]}...")
else:
important_turns.append(turn)
preserved.append(f"Turn {i}: {turn[:50]}...")
else:
removed.append(f"Turn {i}: {turn[:50]}...")
compressed_content = '\n'.join(important_turns)
return compressed_content, preserved, removed
def _compress_error_log(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]:
"""에러 로그 압축"""
lines = content.split('\n')
preserved = []
removed = []
# 에러 관련 키워드가 있는 라인 우선 보존
error_keywords = ['error', 'exception', 'traceback', 'failed', 'critical']
important_lines = []
for i, line in enumerate(lines):
stripped = line.strip()
if any(keyword in stripped.lower() for keyword in error_keywords):
important_lines.append(line)
preserved.append(f"Error line {i+1}: {stripped[:50]}")
elif stripped and not re.match(r'^\s*at\s+', stripped): # 스택 트레이스 제외
if len(important_lines) < 10: # 최대 10라인만 보존
important_lines.append(line)
preserved.append(f"Context line {i+1}: {stripped[:50]}")
else:
removed.append(f"Line {i+1}: {stripped[:50]}")
else:
removed.append(f"Stack trace {i+1}: {stripped[:30]}")
compressed_content = '\n'.join(important_lines)
if not compressed_content.strip():
compressed_content = "# Error log compressed - no critical errors found"
return compressed_content, preserved, removed
def _compress_generic(self, content: str, target_tokens: int) -> Tuple[str, List[str], List[str]]:
"""일반 컨텍스트 압축"""
# 문장별로 분할
sentences = re.split(r'[.!?]\s+', content)
preserved = []
removed = []
# 중요도별로 정렬
sentence_scores = []
for i, sentence in enumerate(sentences):
if sentence.strip():
importance = self.analyzer.analyze_importance(sentence, ContentType.CONVERSATION)
sentence_scores.append((importance, i, sentence))
# 중요도 순으로 정렬
sentence_scores.sort(reverse=True)
# 타겟 토큰에 맞춰 선택
selected_sentences = []
current_tokens = 0
for importance, i, sentence in sentence_scores:
sentence_tokens = self.token_counter.count_tokens(sentence)
if current_tokens + sentence_tokens <= target_tokens:
selected_sentences.append((i, sentence))
current_tokens += sentence_tokens
preserved.append(f"Sentence {i}: {sentence[:50]}...")
else:
removed.append(f"Sentence {i}: {sentence[:50]}...")
# 원래 순서대로 재정렬
selected_sentences.sort()
compressed_content = '. '.join([sentence for _, sentence in selected_sentences])
return compressed_content, preserved, removed
def _summarize_turn(self, turn: str) -> str:
"""대화 턴 요약"""
# 간단한 요약 로직 (실제로는 더 정교한 요약 모델 사용 가능)
lines = turn.split('\n')
if len(lines) <= 3:
return turn
# 첫 줄과 마지막 줄 보존, 중간은 요약
summary_parts = [
lines[0],
f"... ({len(lines)-2} lines compressed) ...",
lines[-1] if len(lines) > 1 else ""
]
return '\n'.join(filter(None, summary_parts))
class AutoCompactDatabase:
"""Auto Compact 데이터베이스 관리자"""
def __init__(self, db_path: str = "auto_compact.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""데이터베이스 초기화"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS context_sessions (
session_id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
last_updated TEXT NOT NULL,
total_tokens INTEGER NOT NULL,
compressed_tokens INTEGER NOT NULL,
chunk_count INTEGER NOT NULL,
compression_history TEXT,
metadata TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
content TEXT NOT NULL,
content_type TEXT NOT NULL,
importance_score REAL NOT NULL,
token_count INTEGER NOT NULL,
timestamp TEXT NOT NULL,
compression_level TEXT NOT NULL,
metadata TEXT,
original_content TEXT,
FOREIGN KEY (session_id) REFERENCES context_sessions (session_id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS compression_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
operation_type TEXT NOT NULL,
original_tokens INTEGER NOT NULL,
compressed_tokens INTEGER NOT NULL,
compression_ratio REAL NOT NULL,
time_taken REAL NOT NULL,
timestamp TEXT NOT NULL,
details TEXT,
FOREIGN KEY (session_id) REFERENCES context_sessions (session_id)
)
""")
def store_context_session(self, session: ContextSession):
"""컨텍스트 세션 저장"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO context_sessions
(session_id, created_at, last_updated, total_tokens, compressed_tokens,
chunk_count, compression_history, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
session.session_id, session.created_at, session.last_updated,
session.total_tokens, session.compressed_tokens, session.chunk_count,
json.dumps(session.compression_history), "{}"
))
def store_context_chunk(self, chunk: ContextChunk):
"""컨텍스트 청크 저장"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO context_chunks
(id, session_id, content, content_type, importance_score, token_count,
timestamp, compression_level, metadata, original_content)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
chunk.id, chunk.metadata.get('session_id', ''), chunk.content,
chunk.content_type, chunk.importance_score, chunk.token_count,
chunk.timestamp, chunk.compression_level, json.dumps(chunk.metadata),
chunk.metadata.get('original_content', chunk.content)
))
def store_compression_history(self, session_id: str, result: CompressionResult, operation_type: str):
"""압축 히스토리 저장"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO compression_history
(session_id, operation_type, original_tokens, compressed_tokens,
compression_ratio, time_taken, timestamp, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id, operation_type, result.original_tokens, result.compressed_tokens,
result.compression_ratio, result.time_taken, datetime.now().isoformat(),
json.dumps(asdict(result))
))
class AutoCompactSystem:
"""Auto Compact System 메인 클래스"""
def __init__(self):
self.database = AutoCompactDatabase()
self.compressor = ContextCompressor()
self.analyzer = ContextAnalyzer()
self.token_counter = TokenCounter()
self.current_session_id = str(uuid.uuid4())
# 설정
self.max_context_tokens = 32000 # 최대 컨텍스트 토큰
self.compression_threshold = 0.8 # 80% 사용시 압축 시작
self.auto_compression_enabled = True
logger.info("🔧 Auto Compact System 초기화 완료")
# 1. 컨텍스트 추가 및 관리
async def add_context(self, content: str, content_type: ContentType,
metadata: Optional[Dict] = None) -> Dict[str, Any]:
"""새로운 컨텍스트 추가"""
try:
if metadata is None:
metadata = {}
# 토큰 수 계산
token_count = self.token_counter.count_tokens(content)
# 중요도 분석
importance_score = self.analyzer.analyze_importance(content, content_type)
# 컨텍스트 청크 생성
chunk_id = str(uuid.uuid4())
chunk = ContextChunk(
id=chunk_id,
content=content,
content_type=content_type.value,
importance_score=importance_score,
token_count=token_count,
timestamp=datetime.now().isoformat(),
compression_level=CompressionLevel.MINIMAL.value,
metadata={**metadata, 'session_id': self.current_session_id, 'original_content': content}
)
# 데이터베이스에 저장
self.database.store_context_chunk(chunk)
# 자동 압축 확인
if self.auto_compression_enabled:
await self._check_auto_compression()
logger.info(f"➕ 컨텍스트 추가: {token_count} 토큰, 중요도 {importance_score:.2f}")
return {
"success": True,
"chunk_id": chunk_id,
"token_count": token_count,
"importance_score": importance_score,
"auto_compression_triggered": False # 실제 구현에서는 확인 필요
}
except Exception as e:
logger.error(f"❌ 컨텍스트 추가 실패: {e}")
return {"success": False, "error": str(e)}
# 2. 수동 압축
async def compress_context(self, target_compression: CompressionLevel,
content_filter: Optional[List[str]] = None) -> Dict[str, Any]:
"""수동 컨텍스트 압축"""
try:
# 현재 세션의 모든 청크 조회
chunks = await self._get_session_chunks(self.current_session_id)
if not chunks:
return {"success": False, "error": "No context to compress"}
# 필터 적용
if content_filter:
chunks = [chunk for chunk in chunks if chunk.content_type in content_filter]
total_original_tokens = sum(chunk.token_count for chunk in chunks)
compressed_chunks = []
total_compressed_tokens = 0
compression_results = []
for chunk in chunks:
# 이미 충분히 압축된 청크는 건너뛰기
if chunk.compression_level == target_compression.value:
compressed_chunks.append(chunk)
total_compressed_tokens += chunk.token_count
continue
# 압축 실행
compressed_content, result = self.compressor.compress_content(
chunk.content,
ContentType(chunk.content_type),
target_compression
)
# 압축된 청크 생성
compressed_chunk = ContextChunk(
id=chunk.id,
content=compressed_content,
content_type=chunk.content_type,
importance_score=chunk.importance_score,
token_count=result.compressed_tokens,
timestamp=chunk.timestamp,
compression_level=target_compression.value,
metadata={**chunk.metadata, 'compressed_at': datetime.now().isoformat()}
)
compressed_chunks.append(compressed_chunk)
total_compressed_tokens += result.compressed_tokens
compression_results.append(result)
# 데이터베이스 업데이트
self.database.store_context_chunk(compressed_chunk)
self.database.store_compression_history(
self.current_session_id, result, "manual_compression"
)
# 전체 압축 결과 계산
overall_ratio = total_compressed_tokens / total_original_tokens if total_original_tokens > 0 else 1.0
tokens_saved = total_original_tokens - total_compressed_tokens
logger.info(f"🗜️ 수동 압축 완료: {total_original_tokens}→{total_compressed_tokens} 토큰 ({overall_ratio:.1%})")
return {
"success": True,
"compression_level": target_compression.value,
"original_tokens": total_original_tokens,
"compressed_tokens": total_compressed_tokens,
"compression_ratio": overall_ratio,
"tokens_saved": tokens_saved,
"chunks_compressed": len(compression_results),
"compression_results": [asdict(result) for result in compression_results]
}
except Exception as e:
logger.error(f"❌ 수동 압축 실패: {e}")
return {"success": False, "error": str(e)}
# 3. 자동 압축 확인
async def _check_auto_compression(self) -> bool:
"""자동 압축 필요성 확인 및 실행"""
try:
current_tokens = await self._get_current_token_count()
if current_tokens >= self.max_context_tokens * self.compression_threshold:
logger.info(f"🤖 자동 압축 트리거: {current_tokens}/{self.max_context_tokens} 토큰")
# 적응형 압축 레벨 결정
if current_tokens >= self.max_context_tokens * 0.95:
compression_level = CompressionLevel.EXTREME
elif current_tokens >= self.max_context_tokens * 0.9:
compression_level = CompressionLevel.AGGRESSIVE
else:
compression_level = CompressionLevel.MODERATE
# 자동 압축 실행
result = await self.compress_context(compression_level)
return result.get("success", False)
return False
except Exception as e:
logger.error(f"❌ 자동 압축 확인 실패: {e}")
return False
# 4. 컨텍스트 검색
async def search_context(self, query: str, limit: int = 10) -> Dict[str, Any]:
"""컨텍스트 검색"""
try:
chunks = await self._get_session_chunks(self.current_session_id)
# 단순한 텍스트 검색 (실제로는 더 정교한 검색 가능)
matches = []
query_lower = query.lower()
for chunk in chunks:
if query_lower in chunk.content.lower():
# 관련성 점수 계산 (간단한 버전)
relevance = chunk.content.lower().count(query_lower) * chunk.importance_score
matches.append((relevance, chunk))
# 관련성 순으로 정렬
matches.sort(reverse=True)
matches = matches[:limit]
logger.info(f"🔍 컨텍스트 검색: '{query}' - {len(matches)}개 결과")
return {
"success": True,
"query": query,
"results": [
{
"chunk_id": chunk.id,
"content_preview": chunk.content[:200] + "..." if len(chunk.content) > 200 else chunk.content,
"content_type": chunk.content_type,
"importance_score": chunk.importance_score,
"relevance_score": relevance,
"timestamp": chunk.timestamp
}
for relevance, chunk in matches
],
"total_results": len(matches)
}
except Exception as e:
logger.error(f"❌ 컨텍스트 검색 실패: {e}")
return {"success": False, "error": str(e)}
# 5. 세션 관리
async def create_new_session(self) -> Dict[str, Any]:
"""새로운 세션 생성"""
try:
# 현재 세션 저장
await self._save_current_session()
# 새 세션 ID 생성
self.current_session_id = str(uuid.uuid4())
logger.info(f"🆕 새 세션 생성: {self.current_session_id}")
return {
"success": True,
"session_id": self.current_session_id,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ 새 세션 생성 실패: {e}")
return {"success": False, "error": str(e)}
# 6. 통계 및 모니터링
async def get_context_stats(self) -> Dict[str, Any]:
"""컨텍스트 사용 통계"""
try:
chunks = await self._get_session_chunks(self.current_session_id)
if not chunks:
return {
"success": True,
"session_id": self.current_session_id,
"total_chunks": 0,
"total_tokens": 0,
"usage_percentage": 0.0
}
total_tokens = sum(chunk.token_count for chunk in chunks)
usage_percentage = (total_tokens / self.max_context_tokens) * 100
# 컨텐츠 타입별 통계
type_stats = {}
for chunk in chunks:
content_type = chunk.content_type
if content_type not in type_stats:
type_stats[content_type] = {"count": 0, "tokens": 0}
type_stats[content_type]["count"] += 1
type_stats[content_type]["tokens"] += chunk.token_count
# 압축 레벨별 통계
compression_stats = {}
for chunk in chunks:
level = chunk.compression_level
if level not in compression_stats:
compression_stats[level] = {"count": 0, "tokens": 0}
compression_stats[level]["count"] += 1
compression_stats[level]["tokens"] += chunk.token_count
logger.info(f"📊 컨텍스트 통계: {len(chunks)}개 청크, {total_tokens} 토큰 ({usage_percentage:.1f}%)")
return {
"success": True,
"session_id": self.current_session_id,
"total_chunks": len(chunks),
"total_tokens": total_tokens,
"max_tokens": self.max_context_tokens,
"usage_percentage": usage_percentage,
"compression_threshold": self.compression_threshold * 100,
"auto_compression_enabled": self.auto_compression_enabled,
"content_type_stats": type_stats,
"compression_level_stats": compression_stats,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ 컨텍스트 통계 조회 실패: {e}")
return {"success": False, "error": str(e)}
# 헬퍼 메서드들
async def _get_session_chunks(self, session_id: str) -> List[ContextChunk]:
"""세션의 모든 청크 조회"""
chunks = []
with sqlite3.connect(self.database.db_path) as conn:
cursor = conn.execute("""
SELECT id, content, content_type, importance_score, token_count,
timestamp, compression_level, metadata
FROM context_chunks
WHERE metadata LIKE ?
ORDER BY timestamp
""", (f'%"session_id": "{session_id}"%',))
for row in cursor.fetchall():
metadata = json.loads(row[7]) if row[7] else {}
chunk = ContextChunk(
id=row[0],
content=row[1],
content_type=row[2],
importance_score=row[3],
token_count=row[4],
timestamp=row[5],
compression_level=row[6],
metadata=metadata
)
chunks.append(chunk)
return chunks
async def _get_current_token_count(self) -> int:
"""현재 세션의 총 토큰 수"""
chunks = await self._get_session_chunks(self.current_session_id)
return sum(chunk.token_count for chunk in chunks)
async def _save_current_session(self):
"""현재 세션 저장"""
chunks = await self._get_session_chunks(self.current_session_id)
if chunks:
total_tokens = sum(chunk.token_count for chunk in chunks)
session = ContextSession(
session_id=self.current_session_id,
created_at=min(chunk.timestamp for chunk in chunks),
last_updated=datetime.now().isoformat(),
total_tokens=total_tokens,
compressed_tokens=total_tokens, # 실제 압축 토큰 계산 필요
chunk_count=len(chunks),
compression_history=[]
)
self.database.store_context_session(session)
class AutoCompactMCPServer:
"""Auto Compact MCP 서버 클래스"""
def __init__(self):
self.auto_compact = AutoCompactSystem()
logger.info("🔧 Auto Compact MCP 서버 초기화 완료")
async def handle_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""MCP 요청 처리"""
if params is None:
params = {}
# 메서드 매핑
method_map = {
"addContext": self._add_context_handler,
"compressContext": self._compress_context_handler,
"searchContext": self.auto_compact.search_context,
"getContextStats": self.auto_compact.get_context_stats,
"createNewSession": self.auto_compact.create_new_session,
"enableAutoCompression": self._enable_auto_compression,
"disableAutoCompression": self._disable_auto_compression,
"setCompressionThreshold": self._set_compression_threshold
}
if method not in method_map:
return {
"success": False,
"error": f"Unknown method: {method}"
}
try:
return await method_map[method](**params)
except Exception as e:
logger.error(f"❌ 메서드 실행 실패 ({method}): {e}")
return {
"success": False,
"error": str(e)
}
async def _add_context_handler(self, content: str, content_type: str, metadata: Dict = None):
"""컨텍스트 추가 핸들러"""
try:
content_type_enum = ContentType(content_type)
except ValueError:
content_type_enum = ContentType.CONVERSATION
return await self.auto_compact.add_context(content, content_type_enum, metadata)
async def _compress_context_handler(self, compression_level: str, content_filter: List[str] = None):
"""컨텍스트 압축 핸들러"""
try:
level_enum = CompressionLevel(compression_level)
except ValueError:
level_enum = CompressionLevel.MODERATE
return await self.auto_compact.compress_context(level_enum, content_filter)
async def _enable_auto_compression(self):
"""자동 압축 활성화"""
self.auto_compact.auto_compression_enabled = True
return {"success": True, "auto_compression_enabled": True}
async def _disable_auto_compression(self):
"""자동 압축 비활성화"""
self.auto_compact.auto_compression_enabled = False
return {"success": True, "auto_compression_enabled": False}
async def _set_compression_threshold(self, threshold: float):
"""압축 임계값 설정"""
if 0.1 <= threshold <= 1.0:
self.auto_compact.compression_threshold = threshold
return {"success": True, "compression_threshold": threshold}
else:
return {"success": False, "error": "Threshold must be between 0.1 and 1.0"}
# 테스트 함수
async def test_auto_compact_system():
"""Auto Compact System 종합 테스트"""
try:
print("🧪 Auto Compact System 테스트 시작...\n")
# MCP 서버 초기화
print("🔧 Auto Compact MCP 서버 초기화...")
server = AutoCompactMCPServer()
print(f"초기화 결과: True\n")
# 1. 컨텍스트 추가 (코드)
print("➕ 컨텍스트 추가 (코드)...")
code_content = """
def calculate_fibonacci(n):
if n <= 1:
return n
else:
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
# 이것은 중요한 주석입니다
def main():
print("Hello World")
for i in range(10):
print(f"Fibonacci({i}) = {calculate_fibonacci(i)}")
if __name__ == "__main__":
main()
"""
add_result = await server.handle_request("addContext", {
"content": code_content,
"content_type": "code",
"metadata": {"language": "python", "file": "fibonacci.py"}
})
print(f"코드 추가: {add_result.get('success', False)}")
if add_result.get('success'):
print(f" 토큰 수: {add_result.get('token_count', 0)}")
print(f" 중요도: {add_result.get('importance_score', 0):.2f}")
# 2. 컨텍스트 추가 (대화)
print("\n➕ 컨텍스트 추가 (대화)...")
conversation = """
User: 안녕하세요! 오늘 날씨가 정말 좋네요.
Assistant: 안녕하세요! 네, 정말 좋은 날씨입니다. 무엇을 도와드릴까요?
User: Python으로 피보나치 수열을 구현하는 방법을 알려주세요.
Assistant: 피보나치 수열은 재귀함수나 반복문으로 구현할 수 있습니다. 재귀적 방법은 이해하기 쉽지만 성능이 떨어질 수 있습니다.
"""
add_result2 = await server.handle_request("addContext", {
"content": conversation,
"content_type": "conversation"
})
print(f"대화 추가: {add_result2.get('success', False)}")
# 3. 컨텍스트 통계 조회
print("\n📊 컨텍스트 통계 조회...")
stats_result = await server.handle_request("getContextStats")
print(f"통계 조회: {stats_result.get('success', False)}")
if stats_result.get('success'):
print(f" 총 청크: {stats_result.get('total_chunks', 0)}")
print(f" 총 토큰: {stats_result.get('total_tokens', 0)}")
print(f" 사용률: {stats_result.get('usage_percentage', 0):.1f}%")
# 4. 수동 압축 실행
print("\n🗜️ 수동 압축 실행...")
compress_result = await server.handle_request("compressContext", {
"compression_level": "moderate"
})
print(f"압축 실행: {compress_result.get('success', False)}")
if compress_result.get('success'):
print(f" 압축 전: {compress_result.get('original_tokens', 0)} 토큰")
print(f" 압축 후: {compress_result.get('compressed_tokens', 0)} 토큰")
print(f" 압축률: {compress_result.get('compression_ratio', 0):.1%}")
# 5. 컨텍스트 검색
print("\n🔍 컨텍스트 검색...")
search_result = await server.handle_request("searchContext", {
"query": "fibonacci",
"limit": 5
})
print(f"검색 실행: {search_result.get('success', False)}")
if search_result.get('success'):
print(f" 검색 결과: {search_result.get('total_results', 0)}개")
# 6. 자동 압축 설정
print("\n🤖 자동 압축 설정...")
auto_result = await server.handle_request("setCompressionThreshold", {
"threshold": 0.7
})
print(f"임계값 설정: {auto_result.get('success', False)}")
# 7. 새 세션 생성
print("\n🆕 새 세션 생성...")
session_result = await server.handle_request("createNewSession")
print(f"세션 생성: {session_result.get('success', False)}")
print("\n🎯 Auto Compact System 테스트 완료!")
# 테스트 결과 통계
test_results = [
add_result.get('success', False),
add_result2.get('success', False),
stats_result.get('success', False),
compress_result.get('success', False),
search_result.get('success', False),
auto_result.get('success', False),
session_result.get('success', False)
]
success_count = sum(test_results)
total_count = len(test_results)
success_rate = (success_count / total_count) * 100
print(f"\n📊 테스트 결과: {success_count}/{total_count} 성공 ({success_rate:.1f}%)")
except Exception as e:
print(f"❌ 테스트 중 오류: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_auto_compact_system())