Skip to main content
Glama

AI Cognitive Nexus

by SimonUTD
main.py19 kB
#!/usr/bin/env python3 """ Unified MCP """ import sys import json from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Dict, List, Optional # 从新文件中导入核心组件 from mcp_logger import logger from app_context import app, AGNO_AVAILABLE from mcp.server.fastmcp import FastMCP # --- 全局常量 --- mcp = FastMCP("Unified MCP") # 各信息 schema 与 必备字段定义 # 人物类 persona_schema = {"name": str, "role": str, "goals": list, "background": str} persona_required_fields = {"name", "role", "goals"} # 产品类 product_schema = {"product_name": str, "description": str, "knowledge_base": str} product_required_fields = {"product_name", "description", "knowledge_base"} # Agent角色类 agent_schema = {"name": str, "role": str, "description": str, "tools": list, "instructions": str} agent_required_fields = {"name", "role", "description"} # 团队类 team_schema = {"team_name": str, "description": str, "members": list, "instructions": str, "success_criteria": str} team_required_fields = {"team_name", "members"} # ============================================== # 辅助工具 # ============================================== def validate_data(data: dict, schema: dict, required_fields: set) -> Optional[str]: """根据 schema 和必需字段校验数据。如果无效则返回错误信息字符串。""" missing_fields = required_fields - set(data.keys()) if missing_fields: return f"缺少必需字段: {list(missing_fields)}" for key, value in data.items(): if key in schema and not isinstance(value, schema[key]): return f"字段 '{key}' 的类型错误,应为 {schema[key].__name__},但收到了 {type(value).__name__}。" return None # ============================================== # MCP Tools # ============================================== # 会话 mcp 组件 @mcp.tool() def start_session(initial_context: Optional[str] = None) -> str: """创建新的session用于存储和跟踪会话,每个会议或讨论都应该创建一个新的session。""" try: new_session = app.session_manager.create_session(initial_context=initial_context) return json.dumps({ "message": "新session创建成功.", "session_id": new_session["session_id"] }, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"创建session失败,错误信息: {e}", exc_info=True) return json.dumps({"error": f"创建session失败,错误信息: {e}"}) # 人物类 mcp 组件 @mcp.tool() def create_persona(persona_key: str, data: dict) -> str: """ 根据定义的Schema创建一个新的人物。 必需字段: 'name'(str), 'role'(str), 'goals'(list of str). 可选字段: 'background'(str). """ if persona_key in app.personas: return json.dumps({"error": f"人物 '{persona_key}' 已存在..."}) # 使用统一的校验函数 error_msg = validate_data(data, persona_schema, persona_required_fields) if error_msg: return json.dumps({"error": f"创建人物失败,{error_msg}"}) app.personas[persona_key] = data app.save_personas() return json.dumps({"status": "success", "message": f"人物 '{persona_key}' 创建成功。"}, ensure_ascii=False) @mcp.tool() def list_personas() -> str: """ 列出所有可用的人物信息记录,获取人物信息记录的 key 以加载个人信息。 """ try: if not app.personas: return json.dumps({"message": "目前尚未记录任何人物信息。"}, indent=2, ensure_ascii=False) persona_list = [ { "persona_key": key, "name": data.get("name", "N/A"), "role": data.get("role", "N/A"), "goals": data.get("goals", "N/A"), "profile_summary": data.get("background", "N/A") } for key, data in app.personas.items() ] return json.dumps(persona_list, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Failed to list personas: {e}", exc_info=True) return json.dumps({"error": "An unexpected error occurred while listing personas."}) @mcp.tool() def get_persona(persona_key: str) -> str: """获取指定人物的完整详细信息。""" persona_data = app.personas.get(persona_key) if not persona_data: return json.dumps({"error": f"未找到人物 '{persona_key}'。"}) return json.dumps(persona_data, indent=2, ensure_ascii=False) @mcp.tool() def update_persona(persona_key: str, data: dict) -> str: """ 更新一个已存在人物的部分或全部信息。只提供需要修改的字段。 """ if persona_key not in app.personas: return json.dumps({"error": f"未找到人物 '{persona_key}',无法更新。"}) # 校验传入字段的合法性 for key, value in data.items(): if key not in persona_schema: return json.dumps({"error": f"无效字段 '{key}'。允许的字段为: {list(persona_schema.keys())}"}) # 简单类型校验 if not isinstance(value, persona_schema[key]): return json.dumps({"error": f"字段 '{key}' 的类型错误,应为 {persona_schema[key].__name__},但收到了 {type(value).__name__}。"}) app.personas[persona_key].update(data) app.save_personas() return json.dumps({"status": "success", "message": f"人物 '{persona_key}' 更新成功。"}, ensure_ascii=False) @mcp.tool() def delete_persona(persona_key: str) -> str: """删除一个指定的人物记录。""" if persona_key not in app.personas: return json.dumps({"error": f"未找到人物 '{persona_key}',无法删除。"}) del app.personas[persona_key] app.save_personas() return json.dumps({"status": "success", "message": f"人物 '{persona_key}' 删除成功。"}, ensure_ascii=False) # 产品类 mcp 组件 @mcp.tool() def create_product(product_key: str, data: dict) -> str: """ 根据定义的Schema创建一个新产品。 必需字段: 'product_name'(str), 'description'(str), 'knowledge_base'(str). """ if product_key in app.products: return json.dumps({"error": f"产品 '{product_key}' 已存在,请使用 update_product 进行更新。"}) missing_fields = product_required_fields - set(data.keys()) if missing_fields: return json.dumps({"error": f"创建产品失败,缺少必需字段: {list(missing_fields)}"}) app.products[product_key] = data app.save_products() return json.dumps({"status": "success", "message": f"产品 '{product_key}' 创建成功。"}, ensure_ascii=False) @mcp.tool() def list_products() -> str: """列出所有已记录的产品。""" if not app.products: return json.dumps({"message": "目前尚未记录任何产品。"}, indent=2, ensure_ascii=False) product_list = [ {"product_key": key, "product_name": data.get("product_name", "N/A"), "description": data.get("description", "")} for key, data in app.products.items() ] return json.dumps(product_list, indent=2, ensure_ascii=False) @mcp.tool() def get_product(product_key: str) -> str: """获取指定产品的完整详细信息,包括其知识库。""" product_data = app.products.get(product_key) if not product_data: return json.dumps({"error": f"未找到产品 '{product_key}'。"}) return json.dumps(product_data, indent=2, ensure_ascii=False) @mcp.tool() def update_product(product_key: str, data: dict) -> str: """更新一个已存在产品的信息。只提供需要修改的字段。""" if product_key not in app.products: return json.dumps({"error": f"未找到产品 '{product_key}',无法更新。"}) for key, value in data.items(): if key not in product_schema: return json.dumps({"error": f"无效字段 '{key}'。允许的字段为: {list(product_schema.keys())}"}) if not isinstance(value, product_schema[key]): return json.dumps({"error": f"字段 '{key}' 的类型错误,应为 {product_schema[key].__name__}。"}) app.products[product_key].update(data) app.save_products() return json.dumps({"status": "success", "message": f"产品 '{product_key}' 更新成功。"}, ensure_ascii=False) @mcp.tool() def delete_product(product_key: str) -> str: """删除一个指定的产品记录及其知识库。""" if product_key not in app.products: return json.dumps({"error": f"未找到产品 '{product_key}',无法删除。"}) del app.products[product_key] app.save_products() return json.dumps({"status": "success", "message": f"产品 '{product_key}' 删除成功。"}, ensure_ascii=False) # 人物Agent与团队类 mcp 组件 @mcp.tool() def create_agent(agent_key: str, data: dict) -> str: """ 创建一个新的角色(Agent)。 必需字段: 'name'(str), 'role'(str), 'description'(str). 可选字段: 'tools'(list of str), 'instructions'(str). """ if agent_key in app.agents: return json.dumps({"error": f"角色 '{agent_key}' 已存在。"}) missing = agent_required_fields - set(data.keys()) if missing: return json.dumps({"error": f"缺少必需字段: {list(missing)}"}) app.agents[agent_key] = data app.save_agents() app.reinitialize_teams() # 重新构建团队以识别新角色 return json.dumps({"status": "success", "message": f"角色 '{agent_key}' 创建成功。团队已重新初始化。"}, ensure_ascii=False) @mcp.tool() def get_agent(agent_key: str) -> str: """获取指定角色的配置信息。""" agent_data = app.agents.get(agent_key) if not agent_data: return json.dumps({"error": f"未找到角色 '{agent_key}'。"}) return json.dumps(agent_data, indent=2, ensure_ascii=False) @mcp.tool() def list_agents() -> str: """列出所有已定义的角色(Agent)。""" if not app.agents: return json.dumps({"message": "尚未定义任何角色。"}, indent=2, ensure_ascii=False) return json.dumps([{"agent_key": k, "name": v.get("name"), "role": v.get("role")} for k, v in app.agents.items()], indent=2, ensure_ascii=False) @mcp.tool() def update_agent(agent_key: str, data: dict) -> str: """更新一个已存在角色的信息。""" if agent_key not in app.agents: return json.dumps({"error": f"未找到角色 '{agent_key}'。"}) app.agents[agent_key].update(data) app.save_agents() app.reinitialize_teams() # 角色更新可能影响团队 return json.dumps({"status": "success", "message": f"角色 '{agent_key}' 更新成功。团队已重新初始化。"}, ensure_ascii=False) @mcp.tool() def delete_agent(agent_key: str) -> str: """删除一个角色(Agent)。警告:这可能导致依赖此角色的团队失效。""" if agent_key not in app.agents: return json.dumps({"error": f"未找到角色 '{agent_key}'。"}) del app.agents[agent_key] app.save_agents() app.reinitialize_teams() # 角色删除必须重载团队 return json.dumps({"status": "success", "message": f"角色 '{agent_key}' 已删除。团队已重新初始化。"}, ensure_ascii=False) @mcp.tool() def create_team(team_key: str, data: dict) -> str: """ 创建一个新的团队。 必需字段: 'team_name'(str), 'members'(list of str, 成员key可以是agent或team). 可选字段: 'description'(str), 'instructions'(str), 'success_criteria'(str). """ if team_key in app.teams_config: return json.dumps({"error": f"团队 '{team_key}' 已存在。"}) missing = team_required_fields - set(data.keys()) if missing: return json.dumps({"error": f"缺少必需字段: {list(missing)}"}) app.teams_config[team_key] = data app.save_teams_config() if not app.reinitialize_teams(): # 如果重载失败(比如有循环依赖),则撤销操作 del app.teams_config[team_key] app.save_teams_config() return json.dumps({"error": "团队创建失败,可能是因为引入了循环依赖或无效成员。操作已回滚。"}) return json.dumps({"status": "success", "message": f"团队 '{team_key}' 创建成功并已激活。"}, ensure_ascii=False) @mcp.tool() def get_team_config(team_key: str) -> str: """获取指定团队的配置信息(非运行实例)。""" team_data = app.teams_config.get(team_key) if not team_data: return json.dumps({"error": f"未找到团队配置 '{team_key}'。"}) return json.dumps(team_data, indent=2, ensure_ascii=False) @mcp.tool() def list_teams() -> str: """列出所有已配置的团队。""" if not app.teams_config: return json.dumps({"message": "尚未配置任何团队。"}, indent=2, ensure_ascii=False) team_list = [] for key, config in app.teams_config.items(): is_active = key in app.teams # 检查团队实例是否成功创建 team_list.append({ "team_key": key, "team_name": config.get("team_name"), "status": "active" if is_active else "inactive (config error),请检查成员或环境参数设置,特别是exa_api_key参数不能缺少", }) return json.dumps(team_list, indent=2, ensure_ascii=False) @mcp.tool() def update_team(team_key: str, data: dict) -> str: """更新一个已存在团队的配置。""" if team_key not in app.teams_config: return json.dumps({"error": f"未找到团队配置 '{team_key}'。"}) original_config = app.teams_config[team_key].copy() app.teams_config[team_key].update(data) app.save_teams_config() if not app.reinitialize_teams(): app.teams_config[team_key] = original_config app.save_teams_config() app.reinitialize_teams() return json.dumps({"error": "团队更新失败,新配置可能导致错误。操作已回滚。"}) app.save_teams_config() return json.dumps({"status": "success", "message": f"团队 '{team_key}' 更新成功并已重新激活。"}, ensure_ascii=False) @mcp.tool() def delete_team(team_key: str) -> str: """删除一个团队的配置。""" if team_key not in app.teams_config: return json.dumps({"error": f"未找到团队配置 '{team_key}'。"}) del app.teams_config[team_key] app.save_teams_config() app.reinitialize_teams() # 团队删除后必须重载 return json.dumps({"status": "success", "message": f"团队 '{team_key}' 已删除。"}, ensure_ascii=False) # 团队讨论类 mcp 组件 @mcp.tool() async def run_ai_team(team_name: str, prompt: str, session_id: Optional[str] = None, persona_key: Optional[str] = None, product_key: Optional[str] = None) -> str: """ 运行指定的AI团队开展业务讨论。请注意这个环节需要耗费较长的时间,请告知用户耐心等待 :param team_name: 要运行的团队的key。 :param prompt: 本次任务的核心指令。 :param session_id: (可选) 关联的会话ID,用于加载历史记录和保存结果。 :param persona_key: (可选) 本次任务中AI需要扮演的人物的key。 :param product_key: (可选) 本次任务关联的产品的key,用于加载特定知识库。 """ if not AGNO_AVAILABLE or not app.factory: return json.dumps({"error": "程序未正常安装运行库,AI 无法使用."}) team = app.get_running_team(team_name) # MODIFIED: 使用新的getter if not team: return json.dumps({"error": f"团队 ‘{team_name}’ 不可用或初始化失败。请检查团队配置和依赖关系。"}) context_parts = [] session_data = None if session_id: session_data = app.session_manager.load_session(session_id) if session_data: history_str = "\n".join([f"Previous {entry['role']}: {entry['content']}" for entry in session_data.get("history", [])]) if history_str: context_parts.append(f"## Session History\n{history_str}") else: logger.warning(f"无法加载会话 ‘{session_id}’。将继续操作,但不会保留历史记录。") if app.company_knowledge_base: context_parts.append(f"## Company-Wide Knowledge Base\n{app.company_knowledge_base}") if product_key: # MODIFIED: 使用product_key product_data = app.products.get(product_key) if product_data: context_parts.append(f"## Knowledge Base for Product: '{product_data.get('product_name')}'\n{product_data.get('knowledge_base','')}") if persona_key: # MODIFIED: 使用persona_key persona_data = app.personas.get(persona_key) if persona_data: profile_text = (f"Role: {persona_data.get('role')}\n" f"Goals: {', '.join(persona_data.get('goals',[]))}\n" f"Background: {persona_data.get('background', 'N/A')}") context_parts.append(f"## Persona Profile to Embody: {persona_data.get('name', persona_key)}\n{profile_text}") context_header = "--- CONTEXTUAL BACKGROUND ---\n" + "\n\n".join(context_parts) if context_parts else "" final_prompt = f"{context_header}\n\n--- YOUR CURRENT TASK ---\n{prompt}".strip() try: logger.info(f"Running team '{team_name}' for session '{session_id or 'None'}'...") result = await team.arun(final_prompt) content = result.content if hasattr(result, 'content') else str(result) response_data = {"status": "completed", "team_name": team_name, "result": content} if session_data: session_data["history"].append({"role": "user", "content": prompt}) session_data["history"].append({"role": "assistant", "content": content}) app.session_manager.save_session(session_id, session_data) return json.dumps(response_data, indent=2, ensure_ascii=False) except Exception as e: logger.exception(f"团队 '{team_name}' 执行错误: {e}") return json.dumps({"error": f"发生了一个意外错误: {str(e)}"}) # ============================================== # 5. Application Lifespan and Startup # ============================================== @asynccontextmanager async def app_lifespan(mcp_instance: FastMCP) -> AsyncIterator[None]: """管理应用程序的启动和关闭流程.""" logger.info("服务器正在运行。应用程序上下文已就绪。") yield logger.info("服务器正在关闭.") def main(): mcp.lifespan = app_lifespan logger.info("Unified MCP Server starting...") mcp.run(transport="stdio") if __name__ == "__main__": try: main() except Exception as e: logger.critical(f"服务器执行过程中发生致命错误: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SimonUTD/ai-cognitive-nexus-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server