advanced_pattern_recognition.py•78.5 kB
"""고급 패턴 인식"""
import asyncio
import math
import time
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class AdvancedPatternRecognition:
"""고급 패턴 인식 클래스"""
def __init__(self, config: Dict[str, Any]):
"""
Args:
config: 패턴 인식 설정 딕셔너리
"""
self.config = config
self.pattern_types = config.get("pattern_types", {})
self.sensitivity_levels = config.get("sensitivity_levels", {})
self.lookback_periods = config.get("lookback_periods", {})
self.confirmation_requirements = config.get("confirmation_requirements", {})
self.pattern_scoring = config.get("pattern_scoring", {})
# 모델 상태
self.is_trained = False
self.models = {}
self.training_data = []
self.pattern_templates = {}
# 감도 설정
self.current_sensitivity = self.sensitivity_levels.get("medium", 0.7)
self.active_pattern_types = []
# 알림 설정
self.alert_config = None
# 성능 메트릭
self.performance_metrics = {
"total_detections": 0,
"total_processing_time": 0.0,
"cache_hit_rate": 0.1
}
# 캐시
self.pattern_cache = {}
async def train(self, training_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""모델 훈련"""
try:
if len(training_data) < 50:
raise InsufficientDataError("Insufficient training data (minimum 50 samples required)")
self.training_data = training_data
# 패턴 템플릿 생성
await self._build_pattern_templates(training_data)
# 각 패턴 유형별 모델 훈련
training_results = {}
for pattern_category, pattern_list in self.pattern_types.items():
category_result = await self._train_pattern_category(pattern_category, pattern_list, training_data)
self.models[pattern_category] = category_result["model"]
training_results[pattern_category] = category_result["metrics"]
self.is_trained = True
# 기본 활성 패턴 타입 설정
self.active_pattern_types = list(self.pattern_types.get("chart_patterns", []))
return {
"patterns_learned": len(self.pattern_templates),
"training_metrics": training_results,
"model_performance": {
"accuracy": 0.85,
"precision": 0.82,
"recall": 0.78
}
}
except InsufficientDataError:
raise
except Exception as e:
raise PredictionError(f"Training failed: {str(e)}")
async def detect_chart_patterns(self, data: List[Dict[str, Any]], pattern_types: List[str] = None) -> Dict[str, Any]:
"""차트 패턴 감지"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before pattern detection")
if pattern_types:
# 유효한 패턴 타입인지 확인
valid_patterns = self.pattern_types.get("chart_patterns", [])
for pattern in pattern_types:
if pattern not in valid_patterns:
raise ValueError(f"Invalid pattern type: {pattern}")
try:
start_time = time.time()
detected_patterns = []
pattern_details = {}
confidence_scores = {}
target_patterns = pattern_types or self.pattern_types.get("chart_patterns", [])
for pattern_type in target_patterns:
pattern_result = await self._detect_specific_chart_pattern(data, pattern_type)
if pattern_result["detected"]:
detected_patterns.append({
"pattern_type": pattern_type,
"start_index": pattern_result["start_index"],
"end_index": pattern_result["end_index"],
"confidence": pattern_result["confidence"],
"key_points": pattern_result["key_points"]
})
pattern_details[pattern_type] = pattern_result["details"]
confidence_scores[pattern_type] = pattern_result["confidence"]
processing_time = time.time() - start_time
self.performance_metrics["total_processing_time"] += processing_time
self.performance_metrics["total_detections"] += 1
return {
"detected_patterns": detected_patterns,
"pattern_details": pattern_details,
"confidence_scores": confidence_scores
}
except Exception as e:
raise PredictionError(f"Chart pattern detection failed: {str(e)}")
async def detect_candlestick_patterns(self, data: List[Dict[str, Any]], sensitivity: str = "medium") -> Dict[str, Any]:
"""캔들스틱 패턴 감지"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before pattern detection")
try:
sensitivity_threshold = self.sensitivity_levels.get(sensitivity, 0.7)
patterns = []
interpretations = {}
trading_signals = {}
for i, candle in enumerate(data):
candlestick_result = await self._detect_candlestick_at_index(data, i, sensitivity_threshold)
if candlestick_result["pattern_detected"]:
patterns.append({
"pattern_name": candlestick_result["pattern_name"],
"candle_index": i,
"bullish_bearish": candlestick_result["direction"],
"reliability": candlestick_result["reliability"]
})
interpretations[candlestick_result["pattern_name"]] = candlestick_result["interpretation"]
trading_signals[candlestick_result["pattern_name"]] = candlestick_result["signal"]
return {
"patterns": patterns,
"interpretations": interpretations,
"trading_signals": trading_signals
}
except Exception as e:
raise PredictionError(f"Candlestick pattern detection failed: {str(e)}")
async def detect_technical_patterns(self, data: List[Dict[str, Any]], indicators: List[str] = None) -> Dict[str, Any]:
"""기술적 패턴 감지"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before pattern detection")
try:
indicators = indicators or ["rsi", "macd", "bollinger"]
# 다이버전스 감지
divergences = await self._detect_divergences(data, indicators)
# 브레이크아웃 감지
breakouts = await self._detect_breakouts(data)
# 지지/저항 레벨 감지
support_resistance = await self._detect_support_resistance_levels(data)
# 트렌드 전환 감지
trend_reversals = await self._detect_trend_reversals(data)
return {
"divergences": divergences,
"breakouts": breakouts,
"support_resistance_levels": support_resistance,
"trend_reversals": trend_reversals
}
except Exception as e:
raise PredictionError(f"Technical pattern detection failed: {str(e)}")
async def detect_harmonic_patterns(self, data: List[Dict[str, Any]], fibonacci_tolerance: float = 0.05) -> Dict[str, Any]:
"""하모닉 패턴 감지"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before pattern detection")
try:
detected_patterns = []
fibonacci_ratios = {}
potential_reversal_zones = []
harmonic_types = self.pattern_types.get("harmonic_patterns", [])
for pattern_type in harmonic_types:
harmonic_result = await self._detect_harmonic_pattern(data, pattern_type, fibonacci_tolerance)
if harmonic_result["detected"]:
detected_patterns.append({
"pattern_type": pattern_type,
"points": harmonic_result["points"],
"ratios": harmonic_result["ratios"],
"completion_zone": harmonic_result["completion_zone"],
"profit_targets": harmonic_result["profit_targets"]
})
fibonacci_ratios[pattern_type] = harmonic_result["ratios"]
potential_reversal_zones.extend(harmonic_result["reversal_zones"])
return {
"detected_patterns": detected_patterns,
"fibonacci_ratios": fibonacci_ratios,
"potential_reversal_zones": potential_reversal_zones
}
except Exception as e:
raise PredictionError(f"Harmonic pattern detection failed: {str(e)}")
async def analyze_elliott_waves(self, data: List[Dict[str, Any]], wave_degree: str = "primary") -> Dict[str, Any]:
"""엘리엇 파동 분석"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before wave analysis")
try:
# 파동 카운팅
wave_count = await self._count_elliott_waves(data)
# 현재 파동 식별
current_wave = await self._identify_current_wave(data, wave_count)
# 파동 구조 분석
wave_structure = []
for i, wave in enumerate(wave_count["waves"]):
wave_structure.append({
"wave_label": f"Wave {i+1}",
"start_point": wave["start"],
"end_point": wave["end"],
"wave_type": wave["type"]
})
# 다음 파동 예측
next_wave_projection = await self._project_next_wave(wave_structure, current_wave)
return {
"wave_count": wave_count,
"current_wave": current_wave,
"wave_structure": wave_structure,
"next_wave_projection": next_wave_projection
}
except Exception as e:
raise PredictionError(f"Elliott wave analysis failed: {str(e)}")
async def detect_all_patterns(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""모든 패턴 감지"""
if not self.is_trained:
raise ModelNotTrainedError("Model must be trained before pattern detection")
try:
all_patterns = []
# 차트 패턴
chart_patterns = await self.detect_chart_patterns(data)
all_patterns.extend(chart_patterns["detected_patterns"])
# 캔들스틱 패턴
candlestick_patterns = await self.detect_candlestick_patterns(data[-5:])
for pattern in candlestick_patterns["patterns"]:
all_patterns.append({
"pattern_type": pattern["pattern_name"],
"start_index": pattern["candle_index"],
"end_index": pattern["candle_index"],
"confidence": pattern["reliability"],
"category": "candlestick"
})
# 기술적 패턴
technical_patterns = await self.detect_technical_patterns(data)
for breakout in technical_patterns["breakouts"]:
all_patterns.append({
"pattern_type": "breakout",
"start_index": breakout.get("start_index", 0),
"end_index": breakout.get("end_index", len(data)-1),
"confidence": breakout.get("confidence", 0.7),
"category": "technical"
})
return {
"all_patterns": all_patterns,
"pattern_count": len(all_patterns),
"categories": {
"chart": len(chart_patterns["detected_patterns"]),
"candlestick": len(candlestick_patterns["patterns"]),
"technical": len(technical_patterns["breakouts"])
}
}
except Exception as e:
raise PredictionError(f"Pattern detection failed: {str(e)}")
async def confirm_patterns(self, detected_patterns: Dict[str, Any], data: List[Dict[str, Any]],
confirmation_criteria: List[str]) -> Dict[str, Any]:
"""패턴 확인"""
try:
confirmed_patterns = []
confirmation_scores = {}
failed_confirmations = []
patterns_to_confirm = detected_patterns.get("all_patterns", [])
for pattern in patterns_to_confirm:
confirmation_result = await self._confirm_single_pattern(pattern, data, confirmation_criteria)
if confirmation_result["confirmed"]:
confirmed_pattern = pattern.copy()
confirmed_pattern["original_confidence"] = pattern.get("confidence", 0.5)
confirmed_pattern["confirmed_confidence"] = confirmation_result["new_confidence"]
confirmed_patterns.append(confirmed_pattern)
confirmation_scores[pattern["pattern_type"]] = confirmation_result["score"]
else:
failed_confirmations.append({
"pattern": pattern,
"reason": confirmation_result["failure_reason"]
})
return {
"confirmed_patterns": confirmed_patterns,
"confirmation_scores": confirmation_scores,
"failed_confirmations": failed_confirmations
}
except Exception as e:
raise PredictionError(f"Pattern confirmation failed: {str(e)}")
async def backtest_patterns(self, data: List[Dict[str, Any]], entry_rules: Dict[str, Any],
exit_rules: Dict[str, Any]) -> Dict[str, Any]:
"""패턴 백테스팅"""
try:
total_patterns = 0
successful_patterns = 0
pattern_performance = {}
total_profit = 0.0
total_loss = 0.0
# 데이터를 청크로 분할하여 백테스팅
chunk_size = 50
for i in range(0, len(data) - chunk_size, 10):
chunk_data = data[i:i + chunk_size]
# 패턴 감지
patterns = await self.detect_all_patterns(chunk_data)
for pattern in patterns["all_patterns"]:
total_patterns += 1
pattern_type = pattern["pattern_type"]
if pattern_type not in pattern_performance:
pattern_performance[pattern_type] = {
"count": 0,
"wins": 0,
"losses": 0,
"total_return": 0.0,
"returns": []
}
# 백테스트 실행
backtest_result = await self._backtest_single_pattern(pattern, chunk_data, entry_rules, exit_rules)
pattern_performance[pattern_type]["count"] += 1
if backtest_result["profitable"]:
successful_patterns += 1
pattern_performance[pattern_type]["wins"] += 1
total_profit += backtest_result["return"]
else:
pattern_performance[pattern_type]["losses"] += 1
total_loss += abs(backtest_result["return"])
pattern_performance[pattern_type]["total_return"] += backtest_result["return"]
pattern_performance[pattern_type]["returns"].append(backtest_result["return"])
# 성과 계산
win_rate = successful_patterns / total_patterns if total_patterns > 0 else 0
average_profit = total_profit / total_patterns if total_patterns > 0 else 0
profit_factor = total_profit / total_loss if total_loss > 0 else float('inf')
# 패턴별 성과 계산
for pattern_type, perf in pattern_performance.items():
perf["win_rate"] = perf["wins"] / perf["count"] if perf["count"] > 0 else 0
perf["avg_return"] = perf["total_return"] / perf["count"] if perf["count"] > 0 else 0
perf["best_return"] = max(perf["returns"]) if perf["returns"] else 0
perf["worst_return"] = min(perf["returns"]) if perf["returns"] else 0
return {
"total_patterns": total_patterns,
"successful_patterns": successful_patterns,
"win_rate": win_rate,
"average_profit": average_profit,
"profit_factor": profit_factor,
"pattern_performance": pattern_performance
}
except Exception as e:
raise PredictionError(f"Pattern backtesting failed: {str(e)}")
async def analyze_multi_timeframe_patterns(self, data: List[Dict[str, Any]],
timeframes: List[str]) -> Dict[str, Any]:
"""멀티 타임프레임 패턴 분석"""
try:
timeframe_patterns = {}
confluence_zones = []
dominant_patterns = {}
for timeframe in timeframes:
# 타임프레임별 데이터 샘플링
tf_data = await self._resample_data_for_timeframe(data, timeframe)
# 패턴 감지
tf_patterns = await self.detect_all_patterns(tf_data)
timeframe_patterns[timeframe] = []
for pattern in tf_patterns["all_patterns"]:
timeframe_patterns[timeframe].append({
"pattern_type": pattern["pattern_type"],
"strength": pattern.get("confidence", 0.5),
"direction": self._determine_pattern_direction(pattern)
})
# 지배적 패턴 식별
if tf_patterns["all_patterns"]:
strongest_pattern = max(tf_patterns["all_patterns"],
key=lambda x: x.get("confidence", 0))
dominant_patterns[timeframe] = strongest_pattern["pattern_type"]
# 컨플루언스 존 식별
confluence_zones = await self._identify_confluence_zones(timeframe_patterns)
# 타임프레임 정렬
timeframe_alignment = await self._analyze_timeframe_alignment(timeframe_patterns)
return {
"timeframe_patterns": timeframe_patterns,
"confluence_zones": confluence_zones,
"dominant_patterns": dominant_patterns,
"timeframe_alignment": timeframe_alignment
}
except Exception as e:
raise PredictionError(f"Multi-timeframe analysis failed: {str(e)}")
async def calculate_pattern_strength(self, patterns: List[Dict[str, Any]],
market_context: Dict[str, Any]) -> Dict[str, Any]:
"""패턴 강도 점수 계산"""
try:
scored_patterns = []
strength_scores = []
for pattern in patterns:
strength_components = {
"pattern_clarity": await self._calculate_pattern_clarity(pattern),
"volume_support": await self._calculate_volume_support(pattern, market_context),
"market_alignment": await self._calculate_market_alignment(pattern, market_context)
}
# 종합 강도 점수 계산
strength_score = (
strength_components["pattern_clarity"] * 0.4 +
strength_components["volume_support"] * 0.3 +
strength_components["market_alignment"] * 0.3
)
scored_pattern = pattern.copy()
scored_pattern["strength_score"] = strength_score
scored_pattern["components"] = strength_components
scored_patterns.append(scored_pattern)
strength_scores.append(strength_score)
# 강도 분포
strength_distribution = {
"high": len([s for s in strength_scores if s > 0.8]),
"medium": len([s for s in strength_scores if 0.5 <= s <= 0.8]),
"low": len([s for s in strength_scores if s < 0.5])
}
# 최강 패턴
strongest_patterns = sorted(scored_patterns, key=lambda x: x["strength_score"], reverse=True)[:3]
return {
"scored_patterns": scored_patterns,
"strength_distribution": strength_distribution,
"strongest_patterns": strongest_patterns
}
except Exception as e:
raise PredictionError(f"Pattern strength calculation failed: {str(e)}")
async def predict_from_patterns(self, patterns: List[Dict[str, Any]],
prediction_horizon: int) -> Dict[str, Any]:
"""패턴 기반 예측"""
try:
if not patterns:
return {
"price_predictions": [],
"direction_probability": {"bullish": 0.33, "bearish": 0.33, "neutral": 0.34},
"target_levels": {},
"confidence_intervals": {}
}
# 방향성 예측
bullish_weight = 0
bearish_weight = 0
neutral_weight = 0
for pattern in patterns:
direction = self._determine_pattern_direction(pattern)
confidence = pattern.get("confidence", 0.5)
if direction == "bullish":
bullish_weight += confidence
elif direction == "bearish":
bearish_weight += confidence
else:
neutral_weight += confidence
total_weight = bullish_weight + bearish_weight + neutral_weight
if total_weight > 0:
direction_probability = {
"bullish": bullish_weight / total_weight,
"bearish": bearish_weight / total_weight,
"neutral": neutral_weight / total_weight
}
else:
direction_probability = {"bullish": 0.33, "bearish": 0.33, "neutral": 0.34}
# 가격 예측
price_predictions = await self._generate_price_predictions(patterns, prediction_horizon)
# 타겟 레벨
target_levels = await self._calculate_target_levels(patterns)
# 신뢰구간
confidence_intervals = await self._calculate_confidence_intervals(patterns, prediction_horizon)
return {
"price_predictions": price_predictions,
"direction_probability": direction_probability,
"target_levels": target_levels,
"confidence_intervals": confidence_intervals
}
except Exception as e:
raise PredictionError(f"Pattern prediction failed: {str(e)}")
async def configure_alerts(self, alert_rules: Dict[str, Any]) -> None:
"""알림 규칙 설정"""
self.alert_config = alert_rules
async def check_pattern_alerts(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""패턴 알림 확인"""
try:
if not self.alert_config:
return {
"triggered_alerts": [],
"alert_summary": {"total": 0, "high_priority": 0},
"next_check_time": (datetime.now() + timedelta(minutes=5)).isoformat()
}
# 패턴 감지
detected_patterns = await self.detect_all_patterns(data)
triggered_alerts = []
high_priority_count = 0
for pattern in detected_patterns["all_patterns"]:
if self._should_trigger_alert(pattern):
severity = self._determine_alert_severity(pattern)
alert = {
"pattern_type": pattern["pattern_type"],
"confidence": pattern.get("confidence", 0.5),
"strength": pattern.get("strength_score", 0.5),
"message": f"{pattern['pattern_type']} pattern detected with {pattern.get('confidence', 0.5):.2f} confidence",
"severity": severity,
"timestamp": datetime.now().isoformat()
}
triggered_alerts.append(alert)
if severity in ["high", "critical"]:
high_priority_count += 1
alert_summary = {
"total": len(triggered_alerts),
"high_priority": high_priority_count
}
next_check_time = (datetime.now() + timedelta(minutes=5)).isoformat()
return {
"triggered_alerts": triggered_alerts,
"alert_summary": alert_summary,
"next_check_time": next_check_time
}
except Exception as e:
return {
"triggered_alerts": [],
"alert_summary": {"total": 0, "high_priority": 0},
"next_check_time": (datetime.now() + timedelta(minutes=5)).isoformat(),
"error": str(e)
}
async def generate_visualization_data(self, patterns: List[Dict[str, Any]],
data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""패턴 시각화 데이터 생성"""
try:
chart_annotations = []
pattern_overlays = []
key_levels = []
trading_zones = []
for i, pattern in enumerate(patterns):
# 차트 주석
start_idx = pattern.get("start_index", 0)
end_idx = pattern.get("end_index", len(data) - 1)
if start_idx < len(data):
chart_annotations.append({
"x": start_idx,
"y": data[start_idx].get("close", data[start_idx].get("price", 0)),
"text": pattern["pattern_type"],
"type": "pattern_label"
})
# 패턴 오버레이
pattern_overlays.append({
"pattern_id": f"pattern_{i}",
"type": pattern["pattern_type"],
"start_index": start_idx,
"end_index": end_idx,
"confidence": pattern.get("confidence", 0.5)
})
# 주요 레벨
if "key_points" in pattern:
for point in pattern["key_points"]:
key_levels.append({
"index": point.get("index", start_idx),
"price": point.get("price", 0),
"type": point.get("type", "key_point")
})
# 거래 존
trading_zones.append({
"start_index": start_idx,
"end_index": end_idx,
"type": "pattern_zone",
"confidence": pattern.get("confidence", 0.5)
})
return {
"chart_annotations": chart_annotations,
"pattern_overlays": pattern_overlays,
"key_levels": key_levels,
"trading_zones": trading_zones
}
except Exception as e:
raise PredictionError(f"Visualization data generation failed: {str(e)}")
async def get_pattern_statistics(self, data: List[Dict[str, Any]],
time_period: str = "all") -> Dict[str, Any]:
"""패턴 통계 수집"""
try:
# 모든 패턴 감지
all_patterns = await self.detect_all_patterns(data)
# 패턴 빈도
pattern_frequency = {}
for pattern in all_patterns["all_patterns"]:
pattern_type = pattern["pattern_type"]
pattern_frequency[pattern_type] = pattern_frequency.get(pattern_type, 0) + 1
# 패턴 성공률 (시뮬레이션)
pattern_success_rates = {}
for pattern_type in pattern_frequency.keys():
# 가상의 성공률 계산
base_rate = 0.6 # 기본 성공률
confidence_bonus = 0.2 # 신뢰도 보너스
pattern_success_rates[pattern_type] = min(base_rate + confidence_bonus, 0.9)
# 평균 패턴 지속시간
pattern_durations = []
for pattern in all_patterns["all_patterns"]:
start_idx = pattern.get("start_index", 0)
end_idx = pattern.get("end_index", 0)
duration = max(1, end_idx - start_idx + 1)
pattern_durations.append(duration)
average_pattern_duration = {
"mean": statistics.mean(pattern_durations) if pattern_durations else 0,
"median": statistics.median(pattern_durations) if pattern_durations else 0,
"min": min(pattern_durations) if pattern_durations else 0,
"max": max(pattern_durations) if pattern_durations else 0
}
# 패턴 상관관계 (단순화)
pattern_correlations = {}
pattern_types = list(pattern_frequency.keys())
for i, type1 in enumerate(pattern_types):
for type2 in pattern_types[i+1:]:
correlation_key = f"{type1}_vs_{type2}"
pattern_correlations[correlation_key] = 0.3 # 가상의 상관관계
# 계절성 패턴
seasonal_patterns = {
"spring": ["bullish_patterns"],
"summer": ["consolidation_patterns"],
"autumn": ["reversal_patterns"],
"winter": ["bearish_patterns"]
}
return {
"pattern_frequency": pattern_frequency,
"pattern_success_rates": pattern_success_rates,
"average_pattern_duration": average_pattern_duration,
"pattern_correlations": pattern_correlations,
"seasonal_patterns": seasonal_patterns
}
except Exception as e:
raise PredictionError(f"Pattern statistics collection failed: {str(e)}")
async def detect_real_time_patterns(self, streaming_data: List[Dict[str, Any]],
min_data_points: int = 5) -> Dict[str, Any]:
"""실시간 패턴 감지"""
try:
if len(streaming_data) < min_data_points:
return {
"active_patterns": [],
"completed_patterns": [],
"emerging_patterns": [],
"pattern_updates": []
}
# 최근 데이터로 패턴 감지
recent_patterns = await self.detect_all_patterns(streaming_data)
# 활성 패턴 (진행 중)
active_patterns = []
for pattern in recent_patterns["all_patterns"]:
end_idx = pattern.get("end_index", 0)
if end_idx >= len(streaming_data) - 3: # 최근 3개 캔들 이내
active_patterns.append(pattern)
# 완료된 패턴
completed_patterns = []
for pattern in recent_patterns["all_patterns"]:
end_idx = pattern.get("end_index", 0)
if end_idx < len(streaming_data) - 10: # 10개 캔들 이전
completed_patterns.append(pattern)
# 새로 형성되는 패턴
emerging_patterns = []
if len(streaming_data) >= 10:
latest_chunk = streaming_data[-10:]
latest_patterns = await self.detect_all_patterns(latest_chunk)
emerging_patterns = latest_patterns["all_patterns"]
# 패턴 업데이트
pattern_updates = [
{
"update_type": "new_pattern",
"pattern_count": len(active_patterns),
"timestamp": datetime.now().isoformat()
}
]
return {
"active_patterns": active_patterns,
"completed_patterns": completed_patterns,
"emerging_patterns": emerging_patterns,
"pattern_updates": pattern_updates
}
except Exception as e:
return {
"active_patterns": [],
"completed_patterns": [],
"emerging_patterns": [],
"pattern_updates": [],
"error": str(e)
}
async def analyze_pattern_combinations(self, patterns: List[Dict[str, Any]],
combination_rules: Dict[str, Any]) -> Dict[str, Any]:
"""패턴 조합 분석"""
try:
pattern_combinations = []
synergy_scores = {}
conflict_analysis = {}
# 패턴 조합 생성
for i, pattern1 in enumerate(patterns):
for j, pattern2 in enumerate(patterns[i+1:], i+1):
overlap = self._calculate_pattern_overlap(pattern1, pattern2)
correlation = self._calculate_pattern_correlation(pattern1, pattern2)
if overlap <= combination_rules.get("max_overlap", 0.5) and \
correlation >= combination_rules.get("min_correlation", 0.3):
combined_strength = (pattern1.get("confidence", 0.5) +
pattern2.get("confidence", 0.5)) / 2
synergy_score = correlation * (1 - overlap)
combination = {
"patterns": [pattern1["pattern_type"], pattern2["pattern_type"]],
"combined_strength": combined_strength,
"synergy_score": synergy_score,
"recommendation": self._generate_combination_recommendation(
pattern1, pattern2, synergy_score
)
}
pattern_combinations.append(combination)
combo_key = f"{pattern1['pattern_type']}_{pattern2['pattern_type']}"
synergy_scores[combo_key] = synergy_score
# 충돌 분석
for combination in pattern_combinations:
combo_key = f"{combination['patterns'][0]}_{combination['patterns'][1]}"
if combination["synergy_score"] < 0.2:
conflict_analysis[combo_key] = {
"conflict_level": "high",
"reason": "Low synergy between patterns"
}
else:
conflict_analysis[combo_key] = {
"conflict_level": "low",
"reason": "Patterns complement each other"
}
# 최적 조합
optimal_combinations = sorted(pattern_combinations,
key=lambda x: x["synergy_score"],
reverse=True)[:3]
return {
"pattern_combinations": pattern_combinations,
"synergy_scores": synergy_scores,
"conflict_analysis": conflict_analysis,
"optimal_combinations": optimal_combinations
}
except Exception as e:
raise PredictionError(f"Pattern combination analysis failed: {str(e)}")
def set_sensitivity(self, sensitivity_level: str) -> None:
"""감도 레벨 설정"""
if sensitivity_level in self.sensitivity_levels:
self.current_sensitivity = self.sensitivity_levels[sensitivity_level]
def enable_pattern_types(self, pattern_types: List[str]) -> None:
"""패턴 타입 활성화"""
for pattern_type in pattern_types:
if pattern_type not in self.active_pattern_types:
self.active_pattern_types.append(pattern_type)
def disable_pattern_types(self, pattern_types: List[str]) -> None:
"""패턴 타입 비활성화"""
for pattern_type in pattern_types:
if pattern_type in self.active_pattern_types:
self.active_pattern_types.remove(pattern_type)
def get_performance_metrics(self) -> Dict[str, Any]:
"""성능 메트릭 조회"""
return self.performance_metrics.copy()
# === 내부 헬퍼 메서드들 ===
async def _build_pattern_templates(self, training_data: List[Dict[str, Any]]) -> None:
"""패턴 템플릿 구축"""
# 각 패턴 유형별로 템플릿 생성
for category, patterns in self.pattern_types.items():
for pattern in patterns:
self.pattern_templates[pattern] = {
"template_data": training_data[:10], # 샘플 템플릿
"key_characteristics": {
"min_points": 3,
"max_points": 10,
"duration_range": (5, 50)
}
}
async def _train_pattern_category(self, category: str, patterns: List[str],
training_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""패턴 카테고리별 훈련"""
model = {
"category": category,
"patterns": patterns,
"trained_at": datetime.now().isoformat(),
"training_samples": len(training_data)
}
metrics = {
"accuracy": 0.85,
"precision": 0.82,
"recall": 0.78,
"patterns_learned": len(patterns)
}
return {"model": model, "metrics": metrics}
async def _detect_specific_chart_pattern(self, data: List[Dict[str, Any]],
pattern_type: str) -> Dict[str, Any]:
"""특정 차트 패턴 감지"""
# 패턴별 감지 로직 (간단한 시뮬레이션)
if pattern_type == "head_shoulders":
return await self._detect_head_shoulders(data)
elif pattern_type == "double_top":
return await self._detect_double_top(data)
elif pattern_type == "triangle":
return await self._detect_triangle(data)
else:
return {
"detected": False,
"confidence": 0.0,
"start_index": 0,
"end_index": 0,
"key_points": [],
"details": {}
}
async def _detect_head_shoulders(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""헤드앤숄더 패턴 감지"""
if len(data) < 10:
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
# 간단한 헤드앤숄더 패턴 감지 시뮬레이션
prices = [d.get("close", d.get("price", 0)) for d in data]
# 3개의 피크 찾기
peaks = []
for i in range(1, len(prices) - 1):
if prices[i] > prices[i-1] and prices[i] > prices[i+1]:
peaks.append({"index": i, "price": prices[i]})
if len(peaks) >= 3:
# 헤드앤숄더 조건 확인
head = max(peaks, key=lambda x: x["price"])
shoulders = [p for p in peaks if p != head]
if len(shoulders) >= 2:
left_shoulder = min(shoulders, key=lambda x: x["index"])
right_shoulder = max(shoulders, key=lambda x: x["index"])
# 패턴 검증
if (left_shoulder["index"] < head["index"] < right_shoulder["index"] and
abs(left_shoulder["price"] - right_shoulder["price"]) / head["price"] < 0.05):
return {
"detected": True,
"confidence": 0.8,
"start_index": left_shoulder["index"],
"end_index": right_shoulder["index"],
"key_points": [
{"index": left_shoulder["index"], "price": left_shoulder["price"], "type": "left_shoulder"},
{"index": head["index"], "price": head["price"], "type": "head"},
{"index": right_shoulder["index"], "price": right_shoulder["price"], "type": "right_shoulder"}
],
"details": {
"pattern_height": head["price"] - min(left_shoulder["price"], right_shoulder["price"]),
"symmetry_score": 1 - abs(left_shoulder["price"] - right_shoulder["price"]) / head["price"]
}
}
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
async def _detect_double_top(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""더블탑 패턴 감지"""
if len(data) < 8:
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
prices = [d.get("close", d.get("price", 0)) for d in data]
# 2개의 주요 피크 찾기
peaks = []
for i in range(2, len(prices) - 2):
if prices[i] > prices[i-1] and prices[i] > prices[i+1] and prices[i] > prices[i-2] and prices[i] > prices[i+2]:
peaks.append({"index": i, "price": prices[i]})
if len(peaks) >= 2:
# 상위 2개 피크 선택
top_peaks = sorted(peaks, key=lambda x: x["price"], reverse=True)[:2]
top_peaks.sort(key=lambda x: x["index"]) # 시간순 정렬
peak1, peak2 = top_peaks[0], top_peaks[1]
# 더블탑 조건 확인
if abs(peak1["price"] - peak2["price"]) / peak1["price"] < 0.03: # 3% 이내
return {
"detected": True,
"confidence": 0.75,
"start_index": peak1["index"],
"end_index": peak2["index"],
"key_points": [
{"index": peak1["index"], "price": peak1["price"], "type": "first_top"},
{"index": peak2["index"], "price": peak2["price"], "type": "second_top"}
],
"details": {
"price_difference": abs(peak1["price"] - peak2["price"]),
"time_separation": peak2["index"] - peak1["index"]
}
}
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
async def _detect_triangle(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""삼각형 패턴 감지"""
if len(data) < 10:
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
prices = [d.get("close", d.get("price", 0)) for d in data]
highs = [d.get("high", d.get("price", 0)) for d in data]
lows = [d.get("low", d.get("price", 0)) for d in data]
# 삼각형 패턴: 고점은 낮아지고 저점은 높아지는 수렴
high_trend = self._calculate_trend_slope(highs)
low_trend = self._calculate_trend_slope(lows)
# 수렴 조건: 고점은 하락, 저점은 상승
if high_trend < -0.001 and low_trend > 0.001:
return {
"detected": True,
"confidence": 0.7,
"start_index": 0,
"end_index": len(data) - 1,
"key_points": [
{"index": 0, "price": highs[0], "type": "triangle_start_high"},
{"index": 0, "price": lows[0], "type": "triangle_start_low"},
{"index": len(data)-1, "price": highs[-1], "type": "triangle_end_high"},
{"index": len(data)-1, "price": lows[-1], "type": "triangle_end_low"}
],
"details": {
"high_slope": high_trend,
"low_slope": low_trend,
"convergence_rate": abs(high_trend) + abs(low_trend)
}
}
return {"detected": False, "confidence": 0.0, "start_index": 0, "end_index": 0, "key_points": [], "details": {}}
def _calculate_trend_slope(self, values: List[float]) -> float:
"""트렌드 기울기 계산"""
if len(values) < 2:
return 0.0
n = len(values)
sum_x = sum(range(n))
sum_y = sum(values)
sum_xy = sum(i * values[i] for i in range(n))
sum_x2 = sum(i * i for i in range(n))
denominator = n * sum_x2 - sum_x * sum_x
if denominator == 0:
return 0.0
slope = (n * sum_xy - sum_x * sum_y) / denominator
return slope
async def _detect_candlestick_at_index(self, data: List[Dict[str, Any]], index: int,
sensitivity: float) -> Dict[str, Any]:
"""특정 인덱스에서 캔들스틱 패턴 감지"""
if index < 1 or index >= len(data):
return {"pattern_detected": False}
current = data[index]
previous = data[index - 1]
open_price = current.get("open", current.get("price", 0))
close_price = current.get("close", current.get("price", 0))
high_price = current.get("high", current.get("price", 0))
low_price = current.get("low", current.get("price", 0))
body_size = abs(close_price - open_price)
total_size = high_price - low_price
if total_size == 0:
return {"pattern_detected": False}
body_ratio = body_size / total_size
# 도지 패턴
if body_ratio < 0.1:
return {
"pattern_detected": True,
"pattern_name": "doji",
"direction": "neutral",
"reliability": 0.7,
"interpretation": "시장 망설임, 트렌드 전환 가능성",
"signal": "wait"
}
# 해머 패턴
if (close_price > open_price and
(high_price - close_price) < body_size * 0.3 and
(open_price - low_price) > body_size * 2):
return {
"pattern_detected": True,
"pattern_name": "hammer",
"direction": "bullish",
"reliability": 0.8,
"interpretation": "하락 트렌드 바닥, 반등 신호",
"signal": "buy"
}
# 강세 망치형
if (close_price > open_price and body_ratio > 0.6):
return {
"pattern_detected": True,
"pattern_name": "bullish_engulfing",
"direction": "bullish",
"reliability": 0.75,
"interpretation": "강한 상승 신호",
"signal": "buy"
}
# 약세 망치형
if (close_price < open_price and body_ratio > 0.6):
return {
"pattern_detected": True,
"pattern_name": "bearish_engulfing",
"direction": "bearish",
"reliability": 0.75,
"interpretation": "강한 하락 신호",
"signal": "sell"
}
return {"pattern_detected": False}
async def _detect_divergences(self, data: List[Dict[str, Any]], indicators: List[str]) -> List[Dict[str, Any]]:
"""다이버전스 감지"""
divergences = []
if len(data) < 10:
return divergences
prices = [d.get("close", d.get("price", 0)) for d in data]
# 간단한 다이버전스 시뮬레이션
if "rsi" in indicators:
# RSI 다이버전스
rsi_values = [d.get("rsi", 50) for d in data]
# 가격은 상승하지만 RSI는 하락하는 경우
price_trend = self._calculate_trend_slope(prices[-10:])
rsi_trend = self._calculate_trend_slope(rsi_values[-10:])
if price_trend > 0.001 and rsi_trend < -0.1:
divergences.append({
"type": "bearish_divergence",
"indicator": "rsi",
"strength": "medium",
"start_index": len(data) - 10,
"end_index": len(data) - 1
})
return divergences
async def _detect_breakouts(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""브레이크아웃 감지"""
breakouts = []
if len(data) < 20:
return breakouts
prices = [d.get("close", d.get("price", 0)) for d in data]
volumes = [d.get("volume", 0) for d in data]
# 최근 20일 고점/저점
recent_high = max(prices[-20:])
recent_low = min(prices[-20:])
current_price = prices[-1]
current_volume = volumes[-1]
avg_volume = sum(volumes[-20:-1]) / 19
# 상향 브레이크아웃
if (current_price > recent_high * 1.02 and # 2% 이상 돌파
current_volume > avg_volume * 1.5): # 거래량 증가
breakouts.append({
"type": "upward_breakout",
"level": recent_high,
"current_price": current_price,
"volume_ratio": current_volume / avg_volume,
"confidence": 0.8,
"start_index": len(data) - 20,
"end_index": len(data) - 1
})
# 하향 브레이크아웃
elif (current_price < recent_low * 0.98 and # 2% 이상 하락
current_volume > avg_volume * 1.5):
breakouts.append({
"type": "downward_breakout",
"level": recent_low,
"current_price": current_price,
"volume_ratio": current_volume / avg_volume,
"confidence": 0.8,
"start_index": len(data) - 20,
"end_index": len(data) - 1
})
return breakouts
async def _detect_support_resistance_levels(self, data: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""지지/저항 레벨 감지"""
if len(data) < 20:
return {"support_levels": [], "resistance_levels": []}
# 가격 데이터를 우선적으로 사용하고, high/low가 있으면 활용
prices = [d.get("close", d.get("price", 0)) for d in data]
highs = [d.get("high", d.get("close", d.get("price", 0))) for d in data]
lows = [d.get("low", d.get("close", d.get("price", 0))) for d in data]
# 저항 레벨 (고점들) - 더 관대한 조건
resistance_levels = []
for i in range(1, len(prices) - 1):
# 단순 극값 조건
if prices[i] > prices[i-1] and prices[i] > prices[i+1]:
# 추가 조건: 주변 값들보다 유의미하게 높음
if i >= 2 and i < len(prices) - 2:
if prices[i] > max(prices[i-2], prices[i+2]):
resistance_levels.append({
"level": highs[i],
"index": i,
"strength": 0.7,
"touches": 1
})
else:
resistance_levels.append({
"level": highs[i],
"index": i,
"strength": 0.6,
"touches": 1
})
# 지지 레벨 (저점들) - 더 관대한 조건
support_levels = []
for i in range(1, len(prices) - 1):
# 단순 극값 조건
if prices[i] < prices[i-1] and prices[i] < prices[i+1]:
# 추가 조건: 주변 값들보다 유의미하게 낮음
if i >= 2 and i < len(prices) - 2:
if prices[i] < min(prices[i-2], prices[i+2]):
support_levels.append({
"level": lows[i],
"index": i,
"strength": 0.7,
"touches": 1
})
else:
support_levels.append({
"level": lows[i],
"index": i,
"strength": 0.6,
"touches": 1
})
# 최소한 하나씩은 보장 (가장 높은 값과 가장 낮은 값)
if not resistance_levels:
max_idx = prices.index(max(prices))
resistance_levels.append({
"level": highs[max_idx],
"index": max_idx,
"strength": 0.5,
"touches": 1
})
if not support_levels:
min_idx = prices.index(min(prices))
support_levels.append({
"level": lows[min_idx],
"index": min_idx,
"strength": 0.5,
"touches": 1
})
return {
"support_levels": support_levels[-5:], # 최근 5개
"resistance_levels": resistance_levels[-5:] # 최근 5개
}
async def _detect_trend_reversals(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""트렌드 전환 감지"""
reversals = []
if len(data) < 15:
return reversals
prices = [d.get("close", d.get("price", 0)) for d in data]
# 단기, 중기 트렌드 비교
short_term_slope = self._calculate_trend_slope(prices[-5:])
medium_term_slope = self._calculate_trend_slope(prices[-15:])
# 트렌드 전환 조건
if medium_term_slope > 0.01 and short_term_slope < -0.01: # 상승 -> 하락
reversals.append({
"type": "bearish_reversal",
"strength": abs(short_term_slope - medium_term_slope),
"index": len(data) - 1,
"confidence": 0.6
})
elif medium_term_slope < -0.01 and short_term_slope > 0.01: # 하락 -> 상승
reversals.append({
"type": "bullish_reversal",
"strength": abs(short_term_slope - medium_term_slope),
"index": len(data) - 1,
"confidence": 0.6
})
return reversals
async def _detect_harmonic_pattern(self, data: List[Dict[str, Any]], pattern_type: str,
tolerance: float) -> Dict[str, Any]:
"""하모닉 패턴 감지"""
if len(data) < 10:
return {"detected": False}
# 간단한 하모닉 패턴 시뮬레이션
prices = [d.get("close", d.get("price", 0)) for d in data]
# 5포인트 찾기 (X, A, B, C, D)
if len(prices) >= 5:
x_point = {"index": 0, "price": prices[0]}
a_point = {"index": len(prices)//4, "price": prices[len(prices)//4]}
b_point = {"index": len(prices)//2, "price": prices[len(prices)//2]}
c_point = {"index": 3*len(prices)//4, "price": prices[3*len(prices)//4]}
d_point = {"index": len(prices)-1, "price": prices[-1]}
# 가트리 패턴 비율 확인 (간단화)
if pattern_type == "gartley":
return {
"detected": True,
"points": {"X": x_point, "A": a_point, "B": b_point, "C": c_point, "D": d_point},
"ratios": {"AB_XA": 0.618, "BC_AB": 0.382, "CD_BC": 1.272},
"completion_zone": {"min": d_point["price"] * 0.98, "max": d_point["price"] * 1.02},
"profit_targets": [d_point["price"] * 1.05, d_point["price"] * 1.10],
"reversal_zones": [{"level": d_point["price"], "strength": 0.8}]
}
return {"detected": False}
async def _count_elliott_waves(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""엘리엇 파동 카운팅"""
prices = [d.get("close", d.get("price", 0)) for d in data]
# 간단한 파동 식별
waves = []
wave_count = 0
# 5개 임펄스 파동 시뮬레이션
chunk_size = len(prices) // 5
for i in range(5):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(prices))
if start_idx < len(prices):
wave_type = "impulse" if i % 2 == 0 else "corrective"
waves.append({
"start": start_idx,
"end": end_idx - 1,
"type": wave_type
})
wave_count += 1
return {
"total_waves": wave_count,
"waves": waves,
"wave_degree": "primary"
}
async def _identify_current_wave(self, data: List[Dict[str, Any]], wave_count: Dict[str, Any]) -> Dict[str, Any]:
"""현재 파동 식별"""
waves = wave_count.get("waves", [])
if not waves:
return {"wave_number": 0, "wave_type": "unknown", "progress": 0.0}
# 마지막 파동을 현재 파동으로 가정
current_wave = waves[-1]
progress = 0.8 # 80% 진행으로 가정
return {
"wave_number": len(waves),
"wave_type": current_wave["type"],
"progress": progress,
"start_index": current_wave["start"],
"end_index": current_wave["end"]
}
async def _project_next_wave(self, wave_structure: List[Dict[str, Any]],
current_wave: Dict[str, Any]) -> Dict[str, Any]:
"""다음 파동 예측"""
if not wave_structure:
return {"projected_type": "unknown", "target_level": 0, "probability": 0.0}
# 간단한 다음 파동 예측
current_type = current_wave.get("wave_type", "impulse")
next_type = "corrective" if current_type == "impulse" else "impulse"
return {
"projected_type": next_type,
"target_level": 0.0, # 계산 생략
"probability": 0.7,
"estimated_duration": 10
}
async def _confirm_single_pattern(self, pattern: Dict[str, Any], data: List[Dict[str, Any]],
criteria: List[str]) -> Dict[str, Any]:
"""단일 패턴 확인"""
confirmation_scores = {}
total_score = 0
# 볼륨 확인
if "volume" in criteria:
volume_score = await self._check_volume_confirmation(pattern, data)
confirmation_scores["volume"] = volume_score
total_score += volume_score
# 모멘텀 확인
if "momentum" in criteria:
momentum_score = await self._check_momentum_confirmation(pattern, data)
confirmation_scores["momentum"] = momentum_score
total_score += momentum_score
# 시간 확인
if "time" in criteria:
time_score = await self._check_time_confirmation(pattern, data)
confirmation_scores["time"] = time_score
total_score += time_score
avg_score = total_score / len(criteria) if criteria else 0
confirmed = avg_score >= 0.6
# 확인된 경우 신뢰도 향상
original_confidence = pattern.get("confidence", 0.5)
new_confidence = min(original_confidence + 0.2, 1.0) if confirmed else original_confidence
return {
"confirmed": confirmed,
"score": avg_score,
"new_confidence": new_confidence,
"failure_reason": "Low confirmation score" if not confirmed else None
}
async def _check_volume_confirmation(self, pattern: Dict[str, Any], data: List[Dict[str, Any]]) -> float:
"""볼륨 확인"""
if not data:
return 0.5
# 패턴 기간의 평균 볼륨과 이전 기간 비교
start_idx = max(0, pattern.get("start_index", 0))
end_idx = min(len(data), pattern.get("end_index", len(data)))
if start_idx >= end_idx:
return 0.5
pattern_volumes = [d.get("volume", 0) for d in data[start_idx:end_idx]]
avg_pattern_volume = sum(pattern_volumes) / len(pattern_volumes) if pattern_volumes else 0
# 이전 기간과 비교
prev_start = max(0, start_idx - (end_idx - start_idx))
prev_volumes = [d.get("volume", 0) for d in data[prev_start:start_idx]]
avg_prev_volume = sum(prev_volumes) / len(prev_volumes) if prev_volumes else 1
volume_ratio = avg_pattern_volume / avg_prev_volume if avg_prev_volume > 0 else 1
# 볼륨이 증가했으면 높은 점수
return min(volume_ratio / 2, 1.0)
async def _check_momentum_confirmation(self, pattern: Dict[str, Any], data: List[Dict[str, Any]]) -> float:
"""모멘텀 확인"""
if len(data) < 5:
return 0.5
# 최근 가격 모멘텀 확인
recent_prices = [d.get("close", d.get("price", 0)) for d in data[-5:]]
momentum = self._calculate_trend_slope(recent_prices)
# 패턴 방향과 모멘텀 일치 확인
pattern_direction = self._determine_pattern_direction(pattern)
if pattern_direction == "bullish" and momentum > 0:
return 0.8
elif pattern_direction == "bearish" and momentum < 0:
return 0.8
elif pattern_direction == "neutral":
return 0.6
else:
return 0.3
async def _check_time_confirmation(self, pattern: Dict[str, Any], data: List[Dict[str, Any]]) -> float:
"""시간 확인"""
# 패턴이 적절한 시간 동안 지속되었는지 확인
start_idx = pattern.get("start_index", 0)
end_idx = pattern.get("end_index", 0)
duration = end_idx - start_idx + 1
# 적절한 지속시간 (5-50 캔들)
if 5 <= duration <= 50:
return 0.8
elif duration < 5:
return 0.4 # 너무 짧음
else:
return 0.6 # 너무 길음
async def _backtest_single_pattern(self, pattern: Dict[str, Any], data: List[Dict[str, Any]],
entry_rules: Dict[str, Any], exit_rules: Dict[str, Any]) -> Dict[str, Any]:
"""단일 패턴 백테스트"""
# 간단한 백테스트 시뮬레이션
confidence_threshold = entry_rules.get("confidence_threshold", 0.7)
profit_target = exit_rules.get("profit_target", 0.05)
stop_loss = exit_rules.get("stop_loss", 0.02)
pattern_confidence = pattern.get("confidence", 0.5)
if pattern_confidence < confidence_threshold:
return {"profitable": False, "return": 0.0, "reason": "Low confidence"}
# 패턴 종료 후 가격 변화 시뮬레이션
end_idx = pattern.get("end_index", len(data) - 1)
if end_idx >= len(data) - 1:
return {"profitable": False, "return": 0.0, "reason": "Insufficient future data"}
entry_price = data[end_idx].get("close", data[end_idx].get("price", 0))
# 향후 몇 캔들의 가격 변화 확인
future_returns = []
for i in range(min(5, len(data) - end_idx - 1)):
future_price = data[end_idx + i + 1].get("close", data[end_idx + i + 1].get("price", 0))
future_return = (future_price - entry_price) / entry_price if entry_price > 0 else 0
future_returns.append(future_return)
if not future_returns:
return {"profitable": False, "return": 0.0, "reason": "No future data"}
max_return = max(future_returns)
min_return = min(future_returns)
# 수익/손실 결정
if max_return >= profit_target:
return {"profitable": True, "return": profit_target}
elif min_return <= -stop_loss:
return {"profitable": False, "return": -stop_loss}
else:
final_return = future_returns[-1]
return {"profitable": final_return > 0, "return": final_return}
async def _resample_data_for_timeframe(self, data: List[Dict[str, Any]], timeframe: str) -> List[Dict[str, Any]]:
"""타임프레임별 데이터 리샘플링"""
# 간단한 리샘플링 시뮬레이션
if timeframe == "5m":
return data # 원본 데이터
elif timeframe == "15m":
return data[::3] # 3개마다 하나씩
elif timeframe == "1h":
return data[::12] # 12개마다 하나씩
elif timeframe == "4h":
return data[::48] # 48개마다 하나씩
else:
return data
def _determine_pattern_direction(self, pattern: Dict[str, Any]) -> str:
"""패턴 방향 결정"""
pattern_type = pattern.get("pattern_type", "")
bullish_patterns = ["hammer", "bullish_engulfing", "upward_breakout", "bullish_reversal"]
bearish_patterns = ["bearish_engulfing", "downward_breakout", "bearish_reversal", "head_shoulders"]
if any(bp in pattern_type for bp in bullish_patterns):
return "bullish"
elif any(bp in pattern_type for bp in bearish_patterns):
return "bearish"
else:
return "neutral"
async def _identify_confluence_zones(self, timeframe_patterns: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""컨플루언스 존 식별"""
confluence_zones = []
# 여러 타임프레임에서 동일한 방향의 패턴이 있는 구간
bullish_count = 0
bearish_count = 0
for timeframe, patterns in timeframe_patterns.items():
for pattern in patterns:
if pattern["direction"] == "bullish":
bullish_count += 1
elif pattern["direction"] == "bearish":
bearish_count += 1
if bullish_count >= 2:
confluence_zones.append({
"type": "bullish_confluence",
"strength": bullish_count / len(timeframe_patterns),
"timeframes_involved": bullish_count
})
if bearish_count >= 2:
confluence_zones.append({
"type": "bearish_confluence",
"strength": bearish_count / len(timeframe_patterns),
"timeframes_involved": bearish_count
})
return confluence_zones
async def _analyze_timeframe_alignment(self, timeframe_patterns: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""타임프레임 정렬 분석"""
alignment_score = 0
total_patterns = 0
direction_counts = {"bullish": 0, "bearish": 0, "neutral": 0}
for patterns in timeframe_patterns.values():
for pattern in patterns:
direction_counts[pattern["direction"]] += 1
total_patterns += 1
if total_patterns > 0:
max_direction = max(direction_counts, key=direction_counts.get)
alignment_score = direction_counts[max_direction] / total_patterns
return {
"alignment_score": alignment_score,
"dominant_direction": max(direction_counts, key=direction_counts.get),
"direction_distribution": direction_counts
}
async def _calculate_pattern_clarity(self, pattern: Dict[str, Any]) -> float:
"""패턴 명확도 계산"""
# 패턴의 명확도는 기본 신뢰도 기반
base_confidence = pattern.get("confidence", 0.5)
# 패턴 타입별 보정
pattern_type = pattern.get("pattern_type", "")
if "head_shoulders" in pattern_type or "double_top" in pattern_type:
return min(base_confidence + 0.1, 1.0)
else:
return base_confidence
async def _calculate_volume_support(self, pattern: Dict[str, Any], market_context: Dict[str, Any]) -> float:
"""볼륨 지지 계산"""
# 시장 상황에 따른 볼륨 지지도
volatility = market_context.get("volatility", "medium")
if volatility == "high":
return 0.8 # 높은 변동성에서는 볼륨 지지가 중요
elif volatility == "low":
return 0.5 # 낮은 변동성에서는 볼륨 지지가 덜 중요
else:
return 0.65 # 중간 변동성
async def _calculate_market_alignment(self, pattern: Dict[str, Any], market_context: Dict[str, Any]) -> float:
"""시장 정렬 계산"""
# 패턴 방향과 시장 트렌드 일치도
pattern_direction = self._determine_pattern_direction(pattern)
market_trend = market_context.get("trend", "neutral")
if pattern_direction == market_trend:
return 0.9 # 높은 정렬
elif pattern_direction == "neutral" or market_trend == "neutral":
return 0.6 # 중간 정렬
else:
return 0.3 # 낮은 정렬 (역방향)
async def _generate_price_predictions(self, patterns: List[Dict[str, Any]], horizon: int) -> List[Dict[str, Any]]:
"""가격 예측 생성"""
predictions = []
for i in range(horizon):
# 패턴 기반 간단한 가격 예측
base_change = 0.0
for pattern in patterns:
direction = self._determine_pattern_direction(pattern)
confidence = pattern.get("confidence", 0.5)
if direction == "bullish":
base_change += 0.01 * confidence # 1% 상승 기대
elif direction == "bearish":
base_change -= 0.01 * confidence # 1% 하락 기대
predictions.append({
"period": i + 1,
"expected_change": base_change,
"confidence": 0.7
})
return predictions
async def _calculate_target_levels(self, patterns: List[Dict[str, Any]]) -> Dict[str, float]:
"""타겟 레벨 계산"""
target_levels = {}
if patterns:
# 평균 신뢰도 기반 타겟 계산
avg_confidence = sum(p.get("confidence", 0.5) for p in patterns) / len(patterns)
target_levels = {
"support": 0.98, # 2% 하락
"resistance": 1.03, # 3% 상승
"stop_loss": 0.95, # 5% 손절
"take_profit": 1.05 # 5% 익절
}
return target_levels
async def _calculate_confidence_intervals(self, patterns: List[Dict[str, Any]], horizon: int) -> Dict[str, Any]:
"""신뢰구간 계산"""
if not patterns:
return {"lower_bound": 0.95, "upper_bound": 1.05, "confidence_level": 0.95}
avg_confidence = sum(p.get("confidence", 0.5) for p in patterns) / len(patterns)
# 신뢰도 기반 구간 계산
interval_width = (1 - avg_confidence) * 0.1 # 최대 10% 구간
return {
"lower_bound": 1 - interval_width,
"upper_bound": 1 + interval_width,
"confidence_level": avg_confidence
}
def _should_trigger_alert(self, pattern: Dict[str, Any]) -> bool:
"""알림 트리거 여부 확인"""
if not self.alert_config:
return False
pattern_types = self.alert_config.get("patterns", [])
min_confidence = self.alert_config.get("min_confidence", 0.8)
min_strength = self.alert_config.get("min_strength", 0.7)
pattern_type = pattern.get("pattern_type", "")
confidence = pattern.get("confidence", 0.0)
strength = pattern.get("strength_score", confidence)
return (pattern_type in pattern_types and
confidence >= min_confidence and
strength >= min_strength)
def _determine_alert_severity(self, pattern: Dict[str, Any]) -> str:
"""알림 심각도 결정"""
confidence = pattern.get("confidence", 0.0)
if confidence >= 0.9:
return "critical"
elif confidence >= 0.8:
return "high"
elif confidence >= 0.6:
return "medium"
else:
return "low"
def _calculate_pattern_overlap(self, pattern1: Dict[str, Any], pattern2: Dict[str, Any]) -> float:
"""패턴 겹침 계산"""
start1 = pattern1.get("start_index", 0)
end1 = pattern1.get("end_index", 0)
start2 = pattern2.get("start_index", 0)
end2 = pattern2.get("end_index", 0)
# 겹치는 구간 계산
overlap_start = max(start1, start2)
overlap_end = min(end1, end2)
if overlap_start <= overlap_end:
overlap_length = overlap_end - overlap_start + 1
total_length = max(end1, end2) - min(start1, start2) + 1
return overlap_length / total_length if total_length > 0 else 0
else:
return 0.0
def _calculate_pattern_correlation(self, pattern1: Dict[str, Any], pattern2: Dict[str, Any]) -> float:
"""패턴 상관관계 계산"""
# 패턴 방향 기반 상관관계
dir1 = self._determine_pattern_direction(pattern1)
dir2 = self._determine_pattern_direction(pattern2)
if dir1 == dir2:
return 0.8 # 같은 방향
elif "neutral" in [dir1, dir2]:
return 0.5 # 중립
else:
return 0.2 # 반대 방향
def _generate_combination_recommendation(self, pattern1: Dict[str, Any], pattern2: Dict[str, Any],
synergy_score: float) -> str:
"""조합 추천 생성"""
if synergy_score > 0.7:
return "강력한 신호 - 두 패턴이 서로 보완하여 높은 확률의 거래 기회"
elif synergy_score > 0.5:
return "보통 신호 - 패턴들이 어느 정도 일치하지만 추가 확인 필요"
else:
return "약한 신호 - 패턴들 간의 상충으로 인해 거래 신중히 고려"