Skip to main content
Glama

SuperDataAnalysis - DataMaster_MCP

by szqshan
main.py28.1 kB
#!/usr/bin/env python3 """ DataMaster MCP - 超级数据分析工具 为AI提供强大的数据分析能力 核心理念:工具专注数据获取和计算,AI专注智能分析和洞察 模块化架构: - core/database.py: 数据库管理和连接 - core/data_analysis.py: 数据分析和统计 - core/data_processing.py: 数据处理和导出 - core/api_manager.py: API管理和数据获取 """ import json import logging from pathlib import Path from mcp.server.fastmcp import FastMCP # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("DataMaster_MCP") # 导入核心模块 try: from .core.database import ( connect_data_source_impl, execute_sql_impl, query_external_database_impl, list_data_sources_impl, manage_database_config_impl, init_database_module ) from .core.data_analysis import ( analyze_data_impl, get_data_info_impl, init_data_analysis_module ) from .core.data_processing import ( process_data_impl, export_data_impl, init_data_processing_module ) from .core.api_manager import ( manage_api_config_impl, fetch_api_data_impl, api_data_preview_impl, create_api_storage_session_impl, list_api_storage_sessions_impl, init_api_manager_module ) except ImportError: # 如果相对导入失败,尝试绝对导入 import sys current_dir = Path(__file__).parent.parent if str(current_dir) not in sys.path: sys.path.insert(0, str(current_dir)) from datamaster_mcp.core.database import ( connect_data_source_impl, execute_sql_impl, query_external_database_impl, list_data_sources_impl, manage_database_config_impl, init_database_module ) from datamaster_mcp.core.data_analysis import ( analyze_data_impl, get_data_info_impl, init_data_analysis_module ) from datamaster_mcp.core.data_processing import ( process_data_impl, export_data_impl, init_data_processing_module ) from datamaster_mcp.core.api_manager import ( manage_api_config_impl, fetch_api_data_impl, api_data_preview_impl, create_api_storage_session_impl, list_api_storage_sessions_impl, init_api_manager_module ) # ================================ # 配置和初始化 # ================================ TOOL_NAME = "DataMaster_MCP" DATA_DIR = "data" EXPORTS_DIR = "exports" # 创建MCP服务器 mcp = FastMCP(TOOL_NAME) # 确保目录存在 for directory in [DATA_DIR, EXPORTS_DIR]: Path(directory).mkdir(exist_ok=True) # 初始化所有核心模块 try: init_database_module() init_data_analysis_module() init_data_processing_module() init_api_manager_module() logger.info("所有核心模块初始化完成") except Exception as e: logger.error(f"模块初始化失败: {e}") # ================================ # 数据库管理工具 # ================================ @mcp.tool() def connect_data_source( source_type: str, config: dict, target_table: str = None, target_database: str = None ) -> str: """ 🔗 数据源连接路由器 - AI必读使用指南 ⚠️ 重要:数据库连接采用"两步连接法"设计模式! 📋 支持的数据源类型: - "excel" - Excel文件导入到数据库 - "csv" - CSV文件导入到数据库 - "json" - JSON文件导入到数据库(支持嵌套结构自动扁平化) - "sqlite" - SQLite数据库文件连接 - "mysql" - MySQL数据库连接(第一步:创建临时配置) - "postgresql" - PostgreSQL数据库连接(第一步:创建临时配置) - "mongodb" - MongoDB数据库连接(第一步:创建临时配置) - "database_config" - 使用已有配置连接(第二步:实际连接) 🎯 AI使用流程: 1️⃣ 数据库连接第一步: connect_data_source(source_type="mysql", config={host, port, user, database, password}) → 返回临时配置名称(如:temp_mysql_20250724_173102) 2️⃣ 数据库连接第二步: connect_data_source(source_type="database_config", config={"database_name": "配置名称"}) → 建立可查询的数据库连接 3️⃣ 查询数据: 使用 query_external_database(database_name="配置名称", query="SQL") 💡 参数兼容性: - 支持 "user" 或 "username" 参数 - 端口号使用数字类型(如:3306) - 密码使用字符串类型 Args: source_type: 数据源类型,必须是上述支持的类型之一 config: 配置参数字典,格式根据source_type不同 target_table: 目标表名(文件导入时可选) target_database: 目标数据库名称(文件导入到外部数据库时可选) Returns: str: JSON格式的连接结果,包含状态、消息和配置信息 ⚡ AI快速上手: 记住"两步连接法":先创建配置 → 再使用配置 → 最后查询数据 """ return connect_data_source_impl(source_type, config, target_table, target_database) @mcp.tool() def execute_sql( query: str, params: dict = None, limit: int = 1000, data_source: str = None ) -> str: """ 📊 SQL执行工具 - 本地数据库查询专用 🎯 使用场景: - 查询本地SQLite数据库(默认) - 查询已导入的Excel/CSV数据 - 查询指定的本地数据源 ⚠️ 重要区别: - 本地数据查询 → 使用此工具 (execute_sql) - 外部数据库查询 → 使用 query_external_database 🔒 安全特性: - 自动添加LIMIT限制防止大量数据返回 - 支持参数化查询防止SQL注入 - 只允许SELECT查询,拒绝危险操作 Args: query: SQL查询语句(推荐使用SELECT语句) params: 查询参数字典,用于参数化查询(可选) limit: 结果行数限制,默认1000行(可选) data_source: 数据源名称,默认本地SQLite(可选) Returns: str: JSON格式查询结果,包含列名、数据行和统计信息 💡 AI使用提示: - 查询本地数据时优先使用此工具 - 查询外部数据库时使用 query_external_database - 使用 get_data_info 先了解表结构 """ return execute_sql_impl(query, params, limit, data_source) @mcp.tool() def query_external_database( database_name: str, query: str, limit: int = 1000 ) -> str: """ 🌐 外部数据库查询工具 - 专门查询外部数据库 🎯 使用场景: - 查询MySQL数据库 - 查询PostgreSQL数据库 - 查询MongoDB数据库 - 查询所有通过connect_data_source连接的外部数据库 ⚠️ 前置条件: 必须先使用connect_data_source建立数据库连接并获得配置名称 🔄 完整流程示例: 1️⃣ connect_data_source(source_type="mysql", config={...}) → 获得配置名 2️⃣ connect_data_source(source_type="database_config", config={"database_name": "配置名"}) → 建立连接 3️⃣ query_external_database(database_name="配置名", query="SELECT * FROM table") → 查询数据 💡 查询语法支持: - MySQL/PostgreSQL: 标准SQL语法 - MongoDB: 支持多种查询格式(JSON、JavaScript风格等) Args: database_name: 数据库配置名称(从connect_data_source获得) query: 查询语句,SQL或MongoDB查询语法 limit: 结果行数限制,默认1000行 Returns: str: JSON格式查询结果,包含数据行、统计信息和元数据 🚀 AI使用建议: - 这是查询外部数据库的首选工具 - 使用list_data_sources查看可用的数据库配置 - 配置名称通常格式为:temp_mysql_20250724_173102 """ return query_external_database_impl(database_name, query, limit) @mcp.tool() def list_data_sources() -> str: """ 📋 数据源列表工具 - 查看所有可用的数据源 🎯 功能说明: - 显示本地SQLite数据库状态 - 列出所有外部数据库配置 - 显示每个数据源的连接状态和基本信息 - 区分临时配置和永久配置 📊 返回信息包括: - 数据源名称和类型 - 连接状态(可用/已配置/已禁用) - 主机地址和数据库名 - 是否为默认数据源 - 配置创建时间(临时配置) 💡 使用场景: - 不确定有哪些数据源时查看 - 检查数据库连接状态 - 查找临时配置名称 - 了解可用的查询目标 Returns: str: JSON格式的数据源列表,包含详细的配置信息 🚀 AI使用建议: - 在查询数据前先调用此工具了解可用数据源 - 用于获取正确的database_name参数 - 检查临时配置是否还存在 """ return list_data_sources_impl() @mcp.tool() def manage_database_config( action: str, config: dict = None ) -> str: """ ⚙️ 数据库配置管理工具 - 管理所有数据库连接配置 🎯 支持的操作类型: - "list" - 列出所有数据库配置(包括临时和永久) - "test" - 测试指定配置的连接状态 - "add" - 添加永久数据库配置 - "remove" - 删除指定配置 - "reload" - 重新加载配置文件 - "list_temp" - 仅列出临时配置 - "cleanup_temp" - 清理所有临时配置 📋 常用操作示例: 1️⃣ 查看所有配置: manage_database_config(action="list") 2️⃣ 测试连接: manage_database_config(action="test", config={"database_name": "配置名"}) 3️⃣ 添加永久配置: manage_database_config(action="add", config={ "database_name": "my_mysql", "database_config": { "host": "localhost", "port": 3306, "type": "mysql", "user": "root", "database": "test_db", "password": "password" } }) 4️⃣ 清理临时配置: manage_database_config(action="cleanup_temp") Args: action: 操作类型,必须是上述支持的操作之一 config: 配置参数字典,根据action类型提供不同参数 Returns: str: JSON格式操作结果,包含状态、消息和相关数据 💡 AI使用建议: - 不确定有哪些配置时,先用action="list"查看 - 连接问题时,用action="test"检查配置状态 - 临时配置过多时,用action="cleanup_temp"清理 """ return manage_database_config_impl(action, config) # ================================ # 数据分析工具 # ================================ @mcp.tool() def get_data_info( info_type: str = "tables", table_name: str = None, data_source: str = None ) -> str: """ 📊 数据信息获取工具 - 查看数据库结构和统计信息 功能说明: - 获取数据库表列表、表结构、数据统计等信息 - 支持本地SQLite和外部数据库 - 提供详细的表结构和数据概览 - 智能数据库清理管理功能 Args: info_type: 信息类型 - "tables": 获取所有表/集合列表(默认) - "schema": 获取指定表的结构信息(需要table_name) - "stats": 获取指定表的统计信息(需要table_name) - "cleanup": 智能检测过时数据和表,提供清理建议 table_name: 表名(当info_type为schema或stats时必需) data_source: 数据源名称 - None: 使用本地SQLite数据库(默认) - 配置名称: 使用外部数据库(需先通过manage_database_config创建配置) Returns: str: JSON格式的数据库信息,包含状态、数据和元数据 🤖 AI使用建议: 1. 数据探索:先用info_type="tables"查看所有表 2. 结构分析:用info_type="schema"了解表结构 3. 数据概览:用info_type="stats"获取统计信息 4. 数据库维护:用info_type="cleanup"检测并清理过时数据 5. 外部数据库:确保data_source配置已存在 💡 最佳实践: - 在查询数据前先了解表结构 - 使用stats了解数据分布和质量 - 定期使用cleanup功能维护数据库整洁 - 结合analyze_data工具进行深度分析 ⚠️ 常见错误避免: - schema和stats必须指定table_name - 外部数据库需要有效的data_source配置 - 表名区分大小写 - cleanup功能仅适用于本地SQLite数据库 📈 高效使用流程: 1. get_data_info(info_type="tables") → 查看所有表 2. get_data_info(info_type="schema", table_name="表名") → 了解结构 3. get_data_info(info_type="stats", table_name="表名") → 查看统计 4. get_data_info(info_type="cleanup") → 检测过时数据 5. analyze_data() → 深度分析 🎯 关键理解点: - 这是数据探索的第一步工具 - 为后续分析提供基础信息 - 支持本地和远程数据源 - 智能维护数据库整洁性 🧹 数据库清理功能(info_type="cleanup"): - 自动检测测试表、临时表、过时表 - 识别空表和重复表 - 分析表的创建时间和最后访问时间 - 提供智能清理建议,询问用户是否执行清理 - 支持批量清理和选择性清理 """ return get_data_info_impl(info_type, table_name, data_source) @mcp.tool() def analyze_data( analysis_type: str, table_name: str, columns: list = None, options: dict = None ) -> str: """ 🔍 数据分析工具 - 执行各种统计分析和数据质量检查 功能说明: - 提供5种核心数据分析功能 - 支持指定列分析或全表分析 - 自动处理数据类型和缺失值 - 返回详细的分析结果和可视化建议 Args: analysis_type: 分析类型 - "basic_stats": 基础统计分析(均值、中位数、标准差等) - "correlation": 相关性分析(数值列之间的相关系数) - "outliers": 异常值检测(IQR、Z-score方法) - "missing_values": 缺失值分析(缺失率、分布模式) - "duplicates": 重复值检测(完全重复、部分重复) table_name: 要分析的数据表名 columns: 分析的列名列表(可选) - None: 分析所有适用列 - ["col1", "col2"]: 只分析指定列 options: 分析选项(可选字典) - outliers: {"method": "iqr|zscore", "threshold": 1.5} - correlation: {"method": "pearson|spearman"} - basic_stats: {"percentiles": [25, 50, 75, 90, 95]} Returns: str: JSON格式的分析结果,包含统计数据、图表建议和洞察 🤖 AI使用建议: 1. 数据概览:先用"basic_stats"了解数据分布 2. 质量检查:用"missing_values"和"duplicates"检查数据质量 3. 关系探索:用"correlation"发现变量关系 4. 异常检测:用"outliers"识别异常数据 5. 逐步深入:从基础统计到高级分析 💡 最佳实践: - 先进行basic_stats了解数据概况 - 数值列用correlation分析关系 - 大数据集指定columns提高效率 - 结合get_data_info了解表结构 ⚠️ 常见错误避免: - 确保table_name存在 - correlation只适用于数值列 - columns名称必须准确匹配 - 空表或单列表某些分析会失败 📈 高效使用流程: 1. get_data_info() → 了解表结构 2. analyze_data("basic_stats") → 基础统计 3. analyze_data("missing_values") → 质量检查 4. analyze_data("correlation") → 关系分析 5. analyze_data("outliers") → 异常检测 🎯 关键理解点: - 每种分析类型有特定适用场景 - 结果包含统计数据和业务洞察 - 支持参数化定制分析行为 """ return analyze_data_impl(analysis_type, table_name, columns, options) # ================================ # 数据处理工具 # ================================ @mcp.tool() def process_data( operation_type: str, data_source: str, config: dict, target_table: str = None ) -> str: """ ⚙️ 数据处理工具 - 执行数据清洗、转换、筛选等操作 功能说明: - 提供6种核心数据处理功能 - 支持表和SQL查询作为数据源 - 灵活的配置参数系统 - 可指定目标表或覆盖原表 Args: operation_type: 处理操作类型 - "clean": 数据清洗(去重、填充缺失值、数据类型转换) - "transform": 数据转换(列重命名、标准化、新列计算) - "filter": 数据筛选(条件过滤、列选择、数据采样) - "aggregate": 数据聚合(分组统计、汇总计算) - "merge": 数据合并(表连接、数据拼接) - "reshape": 数据重塑(透视表、宽长转换) data_source: 数据源 - 表名: 处理整个表 - SQL查询: 处理查询结果 config: 操作配置字典(必需) - clean: {"remove_duplicates": True, "fill_missing": {"col": {"method": "mean"}}} - transform: {"rename_columns": {"old": "new"}, "normalize": {"columns": ["col1"]}} - filter: {"filter_condition": "age > 18", "select_columns": ["name", "age"]} - aggregate: {"group_by": {"columns": ["dept"], "agg": {"salary": "mean"}}} - merge: {"right_table": "table2", "on": "id", "how": "inner"} - reshape: {"pivot": {"index": "date", "columns": "product", "values": "sales"}} target_table: 目标表名(可选) - None: 覆盖原表(默认) - 表名: 保存到新表 Returns: str: JSON格式的处理结果,包含操作详情、影响行数和新表信息 🤖 AI使用建议: 1. 数据清洗:先用"clean"处理数据质量问题 2. 数据转换:用"transform"标准化和计算新字段 3. 数据筛选:用"filter"获取目标数据子集 4. 数据聚合:用"aggregate"生成汇总报表 5. 数据合并:用"merge"关联多个数据源 6. 数据重塑:用"reshape"改变数据结构 💡 最佳实践: - 处理前先备份重要数据 - 使用target_table避免覆盖原数据 - 复杂操作分步骤执行 - 结合analyze_data验证处理结果 ⚠️ 常见错误避免: - config参数必须符合operation_type要求 - 确保引用的列名存在 - merge操作需要确保关联键存在 - 大数据量操作注意性能 📈 高效使用流程: 1. analyze_data() → 了解数据质量 2. process_data("clean") → 清洗数据 3. process_data("transform") → 转换数据 4. process_data("filter") → 筛选数据 5. analyze_data() → 验证处理结果 🎯 关键理解点: - 每种操作类型有特定的config格式 - 支持链式处理(上一步输出作为下一步输入) - 提供详细的操作日志和统计信息 📋 配置示例: ```python # 数据清洗 config = { "remove_duplicates": True, "fill_missing": { "age": {"method": "mean"}, "name": {"method": "mode"} } } # 数据筛选 config = { "filter_condition": "age > 18 and salary > 5000", "select_columns": ["name", "age", "department"] } # 数据聚合 config = { "group_by": { "columns": ["department"], "agg": { "salary": "mean", "age": "count" } } } ``` """ return process_data_impl(operation_type, data_source, config, target_table) @mcp.tool() def export_data( export_type: str, data_source: str, file_path: str = None, options: dict = None ) -> str: """ 📤 数据导出工具 - 将数据导出为各种格式文件 功能说明: - 支持多种导出格式:Excel、CSV、JSON - 可导出表数据或SQL查询结果 - 自动生成文件路径或使用指定路径 - 支持导出选项自定义 Args: export_type: 导出格式类型 - "excel": Excel文件(.xlsx) - "csv": CSV文件(.csv) - "json": JSON文件(.json) data_source: 数据源 - 表名: 直接导出整个表 - SQL查询: 导出查询结果(以SELECT开头) file_path: 导出文件路径(可选) - None: 自动生成路径到exports/目录 - 指定路径: 使用自定义路径 options: 导出选项(可选字典) - Excel: {"sheet_name": "工作表名", "auto_adjust_columns": True} - CSV: {"encoding": "utf-8", "separator": ","} - JSON: {"orient": "records", "indent": 2} Returns: str: JSON格式的导出结果,包含文件路径、大小、记录数等信息 🤖 AI使用建议: 1. 表导出:export_data("excel", "table_name") 2. 查询导出:export_data("csv", "SELECT * FROM table WHERE condition") 3. 自定义格式:使用options参数调整导出格式 4. 批量导出:结合循环导出多个表或查询 💡 最佳实践: - Excel适合报表和可视化 - CSV适合数据交换和导入其他系统 - JSON适合API和程序处理 - 大数据量优先使用CSV ⚠️ 常见错误避免: - 确保data_source存在(表名)或语法正确(SQL) - 文件路径目录必须存在或可创建 - 注意文件权限和磁盘空间 📈 高效使用流程: 1. 确定导出需求(格式、内容) 2. 选择合适的export_type 3. 准备data_source(表名或SQL) 4. 设置options(如需要) 5. 执行导出并检查结果 🎯 关键理解点: - 支持表和查询两种数据源 - 自动处理文件路径和格式 - 提供详细的导出统计信息 """ return export_data_impl(export_type, data_source, file_path, options) # ================================ # API管理工具 # ================================ @mcp.tool() def manage_api_config( action: str, api_name: str = None, config_data: dict = None ) -> str: """ 管理API配置 Args: action: 操作类型 (list|test|add|remove|reload|get_endpoints) api_name: API名称 config_data: API配置数据 Returns: str: 操作结果 """ return manage_api_config_impl(action, api_name, config_data) @mcp.tool() def fetch_api_data( api_name: str, endpoint_name: str, params: dict = None, data: dict = None, method: str = None, transform_config: dict = None, storage_session_id: str = None ) -> str: """ 从API获取数据并自动存储到数据库(方式二:自动持久化流程) 注意:已删除方式一(手动流程),所有API数据默认直接存储到数据库 Args: api_name: API名称 endpoint_name: 端点名称 params: 请求参数 data: 请求数据(POST/PUT) method: HTTP方法 transform_config: 数据转换配置 storage_session_id: 存储会话ID(可选,不提供时自动创建) Returns: str: 数据存储结果和会话信息 """ return fetch_api_data_impl(api_name, endpoint_name, params, data, method, transform_config, storage_session_id) @mcp.tool() def api_data_preview( api_name: str, endpoint_name: str, params: dict = None, max_rows: int = 10, max_cols: int = 10, preview_fields: list = None, preview_depth: int = 3, show_data_types: bool = True, show_summary: bool = True, truncate_length: int = 100 ) -> str: """ 🔍 API数据预览工具 - 灵活预览API返回数据 功能说明: - 支持灵活的数据预览配置 - 可指定预览字段和深度 - 提供数据类型和摘要信息 - 避免数据截断问题 Args: api_name: API名称 endpoint_name: 端点名称 params: 请求参数 max_rows: 最大显示行数 (默认10) max_cols: 最大显示列数 (默认10) preview_fields: 指定预览的字段列表 (可选) preview_depth: JSON嵌套预览深度 (默认3) show_data_types: 是否显示数据类型信息 (默认True) show_summary: 是否显示数据摘要 (默认True) truncate_length: 字段值截断长度 (默认100) Returns: str: 数据预览结果 📋 使用示例: ```python # 基本预览 api_data_preview( api_name="alpha_vantage", endpoint_name="news_sentiment", params={"topics": "technology"} ) # 指定字段预览 api_data_preview( api_name="alpha_vantage", endpoint_name="news_sentiment", params={"topics": "technology"}, preview_fields=["title", "summary", "sentiment_score"], max_rows=5 ) # 深度预览嵌套数据 api_data_preview( api_name="complex_api", endpoint_name="nested_data", preview_depth=5, truncate_length=200 ) ``` 🎯 关键理解点: - preview_fields可以精确控制显示内容 - preview_depth控制JSON嵌套显示层级 - truncate_length避免超长字段影响显示 - 提供完整的数据结构分析 """ return api_data_preview_impl(api_name, endpoint_name, params, max_rows, max_cols, preview_fields, preview_depth, show_data_types, show_summary, truncate_length) @mcp.tool() def create_api_storage_session( session_name: str, api_name: str, endpoint_name: str, description: str = None ) -> str: """ 创建API数据存储会话 Args: session_name: 存储会话名称 api_name: API名称 endpoint_name: 端点名称 description: 会话描述 Returns: str: 创建结果 """ return create_api_storage_session_impl(session_name, api_name, endpoint_name, description) @mcp.tool() def list_api_storage_sessions() -> str: """ 📋 API存储会话列表工具 - 查看所有API数据存储会话 功能说明: - 列出所有API数据存储会话 - 显示会话详细信息和数据统计 - 为API数据导入提供会话选择 Returns: str: JSON格式的会话列表,包含会话信息和数据统计 🤖 AI使用建议: - 在导入API数据前先查看可用会话 - 选择合适的会话进行数据导入 - 了解每个会话的数据量和结构 """ return list_api_storage_sessions_impl() # ================================ # 服务器启动 # ================================ def main(): """主函数 - 启动MCP服务器""" logger.info("DataMaster MCP 服务器启动中...") logger.info("模块化架构已加载:") logger.info(" - core/database.py: 数据库管理") logger.info(" - core/data_analysis.py: 数据分析") logger.info(" - core/data_processing.py: 数据处理") logger.info(" - core/api_manager.py: API管理") mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/szqshan/DataMaster'

If you have feedback or need assistance with the MCP directory API, please join our Discord server