Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
adaptive_window_search.py6.53 kB
import numpy as np from typing import List, Dict """ 自适应窗口搜索算法 用于优化滑动窗口相似度分析的效率和准确性 """ def adaptive_window_search( data1: np.ndarray, data2: np.ndarray, base_window_size: int, similarity_func, min_similarity: float = 0.6, max_segments: int = 20, ) -> List[Dict]: """ 自适应窗口搜索算法 Args: data1, data2: 输入数据序列 base_window_size: 基础窗口大小 similarity_func: 相似度计算函数 min_similarity: 最小相似度阈值 max_segments: 最大返回段数 Returns: 相似时间段列表 """ if len(data1) < base_window_size or len(data2) < base_window_size: return [] # 第一阶段:粗粒度快速扫描 coarse_step = max(1, base_window_size // 5) coarse_candidates = [] for i in range(0, len(data1) - base_window_size + 1, coarse_step): window1 = data1[i : i + base_window_size] window2 = data2[i : i + base_window_size] # 快速相似度估算(只使用DTW和Pearson) quick_similarity = _quick_similarity_estimate(window1, window2) if quick_similarity >= min_similarity * 0.8: # 降低初筛阈值 coarse_candidates.append( { "start": i, "end": i + base_window_size - 1, "quick_score": quick_similarity, } ) # 第二阶段:精细化分析候选区域 refined_results = [] for candidate in coarse_candidates: # 在候选区域周围进行精细搜索 search_start = max(0, candidate["start"] - base_window_size // 4) search_end = min( len(data1) - base_window_size + 1, candidate["end"] - base_window_size + base_window_size // 4, ) fine_step = max(1, base_window_size // 20) for i in range(search_start, search_end, fine_step): window1 = data1[i : i + base_window_size] window2 = data2[i : i + base_window_size] # 完整相似度计算 similarities = similarity_func(window1, window2) # 综合评分 weights = { "dtw": 0.35, "shape": 0.25, "frequency": 0.15, "peaks": 0.10, "change_rate": 0.10, "curvature": 0.05, } similarity = sum(similarities.get(key, 0) * weights[key] for key in weights) if similarity >= min_similarity: refined_results.append( { "start_index": i, "end_index": i + base_window_size - 1, "similarity": float(similarity), "similarity_details": { k: float(v) for k, v in similarities.items() }, "window_size": base_window_size, } ) # 第三阶段:去重和排序 return _remove_overlaps_and_rank(refined_results, base_window_size, max_segments) def _quick_similarity_estimate(window1: np.ndarray, window2: np.ndarray) -> float: """快速相似度估算,只使用计算成本较低的指标""" try: # Pearson相关系数 if len(window1) > 1 and np.std(window1) > 1e-8 and np.std(window2) > 1e-8: pearson = abs(np.corrcoef(window1, window2)[0, 1]) if np.isnan(pearson): pearson = 0.0 else: pearson = 0.0 # 简化的形状相似度(基于标准化欧几里得距离) if np.std(window1) > 1e-8 and np.std(window2) > 1e-8: norm1 = (window1 - np.mean(window1)) / np.std(window1) norm2 = (window2 - np.mean(window2)) / np.std(window2) shape_dist = np.sqrt(np.mean((norm1 - norm2) ** 2)) shape_sim = 1 / (1 + shape_dist) else: shape_sim = 1.0 if np.allclose(window1, window2) else 0.0 # 加权平均 return 0.6 * pearson + 0.4 * shape_sim except Exception: return 0.0 def _remove_overlaps_and_rank( results: List[Dict], window_size: int, max_segments: int ) -> List[Dict]: """去除重叠并排序""" if not results: return [] # 按相似度排序 results.sort(key=lambda x: x["similarity"], reverse=True) # 去重 filtered_results = [] for result in results: is_overlapping = False for existing in filtered_results: overlap_start = max(result["start_index"], existing["start_index"]) overlap_end = min(result["end_index"], existing["end_index"]) overlap_length = max(0, overlap_end - overlap_start + 1) overlap_ratio = overlap_length / window_size if overlap_ratio > 0.4: # 40%重叠阈值 is_overlapping = True break if not is_overlapping: filtered_results.append(result) if len(filtered_results) >= max_segments: break return filtered_results def multi_scale_window_analysis( data1: np.ndarray, data2: np.ndarray, base_window_size: int, similarity_func, min_similarity: float = 0.6, ) -> List[Dict]: """ 多尺度窗口分析,使用不同大小的窗口来捕获不同时间尺度的相似性 Args: data1, data2: 输入数据序列 base_window_size: 基础窗口大小 similarity_func: 相似度计算函数 min_similarity: 最小相似度阈值 Returns: 多尺度相似时间段列表 """ all_results = [] # 定义多个窗口尺度 scales = [0.5, 1.0, 1.5, 2.0] # 相对于基础窗口大小的倍数 for scale in scales: window_size = int(base_window_size * scale) if window_size < 3 or window_size >= len(data1): continue # 使用自适应搜索 scale_results = adaptive_window_search( data1, data2, window_size, similarity_func, min_similarity, max_segments=10 ) # 为每个结果添加尺度信息 for result in scale_results: result["scale"] = scale result["scale_description"] = f"{scale}x基础窗口" all_results.extend(scale_results) # 全局去重和排序 return _remove_overlaps_and_rank(all_results, base_window_size, max_segments=20)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server