"""
DoWhy MCP Server - 反驳方法工具
实现DoWhy的反驳方法,用于验证因果推理结果的稳健性
"""
import logging
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
import dowhy
from dowhy import CausalModel
from mcp.server.fastmcp import FastMCP
from ..utils.data_processor import load_and_validate_data, serialize_numpy_types
logger = logging.getLogger("dowhy-mcp-server.refutation")
def register_refutation_tools(server: FastMCP) -> None:
"""注册所有 DoWhy 反驳方法工具"""
@server.tool()
def placebo_treatment_refuter(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
causal_estimate: float,
num_simulations: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy 安慰剂治疗反驳方法
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
causal_estimate: 原始因果效应估计值
num_simulations: 模拟次数
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 构建因果图
causal_graph = f"""
digraph {{
{treatment} -> {outcome};
{' -> '.join([f'{conf} -> {treatment}; {conf} -> {outcome}' for conf in confounders])}
}}
"""
# 创建因果模型
model = CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
graph=causal_graph
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 执行安慰剂治疗反驳
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="placebo_treatment_refuter",
num_simulations=num_simulations
)
return {
"success": True,
"method": "DoWhy placebo_treatment_refuter",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"original_estimate": float(causal_estimate),
"estimated_effect": float(estimate.value),
"refutation_results": {
"new_effect": float(refutation.new_effect) if refutation.new_effect is not None else None,
"p_value": float(refutation.p_value) if hasattr(refutation, 'p_value') and refutation.p_value is not None else None,
"refutation_type": str(refutation.refutation_type) if hasattr(refutation, 'refutation_type') else None
},
"num_simulations": num_simulations,
"sample_size": len(data),
"message": f"安慰剂治疗反驳完成,进行了{num_simulations}次模拟"
}
except Exception as e:
logger.error(f"DoWhy 安慰剂治疗反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy placebo_treatment_refuter"
}
@server.tool()
def random_common_cause_refuter(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
causal_estimate: float,
num_simulations: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy 随机共同原因反驳方法
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
causal_estimate: 原始因果效应估计值
num_simulations: 模拟次数
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 构建因果图
causal_graph = f"""
digraph {{
{treatment} -> {outcome};
{' -> '.join([f'{conf} -> {treatment}; {conf} -> {outcome}' for conf in confounders])}
}}
"""
# 创建因果模型
model = CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
graph=causal_graph
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 执行随机共同原因反驳
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="random_common_cause",
num_simulations=num_simulations
)
return {
"success": True,
"method": "DoWhy random_common_cause",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"original_estimate": float(causal_estimate),
"estimated_effect": float(estimate.value),
"refutation_results": {
"new_effect": float(refutation.new_effect) if refutation.new_effect is not None else None,
"p_value": float(refutation.p_value) if hasattr(refutation, 'p_value') and refutation.p_value is not None else None,
"refutation_type": str(refutation.refutation_type) if hasattr(refutation, 'refutation_type') else None
},
"num_simulations": num_simulations,
"sample_size": len(data),
"message": f"随机共同原因反驳完成,进行了{num_simulations}次模拟"
}
except Exception as e:
logger.error(f"DoWhy 随机共同原因反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy random_common_cause"
}
@server.tool()
def data_subset_refuter(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
causal_estimate: float,
subset_fraction: float = 0.8,
num_simulations: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy 数据子集反驳方法
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
causal_estimate: 原始因果效应估计值
subset_fraction: 子集比例
num_simulations: 模拟次数
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 构建因果图
causal_graph = f"""
digraph {{
{treatment} -> {outcome};
{' -> '.join([f'{conf} -> {treatment}; {conf} -> {outcome}' for conf in confounders])}
}}
"""
# 创建因果模型
model = CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
graph=causal_graph
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 执行数据子集反驳
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="data_subset_refuter",
subset_fraction=subset_fraction,
num_simulations=num_simulations
)
return {
"success": True,
"method": "DoWhy data_subset_refuter",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"original_estimate": float(causal_estimate),
"estimated_effect": float(estimate.value),
"refutation_results": {
"new_effect": float(refutation.new_effect) if refutation.new_effect is not None else None,
"p_value": float(refutation.p_value) if hasattr(refutation, 'p_value') and refutation.p_value is not None else None,
"refutation_type": str(refutation.refutation_type) if hasattr(refutation, 'refutation_type') else None
},
"subset_fraction": subset_fraction,
"num_simulations": num_simulations,
"sample_size": len(data),
"message": f"数据子集反驳完成,使用{subset_fraction*100}%数据进行了{num_simulations}次模拟"
}
except Exception as e:
logger.error(f"DoWhy 数据子集反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy data_subset_refuter"
}
@server.tool()
def bootstrap_refuter(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
causal_estimate: float,
num_simulations: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy 自举法反驳方法
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
causal_estimate: 原始因果效应估计值
num_simulations: 自举次数
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 构建因果图
causal_graph = f"""
digraph {{
{treatment} -> {outcome};
{' -> '.join([f'{conf} -> {treatment}; {conf} -> {outcome}' for conf in confounders])}
}}
"""
# 创建因果模型
model = CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
graph=causal_graph
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 执行自举法反驳
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="bootstrap_refuter",
num_simulations=num_simulations
)
return {
"success": True,
"method": "DoWhy bootstrap_refuter",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"original_estimate": float(causal_estimate),
"estimated_effect": float(estimate.value),
"refutation_results": {
"new_effect": float(refutation.new_effect) if refutation.new_effect is not None else None,
"p_value": float(refutation.p_value) if hasattr(refutation, 'p_value') and refutation.p_value is not None else None,
"refutation_type": str(refutation.refutation_type) if hasattr(refutation, 'refutation_type') else None
},
"num_simulations": num_simulations,
"sample_size": len(data),
"message": f"自举法反驳完成,进行了{num_simulations}次自举采样"
}
except Exception as e:
logger.error(f"DoWhy 自举法反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy bootstrap_refuter"
}
@server.tool()
def dummy_outcome_refuter(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
causal_estimate: float,
num_simulations: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy 虚拟结果反驳方法
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
causal_estimate: 原始因果效应估计值
num_simulations: 模拟次数
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 构建因果图
causal_graph = f"""
digraph {{
{treatment} -> {outcome};
{' -> '.join([f'{conf} -> {treatment}; {conf} -> {outcome}' for conf in confounders])}
}}
"""
# 创建因果模型
model = CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
graph=causal_graph
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 执行虚拟结果反驳
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="dummy_outcome_refuter",
num_simulations=num_simulations
)
return {
"success": True,
"method": "DoWhy dummy_outcome_refuter",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"original_estimate": float(causal_estimate),
"estimated_effect": float(estimate.value),
"refutation_results": {
"new_effect": float(refutation.new_effect) if refutation.new_effect is not None else None,
"p_value": float(refutation.p_value) if hasattr(refutation, 'p_value') and refutation.p_value is not None else None,
"refutation_type": str(refutation.refutation_type) if hasattr(refutation, 'refutation_type') else None
},
"num_simulations": num_simulations,
"sample_size": len(data),
"message": f"虚拟结果反驳完成,进行了{num_simulations}次模拟"
}
except Exception as e:
logger.error(f"DoWhy 虚拟结果反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy dummy_outcome_refuter"
}