Skip to main content
Glama
agent.py5.12 kB
"""Xiaohongshu ADK Agent implementation.""" import os import asyncio from typing import Optional # ADK imports from google.generativeai import types from google.adk.agents.llm_agent import LlmAgent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters # Local imports from mcp_client.config import MCPConfig, DEFAULT_CONFIG class XhsAgent: """小红书 ADK Agent 实现""" def __init__(self, config: Optional[MCPConfig] = None): """初始化 XHS Agent Args: config: MCP 客户端配置,默认使用 DEFAULT_CONFIG """ self.config = config or DEFAULT_CONFIG self._agent: Optional[LlmAgent] = None self._exit_stack = None self._session_service = InMemorySessionService() # 会话信息 self._session = self._session_service.create_session( state={}, app_name="xhs_agent", user_id="xhs_user" ) # 创建 Runner self._runner = None # 将在 _init_agent 中设置 async def _init_agent(self) -> None: """初始化 ADK Agent 并连接到 MCP 服务器""" if self._agent is not None: return print("正在连接到小红书 MCP 服务器...") try: # 获取 MCP 工具集 tools, exit_stack = await MCPToolset.from_server( connection_params=StdioServerParameters( command=self.config.server_command, args=[self.config.server_script], env=self.config.env_vars ) ) print(f"已获取 {len(tools)} 个 MCP 工具") # 创建 LLM Agent self._agent = LlmAgent( model="gemini-2.0-flash", # 使用您有权限的模型 name="xhs_assistant", instruction=""" 你是一个小红书助手,可以帮助用户: 1. 获取笔记内容 2. 搜索笔记和用户 3. 获取用户信息和笔记列表 4. 浏览推荐流 使用可用的工具来完成这些任务。 保持友好和专业的态度。 """, tools=tools ) # 保存清理上下文 self._exit_stack = exit_stack # 创建 Runner self._runner = Runner( app_name="xhs_agent", agent=self._agent, session_service=self._session_service ) print("Agent 初始化完成") except Exception as e: print(f"初始化失败: {e}") # 确保清理资源 if self._exit_stack: await self._exit_stack.aclose() raise async def chat(self, message: str) -> list[str]: """与 Agent 对话 Args: message: 用户消息 Returns: list[str]: Agent 的响应消息列表 """ # 确保 Agent 已初始化 await self._init_agent() # 创建消息内容 content = types.Content( role="user", parts=[types.Part(text=message)] ) responses = [] # 运行 Agent 并收集响应 events = self._runner.run_async( session_id=self._session.id, user_id=self._session.user_id, new_message=content ) async for event in events: # 处理文本响应 if event.content and event.content.role == "model": for part in event.content.parts: if part.text: responses.append(part.text) return responses async def close(self) -> None: """关闭 Agent 和清理资源""" if self._exit_stack: await self._exit_stack.aclose() self._exit_stack = None self._agent = None async def __aenter__(self): """异步上下文管理器入口""" await self._init_agent() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close() # --- 示例用法 --- async def main(): # 创建 Agent async with XhsAgent() as agent: # 测试对话 query = "搜索关于 Python 编程的笔记" print(f"\n用户: {query}") responses = await agent.chat(query) for response in responses: print(f"\n助手: {response}") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"\n发生错误: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/proerror77/xhs_mcp_agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server