"""
DoWhy GCM (图形因果模型) 工具 - 100% 使用 DoWhy 原生 GCM 方法
"""
import logging
from typing import Any, Dict, List, Optional, Callable
import pandas as pd
import numpy as np
import networkx as nx
import dowhy.gcm as gcm
from mcp.server.fastmcp import FastMCP
from ..utils.data_processor import load_and_validate_data, serialize_numpy_types
logger = logging.getLogger("dowhy-mcp-server.gcm")
def _create_causal_graph(edges: List[Dict[str, str]]) -> nx.DiGraph:
"""从边列表创建因果图"""
graph = nx.DiGraph()
for edge in edges:
graph.add_edge(edge["from"], edge["to"])
return graph
def register_gcm_tools(server: FastMCP) -> None:
"""注册所有 DoWhy GCM 工具"""
@server.tool()
def structural_causal_model_builder(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 构建结构因果模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表,格式: [{"from": "X", "to": "Y"}]
variables: 变量列表
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 创建结构因果模型
causal_model = gcm.StructuralCausalModel(causal_graph)
return {
"success": True,
"method": "DoWhy GCM StructuralCausalModel",
"variables": variables,
"edges": graph_edges,
"model_info": {
"nodes": list(causal_graph.nodes()),
"edges": list(causal_graph.edges()),
"sample_size": len(data)
},
"message": "结构因果模型创建成功,需要进一步分配因果机制"
}
except Exception as e:
logger.error(f"DoWhy GCM 结构因果模型构建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM StructuralCausalModel"
}
@server.tool()
def assign_causal_mechanisms(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 自动分配因果机制
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量 ('GOOD', 'BETTER', 'BEST')
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图和模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.StructuralCausalModel(causal_graph)
# 设置质量级别
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
# 自动分配因果机制
assignment_summary = gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
return {
"success": True,
"method": "DoWhy GCM assign_causal_mechanisms",
"variables": variables,
"edges": graph_edges,
"quality": quality,
"assignment_summary": str(assignment_summary),
"model_info": {
"nodes": list(causal_graph.nodes()),
"edges": list(causal_graph.edges()),
"sample_size": len(data)
},
"message": "因果机制分配成功,模型已准备好进行拟合"
}
except Exception as e:
logger.error(f"DoWhy GCM 因果机制分配失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM assign_causal_mechanisms"
}
@server.tool()
def fit_gcm_model(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 拟合因果模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图和模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.StructuralCausalModel(causal_graph)
# 设置质量级别
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
# 自动分配因果机制
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
# 拟合模型
gcm.fit(causal_model, data)
return {
"success": True,
"method": "DoWhy GCM fit",
"variables": variables,
"edges": graph_edges,
"quality": quality,
"model_info": {
"nodes": list(causal_graph.nodes()),
"edges": list(causal_graph.edges()),
"sample_size": len(data),
"fitted": True
},
"message": "GCM模型拟合成功,可以进行推理分析"
}
except Exception as e:
logger.error(f"DoWhy GCM 模型拟合失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM fit"
}
@server.tool()
def draw_samples_from_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
num_samples: int = 1000,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 从拟合的模型中采样
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
num_samples: 采样数量
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.StructuralCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 从模型采样
generated_samples = gcm.draw_samples(causal_model, num_samples)
return {
"success": True,
"method": "DoWhy GCM draw_samples",
"variables": variables,
"edges": graph_edges,
"num_samples": num_samples,
"generated_data": {
"shape": generated_samples.shape,
"columns": list(generated_samples.columns),
"sample_statistics": serialize_numpy_types({
col: {
"mean": float(generated_samples[col].mean()),
"std": float(generated_samples[col].std()),
"min": float(generated_samples[col].min()),
"max": float(generated_samples[col].max())
} for col in generated_samples.columns
})
},
"message": f"成功从GCM模型生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 采样失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM draw_samples"
}
@server.tool()
def interventional_samples_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
interventions: Dict[str, Any],
num_samples: int = 1000,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 生成干预样本
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
interventions: 干预字典,如 {"X": 1.0, "Y": lambda x: x + 1}
num_samples: 采样数量
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.StructuralCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 处理干预函数
processed_interventions = {}
for var, intervention in interventions.items():
if isinstance(intervention, (int, float)):
# 常数干预
processed_interventions[var] = lambda x, val=intervention: val
else:
# 假设是函数形式的字符串,这里简化处理
processed_interventions[var] = lambda x, val=intervention: val
# 生成干预样本
interventional_data = gcm.interventional_samples(
causal_model,
processed_interventions,
num_samples_to_draw=num_samples
)
return {
"success": True,
"method": "DoWhy GCM interventional_samples",
"variables": variables,
"edges": graph_edges,
"interventions": interventions,
"num_samples": num_samples,
"interventional_data": {
"shape": interventional_data.shape,
"columns": list(interventional_data.columns),
"sample_statistics": serialize_numpy_types({
col: {
"mean": float(interventional_data[col].mean()),
"std": float(interventional_data[col].std()),
"min": float(interventional_data[col].min()),
"max": float(interventional_data[col].max())
} for col in interventional_data.columns
})
},
"message": f"成功生成{num_samples}个干预样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 干预采样失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM interventional_samples"
}
@server.tool()
def counterfactual_samples_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
interventions: Dict[str, Any],
observed_data_path: Optional[str] = None,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 生成反事实样本
Args:
data_path: 训练数据文件路径
graph_edges: 图边列表
variables: 变量列表
interventions: 干预字典
observed_data_path: 观测数据路径(用于反事实推理)
quality: 分配质量
"""
try:
# 加载训练数据
data = load_and_validate_data(data_path, variables)
# 加载观测数据(如果提供)
observed_data = None
if observed_data_path:
observed_data = load_and_validate_data(observed_data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph) # 反事实需要可逆模型
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 处理干预函数
processed_interventions = {}
for var, intervention in interventions.items():
if isinstance(intervention, (int, float)):
processed_interventions[var] = lambda x, val=intervention: val
else:
processed_interventions[var] = lambda x, val=intervention: val
# 生成反事实样本
counterfactual_data = gcm.counterfactual_samples(
causal_model,
processed_interventions,
observed_data=observed_data
)
return {
"success": True,
"method": "DoWhy GCM counterfactual_samples",
"variables": variables,
"edges": graph_edges,
"interventions": interventions,
"counterfactual_data": {
"shape": counterfactual_data.shape,
"columns": list(counterfactual_data.columns),
"sample_statistics": serialize_numpy_types({
col: {
"mean": float(counterfactual_data[col].mean()),
"std": float(counterfactual_data[col].std()),
"min": float(counterfactual_data[col].min()),
"max": float(counterfactual_data[col].max())
} for col in counterfactual_data.columns
})
},
"message": f"成功生成{len(counterfactual_data)}个反事实样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 反事实采样失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM counterfactual_samples"
}
@server.tool()
def attribute_anomalies_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
anomaly_data_path: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 进行异常归因分析
Args:
data_path: 训练数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
anomaly_data_path: 异常数据文件路径
quality: 分配质量
"""
try:
# 加载训练数据和异常数据
data = load_and_validate_data(data_path, variables)
anomaly_data = load_and_validate_data(anomaly_data_path, variables)
# 创建、分配和拟合可逆模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 进行异常归因
attributions = gcm.attribute_anomalies(
causal_model,
target_node,
anomaly_data
)
# 处理归因结果
attribution_results = {}
for node, scores in attributions.items():
attribution_results[node] = {
"mean_attribution": float(np.mean(scores)),
"std_attribution": float(np.std(scores)),
"min_attribution": float(np.min(scores)),
"max_attribution": float(np.max(scores)),
"scores": scores.tolist()
}
return {
"success": True,
"method": "DoWhy GCM attribute_anomalies",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"anomaly_samples": len(anomaly_data),
"attributions": serialize_numpy_types(attribution_results),
"message": f"成功完成{target_node}的异常归因分析"
}
except Exception as e:
logger.error(f"DoWhy GCM 异常归因失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM attribute_anomalies"
}
@server.tool()
def anomaly_scores_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
anomaly_data_path: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 计算异常分数
Args:
data_path: 训练数据文件路径
graph_edges: 图边列表
variables: 变量列表
anomaly_data_path: 异常数据文件路径
quality: 分配质量
"""
try:
# 加载训练数据和异常数据
data = load_and_validate_data(data_path, variables)
anomaly_data = load_and_validate_data(anomaly_data_path, variables)
# 创建、分配和拟合概率模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 计算异常分数
scores = gcm.anomaly_scores(causal_model, anomaly_data)
# 处理分数结果
score_results = {}
for node, node_scores in scores.items():
score_results[node] = {
"mean_score": float(np.mean(node_scores)),
"std_score": float(np.std(node_scores)),
"min_score": float(np.min(node_scores)),
"max_score": float(np.max(node_scores)),
"scores": node_scores.tolist()
}
return {
"success": True,
"method": "DoWhy GCM anomaly_scores",
"variables": variables,
"edges": graph_edges,
"anomaly_samples": len(anomaly_data),
"anomaly_scores": serialize_numpy_types(score_results),
"message": f"成功计算{len(anomaly_data)}个样本的异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 异常分数计算失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM anomaly_scores"
}
@server.tool()
def arrow_strength_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 计算箭头强度(因果影响强度)
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 计算所有边的箭头强度
strengths = gcm.arrow_strength(causal_model, data)
# 处理结果
strength_results = {}
for (source, target), strength in strengths.items():
edge_key = f"{source} -> {target}"
strength_results[edge_key] = float(strength)
return {
"success": True,
"method": "DoWhy GCM arrow_strength",
"variables": variables,
"edges": graph_edges,
"arrow_strengths": serialize_numpy_types(strength_results),
"sample_size": len(data),
"message": f"成功计算{len(strength_results)}条边的箭头强度"
}
except Exception as e:
logger.error(f"DoWhy GCM 箭头强度计算失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM arrow_strength"
}
@server.tool()
def intrinsic_causal_influence_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 计算内在因果影响
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 计算内在因果影响
influences = gcm.intrinsic_causal_influence(
causal_model,
target_node,
data
)
# 处理结果
influence_results = {}
for node, influence in influences.items():
influence_results[node] = {
"mean_influence": float(np.mean(influence)),
"std_influence": float(np.std(influence)),
"min_influence": float(np.min(influence)),
"max_influence": float(np.max(influence))
}
return {
"success": True,
"method": "DoWhy GCM intrinsic_causal_influence",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"causal_influences": serialize_numpy_types(influence_results),
"sample_size": len(data),
"message": f"成功计算对{target_node}的内在因果影响"
}
except Exception as e:
logger.error(f"DoWhy GCM 内在因果影响计算失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM intrinsic_causal_influence"
}
@server.tool()
def evaluate_causal_model_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 评估因果模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 评估因果模型
evaluation_result = gcm.evaluate_causal_model(causal_model, data)
# 处理评估结果
evaluation_summary = {
"overall_kl_divergence": float(evaluation_result.overall_kl_divergence),
"mechanism_performances": {},
"graph_falsification": str(evaluation_result.graph_falsification),
"pnl_assumptions": str(evaluation_result.pnl_assumptions)
}
# 处理机制性能结果
for node, performance in evaluation_result.mechanism_performances.items():
evaluation_summary["mechanism_performances"][node] = {
"performance_score": float(performance) if isinstance(performance, (int, float)) else str(performance)
}
return {
"success": True,
"method": "DoWhy GCM evaluate_causal_model",
"variables": variables,
"edges": graph_edges,
"evaluation_results": serialize_numpy_types(evaluation_summary),
"sample_size": len(data),
"message": "因果模型评估完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 模型评估失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM evaluate_causal_model"
}
@server.tool()
def falsify_graph_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 验证图结构
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 验证图结构
falsification_result = gcm.falsify_graph(causal_model, data)
# 处理验证结果
falsification_summary = {
"significance_level": float(falsification_result.significance_level),
"summary": str(falsification_result.summary),
"suggestions": list(falsification_result.suggestions) if hasattr(falsification_result, 'suggestions') else []
}
return {
"success": True,
"method": "DoWhy GCM falsify_graph",
"variables": variables,
"edges": graph_edges,
"falsification_results": serialize_numpy_types(falsification_summary),
"sample_size": len(data),
"message": "图结构验证完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 图结构验证失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM falsify_graph"
}
@server.tool()
def distribution_change_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
new_data_path: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 分析分布变化
Args:
data_path: 原始数据文件路径
graph_edges: 图边列表
variables: 变量列表
new_data_path: 新数据文件路径
quality: 分配质量
"""
try:
# 加载原始数据和新数据
original_data = load_and_validate_data(data_path, variables)
new_data = load_and_validate_data(new_data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
original_data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, original_data)
# 分析分布变化
change_scores = gcm.distribution_change(
causal_model,
original_data,
new_data
)
# 处理变化分数结果
change_results = {}
for node, score in change_scores.items():
change_results[node] = float(score)
return {
"success": True,
"method": "DoWhy GCM distribution_change",
"variables": variables,
"edges": graph_edges,
"original_samples": len(original_data),
"new_samples": len(new_data),
"distribution_changes": serialize_numpy_types(change_results),
"message": "分布变化分析完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 分布变化分析失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM distribution_change"
}
@server.tool()
def mechanism_change_test_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
new_data_path: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 测试机制变化
Args:
data_path: 原始数据文件路径
graph_edges: 图边列表
variables: 变量列表
new_data_path: 新数据文件路径
quality: 分配质量
"""
try:
# 加载原始数据和新数据
original_data = load_and_validate_data(data_path, variables)
new_data = load_and_validate_data(new_data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
original_data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, original_data)
# 测试机制变化
test_results = gcm.mechanism_change_test(
causal_model,
original_data,
new_data
)
# 处理测试结果
mechanism_results = {}
for node, result in test_results.items():
if hasattr(result, 'p_value'):
mechanism_results[node] = {
"p_value": float(result.p_value),
"test_statistic": float(result.test_statistic) if hasattr(result, 'test_statistic') else None,
"significant": result.p_value < 0.05
}
else:
mechanism_results[node] = {
"result": str(result),
"significant": None
}
return {
"success": True,
"method": "DoWhy GCM mechanism_change_test",
"variables": variables,
"edges": graph_edges,
"original_samples": len(original_data),
"new_samples": len(new_data),
"mechanism_test_results": serialize_numpy_types(mechanism_results),
"message": "机制变化测试完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 机制变化测试失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM mechanism_change_test"
}
@server.tool()
def estimate_shapley_values_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
num_samples: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 估计Shapley值
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
num_samples: 用于Shapley估计的样本数
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 定义评分函数(这里使用简单的预测函数)
def scoring_function(subset_X):
if len(subset_X) == 0:
return np.zeros(len(y))
# 简单的线性组合作为评分
return np.mean(subset_X, axis=1) if subset_X.ndim > 1 else subset_X
# 估计Shapley值
shapley_values = gcm.estimate_shapley_values(
scoring_function,
X,
num_samples=num_samples
)
# 处理Shapley值结果
shapley_results = {}
for i, feature in enumerate(feature_variables):
if i < len(shapley_values):
shapley_results[feature] = {
"mean_shapley": float(np.mean(shapley_values[i])),
"std_shapley": float(np.std(shapley_values[i])),
"min_shapley": float(np.min(shapley_values[i])),
"max_shapley": float(np.max(shapley_values[i]))
}
return {
"success": True,
"method": "DoWhy GCM estimate_shapley_values",
"target_variable": target_variable,
"feature_variables": feature_variables,
"num_samples": num_samples,
"shapley_values": serialize_numpy_types(shapley_results),
"sample_size": len(data),
"message": f"成功估计{target_variable}的Shapley值"
}
except Exception as e:
logger.error(f"DoWhy GCM Shapley值估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM estimate_shapley_values"
}
@server.tool()
def unit_change_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 分析单位变化
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 分析单位变化
unit_changes = gcm.unit_change(
causal_model,
target_node,
data
)
# 处理单位变化结果
change_results = {}
for node, change in unit_changes.items():
change_results[node] = {
"mean_change": float(np.mean(change)),
"std_change": float(np.std(change)),
"min_change": float(np.min(change)),
"max_change": float(np.max(change))
}
return {
"success": True,
"method": "DoWhy GCM unit_change",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"unit_changes": serialize_numpy_types(change_results),
"sample_size": len(data),
"message": f"成功分析{target_node}的单位变化"
}
except Exception as e:
logger.error(f"DoWhy GCM 单位变化分析失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM unit_change"
}
@server.tool()
def refute_causal_structure_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 反驳因果结构
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 反驳因果结构
refutation_result = gcm.refute_causal_structure(
causal_graph,
data
)
# 处理反驳结果
refutation_summary = {
"is_valid": refutation_result[0] if isinstance(refutation_result, tuple) else str(refutation_result),
"test_results": refutation_result[1] if isinstance(refutation_result, tuple) and len(refutation_result) > 1 else {}
}
return {
"success": True,
"method": "DoWhy GCM refute_causal_structure",
"variables": variables,
"edges": graph_edges,
"refutation_results": serialize_numpy_types(refutation_summary),
"sample_size": len(data),
"message": "因果结构反驳测试完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 因果结构反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM refute_causal_structure"
}
@server.tool()
def refute_invertible_model_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 反驳可逆模型假设
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合可逆模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 反驳可逆模型假设
refutation_result = gcm.refute_invertible_model(
causal_model,
data
)
# 处理反驳结果
refutation_summary = {
"rejection_result": str(refutation_result),
"model_valid": refutation_result != gcm.RejectionResult.REJECTED if hasattr(gcm, 'RejectionResult') else True
}
return {
"success": True,
"method": "DoWhy GCM refute_invertible_model",
"variables": variables,
"edges": graph_edges,
"refutation_results": serialize_numpy_types(refutation_summary),
"sample_size": len(data),
"message": "可逆模型假设反驳测试完成"
}
except Exception as e:
logger.error(f"DoWhy GCM 可逆模型反驳失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM refute_invertible_model"
}
@server.tool()
def confidence_intervals_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
confidence_level: float = 0.95,
num_bootstrap_samples: int = 100,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 计算置信区间
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
confidence_level: 置信水平
num_bootstrap_samples: 自举样本数
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 定义估计函数(这里使用简单的均值估计)
def estimation_function(data_sample):
return np.mean(data_sample[target_node])
# 计算置信区间
confidence_interval = gcm.confidence_intervals(
estimation_function,
data,
confidence_level=confidence_level,
num_bootstrap_samples=num_bootstrap_samples
)
# 处理置信区间结果
ci_results = {
"confidence_level": confidence_level,
"lower_bound": float(confidence_interval[0]),
"upper_bound": float(confidence_interval[1]),
"interval_width": float(confidence_interval[1] - confidence_interval[0]),
"point_estimate": float(estimation_function(data))
}
return {
"success": True,
"method": "DoWhy GCM confidence_intervals",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"confidence_intervals": serialize_numpy_types(ci_results),
"num_bootstrap_samples": num_bootstrap_samples,
"sample_size": len(data),
"message": f"成功计算{target_node}的{confidence_level*100}%置信区间"
}
except Exception as e:
logger.error(f"DoWhy GCM 置信区间计算失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM confidence_intervals"
}
@server.tool()
def feature_relevance_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 分析特征相关性
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 分析父节点相关性
parent_relevance = gcm.parent_relevance(
causal_model,
target_node,
data
)
# 处理相关性结果
relevance_results = {}
for parent, relevance in parent_relevance.items():
relevance_results[parent] = {
"mean_relevance": float(np.mean(relevance)),
"std_relevance": float(np.std(relevance)),
"min_relevance": float(np.min(relevance)),
"max_relevance": float(np.max(relevance))
}
return {
"success": True,
"method": "DoWhy GCM parent_relevance",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"parent_relevance": serialize_numpy_types(relevance_results),
"sample_size": len(data),
"message": f"成功分析{target_node}的父节点相关性"
}
except Exception as e:
logger.error(f"DoWhy GCM 特征相关性分析失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM parent_relevance"
}
@server.tool()
def estimate_entropy_gcm(
data_path: str,
variable: str,
method: str = "gaussian"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 估计熵
Args:
data_path: 数据文件路径
variable: 变量名
method: 估计方法 ('gaussian', 'discrete', 'kmeans')
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 根据方法估计熵
if method == "gaussian":
entropy = gcm.estimate_gaussian_entropy(variable_data)
elif method == "discrete":
entropy = gcm.estimate_entropy_discrete(variable_data)
elif method == "kmeans":
entropy = gcm.estimate_entropy_kmeans(variable_data)
else:
raise ValueError(f"不支持的熵估计方法: {method}")
return {
"success": True,
"method": f"DoWhy GCM estimate_{method}_entropy",
"variable": variable,
"entropy": float(entropy),
"estimation_method": method,
"sample_size": len(data),
"message": f"成功估计{variable}的{method}熵"
}
except Exception as e:
logger.error(f"DoWhy GCM 熵估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy GCM estimate_{method}_entropy"
}
@server.tool()
def estimate_variance_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 估计方差
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 估计方差
variance = gcm.estimate_variance(variable_data)
return {
"success": True,
"method": "DoWhy GCM estimate_variance",
"variable": variable,
"variance": float(variance),
"standard_deviation": float(np.sqrt(variance)),
"sample_size": len(data),
"message": f"成功估计{variable}的方差"
}
except Exception as e:
logger.error(f"DoWhy GCM 方差估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM estimate_variance"
}
@server.tool()
def auto_estimate_kl_divergence_gcm(
data_path: str,
variable1: str,
variable2: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 自动估计KL散度
Args:
data_path: 数据文件路径
variable1: 第一个变量名
variable2: 第二个变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable1, variable2])
data1 = data[variable1].values
data2 = data[variable2].values
# 自动估计KL散度
kl_divergence = gcm.auto_estimate_kl_divergence(data1, data2)
return {
"success": True,
"method": "DoWhy GCM auto_estimate_kl_divergence",
"variable1": variable1,
"variable2": variable2,
"kl_divergence": float(kl_divergence),
"sample_size": len(data),
"message": f"成功估计{variable1}和{variable2}之间的KL散度"
}
except Exception as e:
logger.error(f"DoWhy GCM KL散度估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM auto_estimate_kl_divergence"
}
@server.tool()
def average_causal_effect_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
interventions_alternative: Dict[str, Any],
interventions_reference: Dict[str, Any],
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 估计平均因果效应
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
interventions_alternative: 替代干预
interventions_reference: 参考干预
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建、分配和拟合模型
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.ProbabilisticCausalModel(causal_graph)
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
gcm.assign_causal_mechanisms(
causal_model,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
gcm.fit(causal_model, data)
# 处理干预函数
def process_interventions(interventions):
processed = {}
for var, intervention in interventions.items():
if isinstance(intervention, (int, float)):
processed[var] = lambda x, val=intervention: val
else:
processed[var] = lambda x, val=intervention: val
return processed
alt_interventions = process_interventions(interventions_alternative)
ref_interventions = process_interventions(interventions_reference)
# 估计平均因果效应
ace = gcm.average_causal_effect(
causal_model,
target_node,
alt_interventions,
ref_interventions,
observed_data=data
)
return {
"success": True,
"method": "DoWhy GCM average_causal_effect",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"interventions_alternative": interventions_alternative,
"interventions_reference": interventions_reference,
"average_causal_effect": float(ace),
"sample_size": len(data),
"message": f"成功估计{target_node}的平均因果效应"
}
except Exception as e:
logger.error(f"DoWhy GCM 平均因果效应估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM average_causal_effect"
}
@server.tool()
def gaussian_mixture_density_estimation_gcm(
data_path: str,
variable: str,
n_components: int = 3
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 高斯混合密度估计
Args:
data_path: 数据文件路径
variable: 变量名
n_components: 高斯组件数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values.reshape(-1, 1)
# 创建高斯混合密度估计器
density_estimator = gcm.GaussianMixtureDensityEstimator(n_components=n_components)
density_estimator.fit(variable_data)
# 计算密度值
density_values = density_estimator.density(variable_data)
return {
"success": True,
"method": "DoWhy GCM GaussianMixtureDensityEstimator",
"variable": variable,
"n_components": n_components,
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功完成{variable}的高斯混合密度估计"
}
except Exception as e:
logger.error(f"DoWhy GCM 高斯混合密度估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM GaussianMixtureDensityEstimator"
}
@server.tool()
def kernel_density_estimation_gcm(
data_path: str,
variable: str,
bandwidth: Optional[float] = None
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 核密度估计
Args:
data_path: 数据文件路径
variable: 变量名
bandwidth: 带宽参数
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建核密度估计器
density_estimator = gcm.KernelDensityEstimator1D(bandwidth=bandwidth)
density_estimator.fit(variable_data)
# 计算密度值
density_values = density_estimator.density(variable_data)
return {
"success": True,
"method": "DoWhy GCM KernelDensityEstimator1D",
"variable": variable,
"bandwidth": bandwidth,
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功完成{variable}的核密度估计"
}
except Exception as e:
logger.error(f"DoWhy GCM 核密度估计失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM KernelDensityEstimator1D"
}
@server.tool()
def empirical_distribution_gcm(
data_path: str,
variable: str,
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 经验分布建模
Args:
data_path: 数据文件路径
variable: 变量名
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建经验分布
empirical_dist = gcm.EmpiricalDistribution()
empirical_dist.fit(variable_data)
# 从经验分布采样
samples = empirical_dist.draw_samples(num_samples)
return {
"success": True,
"method": "DoWhy GCM EmpiricalDistribution",
"variable": variable,
"original_samples": len(data),
"generated_samples": num_samples,
"sample_statistics": {
"mean": float(np.mean(samples)),
"std": float(np.std(samples)),
"min": float(np.min(samples)),
"max": float(np.max(samples))
},
"original_statistics": {
"mean": float(np.mean(variable_data)),
"std": float(np.std(variable_data)),
"min": float(np.min(variable_data)),
"max": float(np.max(variable_data))
},
"message": f"成功创建{variable}的经验分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 经验分布建模失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM EmpiricalDistribution"
}
@server.tool()
def scipy_distribution_gcm(
data_path: str,
variable: str,
distribution_name: str = "norm",
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM Scipy分布建模
Args:
data_path: 数据文件路径
variable: 变量名
distribution_name: 分布名称 ('norm', 'gamma', 'beta', etc.)
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建Scipy分布
scipy_dist = gcm.ScipyDistribution(distribution_name)
scipy_dist.fit(variable_data)
# 从分布采样
samples = scipy_dist.draw_samples(num_samples)
# 获取分布参数
parameters = scipy_dist.parameters if hasattr(scipy_dist, 'parameters') else {}
return {
"success": True,
"method": "DoWhy GCM ScipyDistribution",
"variable": variable,
"distribution_name": distribution_name,
"distribution_parameters": serialize_numpy_types(parameters),
"original_samples": len(data),
"generated_samples": num_samples,
"sample_statistics": {
"mean": float(np.mean(samples)),
"std": float(np.std(samples)),
"min": float(np.min(samples)),
"max": float(np.max(samples))
},
"original_statistics": {
"mean": float(np.mean(variable_data)),
"std": float(np.std(variable_data)),
"min": float(np.min(variable_data)),
"max": float(np.max(variable_data))
},
"message": f"成功拟合{variable}的{distribution_name}分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM Scipy分布建模失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM ScipyDistribution"
}
@server.tool()
def additive_noise_model_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建加性噪声模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
# 为目标节点创建加性噪声模型
from sklearn.linear_model import LinearRegression
anm = gcm.AdditiveNoiseModel(
prediction_model=gcm.SklearnRegressionModel(LinearRegression())
)
causal_model.set_causal_mechanism(target_node, anm)
# 为其他节点分配机制
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
for node in causal_graph.nodes():
if node != target_node and not causal_model.causal_mechanism(node):
gcm.assign_causal_mechanism_node(
causal_model,
node,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
# 拟合模型
gcm.fit(causal_model, data)
# 估计噪声
noise = anm.estimate_noise(
data[list(causal_graph.predecessors(target_node))].values if list(causal_graph.predecessors(target_node)) else np.array([]).reshape(len(data), 0),
data[target_node].values
)
return {
"success": True,
"method": "DoWhy GCM AdditiveNoiseModel",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"noise_statistics": {
"mean_noise": float(np.mean(noise)),
"std_noise": float(np.std(noise)),
"min_noise": float(np.min(noise)),
"max_noise": float(np.max(noise))
},
"sample_size": len(data),
"message": f"成功为{target_node}创建加性噪声模型"
}
except Exception as e:
logger.error(f"DoWhy GCM 加性噪声模型创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM AdditiveNoiseModel"
}
@server.tool()
def post_nonlinear_model_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建后非线性模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
# 为目标节点创建后非线性模型
from sklearn.linear_model import LinearRegression
pnm = gcm.PostNonlinearModel(
prediction_model=gcm.SklearnRegressionModel(LinearRegression()),
invertible_function=gcm.InvertibleIdentityFunction()
)
causal_model.set_causal_mechanism(target_node, pnm)
# 为其他节点分配机制
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
for node in causal_graph.nodes():
if node != target_node and not causal_model.causal_mechanism(node):
gcm.assign_causal_mechanism_node(
causal_model,
node,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
# 拟合模型
gcm.fit(causal_model, data)
# 估计噪声
noise = pnm.estimate_noise(
data[list(causal_graph.predecessors(target_node))].values if list(causal_graph.predecessors(target_node)) else np.array([]).reshape(len(data), 0),
data[target_node].values
)
return {
"success": True,
"method": "DoWhy GCM PostNonlinearModel",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"noise_statistics": {
"mean_noise": float(np.mean(noise)),
"std_noise": float(np.std(noise)),
"min_noise": float(np.min(noise)),
"max_noise": float(np.max(noise))
},
"sample_size": len(data),
"message": f"成功为{target_node}创建后非线性模型"
}
except Exception as e:
logger.error(f"DoWhy GCM 后非线性模型创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM PostNonlinearModel"
}
@server.tool()
def classifier_fcm_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建分类器功能因果模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点(分类变量)
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.StructuralCausalModel(causal_graph)
# 为目标节点创建分类器FCM
from sklearn.ensemble import RandomForestClassifier
classifier_fcm = gcm.ClassifierFCM(
classifier_model=gcm.SklearnClassificationModel(RandomForestClassifier())
)
causal_model.set_causal_mechanism(target_node, classifier_fcm)
# 为其他节点分配机制
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
for node in causal_graph.nodes():
if node != target_node:
gcm.assign_causal_mechanism_node(
causal_model,
node,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
# 拟合模型
gcm.fit(causal_model, data)
# 获取类别信息
class_names = classifier_fcm.get_class_names()
return {
"success": True,
"method": "DoWhy GCM ClassifierFCM",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"class_names": list(class_names) if class_names is not None else [],
"sample_size": len(data),
"message": f"成功为{target_node}创建分类器功能因果模型"
}
except Exception as e:
logger.error(f"DoWhy GCM 分类器FCM创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM ClassifierFCM"
}
@server.tool()
def discrete_additive_noise_model_gcm(
data_path: str,
graph_edges: List[Dict[str, str]],
variables: List[str],
target_node: str,
quality: str = "GOOD"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建离散加性噪声模型
Args:
data_path: 数据文件路径
graph_edges: 图边列表
variables: 变量列表
target_node: 目标节点(离散变量)
quality: 分配质量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
causal_model = gcm.InvertibleStructuralCausalModel(causal_graph)
# 为目标节点创建离散加性噪声模型
from sklearn.linear_model import LinearRegression
danm = gcm.DiscreteAdditiveNoiseModel(
prediction_model=gcm.SklearnRegressionModel(LinearRegression())
)
causal_model.set_causal_mechanism(target_node, danm)
# 为其他节点分配机制
quality_mapping = {
"GOOD": gcm.AssignmentQuality.GOOD,
"BETTER": gcm.AssignmentQuality.BETTER,
"BEST": gcm.AssignmentQuality.BEST
}
for node in causal_graph.nodes():
if node != target_node:
gcm.assign_causal_mechanism_node(
causal_model,
node,
data,
quality=quality_mapping.get(quality, gcm.AssignmentQuality.GOOD)
)
# 拟合模型
gcm.fit(causal_model, data)
# 估计噪声
noise = danm.estimate_noise(
data[list(causal_graph.predecessors(target_node))].values if list(causal_graph.predecessors(target_node)) else np.array([]).reshape(len(data), 0),
data[target_node].values
)
return {
"success": True,
"method": "DoWhy GCM DiscreteAdditiveNoiseModel",
"variables": variables,
"edges": graph_edges,
"target_node": target_node,
"noise_statistics": {
"unique_values": len(np.unique(noise)),
"most_common": float(np.bincount(noise.astype(int)).argmax()) if noise.dtype.kind in 'iu' else "N/A"
},
"sample_size": len(data),
"message": f"成功为{target_node}创建离散加性噪声模型"
}
except Exception as e:
logger.error(f"DoWhy GCM 离散加性噪声模型创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM DiscreteAdditiveNoiseModel"
}
@server.tool()
def bayesian_gaussian_mixture_gcm(
data_path: str,
variable: str,
n_components: int = 3,
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 贝叶斯高斯混合分布
Args:
data_path: 数据文件路径
variable: 变量名
n_components: 高斯组件数量
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建贝叶斯高斯混合分布
bgm_dist = gcm.BayesianGaussianMixtureDistribution(n_components=n_components)
bgm_dist.fit(variable_data)
# 从分布采样
samples = bgm_dist.draw_samples(num_samples)
return {
"success": True,
"method": "DoWhy GCM BayesianGaussianMixtureDistribution",
"variable": variable,
"n_components": n_components,
"original_samples": len(data),
"generated_samples": num_samples,
"sample_statistics": {
"mean": float(np.mean(samples)),
"std": float(np.std(samples)),
"min": float(np.min(samples)),
"max": float(np.max(samples))
},
"original_statistics": {
"mean": float(np.mean(variable_data)),
"std": float(np.std(variable_data)),
"min": float(np.min(variable_data)),
"max": float(np.max(variable_data))
},
"message": f"成功创建{variable}的贝叶斯高斯混合分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 贝叶斯高斯混合分布失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM BayesianGaussianMixtureDistribution"
}
@server.tool()
def independence_test_gcm(
data_path: str,
variable1: str,
variable2: str,
conditioning_variables: Optional[List[str]] = None,
test_method: str = "kernel"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 独立性检验
Args:
data_path: 数据文件路径
variable1: 第一个变量名
variable2: 第二个变量名
conditioning_variables: 条件变量列表
test_method: 检验方法 ('kernel', 'regression')
"""
try:
# 加载数据
all_vars = [variable1, variable2]
if conditioning_variables:
all_vars.extend(conditioning_variables)
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[variable1].values
Y = data[variable2].values
Z = data[conditioning_variables].values if conditioning_variables else None
# 选择独立性检验方法
if test_method == "kernel":
if Z is not None:
p_value = gcm.independence_test(X, Y, Z)
else:
p_value = gcm.independence_test(X, Y)
elif test_method == "regression":
from dowhy.gcm.independence_test import regression_based
if Z is not None:
p_value = regression_based(X, Y, Z)
else:
p_value = regression_based(X, Y)
else:
raise ValueError(f"不支持的检验方法: {test_method}")
# 判断独立性
is_independent = p_value > 0.05
return {
"success": True,
"method": f"DoWhy GCM independence_test ({test_method})",
"variable1": variable1,
"variable2": variable2,
"conditioning_variables": conditioning_variables or [],
"test_method": test_method,
"p_value": float(p_value),
"is_independent": is_independent,
"significance_level": 0.05,
"sample_size": len(data),
"message": f"{variable1}和{variable2}{'独立' if is_independent else '不独立'}(p={p_value:.4f})"
}
except Exception as e:
logger.error(f"DoWhy GCM 独立性检验失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": f"DoWhy GCM independence_test ({test_method})"
}
@server.tool()
def create_linear_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
coefficients: Optional[List[float]] = None
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建线性回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
coefficients: 指定的系数(可选)
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建线性回归器
if coefficients:
# 使用指定系数的线性回归器
linear_regressor = gcm.create_linear_regressor_with_given_parameters(coefficients)
else:
# 使用标准线性回归器
linear_regressor = gcm.create_linear_regressor()
# 拟合模型
linear_regressor.fit(X, y)
# 进行预测
predictions = linear_regressor.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_linear_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"specified_coefficients": coefficients,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"prediction_statistics": {
"mean_prediction": float(np.mean(predictions)),
"std_prediction": float(np.std(predictions)),
"min_prediction": float(np.min(predictions)),
"max_prediction": float(np.max(predictions))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的线性回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 线性回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_linear_regressor"
}
@server.tool()
def create_random_forest_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_estimators: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建随机森林回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
n_estimators: 树的数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建随机森林回归器
rf_regressor = gcm.create_random_forest_regressor(n_estimators=n_estimators)
# 拟合模型
rf_regressor.fit(X, y)
# 进行预测
predictions = rf_regressor.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_random_forest_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_estimators": n_estimators,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"prediction_statistics": {
"mean_prediction": float(np.mean(predictions)),
"std_prediction": float(np.std(predictions)),
"min_prediction": float(np.min(predictions)),
"max_prediction": float(np.max(predictions))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的随机森林回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 随机森林回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_random_forest_regressor"
}
@server.tool()
def create_logistic_regression_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建逻辑回归分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建逻辑回归分类器
lr_classifier = gcm.create_logistic_regression_classifier()
# 拟合模型
lr_classifier.fit(X, y)
# 进行预测
predictions = lr_classifier.predict(X)
probabilities = lr_classifier.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = lr_classifier.classes
return {
"success": True,
"method": "DoWhy GCM create_logistic_regression_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的逻辑回归分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM 逻辑回归分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_logistic_regression_classifier"
}
@server.tool()
def geometric_median_gcm(
data_path: str,
variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 计算几何中位数
Args:
data_path: 数据文件路径
variables: 变量列表
"""
try:
# 加载数据
data = load_and_validate_data(data_path, variables)
data_matrix = data[variables].values
# 计算几何中位数
geometric_median = gcm.geometric_median(data_matrix)
# 计算到几何中位数的距离统计
distances = np.linalg.norm(data_matrix - geometric_median, axis=1)
return {
"success": True,
"method": "DoWhy GCM geometric_median",
"variables": variables,
"geometric_median": geometric_median.tolist(),
"distance_statistics": {
"mean_distance": float(np.mean(distances)),
"std_distance": float(np.std(distances)),
"min_distance": float(np.min(distances)),
"max_distance": float(np.max(distances))
},
"sample_size": len(data),
"message": f"成功计算{len(variables)}维数据的几何中位数"
}
except Exception as e:
logger.error(f"DoWhy GCM 几何中位数计算失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM geometric_median"
}
@server.tool()
def set_random_seed_gcm(
seed: int = 42
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 设置随机种子
Args:
seed: 随机种子值
"""
try:
# 设置随机种子
gcm.set_random_seed(seed)
return {
"success": True,
"method": "DoWhy GCM set_random_seed",
"seed": seed,
"message": f"成功设置随机种子为{seed}"
}
except Exception as e:
logger.error(f"DoWhy GCM 随机种子设置失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM set_random_seed"
}
@server.tool()
def create_polynom_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
degree: int = 2
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建多项式回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
degree: 多项式度数
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建多项式回归器
poly_regressor = gcm.create_polynom_regressor(degree=degree)
# 拟合模型
poly_regressor.fit(X, y)
# 进行预测
predictions = poly_regressor.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_polynom_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"degree": degree,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"prediction_statistics": {
"mean_prediction": float(np.mean(predictions)),
"std_prediction": float(np.std(predictions)),
"min_prediction": float(np.min(predictions)),
"max_prediction": float(np.max(predictions))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的{degree}次多项式回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 多项式回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_polynom_regressor"
}
@server.tool()
def create_random_forest_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_estimators: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建随机森林分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
n_estimators: 树的数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建随机森林分类器
rf_classifier = gcm.create_random_forest_classifier(n_estimators=n_estimators)
# 拟合模型
rf_classifier.fit(X, y)
# 进行预测
predictions = rf_classifier.predict(X)
probabilities = rf_classifier.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = rf_classifier.classes
return {
"success": True,
"method": "DoWhy GCM create_random_forest_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_estimators": n_estimators,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的随机森林分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM 随机森林分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_random_forest_classifier"
}
@server.tool()
def create_histogram_density_estimator_gcm(
data_path: str,
variable: str,
num_bins: int = 50
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建直方图密度估计器
Args:
data_path: 数据文件路径
variable: 变量名
num_bins: 直方图箱数
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建直方图密度估计器
hist_estimator = gcm.create_histogram_density_estimator(num_bins=num_bins)
hist_estimator.fit(variable_data)
# 计算密度值
density_values = hist_estimator.density(variable_data)
return {
"success": True,
"method": "DoWhy GCM create_histogram_density_estimator",
"variable": variable,
"num_bins": num_bins,
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功创建{variable}的直方图密度估计器({num_bins}个箱)"
}
except Exception as e:
logger.error(f"DoWhy GCM 直方图密度估计器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_histogram_density_estimator"
}
@server.tool()
def mean_deviation_scorer_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 均值偏差异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建均值偏差评分器
scorer = gcm.MeanDeviationScorer()
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM MeanDeviationScorer",
"variable": variable,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的均值偏差异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 均值偏差评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM MeanDeviationScorer"
}
@server.tool()
def median_deviation_scorer_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 中位数偏差异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建中位数偏差评分器
scorer = gcm.MedianDeviationScorer()
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM MedianDeviationScorer",
"variable": variable,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的中位数偏差异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 中位数偏差评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM MedianDeviationScorer"
}
@server.tool()
def inverse_density_scorer_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 逆密度异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values.reshape(-1, 1)
# 创建逆密度评分器
scorer = gcm.InverseDensityScorer()
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM InverseDensityScorer",
"variable": variable,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的逆密度异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 逆密度评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM InverseDensityScorer"
}
@server.tool()
def create_support_vector_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
kernel: str = "rbf"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建支持向量回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
kernel: 核函数类型
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建支持向量回归器
svr = gcm.create_support_vector_regressor(kernel=kernel)
# 拟合模型
svr.fit(X, y)
# 进行预测
predictions = svr.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_support_vector_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"kernel": kernel,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的支持向量回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 支持向量回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_support_vector_regressor"
}
@server.tool()
def create_gaussian_process_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建高斯过程回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建高斯过程回归器
gpr = gcm.create_gaussian_process_regressor()
# 拟合模型
gpr.fit(X, y)
# 进行预测
predictions = gpr.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_gaussian_process_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的高斯过程回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 高斯过程回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_gaussian_process_regressor"
}
@server.tool()
def create_extra_trees_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_estimators: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建极端随机树回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
n_estimators: 树的数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建极端随机树回归器
etr = gcm.create_extra_trees_regressor(n_estimators=n_estimators)
# 拟合模型
etr.fit(X, y)
# 进行预测
predictions = etr.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_extra_trees_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_estimators": n_estimators,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的极端随机树回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 极端随机树回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_extra_trees_regressor"
}
@server.tool()
def create_lasso_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
alpha: float = 1.0
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建Lasso回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
alpha: 正则化强度
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建Lasso回归器
lasso = gcm.create_lasso_regressor(alpha=alpha)
# 拟合模型
lasso.fit(X, y)
# 进行预测
predictions = lasso.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_lasso_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"alpha": alpha,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的Lasso回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM Lasso回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_lasso_regressor"
}
@server.tool()
def create_ridge_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
alpha: float = 1.0
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建Ridge回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
alpha: 正则化强度
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建Ridge回归器
ridge = gcm.create_ridge_regressor(alpha=alpha)
# 拟合模型
ridge.fit(X, y)
# 进行预测
predictions = ridge.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_ridge_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"alpha": alpha,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的Ridge回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM Ridge回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_ridge_regressor"
}
@server.tool()
def create_elastic_net_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
alpha: float = 1.0,
l1_ratio: float = 0.5
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建弹性网络回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
alpha: 正则化强度
l1_ratio: L1正则化比例
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建弹性网络回归器
elastic_net = gcm.create_elastic_net_regressor(alpha=alpha, l1_ratio=l1_ratio)
# 拟合模型
elastic_net.fit(X, y)
# 进行预测
predictions = elastic_net.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_elastic_net_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"alpha": alpha,
"l1_ratio": l1_ratio,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的弹性网络回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 弹性网络回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_elastic_net_regressor"
}
@server.tool()
def create_support_vector_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
kernel: str = "rbf"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建支持向量分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
kernel: 核函数类型
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建支持向量分类器
svc = gcm.create_support_vector_classifier(kernel=kernel)
# 拟合模型
svc.fit(X, y)
# 进行预测
predictions = svc.predict(X)
probabilities = svc.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = svc.classes
return {
"success": True,
"method": "DoWhy GCM create_support_vector_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"kernel": kernel,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的支持向量分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM 支持向量分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_support_vector_classifier"
}
@server.tool()
def create_gaussian_nb_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建高斯朴素贝叶斯分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建高斯朴素贝叶斯分类器
gnb = gcm.create_gaussian_nb_classifier()
# 拟合模型
gnb.fit(X, y)
# 进行预测
predictions = gnb.predict(X)
probabilities = gnb.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = gnb.classes
return {
"success": True,
"method": "DoWhy GCM create_gaussian_nb_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的高斯朴素贝叶斯分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM 高斯朴素贝叶斯分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_gaussian_nb_classifier"
}
@server.tool()
def create_knn_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_neighbors: int = 5
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建K近邻回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
n_neighbors: 邻居数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建K近邻回归器
knn = gcm.create_knn_regressor(n_neighbors=n_neighbors)
# 拟合模型
knn.fit(X, y)
# 进行预测
predictions = knn.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_knn_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_neighbors": n_neighbors,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的K近邻回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM K近邻回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_knn_regressor"
}
@server.tool()
def rca_scorer_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM RCA异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建RCA评分器
scorer = gcm.RCAScorer()
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM RCAScorer",
"variable": variable,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的RCA异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM RCA评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM RCAScorer"
}
@server.tool()
def isolation_forest_scorer_gcm(
data_path: str,
variable: str,
contamination: float = 0.1
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 孤立森林异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
contamination: 异常比例
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values.reshape(-1, 1)
# 创建孤立森林评分器
scorer = gcm.IsolationForestScorer(contamination=contamination)
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM IsolationForestScorer",
"variable": variable,
"contamination": contamination,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的孤立森林异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 孤立森林评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM IsolationForestScorer"
}
@server.tool()
def one_class_svm_scorer_gcm(
data_path: str,
variable: str,
nu: float = 0.1
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 单类SVM异常评分器
Args:
data_path: 数据文件路径
variable: 变量名
nu: 异常比例上界
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values.reshape(-1, 1)
# 创建单类SVM评分器
scorer = gcm.OneClassSVMScorer(nu=nu)
scorer.fit(variable_data)
# 计算异常分数
scores = scorer.score(variable_data)
return {
"success": True,
"method": "DoWhy GCM OneClassSVMScorer",
"variable": variable,
"nu": nu,
"anomaly_scores": {
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"scores": scores.tolist()
},
"sample_size": len(data),
"message": f"成功计算{variable}的单类SVM异常分数"
}
except Exception as e:
logger.error(f"DoWhy GCM 单类SVM评分器失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM OneClassSVMScorer"
}
@server.tool()
def create_kernel_density_estimator_2d_gcm(
data_path: str,
variable1: str,
variable2: str,
bandwidth: Optional[float] = None
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建二维核密度估计器
Args:
data_path: 数据文件路径
variable1: 第一个变量名
variable2: 第二个变量名
bandwidth: 带宽参数
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable1, variable2])
data_2d = data[[variable1, variable2]].values
# 创建二维核密度估计器
kde_2d = gcm.KernelDensityEstimator2D(bandwidth=bandwidth)
kde_2d.fit(data_2d)
# 计算密度值
density_values = kde_2d.density(data_2d)
return {
"success": True,
"method": "DoWhy GCM KernelDensityEstimator2D",
"variables": [variable1, variable2],
"bandwidth": bandwidth,
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功创建{variable1}和{variable2}的二维核密度估计器"
}
except Exception as e:
logger.error(f"DoWhy GCM 二维核密度估计器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM KernelDensityEstimator2D"
}
@server.tool()
def create_gaussian_density_estimator_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建高斯密度估计器
Args:
data_path: 数据文件路径
variable: 变量名
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建高斯密度估计器
gaussian_estimator = gcm.GaussianDensityEstimator()
gaussian_estimator.fit(variable_data)
# 计算密度值
density_values = gaussian_estimator.density(variable_data)
# 获取参数
mean = gaussian_estimator.mean if hasattr(gaussian_estimator, 'mean') else np.mean(variable_data)
std = gaussian_estimator.std if hasattr(gaussian_estimator, 'std') else np.std(variable_data)
return {
"success": True,
"method": "DoWhy GCM GaussianDensityEstimator",
"variable": variable,
"parameters": {
"mean": float(mean),
"std": float(std)
},
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功创建{variable}的高斯密度估计器"
}
except Exception as e:
logger.error(f"DoWhy GCM 高斯密度估计器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM GaussianDensityEstimator"
}
@server.tool()
def create_discrete_density_estimator_gcm(
data_path: str,
variable: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建离散密度估计器
Args:
data_path: 数据文件路径
variable: 变量名(离散变量)
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 创建离散密度估计器
discrete_estimator = gcm.DiscreteDensityEstimator()
discrete_estimator.fit(variable_data)
# 计算密度值
density_values = discrete_estimator.density(variable_data)
# 获取唯一值和概率
unique_values = np.unique(variable_data)
probabilities = {}
for val in unique_values:
prob = discrete_estimator.density(np.array([val]))
probabilities[str(val)] = float(prob[0])
return {
"success": True,
"method": "DoWhy GCM DiscreteDensityEstimator",
"variable": variable,
"unique_values": len(unique_values),
"value_probabilities": probabilities,
"density_statistics": {
"mean_density": float(np.mean(density_values)),
"std_density": float(np.std(density_values)),
"min_density": float(np.min(density_values)),
"max_density": float(np.max(density_values))
},
"sample_size": len(data),
"message": f"成功创建{variable}的离散密度估计器"
}
except Exception as e:
logger.error(f"DoWhy GCM 离散密度估计器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM DiscreteDensityEstimator"
}
@server.tool()
def fit_and_compute_mse_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
model_type: str = "linear"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 拟合模型并计算MSE
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
model_type: 模型类型 ('linear', 'random_forest', 'polynomial')
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 根据模型类型创建模型
if model_type == "linear":
model = gcm.create_linear_regressor()
elif model_type == "random_forest":
model = gcm.create_random_forest_regressor()
elif model_type == "polynomial":
model = gcm.create_polynom_regressor()
else:
raise ValueError(f"不支持的模型类型: {model_type}")
# 拟合模型并计算MSE
mse = gcm.fit_and_compute_mse(model, X, y)
return {
"success": True,
"method": "DoWhy GCM fit_and_compute_mse",
"target_variable": target_variable,
"feature_variables": feature_variables,
"model_type": model_type,
"mse": float(mse),
"rmse": float(np.sqrt(mse)),
"sample_size": len(data),
"message": f"成功拟合{model_type}模型并计算MSE"
}
except Exception as e:
logger.error(f"DoWhy GCM 拟合模型和计算MSE失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM fit_and_compute_mse"
}
@server.tool()
def is_root_node_gcm(
graph_edges: List[Dict[str, str]],
node: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 检查节点是否为根节点
Args:
graph_edges: 图边列表
node: 节点名称
"""
try:
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 检查是否为根节点
is_root = gcm.is_root_node(causal_graph, node)
# 获取节点信息
predecessors = list(causal_graph.predecessors(node))
successors = list(causal_graph.successors(node))
return {
"success": True,
"method": "DoWhy GCM is_root_node",
"node": node,
"is_root_node": bool(is_root),
"predecessors": predecessors,
"successors": successors,
"in_degree": causal_graph.in_degree(node),
"out_degree": causal_graph.out_degree(node),
"message": f"节点{node}{'是' if is_root else '不是'}根节点"
}
except Exception as e:
logger.error(f"DoWhy GCM 根节点检查失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM is_root_node"
}
@server.tool()
def is_leaf_node_gcm(
graph_edges: List[Dict[str, str]],
node: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 检查节点是否为叶节点
Args:
graph_edges: 图边列表
node: 节点名称
"""
try:
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 检查是否为叶节点
is_leaf = gcm.is_leaf_node(causal_graph, node)
# 获取节点信息
predecessors = list(causal_graph.predecessors(node))
successors = list(causal_graph.successors(node))
return {
"success": True,
"method": "DoWhy GCM is_leaf_node",
"node": node,
"is_leaf_node": bool(is_leaf),
"predecessors": predecessors,
"successors": successors,
"in_degree": causal_graph.in_degree(node),
"out_degree": causal_graph.out_degree(node),
"message": f"节点{node}{'是' if is_leaf else '不是'}叶节点"
}
except Exception as e:
logger.error(f"DoWhy GCM 叶节点检查失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM is_leaf_node"
}
@server.tool()
def topological_sort_gcm(
graph_edges: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 对因果图进行拓扑排序
Args:
graph_edges: 图边列表
"""
try:
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 进行拓扑排序
sorted_nodes = gcm.topological_sort(causal_graph)
return {
"success": True,
"method": "DoWhy GCM topological_sort",
"edges": graph_edges,
"sorted_nodes": list(sorted_nodes),
"num_nodes": len(sorted_nodes),
"num_edges": len(graph_edges),
"message": f"成功对{len(sorted_nodes)}个节点进行拓扑排序"
}
except Exception as e:
logger.error(f"DoWhy GCM 拓扑排序失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM topological_sort"
}
@server.tool()
def validate_causal_dag_gcm(
graph_edges: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 验证因果DAG
Args:
graph_edges: 图边列表
"""
try:
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 验证是否为DAG
is_dag = gcm.validate_causal_dag(causal_graph)
# 获取图的基本信息
num_nodes = causal_graph.number_of_nodes()
num_edges = causal_graph.number_of_edges()
is_connected = nx.is_weakly_connected(causal_graph)
# 检查循环
try:
cycles = list(nx.simple_cycles(causal_graph))
has_cycles = len(cycles) > 0
except:
cycles = []
has_cycles = False
return {
"success": True,
"method": "DoWhy GCM validate_causal_dag",
"edges": graph_edges,
"is_valid_dag": bool(is_dag),
"graph_properties": {
"num_nodes": num_nodes,
"num_edges": num_edges,
"is_connected": is_connected,
"has_cycles": has_cycles,
"cycles": cycles[:5] if cycles else [] # 只显示前5个循环
},
"message": f"因果图{'是' if is_dag else '不是'}有效的DAG"
}
except Exception as e:
logger.error(f"DoWhy GCM DAG验证失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM validate_causal_dag"
}
@server.tool()
def get_ordered_predecessors_gcm(
graph_edges: List[Dict[str, str]],
node: str
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 获取节点的有序前驱
Args:
graph_edges: 图边列表
node: 节点名称
"""
try:
# 创建因果图
causal_graph = _create_causal_graph(graph_edges)
# 获取有序前驱
ordered_predecessors = gcm.get_ordered_predecessors(causal_graph, node)
# 获取所有前驱(无序)
all_predecessors = list(causal_graph.predecessors(node))
return {
"success": True,
"method": "DoWhy GCM get_ordered_predecessors",
"node": node,
"ordered_predecessors": list(ordered_predecessors),
"all_predecessors": all_predecessors,
"num_predecessors": len(ordered_predecessors),
"message": f"成功获取节点{node}的{len(ordered_predecessors)}个有序前驱"
}
except Exception as e:
logger.error(f"DoWhy GCM 获取有序前驱失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM get_ordered_predecessors"
}
@server.tool()
def get_noise_dependent_function_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str]
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 获取噪声依赖函数
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values if feature_variables else np.array([]).reshape(len(data), 0)
y = data[target_variable].values
# 获取噪声依赖函数
noise_function = gcm.get_noise_dependent_function(X, y)
# 计算噪声
noise = noise_function(X, y)
return {
"success": True,
"method": "DoWhy GCM get_noise_dependent_function",
"target_variable": target_variable,
"feature_variables": feature_variables,
"noise_statistics": {
"mean_noise": float(np.mean(noise)),
"std_noise": float(np.std(noise)),
"min_noise": float(np.min(noise)),
"max_noise": float(np.max(noise))
},
"sample_size": len(data),
"message": f"成功获取{target_variable}的噪声依赖函数"
}
except Exception as e:
logger.error(f"DoWhy GCM 获取噪声依赖函数失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM get_noise_dependent_function"
}
@server.tool()
def create_knn_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_neighbors: int = 5
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建K近邻分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
n_neighbors: 邻居数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建K近邻分类器
knn = gcm.create_knn_classifier(n_neighbors=n_neighbors)
# 拟合模型
knn.fit(X, y)
# 进行预测
predictions = knn.predict(X)
probabilities = knn.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = knn.classes
return {
"success": True,
"method": "DoWhy GCM create_knn_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_neighbors": n_neighbors,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的K近邻分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM K近邻分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_knn_classifier"
}
@server.tool()
def create_extra_trees_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_estimators: int = 100
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建极端随机树分类器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
n_estimators: 树的数量
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建极端随机树分类器
etc = gcm.create_extra_trees_classifier(n_estimators=n_estimators)
# 拟合模型
etc.fit(X, y)
# 进行预测
predictions = etc.predict(X)
probabilities = etc.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = etc.classes
return {
"success": True,
"method": "DoWhy GCM create_extra_trees_classifier",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_estimators": n_estimators,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的极端随机树分类器"
}
except Exception as e:
logger.error(f"DoWhy GCM 极端随机树分类器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_extra_trees_classifier"
}
@server.tool()
def create_gradient_boosting_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
n_estimators: int = 100,
learning_rate: float = 0.1
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建梯度提升回归器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
n_estimators: 提升阶段数
learning_rate: 学习率
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 创建梯度提升回归器
gbr = gcm.create_gradient_boosting_regressor(
n_estimators=n_estimators,
learning_rate=learning_rate
)
# 拟合模型
gbr.fit(X, y)
# 进行预测
predictions = gbr.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM create_gradient_boosting_regressor",
"target_variable": target_variable,
"feature_variables": feature_variables,
"n_estimators": n_estimators,
"learning_rate": learning_rate,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{target_variable}的梯度提升回归器"
}
except Exception as e:
logger.error(f"DoWhy GCM 梯度提升回归器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM create_gradient_boosting_regressor"
}
@server.tool()
def create_uniform_distribution_gcm(
data_path: str,
variable: str,
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建均匀分布
Args:
data_path: 数据文件路径
variable: 变量名
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 获取数据范围
min_val = np.min(variable_data)
max_val = np.max(variable_data)
# 创建均匀分布
uniform_dist = gcm.UniformDistribution(min_val, max_val)
# 从分布采样
samples = uniform_dist.draw_samples(num_samples)
return {
"success": True,
"method": "DoWhy GCM UniformDistribution",
"variable": variable,
"distribution_parameters": {
"min_value": float(min_val),
"max_value": float(max_val)
},
"original_samples": len(data),
"generated_samples": num_samples,
"sample_statistics": {
"mean": float(np.mean(samples)),
"std": float(np.std(samples)),
"min": float(np.min(samples)),
"max": float(np.max(samples))
},
"original_statistics": {
"mean": float(np.mean(variable_data)),
"std": float(np.std(variable_data)),
"min": float(min_val),
"max": float(max_val)
},
"message": f"成功创建{variable}的均匀分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 均匀分布创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM UniformDistribution"
}
@server.tool()
def create_normal_distribution_gcm(
data_path: str,
variable: str,
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建正态分布
Args:
data_path: 数据文件路径
variable: 变量名
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 估计参数
mean = np.mean(variable_data)
std = np.std(variable_data)
# 创建正态分布
normal_dist = gcm.NormalDistribution(mean, std)
# 从分布采样
samples = normal_dist.draw_samples(num_samples)
return {
"success": True,
"method": "DoWhy GCM NormalDistribution",
"variable": variable,
"distribution_parameters": {
"mean": float(mean),
"std": float(std)
},
"original_samples": len(data),
"generated_samples": num_samples,
"sample_statistics": {
"mean": float(np.mean(samples)),
"std": float(np.std(samples)),
"min": float(np.min(samples)),
"max": float(np.max(samples))
},
"original_statistics": {
"mean": float(mean),
"std": float(std),
"min": float(np.min(variable_data)),
"max": float(np.max(variable_data))
},
"message": f"成功创建{variable}的正态分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 正态分布创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM NormalDistribution"
}
@server.tool()
def create_categorical_distribution_gcm(
data_path: str,
variable: str,
num_samples: int = 1000
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建分类分布
Args:
data_path: 数据文件路径
variable: 变量名(分类变量)
num_samples: 采样数量
"""
try:
# 加载数据
data = load_and_validate_data(data_path, [variable])
variable_data = data[variable].values
# 计算类别概率
unique_values, counts = np.unique(variable_data, return_counts=True)
probabilities = counts / len(variable_data)
# 创建分类分布
categorical_dist = gcm.CategoricalDistribution(unique_values, probabilities)
# 从分布采样
samples = categorical_dist.draw_samples(num_samples)
# 计算生成样本的分布
generated_unique, generated_counts = np.unique(samples, return_counts=True)
generated_probs = generated_counts / len(samples)
return {
"success": True,
"method": "DoWhy GCM CategoricalDistribution",
"variable": variable,
"distribution_parameters": {
"categories": unique_values.tolist(),
"probabilities": probabilities.tolist()
},
"original_samples": len(data),
"generated_samples": num_samples,
"category_distribution": {
"original": dict(zip(unique_values.astype(str), probabilities)),
"generated": dict(zip(generated_unique.astype(str), generated_probs))
},
"message": f"成功创建{variable}的分类分布并生成{num_samples}个样本"
}
except Exception as e:
logger.error(f"DoWhy GCM 分类分布创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM CategoricalDistribution"
}
@server.tool()
def config_disable_progress_bars_gcm(
disable: bool = True
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 配置进度条显示
Args:
disable: 是否禁用进度条
"""
try:
# 配置进度条
gcm.config.disable_progress_bars(disable)
return {
"success": True,
"method": "DoWhy GCM config.disable_progress_bars",
"progress_bars_disabled": disable,
"message": f"进度条已{'禁用' if disable else '启用'}"
}
except Exception as e:
logger.error(f"DoWhy GCM 进度条配置失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM config.disable_progress_bars"
}
@server.tool()
def create_sklearn_regressor_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
sklearn_model_class: str = "LinearRegression"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建Sklearn回归器包装器
Args:
data_path: 数据文件路径
target_variable: 目标变量
feature_variables: 特征变量列表
sklearn_model_class: Sklearn模型类名
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 动态导入sklearn模型
if sklearn_model_class == "LinearRegression":
from sklearn.linear_model import LinearRegression
sklearn_model = LinearRegression()
elif sklearn_model_class == "DecisionTreeRegressor":
from sklearn.tree import DecisionTreeRegressor
sklearn_model = DecisionTreeRegressor()
elif sklearn_model_class == "AdaBoostRegressor":
from sklearn.ensemble import AdaBoostRegressor
sklearn_model = AdaBoostRegressor()
else:
raise ValueError(f"不支持的sklearn模型: {sklearn_model_class}")
# 创建DoWhy包装器
regressor = gcm.SklearnRegressionModel(sklearn_model)
# 拟合模型
regressor.fit(X, y)
# 进行预测
predictions = regressor.predict(X)
# 计算性能指标
mse = np.mean((y - predictions) ** 2)
r2 = 1 - (np.sum((y - predictions) ** 2) / np.sum((y - np.mean(y)) ** 2))
return {
"success": True,
"method": "DoWhy GCM SklearnRegressionModel",
"target_variable": target_variable,
"feature_variables": feature_variables,
"sklearn_model_class": sklearn_model_class,
"performance_metrics": {
"mse": float(mse),
"r2_score": float(r2),
"rmse": float(np.sqrt(mse))
},
"sample_size": len(data),
"message": f"成功创建{sklearn_model_class}的DoWhy包装器"
}
except Exception as e:
logger.error(f"DoWhy GCM Sklearn回归器包装器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM SklearnRegressionModel"
}
@server.tool()
def create_sklearn_classifier_gcm(
data_path: str,
target_variable: str,
feature_variables: List[str],
sklearn_model_class: str = "LogisticRegression"
) -> Dict[str, Any]:
"""
使用 DoWhy GCM 创建Sklearn分类器包装器
Args:
data_path: 数据文件路径
target_variable: 目标变量(分类)
feature_variables: 特征变量列表
sklearn_model_class: Sklearn模型类名
"""
try:
# 加载数据
all_vars = [target_variable] + feature_variables
data = load_and_validate_data(data_path, all_vars)
# 准备数据
X = data[feature_variables].values
y = data[target_variable].values
# 动态导入sklearn模型
if sklearn_model_class == "LogisticRegression":
from sklearn.linear_model import LogisticRegression
sklearn_model = LogisticRegression()
elif sklearn_model_class == "DecisionTreeClassifier":
from sklearn.tree import DecisionTreeClassifier
sklearn_model = DecisionTreeClassifier()
elif sklearn_model_class == "AdaBoostClassifier":
from sklearn.ensemble import AdaBoostClassifier
sklearn_model = AdaBoostClassifier()
else:
raise ValueError(f"不支持的sklearn模型: {sklearn_model_class}")
# 创建DoWhy包装器
classifier = gcm.SklearnClassificationModel(sklearn_model)
# 拟合模型
classifier.fit(X, y)
# 进行预测
predictions = classifier.predict(X)
probabilities = classifier.predict_probabilities(X)
# 计算准确率
accuracy = np.mean(predictions == y)
# 获取类别名称
class_names = classifier.classes
return {
"success": True,
"method": "DoWhy GCM SklearnClassificationModel",
"target_variable": target_variable,
"feature_variables": feature_variables,
"sklearn_model_class": sklearn_model_class,
"class_names": list(class_names) if class_names is not None else [],
"performance_metrics": {
"accuracy": float(accuracy),
"num_classes": len(np.unique(y))
},
"prediction_statistics": {
"unique_predictions": len(np.unique(predictions)),
"mean_probability": float(np.mean(probabilities)) if probabilities is not None else None
},
"sample_size": len(data),
"message": f"成功创建{sklearn_model_class}的DoWhy包装器"
}
except Exception as e:
logger.error(f"DoWhy GCM Sklearn分类器包装器创建失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "DoWhy GCM SklearnClassificationModel"
}