"""
纯 DoWhy 根本原因分析工具 - 100% 使用 DoWhy 原生方法,不重复造轮子
"""
import logging
from typing import Any, Dict, List
import pandas as pd
import numpy as np
import dowhy
from mcp.server.fastmcp import FastMCP
from ..utils.data_processor import load_and_validate_data
logger = logging.getLogger("dowhy-mcp-server.root_cause")
def register_root_cause_tools(server: FastMCP) -> None:
"""注册所有纯 DoWhy 根本原因分析工具"""
@server.tool()
def anomaly_attribution_analyzer(
data_path: str,
target_variable: str,
feature_variables: List[str],
anomaly_threshold: float = 2
) -> Dict[str, Any]:
"""
使用 DoWhy 进行异常归因分析
"""
try:
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# DoWhy没有直接的异常检测功能,这需要专门的异常检测库
return {
"success": False,
"error": "异常检测不是DoWhy的原生功能。请使用专门的异常检测库(如scikit-learn的IsolationForest)结合DoWhy进行因果分析。",
"method": "DoWhy Anomaly Attribution",
"recommendation": "先使用异常检测库识别异常,然后使用DoWhy分析异常的因果关系",
"alternative_approach": "使用DoWhy的refutation方法检测数据中的异常模式对因果推断的影响"
}
# 使用 DoWhy 分析每个特征对异常的贡献
attribution_results = {}
for feature in feature_variables:
try:
# 创建二元异常指示变量
data_with_anomaly = data.copy()
data_with_anomaly['is_anomaly'] = anomaly_mask.astype(int)
# 使用 DoWhy 分析特征对异常的因果效应
model = dowhy.CausalModel(
data=data_with_anomaly,
treatment=feature,
outcome='is_anomaly',
common_causes=[f for f in feature_variables if f != feature]
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
attribution_results[feature] = {
"causal_effect": float(estimate.value),
"attribution_strength": abs(float(estimate.value)),
"direction": "增加异常概率" if float(estimate.value) > 0 else "减少异常概率"
}
except Exception as e:
logger.warning(f"特征 {feature} 的异常归因失败: {e}")
attribution_results[feature] = {
"causal_effect": 0.0,
"attribution_strength": 0.0,
"direction": "无法确定"
}
# 排序归因结果
sorted_attributions = sorted(
attribution_results.items(),
key=lambda x: x[1]["attribution_strength"],
reverse=True
)
return {
"success": True,
"method": "DoWhy Anomaly Attribution",
"target_variable": target_variable,
"feature_variables": feature_variables,
"anomaly_threshold": anomaly_threshold,
"anomaly_count": len(anomalies),
"attribution_results": attribution_results,
"top_contributors": [
{"feature": feature, "strength": result["attribution_strength"]}
for feature, result in sorted_attributions[:3]
],
"sample_size": int(len(data))
}
except Exception as e:
return {
"success": False,
"error": str(e),
"method": "DoWhy Anomaly Attribution"
}
@server.tool()
def distribution_change_attribution(
data_path: str,
target_variable: str,
feature_variables: List[str],
time_variable: str,
baseline_period: Any,
comparison_period: Any
) -> Dict[str, Any]:
"""
使用 DoWhy 归因分布变化的根本原因
"""
try:
all_vars = [target_variable, time_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 分离基准期和比较期数据
baseline_data = data[data[time_variable] == baseline_period]
comparison_data = data[data[time_variable] == comparison_period]
if len(baseline_data) == 0 or len(comparison_data) == 0:
return {
"success": False,
"error": "基准期或比较期数据为空",
"method": "DoWhy Distribution Change Attribution"
}
# 计算分布变化
baseline_mean = baseline_data[target_variable].mean()
comparison_mean = comparison_data[target_variable].mean()
distribution_change = comparison_mean - baseline_mean
# 创建时期指示变量
data_combined = data[data[time_variable].isin([baseline_period, comparison_period])].copy()
data_combined['is_comparison_period'] = (data_combined[time_variable] == comparison_period).astype(int)
# 使用 DoWhy 分析每个特征对分布变化的贡献
attribution_results = {}
for feature in feature_variables:
try:
# 分析特征在两个时期的变化对目标变量的影响
model = dowhy.CausalModel(
data=data_combined,
treatment=feature,
outcome=target_variable,
common_causes=[f for f in feature_variables if f != feature] + ['is_comparison_period']
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 计算特征在两个时期的变化
feature_baseline_mean = baseline_data[feature].mean()
feature_comparison_mean = comparison_data[feature].mean()
feature_change = feature_comparison_mean - feature_baseline_mean
# 计算归因贡献
causal_effect = float(estimate.value)
contribution = causal_effect * feature_change
attribution_results[feature] = {
"causal_effect": causal_effect,
"feature_change": float(feature_change),
"contribution_to_change": float(contribution),
"relative_contribution": float(contribution / distribution_change) if distribution_change != 0 else 0,
"baseline_mean": float(feature_baseline_mean),
"comparison_mean": float(feature_comparison_mean)
}
except Exception as e:
logger.warning(f"特征 {feature} 的分布变化归因失败: {e}")
attribution_results[feature] = {
"causal_effect": 0.0,
"feature_change": 0.0,
"contribution_to_change": 0.0,
"relative_contribution": 0.0,
"baseline_mean": 0.0,
"comparison_mean": 0.0
}
# 排序贡献
sorted_contributions = sorted(
attribution_results.items(),
key=lambda x: abs(x[1]["contribution_to_change"]),
reverse=True
)
return {
"success": True,
"method": "DoWhy Distribution Change Attribution",
"target_variable": target_variable,
"feature_variables": feature_variables,
"time_variable": time_variable,
"baseline_period": baseline_period,
"comparison_period": comparison_period,
"distribution_change": float(distribution_change),
"baseline_mean": float(baseline_mean),
"comparison_mean": float(comparison_mean),
"attribution_results": attribution_results,
"top_contributors": [
{
"feature": feature,
"contribution": result["contribution_to_change"],
"relative_contribution": result["relative_contribution"]
}
for feature, result in sorted_contributions[:3]
],
"baseline_sample_size": len(baseline_data),
"comparison_sample_size": len(comparison_data)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"method": "DoWhy Distribution Change Attribution"
}
@server.tool()
def causal_chain_tracer(
data_path: str,
event_variable: str,
potential_causes: List[str],
max_depth: int = 3
) -> Dict[str, Any]:
"""
使用 DoWhy 追踪因果链
"""
try:
all_vars = [event_variable] + potential_causes
data = load_and_validate_data(data_path, all_vars)
# 构建因果链
causal_chains = {}
# 第一层:直接原因
direct_causes = {}
for cause in potential_causes:
try:
other_causes = [c for c in potential_causes if c != cause]
model = dowhy.CausalModel(
data=data,
treatment=cause,
outcome=event_variable,
common_causes=other_causes if other_causes else None
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
effect_size = abs(float(estimate.value))
if effect_size > 0.1: # 阈值
direct_causes[cause] = {
"effect_size": effect_size,
"causal_effect": float(estimate.value),
"depth": 1
}
except Exception as e:
logger.warning(f"直接原因分析失败 {cause}: {e}")
continue
# 构建更深层的因果链(简化版本)
if max_depth > 1:
for depth in range(2, max_depth + 1):
for intermediate_var in potential_causes:
if intermediate_var in direct_causes:
continue
# 检查是否是其他变量的原因
for target_var in direct_causes.keys():
try:
other_vars = [v for v in potential_causes if v not in [intermediate_var, target_var]]
model = dowhy.CausalModel(
data=data,
treatment=intermediate_var,
outcome=target_var,
common_causes=other_vars if other_vars else None
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
effect_size = abs(float(estimate.value))
if effect_size > 0.1:
chain_key = f"{intermediate_var}->{target_var}->{event_variable}"
causal_chains[chain_key] = {
"chain": [intermediate_var, target_var, event_variable],
"intermediate_effect": float(estimate.value),
"total_effect": float(estimate.value) * direct_causes[target_var]["causal_effect"],
"depth": depth
}
except:
continue
return {
"success": True,
"method": "DoWhy Causal Chain Tracing",
"event_variable": event_variable,
"potential_causes": potential_causes,
"max_depth": max_depth,
"direct_causes": direct_causes,
"causal_chains": causal_chains,
"sample_size": int(len(data))
}
except Exception as e:
return {
"success": False,
"error": str(e),
"method": "DoWhy Causal Chain Tracing"
}
@server.tool()
def root_cause_identifier(
data_path: str,
problem_variable: str,
candidate_causes: List[str],
problem_threshold: float = 2
) -> Dict[str, Any]:
"""
使用 DoWhy 识别问题的根本原因
"""
try:
all_vars = [problem_variable] + candidate_causes
data = load_and_validate_data(data_path, all_vars)
# 定义问题实例
problem_mean = data[problem_variable].mean()
problem_std = data[problem_variable].std()
problem_mask = data[problem_variable] > (problem_mean + problem_threshold * problem_std)
data_with_problem = data.copy()
data_with_problem['has_problem'] = problem_mask.astype(int)
problem_count = problem_mask.sum()
if problem_count == 0:
return {
"success": True,
"method": "DoWhy Root Cause Identification",
"problem_variable": problem_variable,
"candidate_causes": candidate_causes,
"problem_count": 0,
"root_causes": {},
"message": "未发现问题实例"
}
# 分析每个候选原因
root_cause_analysis = {}
for cause in candidate_causes:
try:
other_causes = [c for c in candidate_causes if c != cause]
# 分析原因对问题发生的影响
model = dowhy.CausalModel(
data=data_with_problem,
treatment=cause,
outcome='has_problem',
common_causes=other_causes if other_causes else None
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
causal_effect = float(estimate.value)
# 计算原因在问题和非问题实例中的差异
problem_instances = data[problem_mask]
normal_instances = data[~problem_mask]
cause_problem_mean = problem_instances[cause].mean() if len(problem_instances) > 0 else 0
cause_normal_mean = normal_instances[cause].mean() if len(normal_instances) > 0 else 0
cause_difference = cause_problem_mean - cause_normal_mean
root_cause_analysis[cause] = {
"causal_effect": causal_effect,
"effect_strength": abs(causal_effect),
"cause_difference": float(cause_difference),
"problem_mean": float(cause_problem_mean),
"normal_mean": float(cause_normal_mean),
"is_root_cause": abs(causal_effect) > 0.1 and abs(cause_difference) > 0.1
}
except Exception as e:
logger.warning(f"根本原因分析失败 {cause}: {e}")
root_cause_analysis[cause] = {
"causal_effect": 0.0,
"effect_strength": 0.0,
"cause_difference": 0.0,
"problem_mean": 0.0,
"normal_mean": 0.0,
"is_root_cause": False
}
# 识别根本原因
root_causes = {
cause: analysis for cause, analysis in root_cause_analysis.items()
if analysis["is_root_cause"]
}
# 按影响强度排序
sorted_causes = sorted(
root_causes.items(),
key=lambda x: x[1]["effect_strength"],
reverse=True
)
return {
"success": True,
"method": "DoWhy Root Cause Identification",
"problem_variable": problem_variable,
"candidate_causes": candidate_causes,
"problem_threshold": problem_threshold,
"problem_count": int(problem_count),
"total_samples": int(len(data)),
"problem_rate": float(problem_count / len(data)),
"root_cause_analysis": root_cause_analysis,
"identified_root_causes": root_causes,
"top_root_causes": [
{"cause": cause, "strength": analysis["effect_strength"]}
for cause, analysis in sorted_causes[:3]
]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"method": "DoWhy Root Cause Identification"
}
@server.tool()
def event_attribution_analyzer(
data_path: str,
event_variable: str,
attribution_variables: List[str],
event_threshold: float = 1
) -> Dict[str, Any]:
"""
使用 DoWhy 分析事件归因
"""
try:
all_vars = [event_variable] + attribution_variables
data = load_and_validate_data(data_path, all_vars)
# 定义事件发生
event_mask = data[event_variable] >= event_threshold
data_with_event = data.copy()
data_with_event['event_occurred'] = event_mask.astype(int)
event_count = event_mask.sum()
if event_count == 0:
return {
"success": True,
"method": "DoWhy Event Attribution",
"event_variable": event_variable,
"attribution_variables": attribution_variables,
"event_count": 0,
"attribution_results": {},
"message": "未发现事件实例"
}
# 分析每个归因变量
attribution_results = {}
for attr_var in attribution_variables:
try:
other_vars = [v for v in attribution_variables if v != attr_var]
# 分析变量对事件发生的贡献
model = dowhy.CausalModel(
data=data_with_event,
treatment=attr_var,
outcome='event_occurred',
common_causes=other_vars if other_vars else None
)
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
causal_effect = float(estimate.value)
# 计算变量在事件和非事件实例中的差异
event_instances = data[event_mask]
non_event_instances = data[~event_mask]
attr_event_mean = event_instances[attr_var].mean() if len(event_instances) > 0 else 0
attr_non_event_mean = non_event_instances[attr_var].mean() if len(non_event_instances) > 0 else 0
attr_difference = attr_event_mean - attr_non_event_mean
attribution_results[attr_var] = {
"causal_effect": causal_effect,
"attribution_strength": abs(causal_effect),
"variable_difference": float(attr_difference),
"event_mean": float(attr_event_mean),
"non_event_mean": float(attr_non_event_mean),
"attribution_direction": "促进事件" if causal_effect > 0 else "抑制事件"
}
except Exception as e:
logger.warning(f"事件归因分析失败 {attr_var}: {e}")
attribution_results[attr_var] = {
"causal_effect": 0.0,
"attribution_strength": 0.0,
"variable_difference": 0.0,
"event_mean": 0.0,
"non_event_mean": 0.0,
"attribution_direction": "无影响"
}
# 按归因强度排序
sorted_attributions = sorted(
attribution_results.items(),
key=lambda x: x[1]["attribution_strength"],
reverse=True
)
return {
"success": True,
"method": "DoWhy Event Attribution",
"event_variable": event_variable,
"attribution_variables": attribution_variables,
"event_threshold": event_threshold,
"event_count": int(event_count),
"total_samples": int(len(data)),
"event_rate": float(event_count / len(data)),
"attribution_results": attribution_results,
"top_attributions": [
{
"variable": var,
"strength": result["attribution_strength"],
"direction": result["attribution_direction"]
}
for var, result in sorted_attributions[:3]
]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"method": "DoWhy Event Attribution"
}