agent.py•5.12 kB
"""Xiaohongshu ADK Agent implementation."""
import os
import asyncio
from typing import Optional
# ADK imports
from google.generativeai import types
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
# Local imports
from mcp_client.config import MCPConfig, DEFAULT_CONFIG
class XhsAgent:
"""小红书 ADK Agent 实现"""
def __init__(self, config: Optional[MCPConfig] = None):
"""初始化 XHS Agent
Args:
config: MCP 客户端配置,默认使用 DEFAULT_CONFIG
"""
self.config = config or DEFAULT_CONFIG
self._agent: Optional[LlmAgent] = None
self._exit_stack = None
self._session_service = InMemorySessionService()
# 会话信息
self._session = self._session_service.create_session(
state={},
app_name="xhs_agent",
user_id="xhs_user"
)
# 创建 Runner
self._runner = None # 将在 _init_agent 中设置
async def _init_agent(self) -> None:
"""初始化 ADK Agent 并连接到 MCP 服务器"""
if self._agent is not None:
return
print("正在连接到小红书 MCP 服务器...")
try:
# 获取 MCP 工具集
tools, exit_stack = await MCPToolset.from_server(
connection_params=StdioServerParameters(
command=self.config.server_command,
args=[self.config.server_script],
env=self.config.env_vars
)
)
print(f"已获取 {len(tools)} 个 MCP 工具")
# 创建 LLM Agent
self._agent = LlmAgent(
model="gemini-2.0-flash", # 使用您有权限的模型
name="xhs_assistant",
instruction="""
你是一个小红书助手,可以帮助用户:
1. 获取笔记内容
2. 搜索笔记和用户
3. 获取用户信息和笔记列表
4. 浏览推荐流
使用可用的工具来完成这些任务。
保持友好和专业的态度。
""",
tools=tools
)
# 保存清理上下文
self._exit_stack = exit_stack
# 创建 Runner
self._runner = Runner(
app_name="xhs_agent",
agent=self._agent,
session_service=self._session_service
)
print("Agent 初始化完成")
except Exception as e:
print(f"初始化失败: {e}")
# 确保清理资源
if self._exit_stack:
await self._exit_stack.aclose()
raise
async def chat(self, message: str) -> list[str]:
"""与 Agent 对话
Args:
message: 用户消息
Returns:
list[str]: Agent 的响应消息列表
"""
# 确保 Agent 已初始化
await self._init_agent()
# 创建消息内容
content = types.Content(
role="user",
parts=[types.Part(text=message)]
)
responses = []
# 运行 Agent 并收集响应
events = self._runner.run_async(
session_id=self._session.id,
user_id=self._session.user_id,
new_message=content
)
async for event in events:
# 处理文本响应
if event.content and event.content.role == "model":
for part in event.content.parts:
if part.text:
responses.append(part.text)
return responses
async def close(self) -> None:
"""关闭 Agent 和清理资源"""
if self._exit_stack:
await self._exit_stack.aclose()
self._exit_stack = None
self._agent = None
async def __aenter__(self):
"""异步上下文管理器入口"""
await self._init_agent()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.close()
# --- 示例用法 ---
async def main():
# 创建 Agent
async with XhsAgent() as agent:
# 测试对话
query = "搜索关于 Python 编程的笔记"
print(f"\n用户: {query}")
responses = await agent.chat(query)
for response in responses:
print(f"\n助手: {response}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"\n发生错误: {e}")