two_similarity_analysis.py•27.7 kB
import logging
import asyncio
import numpy as np
from pydantic import BaseModel, Field
from scipy.spatial.distance import euclidean
from typing import List, Optional, Dict, Any
from fastapi import HTTPException, APIRouter
from config.config import *
from routers.utils.openplant import OpenPlant
from routers.utils.adaptive_window_search import (
adaptive_window_search,
multi_scale_window_analysis,
)
# 全局配置
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
router = APIRouter()
logger = logging.getLogger("two_similarity_analysis")
class SimilarityAnalysisRequest(BaseModel):
"""相似度分析请求模型
用于分析两个时间序列数据之间的相似度。
典型应用场景:比较不同传感器数据的相似性、分析时间序列的匹配程度等。
"""
point1: str = Field(
...,
description="第一个数据点名称",
example="W3.NODE1.XX",
)
point2: str = Field(
...,
description="第二个数据点名称",
example="W3.NODE1.XX",
)
start_time: str = Field(
...,
description="开始时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 00:00:00",
)
end_time: str = Field(
...,
description="结束时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 23:59:59",
)
interval: str = Field(
default="1m",
description="采样间隔,建议根据分析时间范围选择:1小时内用'1m',1天内用'3m',1周内用'10m',1月内用'1h'。支持'1m', '3m', '10m', '15m', '30m', '1h', '1d'等",
example="1m",
)
fill_method: str = Field(
default="outer",
description="缺失值填充方法",
example="outer",
)
window_size: Optional[int] = Field(
default=None,
description="滑动窗口大小,用于区间相似度分析,默认为数据长度的1/4",
ge=3,
)
normalize: Optional[bool] = Field(
default=True, description="是否对数据进行标准化处理"
)
class SimilarityAnalysisResponse(BaseModel):
"""相似度分析响应模型"""
similarity_score: float = Field(
..., description="综合相似度评分(0到1之间)", ge=0.0, le=1.0
)
method: str = Field(..., description="使用的分析方法")
data_length: int = Field(..., description="数据点数量")
interpretation: str = Field(..., description="相似度解释")
best_match_info: Optional[Dict[str, Any]] = Field(
None, description="最佳匹配区间信息"
)
def dtw_distance(x, y, max_length=1000, window_constraint=None):
"""改进的DTW距离计算,专门优化用于波形相似性检测
Args:
x, y: 输入时间序列
max_length: 最大长度限制
window_constraint: Sakoe-Chiba带约束,限制路径偏移
"""
# 如果数据太大,进行采样
if len(x) > max_length or len(y) > max_length:
return fast_dtw_distance(x, y)
n, m = len(x), len(y)
# 对于大数据集,使用快速DTW
if n * m > 500000:
return fast_dtw_distance(x, y)
# 更严格的窗口约束,专门用于波形匹配
if window_constraint is None:
window_constraint = max(2, min(n, m) // 30) # 更严格的约束,约3%
# 数据预处理:去除趋势,突出波形特征
x_detrended = x - np.linspace(x[0], x[-1], len(x))
y_detrended = y - np.linspace(y[0], y[-1], len(y))
# 标准化处理,保持波形形状
x_std = np.std(x_detrended)
y_std = np.std(y_detrended)
if x_std > 1e-8:
x_norm = x_detrended / x_std
else:
x_norm = x_detrended
if y_std > 1e-8:
y_norm = y_detrended / y_std
else:
y_norm = y_detrended
# 创建DTW矩阵
dtw_matrix = np.full((n + 1, m + 1), np.inf)
dtw_matrix[0, 0] = 0
for i in range(1, n + 1):
# 应用更严格的Sakoe-Chiba带约束
j_start = max(1, i - window_constraint)
j_end = min(m + 1, i + window_constraint + 1)
for j in range(j_start, j_end):
# 多层次的距离度量
# 1. 基础点距离
point_cost = abs(x_norm[i - 1] - y_norm[j - 1])
# 2. 一阶导数距离(变化率相似性)
derivative_cost = 0
if i > 1 and j > 1:
dx1 = x_norm[i - 1] - x_norm[i - 2]
dy1 = y_norm[j - 1] - y_norm[j - 2]
derivative_cost = abs(dx1 - dy1) * 0.5
# 3. 二阶导数距离(曲率相似性)
curvature_cost = 0
if i > 2 and j > 2:
dx2 = (x_norm[i - 1] - x_norm[i - 2]) - (x_norm[i - 2] - x_norm[i - 3])
dy2 = (y_norm[j - 1] - y_norm[j - 2]) - (y_norm[j - 2] - y_norm[j - 3])
curvature_cost = abs(dx2 - dy2) * 0.3
# 综合成本
total_cost = point_cost + derivative_cost + curvature_cost
# DTW递推关系,增加对角线权重以鼓励更好的对齐
dtw_matrix[i, j] = total_cost + min(
dtw_matrix[i - 1, j] + 0.1, # insertion penalty
dtw_matrix[i, j - 1] + 0.1, # deletion penalty
dtw_matrix[i - 1, j - 1], # match (preferred)
)
return dtw_matrix[n, m]
def fast_dtw_distance(x, y):
"""快速DTW距离计算,使用简化的欧氏距离作为近似"""
min_len = min(len(x), len(y))
x_resized = np.interp(np.linspace(0, 1, min_len), np.linspace(0, 1, len(x)), x)
y_resized = np.interp(np.linspace(0, 1, min_len), np.linspace(0, 1, len(y)), y)
return euclidean(x_resized, y_resized)
def calculate_similarity_metrics(x, y):
"""改进的多维度相似度计算,专门优化用于波形相似性检测"""
similarities = {}
# 数据预处理:去除趋势,突出波形特征
x_detrended = x - np.linspace(x[0], x[-1], len(x))
y_detrended = y - np.linspace(y[0], y[-1], len(y))
# 1. 改进的DTW相似度 - 最适合波形匹配
dtw_dist = dtw_distance(x, y)
# 使用更合理的归一化方法
max_possible_dtw = np.sqrt(len(x)) * (
np.std(x_detrended) + np.std(y_detrended) + 1e-8
)
similarities["dtw"] = max(0, 1 - (dtw_dist / max_possible_dtw))
# 2. 波形形状相似度 - 基于去趋势后的标准化欧氏距离
if np.std(x_detrended) > 1e-8 and np.std(y_detrended) > 1e-8:
x_shape = x_detrended / np.std(x_detrended)
y_shape = y_detrended / np.std(y_detrended)
shape_dist = euclidean(x_shape, y_shape)
max_shape_dist = np.sqrt(2 * len(x))
similarities["shape"] = max(0, 1 - (shape_dist / max_shape_dist))
else:
similarities["shape"] = 0.0
# 3. 频域相似度 - 检测周期性和频率特征
try:
# 使用FFT分析频域特征
fft_x = np.abs(np.fft.fft(x_detrended))[: len(x) // 2]
fft_y = np.abs(np.fft.fft(y_detrended))[: len(y) // 2]
# 归一化频谱
if np.sum(fft_x) > 1e-8 and np.sum(fft_y) > 1e-8:
fft_x_norm = fft_x / np.sum(fft_x)
fft_y_norm = fft_y / np.sum(fft_y)
# 计算频谱相似度
freq_corr = np.corrcoef(fft_x_norm, fft_y_norm)[0, 1]
similarities["frequency"] = (
abs(freq_corr) if not np.isnan(freq_corr) else 0.0
)
else:
similarities["frequency"] = 0.0
except:
similarities["frequency"] = 0.0
# 4. 峰值模式相似度 - 检测波形的峰谷特征
def find_peaks_valleys(data, prominence=0.1):
"""找到数据中的峰值和谷值"""
peaks = []
valleys = []
for i in range(1, len(data) - 1):
if data[i] > data[i - 1] and data[i] > data[i + 1]:
if abs(data[i] - np.mean(data)) > prominence * np.std(data):
peaks.append(i)
elif data[i] < data[i - 1] and data[i] < data[i + 1]:
if abs(data[i] - np.mean(data)) > prominence * np.std(data):
valleys.append(i)
return peaks, valleys
peaks_x, valleys_x = find_peaks_valleys(x_detrended)
peaks_y, valleys_y = find_peaks_valleys(y_detrended)
# 计算峰谷特征相似度
total_features_x = len(peaks_x) + len(valleys_x)
total_features_y = len(peaks_y) + len(valleys_y)
if total_features_x > 0 and total_features_y > 0:
feature_ratio = min(total_features_x, total_features_y) / max(
total_features_x, total_features_y
)
similarities["peaks"] = feature_ratio
else:
similarities["peaks"] = (
1.0 if total_features_x == total_features_y == 0 else 0.0
)
# 5. 变化率相似度 - 检测变化模式的一致性
if len(x) > 1 and len(y) > 1:
diff_x = np.diff(x_detrended)
diff_y = np.diff(y_detrended)
if (
len(diff_x) > 1
and len(diff_y) > 1
and np.std(diff_x) > 1e-8
and np.std(diff_y) > 1e-8
):
# 标准化变化率
diff_x_norm = diff_x / np.std(diff_x)
diff_y_norm = diff_y / np.std(diff_y)
# 计算变化率相关性
rate_corr = np.corrcoef(diff_x_norm, diff_y_norm)[0, 1]
similarities["change_rate"] = (
abs(rate_corr) if not np.isnan(rate_corr) else 0.0
)
else:
similarities["change_rate"] = 0.0
else:
similarities["change_rate"] = 0.0
# 6. 曲率相似度 - 检测波形弯曲程度的相似性
if len(x) > 2 and len(y) > 2:
# 计算二阶导数(曲率)
curvature_x = np.diff(x_detrended, n=2)
curvature_y = np.diff(y_detrended, n=2)
if (
len(curvature_x) > 1
and len(curvature_y) > 1
and np.std(curvature_x) > 1e-8
and np.std(curvature_y) > 1e-8
):
curv_corr = np.corrcoef(curvature_x, curvature_y)[0, 1]
similarities["curvature"] = (
abs(curv_corr) if not np.isnan(curv_corr) else 0.0
)
else:
similarities["curvature"] = 0.0
else:
similarities["curvature"] = 0.0
return similarities
def sliding_window_analysis(data1, data2, window_size=100, min_similarity=0.6):
"""
增强的滑动窗口相似度分析,返回所有相似的时间段
Args:
data1, data2: 输入数据序列
window_size: 窗口大小
min_similarity: 最小相似度阈值
Returns:
所有相似时间段的详细信息列表
"""
if len(data1) < window_size or len(data2) < window_size:
return []
results = []
# 动态调整步长,确保不遗漏重要的相似段
step = max(1, window_size // 10) # 增加采样密度,减少遗漏
for i in range(0, len(data1) - window_size + 1, step):
window1 = data1[i : i + window_size]
window2 = data2[i : i + window_size]
# 多种相似度指标组合,更好地检测波形相似性
similarities = calculate_similarity_metrics(window1, window2)
# 综合相似度评分(加权平均)- 使用与主算法相同的权重
weights = {
"dtw": 0.35,
"shape": 0.25,
"frequency": 0.15,
"peaks": 0.10,
"change_rate": 0.10,
"curvature": 0.05,
}
similarity = sum(similarities.get(key, 0) * weights[key] for key in weights)
# 降低阈值,捕获更多潜在的相似段
if similarity < min_similarity:
continue
# 计算波形特征描述
window1_stats = {
"mean": float(np.mean(window1)),
"std": float(np.std(window1)),
"min": float(np.min(window1)),
"max": float(np.max(window1)),
"trend": (
"上升"
if window1[-1] > window1[0]
else "下降" if window1[-1] < window1[0] else "平稳"
),
}
window2_stats = {
"mean": float(np.mean(window2)),
"std": float(np.std(window2)),
"min": float(np.min(window2)),
"max": float(np.max(window2)),
"trend": (
"上升"
if window2[-1] > window2[0]
else "下降" if window2[-1] < window2[0] else "平稳"
),
}
results.append(
{
"start_index": i,
"end_index": i + window_size - 1, # 修正结束索引
"similarity": float(similarity),
"similarity_details": {k: float(v) for k, v in similarities.items()},
"window1_stats": window1_stats,
"window2_stats": window2_stats,
"window_size": window_size,
}
)
# 按相似度排序,最相似的在前面
results.sort(key=lambda x: x["similarity"], reverse=True)
# 去除重叠度过高的窗口,保留最具代表性的
filtered_results = []
for result in results:
is_overlapping = False
for existing in filtered_results:
# 计算重叠度
overlap_start = max(result["start_index"], existing["start_index"])
overlap_end = min(result["end_index"], existing["end_index"])
overlap_length = max(0, overlap_end - overlap_start + 1)
overlap_ratio = overlap_length / window_size
# 如果重叠度超过50%,则认为是重复的
if overlap_ratio > 0.5:
is_overlapping = True
break
if not is_overlapping:
filtered_results.append(result)
return filtered_results
def get_similarity_interpretation(similarity_score):
"""获取相似度解释"""
if similarity_score >= 0.85:
return "高度相似"
elif similarity_score >= 0.7:
return "中等相似"
elif similarity_score >= 0.55:
return "低度相似"
else:
return "不相似"
async def calculate_similarity_with_timeout(request, df_data, timeout_seconds=120):
"""带超时处理的相似度计算,优化用于波形相似性检测"""
try:
# 数据预处理
data1 = df_data[request.point1].values
data2 = df_data[request.point2].values
array1 = np.array(data1)
array2 = np.array(data2)
# 检查NaN值
if np.any(np.isnan(array1)) or np.any(np.isnan(array2)):
raise ValueError("数据包含NaN值")
# 数据质量检查和预处理
# 1. 去除异常值(使用3σ原则)
def remove_outliers(data, threshold=3):
mean = np.mean(data)
std = np.std(data)
if std > 1e-8:
mask = np.abs(data - mean) <= threshold * std
return data[mask] if np.sum(mask) > len(data) * 0.5 else data
return data
# 2. 平滑处理(移动平均,减少噪声)
def smooth_data(data, window=3):
if len(data) < window:
return data
smoothed = np.convolve(data, np.ones(window) / window, mode="same")
# 处理边界
smoothed[: window // 2] = data[: window // 2]
smoothed[-(window // 2) :] = data[-(window // 2) :]
return smoothed
# 应用预处理(可选,根据数据质量决定)
if len(array1) > 10 and len(array2) > 10:
# 只对较长的序列进行平滑处理
array1 = smooth_data(array1, window=min(3, len(array1) // 5))
array2 = smooth_data(array2, window=min(3, len(array2) // 5))
# 数据标准化(改进版本)
if request.normalize:
# 使用鲁棒标准化(基于中位数和MAD)
def robust_normalize(data):
median = np.median(data)
mad = np.median(np.abs(data - median))
if mad > 1e-8:
return (data - median) / (
1.4826 * mad
) # 1.4826是MAD到标准差的转换因子
else:
return data - median
array1 = robust_normalize(array1)
array2 = robust_normalize(array2)
# 基础相似度计算
similarity_metrics = calculate_similarity_metrics(array1, array2)
# 针对波形检测优化权重分配 - 更注重波形特征
weights = {
"dtw": 0.35, # DTW最重要,专门用于波形匹配
"shape": 0.25, # 形状相似度很重要
"frequency": 0.15, # 频域特征重要
"peaks": 0.10, # 峰值模式
"change_rate": 0.10, # 变化率
"curvature": 0.05, # 曲率
}
similarity_score = float(
sum(similarity_metrics.get(key, 0) * weights[key] for key in weights)
)
# 滑动窗口分析 - 返回所有相似的时间段
similar_segments = []
window_size = request.window_size
if window_size is None:
# 根据采样间隔计算10分钟对应的数据点数
interval = request.interval
if interval == "1m":
window_size = 10 # 10分钟 = 10个1分钟数据点
elif interval == "5m":
window_size = 2 # 10分钟 = 2个5分钟数据点
elif interval == "15m":
window_size = 1 # 15分钟已经超过10分钟,使用1个数据点
elif interval == "1h":
window_size = 1 # 1小时远超过10分钟,使用1个数据点
else:
# 默认情况,尝试估算10分钟的数据点数
window_size = max(2, min(10, len(array1) // 20)) # 限制在2-10之间
# 确保窗口大小不超过数据长度的一半
window_size = min(window_size, len(array1) // 2)
if window_size < len(array1):
# 使用自适应窗口搜索算法,提高效率和准确性
min_similarity = 0.6 # 可以根据需要调整
# 选择搜索策略
if len(array1) > 1000: # 对于大数据集,使用自适应搜索
window_results = adaptive_window_search(
array1,
array2,
window_size,
calculate_similarity_metrics,
min_similarity,
max_segments=15,
)
elif len(array1) > 200: # 中等数据集,使用多尺度分析
window_results = multi_scale_window_analysis(
array1,
array2,
window_size,
calculate_similarity_metrics,
min_similarity,
)
else: # 小数据集,使用原始方法
window_results = sliding_window_analysis(
array1, array2, window_size, min_similarity
)
# 获取DataFrame的时间索引
time_index = df_data.index
# 处理所有相似的时间段
for result in window_results:
start_idx = result["start_index"]
end_idx = result["end_index"]
# 获取对应的时间区间
start_time = (
time_index[start_idx] if start_idx < len(time_index) else None
)
end_time = time_index[end_idx] if end_idx < len(time_index) else None
# 计算时间段长度
duration_minutes = None
if start_time is not None and end_time is not None:
duration = end_time - start_time
duration_minutes = duration.total_seconds() / 60
segment_info = {
"start_position": int(start_idx),
"end_position": int(end_idx),
"start_time": str(start_time) if start_time is not None else None,
"end_time": str(end_time) if end_time is not None else None,
"duration_minutes": (
float(duration_minutes)
if duration_minutes is not None
else None
),
"similarity_score": result["similarity"],
"similarity_details": result["similarity_details"],
"window_size": result["window_size"],
"interpretation": (
"高度相似"
if result["similarity"] >= 0.85
else (
"中等相似"
if result["similarity"] >= 0.7
else (
"低度相似"
if result["similarity"] >= 0.55
else "轻微相似"
)
)
),
}
# 添加统计信息(如果存在)
if "window1_stats" in result:
segment_info["point1_stats"] = result["window1_stats"]
if "window2_stats" in result:
segment_info["point2_stats"] = result["window2_stats"]
if "scale" in result:
segment_info["scale"] = result["scale"]
segment_info["scale_description"] = result["scale_description"]
similar_segments.append(segment_info)
# 找出最佳匹配段(向后兼容)
best_match_info = similar_segments[0] if similar_segments else None
# 相似度解释 - 提高判断标准
if similarity_score >= 0.85:
interpretation = "高度相似"
elif similarity_score >= 0.7:
interpretation = "中等相似"
elif similarity_score >= 0.55:
interpretation = "低度相似"
else:
interpretation = "不相似"
return {
"similarity_score": similarity_score,
"interpretation": interpretation,
"data_length": len(array1),
"best_match": best_match_info,
"similar_segments": similar_segments, # 新增:所有相似时间段
"segments_count": len(similar_segments), # 新增:相似段数量
"detailed_metrics": similarity_metrics,
}
except Exception as e:
logger.error(f"相似度计算错误: {str(e)}")
raise HTTPException(status_code=500, detail=f"计算错误: {str(e)}")
except asyncio.TimeoutError:
raise HTTPException(
status_code=408,
detail={
"error_type": "计算超时",
"message": f"相似度分析在{timeout_seconds}秒内未完成",
"suggestions": [
"尝试使用更大的采样间隔(如'15m'或'1h')",
"缩短分析时间范围",
"减小滑动窗口大小",
],
},
)
@router.post(
"/api/similarity_analysis",
response_model=SimilarityAnalysisResponse,
operation_id="two_points_similarity_analysis",
tags=["相似度分析"],
)
async def similarity_analysis(request: SimilarityAnalysisRequest):
"""
分析两个时间序列数据之间的相似度,支持多种相似度计算方法
**参数说明:**
- **point1**: 第一个数据点名称, 例如 "W3.NODE1.XX"
- **point2**: 第二个数据点名称, 例如 "W3.NODE1.XX"
- **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS
- **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS
- **interval**: 采样间隔,**重要**:请根据分析时间范围合理选择
- 1小时内分析:建议使用'1m'(1分钟)
- 1天内分析:建议使用'1m'(1分钟,默认值)
- 1周内分析:建议使用'10m'(10分钟)
- 1月内分析:建议使用'30m'(30分钟)
- **fill_method**: 缺失值填充方法
- **window_size**: 滑动窗口大小,用于区间相似度分析
- 如果不指定,系统会根据采样间隔自动设置约10分钟的窗口:
- 1分钟间隔:10个数据点(10分钟)
- 5分钟间隔:2个数据点(10分钟)
- 15分钟间隔:1个数据点(15分钟)
- 1小时间隔:1个数据点(1小时)
- **normalize**: 是否对数据进行标准化处理
**返回结果:**
- similarity_score: 综合相似度评分(0到1之间)
- method: 使用的分析方法
- data_length: 数据点数量
- interpretation: 相似度解释
- best_match_info: 最佳匹配区间信息(可选)
**使用示例:**
```json
{
"point1": "W3.NODE1.XX",
"point2": "",
"start_time": "2024-01-01 00:00:00",
"end_time": "2024-01-01 23:59:59",
"interval": "1m",
"normalize": true
}
```
"""
try:
# 通过 openplant 获取数据
df_data = opt.api_select_to_frame(
point_list=[request.point1, request.point2],
start_time=request.start_time,
end_time=request.end_time,
interval=request.interval,
fill_method=request.fill_method,
)
if df_data is None or df_data.empty:
raise HTTPException(
status_code=404,
detail={
"error_type": "数据获取失败",
"message": "无法获取指定时间范围内的数据",
"suggestions": [
"检查数据点名称是否正确",
"确认时间范围内有数据",
"验证数据源连接是否正常",
],
},
)
# 使用超时处理计算相似度
result = await asyncio.wait_for(
calculate_similarity_with_timeout(request, df_data), timeout=120
)
# 构造响应
return SimilarityAnalysisResponse(
similarity_score=result["similarity_score"],
method="综合相似度分析",
data_length=result["data_length"],
interpretation=result["interpretation"],
best_match_info=result["best_match"],
)
except ValueError as e:
# Pydantic验证错误
raise HTTPException(
status_code=422,
detail={
"error_type": "参数验证错误",
"message": str(e),
"suggestions": [
"检查数据点名称是否正确",
"确认时间格式是否正确",
"验证时间范围是否合理",
],
},
)
except Exception as e:
# 其他计算错误
raise HTTPException(
status_code=500,
detail={
"error_type": "计算错误",
"message": f"相似度分析过程中发生错误: {str(e)}",
"suggestions": [
"检查数据点是否存在",
"确认时间范围内有足够的数据",
"尝试调整窗口大小参数",
],
},
)