adaptive_window_search.py•6.53 kB
import numpy as np
from typing import List, Dict
"""
自适应窗口搜索算法
用于优化滑动窗口相似度分析的效率和准确性
"""
def adaptive_window_search(
data1: np.ndarray,
data2: np.ndarray,
base_window_size: int,
similarity_func,
min_similarity: float = 0.6,
max_segments: int = 20,
) -> List[Dict]:
"""
自适应窗口搜索算法
Args:
data1, data2: 输入数据序列
base_window_size: 基础窗口大小
similarity_func: 相似度计算函数
min_similarity: 最小相似度阈值
max_segments: 最大返回段数
Returns:
相似时间段列表
"""
if len(data1) < base_window_size or len(data2) < base_window_size:
return []
# 第一阶段:粗粒度快速扫描
coarse_step = max(1, base_window_size // 5)
coarse_candidates = []
for i in range(0, len(data1) - base_window_size + 1, coarse_step):
window1 = data1[i : i + base_window_size]
window2 = data2[i : i + base_window_size]
# 快速相似度估算(只使用DTW和Pearson)
quick_similarity = _quick_similarity_estimate(window1, window2)
if quick_similarity >= min_similarity * 0.8: # 降低初筛阈值
coarse_candidates.append(
{
"start": i,
"end": i + base_window_size - 1,
"quick_score": quick_similarity,
}
)
# 第二阶段:精细化分析候选区域
refined_results = []
for candidate in coarse_candidates:
# 在候选区域周围进行精细搜索
search_start = max(0, candidate["start"] - base_window_size // 4)
search_end = min(
len(data1) - base_window_size + 1,
candidate["end"] - base_window_size + base_window_size // 4,
)
fine_step = max(1, base_window_size // 20)
for i in range(search_start, search_end, fine_step):
window1 = data1[i : i + base_window_size]
window2 = data2[i : i + base_window_size]
# 完整相似度计算
similarities = similarity_func(window1, window2)
# 综合评分
weights = {
"dtw": 0.35,
"shape": 0.25,
"frequency": 0.15,
"peaks": 0.10,
"change_rate": 0.10,
"curvature": 0.05,
}
similarity = sum(similarities.get(key, 0) * weights[key] for key in weights)
if similarity >= min_similarity:
refined_results.append(
{
"start_index": i,
"end_index": i + base_window_size - 1,
"similarity": float(similarity),
"similarity_details": {
k: float(v) for k, v in similarities.items()
},
"window_size": base_window_size,
}
)
# 第三阶段:去重和排序
return _remove_overlaps_and_rank(refined_results, base_window_size, max_segments)
def _quick_similarity_estimate(window1: np.ndarray, window2: np.ndarray) -> float:
"""快速相似度估算,只使用计算成本较低的指标"""
try:
# Pearson相关系数
if len(window1) > 1 and np.std(window1) > 1e-8 and np.std(window2) > 1e-8:
pearson = abs(np.corrcoef(window1, window2)[0, 1])
if np.isnan(pearson):
pearson = 0.0
else:
pearson = 0.0
# 简化的形状相似度(基于标准化欧几里得距离)
if np.std(window1) > 1e-8 and np.std(window2) > 1e-8:
norm1 = (window1 - np.mean(window1)) / np.std(window1)
norm2 = (window2 - np.mean(window2)) / np.std(window2)
shape_dist = np.sqrt(np.mean((norm1 - norm2) ** 2))
shape_sim = 1 / (1 + shape_dist)
else:
shape_sim = 1.0 if np.allclose(window1, window2) else 0.0
# 加权平均
return 0.6 * pearson + 0.4 * shape_sim
except Exception:
return 0.0
def _remove_overlaps_and_rank(
results: List[Dict], window_size: int, max_segments: int
) -> List[Dict]:
"""去除重叠并排序"""
if not results:
return []
# 按相似度排序
results.sort(key=lambda x: x["similarity"], reverse=True)
# 去重
filtered_results = []
for result in results:
is_overlapping = False
for existing in filtered_results:
overlap_start = max(result["start_index"], existing["start_index"])
overlap_end = min(result["end_index"], existing["end_index"])
overlap_length = max(0, overlap_end - overlap_start + 1)
overlap_ratio = overlap_length / window_size
if overlap_ratio > 0.4: # 40%重叠阈值
is_overlapping = True
break
if not is_overlapping:
filtered_results.append(result)
if len(filtered_results) >= max_segments:
break
return filtered_results
def multi_scale_window_analysis(
data1: np.ndarray,
data2: np.ndarray,
base_window_size: int,
similarity_func,
min_similarity: float = 0.6,
) -> List[Dict]:
"""
多尺度窗口分析,使用不同大小的窗口来捕获不同时间尺度的相似性
Args:
data1, data2: 输入数据序列
base_window_size: 基础窗口大小
similarity_func: 相似度计算函数
min_similarity: 最小相似度阈值
Returns:
多尺度相似时间段列表
"""
all_results = []
# 定义多个窗口尺度
scales = [0.5, 1.0, 1.5, 2.0] # 相对于基础窗口大小的倍数
for scale in scales:
window_size = int(base_window_size * scale)
if window_size < 3 or window_size >= len(data1):
continue
# 使用自适应搜索
scale_results = adaptive_window_search(
data1, data2, window_size, similarity_func, min_similarity, max_segments=10
)
# 为每个结果添加尺度信息
for result in scale_results:
result["scale"] = scale
result["scale_description"] = f"{scale}x基础窗口"
all_results.extend(scale_results)
# 全局去重和排序
return _remove_overlaps_and_rank(all_results, base_window_size, max_segments=20)