usage_examples.py•15.7 kB
#!/usr/bin/env python3
"""
MemOS 使用示例
演示如何使用MemOS进行记忆管理和AI对话
"""
import os
import json
import time
import tempfile
from pathlib import Path
from typing import Dict, Any
from datetime import datetime
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
def load_env_file():
"""加载环境变量"""
env_file = Path(".env")
if env_file.exists():
with open(env_file) as f:
for line in f:
if line.strip() and not line.startswith('#'):
key, value = line.strip().split('=', 1)
os.environ[key] = value
class SimpleMemOS:
"""简化的MemOS实现,演示核心功能"""
def __init__(self, data_dir="./memos_data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
# 读取统一配置文件 - 按专家建议修复并发问题
config_path = os.environ.get('MEMOS_CONFIG', str(self.data_dir / "concurrent_config.json"))
qdrant_config = self._load_qdrant_config(config_path)
# 初始化向量数据库 - 强制使用服务器模式
qdrant_url = f"http://{qdrant_config['host']}:{qdrant_config['port']}"
print(f"[USAGE_EXAMPLES] 连接Qdrant: {qdrant_url}")
self.vector_db = QdrantClient(url=qdrant_url)
self.collection_name = "memories"
self._init_collection()
# 初始化LLM客户端
self.llm_client = OpenAI(
api_key=os.getenv("SILICONFLOW_API_KEY"),
base_url=os.getenv("SILICONFLOW_BASE_URL")
)
# 记忆计数器
self.memory_counter = self._load_counter()
def _load_qdrant_config(self, config_path: str) -> Dict[str, Any]:
"""读取Qdrant配置 - 按专家建议统一配置管理"""
try:
if Path(config_path).exists():
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
if config.get("qdrant_mode") == "server":
return {
"host": "localhost",
"port": config.get("qdrant_port", 6335)
}
else:
print(f"[WARNING] 配置文件中qdrant_mode不是server: {config.get('qdrant_mode')}")
else:
print(f"[WARNING] 配置文件不存在: {config_path}")
except Exception as e:
print(f"[ERROR] 读取配置文件失败: {e}")
# 回退到环境变量或默认值
qdrant_url = os.environ.get('QDRANT_URL', 'http://localhost:6335')
if qdrant_url.startswith('http://'):
parts = qdrant_url.replace('http://', '').split(':')
return {"host": parts[0], "port": int(parts[1]) if len(parts) > 1 else 6335}
else:
return {"host": "localhost", "port": 6335}
def _init_collection(self):
"""初始化向量集合 - 按专家建议增强版"""
max_retries = 3
for attempt in range(max_retries):
try:
# 检查集合是否存在
collections = self.vector_db.get_collections()
collection_exists = any(c.name == self.collection_name for c in collections.collections)
if not collection_exists:
print(f"🔧 创建向量集合: {self.collection_name} (尝试 {attempt+1}/{max_retries})")
self.vector_db.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
print(f"✅ 向量集合创建成功: {self.collection_name}")
else:
print(f"✅ 向量集合已存在: {self.collection_name}")
# 验证集合确实可用
collections = self.vector_db.get_collections()
if any(c.name == self.collection_name for c in collections.collections):
return True
else:
raise RuntimeError(f"集合创建后仍不可见: {self.collection_name}")
except Exception as e:
print(f"⚠️ 集合初始化失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt == max_retries - 1:
print(f"❌ 集合初始化最终失败: {e}")
raise RuntimeError(f"集合初始化失败: {e}")
time.sleep(1) # 等待1秒后重试
return False
def _ensure_collection_exists(self):
"""确保集合存在 - 专家建议的延迟初始化"""
try:
collections = self.vector_db.get_collections()
collection_exists = any(c.name == self.collection_name for c in collections.collections)
if not collection_exists:
print(f"🔧 动态创建向量集合: {self.collection_name}")
self.vector_db.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
print(f"✅ 动态创建成功: {self.collection_name}")
return True
except Exception as e:
print(f"❌ 集合检查失败: {e}")
return False
def _load_counter(self):
"""加载记忆计数器"""
counter_file = self.data_dir / "counter.txt"
if counter_file.exists():
return int(counter_file.read_text().strip())
return 0
def _save_counter(self):
"""保存记忆计数器"""
counter_file = self.data_dir / "counter.txt"
counter_file.write_text(str(self.memory_counter))
def _get_embedding(self, text):
"""获取文本的向量表示(简化版)"""
# 这里使用简单的哈希向量作为示例
# 实际使用中应该调用embedding API
import hashlib
hash_obj = hashlib.md5(text.encode())
hash_hex = hash_obj.hexdigest()
# 将哈希转换为384维向量
vector = []
for i in range(0, len(hash_hex), 2):
byte_val = int(hash_hex[i:i+2], 16)
vector.append((byte_val - 128) / 128.0) # 归一化到[-1, 1]
# 填充到384维
while len(vector) < 384:
vector.extend(vector[:min(384-len(vector), len(vector))])
return vector[:384]
def add_memory(self, content, tags=None, metadata=None):
"""添加记忆 - 按专家建议增强版"""
# 专家建议:在每次操作前检查集合是否存在
if not self._ensure_collection_exists():
print("❌ 集合不可用,无法添加记忆")
return None
self.memory_counter += 1
memory_id = self.memory_counter
# 获取向量表示
vector = self._get_embedding(content)
# 准备元数据
payload = {
"content": content,
"tags": tags or [],
"metadata": metadata or {},
"timestamp": str(Path().stat().st_mtime if Path().exists() else 0)
}
# 存储到向量数据库
try:
point = PointStruct(
id=memory_id,
vector=vector,
payload=payload
)
self.vector_db.upsert(
collection_name=self.collection_name,
points=[point]
)
self._save_counter()
print(f"✅ 添加记忆 #{memory_id}: {content[:50]}...")
return memory_id
except Exception as e:
print(f"❌ 添加记忆失败: {e}")
return None
def search_memories(self, query, limit=5):
"""搜索记忆 - 按专家建议增强版"""
# 专家建议:在每次操作前检查集合是否存在
if not self._ensure_collection_exists():
print("❌ 集合不可用,无法搜索记忆")
return []
try:
query_vector = self._get_embedding(query)
# 使用新的query_points方法
search_result = self.vector_db.search(
collection_name=self.collection_name,
query_vector=query_vector, # ✅ 修复:使用正确的参数名
limit=limit
)
memories = []
for point in search_result:
memories.append({
"id": point.id,
"content": point.payload["content"],
"tags": point.payload.get("tags", []),
"score": point.score,
"metadata": point.payload.get("metadata", {})
})
return memories
except Exception as e:
print(f"❌ 搜索记忆失败: {e}")
return []
def chat_with_memory(self, user_input, use_memory=True):
"""基于记忆的AI对话"""
messages = []
# 如果启用记忆,搜索相关记忆
if use_memory:
relevant_memories = self.search_memories(user_input, limit=3)
if relevant_memories:
memory_context = "相关记忆:\n"
for i, memory in enumerate(relevant_memories, 1):
memory_context += f"{i}. {memory['content']} (相关度: {memory['score']:.3f})\n"
# 将记忆作为用户消息的一部分,不使用system角色
user_input = f"{memory_context}\n\n用户问题: {user_input}"
messages.append({"role": "user", "content": user_input})
try:
response = self.llm_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3",
messages=messages,
max_tokens=500,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
print(f"❌ AI对话失败: {e}")
return "抱歉,AI服务暂时不可用。"
def get_all_memories(self):
"""获取所有记忆"""
try:
# 获取所有点
result = self.vector_db.scroll(
collection_name=self.collection_name,
limit=1000 # 限制返回数量
)
memories = []
for point in result[0]: # result是(points, next_page_offset)的元组
memories.append({
"id": point.id,
"content": point.payload["content"],
"tags": point.payload.get("tags", []),
"metadata": point.payload.get("metadata", {})
})
return memories
except Exception as e:
print(f"❌ 获取记忆失败: {e}")
return []
def demo_basic_usage():
"""演示基础使用"""
print("🚀 MemOS 基础使用演示")
print("=" * 50)
# 初始化MemOS
memos = SimpleMemOS()
# 添加一些示例记忆
print("\n📝 添加记忆...")
memos.add_memory("我喜欢喝咖啡,特别是拿铁", tags=["个人偏好", "饮品"])
memos.add_memory("今天学习了Python编程,重点是面向对象", tags=["学习", "编程"])
memos.add_memory("明天要开会讨论项目进度", tags=["工作", "会议"])
memos.add_memory("MemOS是一个很棒的记忆管理系统", tags=["技术", "工具"])
# 搜索记忆
print("\n🔍 搜索记忆...")
query = "编程"
memories = memos.search_memories(query)
print(f"搜索 '{query}' 的结果:")
for memory in memories:
print(f" - {memory['content']} (相关度: {memory['score']:.3f})")
# AI对话(不使用记忆)
print("\n🤖 普通AI对话...")
response = memos.chat_with_memory("你好,介绍一下自己", use_memory=False)
print(f"AI: {response}")
# AI对话(使用记忆)
print("\n🧠 基于记忆的AI对话...")
response = memos.chat_with_memory("我喜欢什么饮品?", use_memory=True)
print(f"AI: {response}")
# 显示所有记忆
print("\n📚 所有记忆:")
all_memories = memos.get_all_memories()
for memory in all_memories:
tags_str = ", ".join(memory['tags']) if memory['tags'] else "无标签"
print(f" #{memory['id']}: {memory['content']} [{tags_str}]")
def demo_interactive_chat():
"""演示交互式对话(增强版:支持智能提取)"""
print("\n🎯 交互式记忆对话演示")
print("=" * 50)
print("输入 'quit' 退出,输入 'add:内容' 添加记忆")
print("输入 'extract' 手动触发记忆提取")
memos = SimpleMemOS()
# 对话历史记录
conversation_history = []
while True:
user_input = input("\n你: ").strip()
if user_input.lower() == 'quit':
# 对话结束时自动提取记忆
print("\n🤖 对话结束,正在智能提取重要记忆...")
extracted_count = extract_memories_from_conversation(conversation_history)
if extracted_count > 0:
print(f"✅ 成功提取并保存了 {extracted_count} 条重要记忆")
else:
print("💡 本次对话未发现需要保存的重要信息")
break
if user_input.startswith('add:'):
content = user_input[4:].strip()
memory_id = memos.add_memory(content)
print(f"✅ 已添加记忆 #{memory_id}")
continue
if user_input.lower() == 'extract':
# 手动触发提取
print("\n🤖 手动触发记忆提取...")
extracted_count = extract_memories_from_conversation(conversation_history)
if extracted_count > 0:
print(f"✅ 成功提取并保存了 {extracted_count} 条重要记忆")
else:
print("💡 当前对话未发现需要保存的重要信息")
continue
# 记录用户输入
conversation_history.append({
"role": "user",
"content": user_input,
"timestamp": datetime.now().isoformat()
})
# AI对话
response = memos.chat_with_memory(user_input, use_memory=True)
print(f"AI: {response}")
# 记录AI回复
conversation_history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.now().isoformat()
})
def extract_memories_from_conversation(conversation_history):
"""从对话历史中提取重要记忆"""
if not conversation_history:
return 0
try:
# 导入MVP记忆管理器
from mvp_memory import MVPMemoryManager
# 初始化记忆管理器
mvp_manager = MVPMemoryManager()
# 使用MVP管理器的extract功能
extracted_memories = mvp_manager.extract_from_messages(conversation_history)
return len(extracted_memories)
except Exception as e:
print(f"❌ 记忆提取失败: {e}")
return 0
if __name__ == "__main__":
# 加载环境变量
load_env_file()
# 运行演示
demo_basic_usage()
# 可选:运行交互式演示
# demo_interactive_chat()