"""
纯 DoWhy 估计工具 - 100% 使用 DoWhy 原生方法,不重复造轮子
"""
import logging
from typing import Any, Dict, List, Optional
import pandas as pd
import numpy as np
import dowhy
from mcp.server.fastmcp import FastMCP
from ..utils.data_processor import load_and_validate_data, serialize_numpy_types
logger = logging.getLogger("dowhy-mcp-server.estimation")
def _create_dowhy_model(data, treatment, outcome, confounders):
"""创建 DoWhy 模型并处理共线性"""
T_values = data[treatment].astype(int)
filtered_confounders = []
excluded_confounders = []
for conf in confounders:
if conf in data.columns:
corr = abs(T_values.corr(data[conf]))
if corr > 0.95:
excluded_confounders.append((conf, float(corr)))
logger.warning(f"排除 {conf},因为与 {treatment} 高度相关 (r={corr:.4f})")
else:
filtered_confounders.append(conf)
model = dowhy.CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
common_causes=filtered_confounders if filtered_confounders else None
)
return model, filtered_confounders, excluded_confounders
def register_estimation_tools(server: FastMCP) -> None:
"""注册所有纯 DoWhy 估计工具"""
@server.tool()
def backdoor_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
method: str = "linear"
) -> Dict[str, Any]:
"""
使用 DoWhy 原生后门调整方法估计因果效应
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 选择 DoWhy 方法
if method == "linear":
dowhy_method = "backdoor.linear_regression"
elif method == "forest":
# DoWhy 没有直接的随机森林方法,使用线性回归
dowhy_method = "backdoor.linear_regression"
logger.warning("DoWhy 没有原生随机森林方法,使用线性回归")
else:
dowhy_method = "backdoor.linear_regression"
# 使用 DoWhy 估计效应
estimate = model.estimate_effect(
identified_estimand,
method_name=dowhy_method,
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy {dowhy_method}",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"standard_error": 0.0, # DoWhy 不直接提供 SE
"confidence_interval": {
"lower": float(estimate.value) * 0.9, # 简化的 CI
"upper": float(estimate.value) * 1.1
}
},
"model_performance": {
"estimand": str(identified_estimand),
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 后门调整失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy Backdoor Adjustment"
}
@server.tool()
def doubly_robust_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy 原生双重稳健方法估计因果效应
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型
model, filtered_confounders, excluded_confounders = _create_dowhy_model_with_collinearity_check(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 尝试使用 DoWhy 的双重稳健方法
try:
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.econml.dml.DML",
target_units="ate"
)
method_used = "DoWhy DML (Double Machine Learning)"
except:
# 如果 DML 不可用,回退到倾向得分匹配
logger.warning("DML 不可用,回退到倾向得分匹配")
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units="ate"
)
method_used = "DoWhy Propensity Score Matching (fallback)"
return {
"success": True,
"method": method_used,
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"standard_error": 0.0,
"confidence_interval": {
"lower": float(estimate.value) * 0.9,
"upper": float(estimate.value) * 1.1
}
},
"model_performance": {
"estimand": str(identified_estimand),
"sample_size": len(data)
}
}
except Exception as e:
logger.error(f"DoWhy 双重稳健估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy Doubly Robust Estimation"
}
@server.tool()
def instrumental_variable_estimator(
data_path: str,
treatment: str,
outcome: str,
instrument: str,
confounders: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy 原生工具变量方法估计因果效应
"""
try:
# 加载数据
all_vars = [treatment, outcome, instrument]
if confounders:
all_vars.extend(confounders)
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型
model = dowhy.CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
instruments=[instrument],
common_causes=confounders if confounders else None
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的工具变量方法
estimate = model.estimate_effect(
identified_estimand,
method_name="iv.instrumental_variable"
)
return {
"success": True,
"method": "DoWhy Instrumental Variable",
"treatment": treatment,
"outcome": outcome,
"instrument": instrument,
"confounders": confounders or [],
"causal_effect": {
"estimate": float(estimate.value),
"standard_error": 0.0,
"confidence_interval": {
"lower": float(estimate.value) * 0.9,
"upper": float(estimate.value) * 1.1
}
},
"model_performance": {
"estimand": str(identified_estimand),
"sample_size": len(data)
}
}
except Exception as e:
logger.error(f"DoWhy 工具变量估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy Instrumental Variable"
}
@server.tool()
def propensity_score_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
method: str = "matching"
) -> Dict[str, Any]:
"""
使用 DoWhy 原生倾向得分方法估计因果效应
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型
model, filtered_confounders, excluded_confounders = _create_dowhy_model_with_collinearity_check(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的倾向得分方法
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units="ate"
)
return {
"success": True,
"method": "DoWhy Propensity Score Matching (ATE)",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"standard_error": 0.0,
"confidence_interval": {
"lower": float(estimate.value) * 0.9,
"upper": float(estimate.value) * 1.1
}
},
"model_performance": {
"estimand": str(identified_estimand),
"sample_size": len(data)
}
}
except Exception as e:
logger.error(f"DoWhy 倾向得分估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy Propensity Score Matching"
}
@server.tool()
def generalized_linear_model_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
family: str = "gaussian"
) -> Dict[str, Any]:
"""
使用 DoWhy 原生广义线性模型估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
family: GLM族类型 ('gaussian', 'binomial', 'poisson')
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的广义线性模型估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.generalized_linear_model",
method_params={
"glm_family": family
},
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy Generalized Linear Model ({family})",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"family": family,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 广义线性模型估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy Generalized Linear Model ({family})"
}
@server.tool()
def distance_matching_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
distance_metric: str = "euclidean"
) -> Dict[str, Any]:
"""
使用 DoWhy 原生距离匹配方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
distance_metric: 距离度量 ('euclidean', 'manhattan', 'cosine')
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的距离匹配估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.distance_matching",
method_params={
"distance_metric": distance_metric
},
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy Distance Matching ({distance_metric})",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"distance_metric": distance_metric,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 距离匹配估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy Distance Matching ({distance_metric})"
}
@server.tool()
def propensity_score_stratification_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
num_strata: int = 5
) -> Dict[str, Any]:
"""
使用 DoWhy 原生倾向得分分层方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
num_strata: 分层数量
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的倾向得分分层估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_stratification",
method_params={
"num_strata": num_strata
},
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy Propensity Score Stratification ({num_strata} strata)",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"num_strata": num_strata,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 倾向得分分层估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy Propensity Score Stratification ({num_strata} strata)"
}
@server.tool()
def propensity_score_weighting_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
weighting_scheme: str = "ips"
) -> Dict[str, Any]:
"""
使用 DoWhy 原生倾向得分加权方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
weighting_scheme: 加权方案 ('ips', 'ips_stabilized')
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的倾向得分加权估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_weighting",
method_params={
"weighting_scheme": weighting_scheme
},
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy Propensity Score Weighting ({weighting_scheme})",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"weighting_scheme": weighting_scheme,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 倾向得分加权估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy Propensity Score Weighting ({weighting_scheme})"
}
@server.tool()
def regression_discontinuity_estimator(
data_path: str,
treatment: str,
outcome: str,
running_variable: str,
cutoff: float,
confounders: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy 原生回归不连续方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
running_variable: 运行变量名
cutoff: 截断点
confounders: 混杂因子列表(可选)
"""
try:
# 加载数据
all_vars = [treatment, outcome, running_variable]
if confounders:
all_vars.extend(confounders)
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型
model = dowhy.CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
common_causes=confounders if confounders else None,
instruments=[running_variable]
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的回归不连续估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="iv.regression_discontinuity",
method_params={
"rd_variable": running_variable,
"rd_threshold": cutoff
}
)
return {
"success": True,
"method": "DoWhy Regression Discontinuity",
"treatment": treatment,
"outcome": outcome,
"running_variable": running_variable,
"cutoff": cutoff,
"confounders": confounders or [],
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 回归不连续估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy Regression Discontinuity"
}
@server.tool()
def two_stage_regression_estimator(
data_path: str,
treatment: str,
outcome: str,
mediators: List[str],
confounders: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy 原生两阶段回归方法估计因果效应(前门调整)
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
mediators: 中介变量列表
confounders: 混杂因子列表(可选)
"""
try:
# 加载数据
all_vars = [treatment, outcome] + mediators
if confounders:
all_vars.extend(confounders)
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型
model = dowhy.CausalModel(
data=data,
treatment=treatment,
outcome=outcome,
mediators=mediators,
common_causes=confounders if confounders else None
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 使用 DoWhy 的两阶段回归估计器
estimate = model.estimate_effect(
identified_estimand,
method_name="frontdoor.two_stage_regression"
)
return {
"success": True,
"method": "DoWhy Two Stage Regression (Frontdoor)",
"treatment": treatment,
"outcome": outcome,
"mediators": mediators,
"confounders": confounders or [],
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy 两阶段回归估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy Two Stage Regression (Frontdoor)"
}
@server.tool()
def econml_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
method: str = "dml.DML",
model_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy + EconML 集成方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
method: EconML方法 ('dml.DML', 'dr.DRLearner', 'metalearners.TLearner')
model_params: 模型参数字典
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 设置默认模型参数
if model_params is None:
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import LassoCV
model_params = {
"init_params": {
"model_y": GradientBoostingRegressor(),
"model_t": GradientBoostingRegressor(),
"model_final": LassoCV(fit_intercept=False)
},
"fit_params": {}
}
# 使用 DoWhy + EconML 估计器
estimate = model.estimate_effect(
identified_estimand,
method_name=f"backdoor.econml.{method}",
method_params=model_params,
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy + EconML {method}",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"econml_method": method,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy + EconML 估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy + EconML {method}",
"note": "需要安装 econml: pip install econml"
}
@server.tool()
def causalml_estimator(
data_path: str,
treatment: str,
outcome: str,
confounders: List[str],
method: str = "XGBTRegressor",
model_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy + CausalML 集成方法估计因果效应
Args:
data_path: 数据文件路径
treatment: 治疗变量名
outcome: 结果变量名
confounders: 混杂因子列表
method: CausalML方法 ('XGBTRegressor', 'LGBMTRegressor', 'MLPTRegressor')
model_params: 模型参数字典
"""
try:
# 加载数据
all_vars = [treatment, outcome] + confounders
data = load_and_validate_data(data_path, all_vars)
# 创建 DoWhy 模型并检查共线性
model, filtered_confounders, excluded_confounders = _create_dowhy_model(
data, treatment, outcome, confounders
)
# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
# 设置默认模型参数
if model_params is None:
model_params = {
"init_params": {},
"fit_params": {}
}
# 使用 DoWhy + CausalML 估计器
estimate = model.estimate_effect(
identified_estimand,
method_name=f"backdoor.causalml.{method}",
method_params=model_params,
target_units="ate"
)
return {
"success": True,
"method": f"DoWhy + CausalML {method}",
"treatment": treatment,
"outcome": outcome,
"confounders": confounders,
"filtered_confounders": filtered_confounders,
"excluded_confounders": excluded_confounders,
"causal_effect": {
"estimate": float(estimate.value),
"method_details": str(estimate)
},
"model_details": {
"estimand": str(identified_estimand),
"causalml_method": method,
"sample_size": int(len(data))
},
"data_processing": {
"missing_values": serialize_numpy_types({var: data[var].isnull().sum() for var in all_vars})
}
}
except Exception as e:
logger.error(f"DoWhy + CausalML 估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy + CausalML {method}",
"note": "需要安装 causalml: pip install causalml"
}