Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
data_encryption.py10.8 kB
#!/usr/bin/env python3 """ MemOS数据加密工具 为个人场景提供简单的数据加密功能 """ import os import json import base64 import hashlib from pathlib import Path from typing import Dict, Any, Optional from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC class MemOSDataEncryption: """MemOS数据加密管理器""" def __init__(self, data_dir: str = "./memos_data"): self.data_dir = Path(data_dir) self.key_file = self.data_dir / ".encryption_key" self.salt_file = self.data_dir / ".encryption_salt" self.encrypted_marker = self.data_dir / ".encrypted" def _generate_key_from_password(self, password: str, salt: bytes = None) -> tuple[bytes, bytes]: """从密码生成加密密钥""" if salt is None: salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password.encode())) return key, salt def _save_salt(self, salt: bytes): """保存盐值""" with open(self.salt_file, 'wb') as f: f.write(salt) os.chmod(self.salt_file, 0o600) def _load_salt(self) -> bytes: """加载盐值""" if not self.salt_file.exists(): raise FileNotFoundError("加密盐值文件不存在") with open(self.salt_file, 'rb') as f: return f.read() def _save_key(self, key: bytes): """保存加密密钥(仅用于测试,生产环境不应保存密钥)""" with open(self.key_file, 'wb') as f: f.write(key) os.chmod(self.key_file, 0o600) def _load_key(self) -> bytes: """加载加密密钥""" if not self.key_file.exists(): raise FileNotFoundError("加密密钥文件不存在") with open(self.key_file, 'rb') as f: return f.read() def setup_encryption(self, password: str) -> bool: """设置数据加密""" try: print("🔐 设置数据加密...") # 生成密钥和盐值 key, salt = self._generate_key_from_password(password) # 保存盐值和密钥 self._save_salt(salt) self._save_key(key) # 创建加密标记文件 with open(self.encrypted_marker, 'w') as f: json.dump({ "encrypted": True, "created_at": str(Path().cwd()), "algorithm": "Fernet (AES 128)" }, f) print("✅ 数据加密设置完成") print("⚠️ 请妥善保管密码,丢失密码将无法恢复数据") return True except Exception as e: print(f"❌ 设置加密失败: {e}") return False def is_encrypted(self) -> bool: """检查数据是否已加密""" return self.encrypted_marker.exists() def encrypt_file(self, file_path: str, password: str = None) -> bool: """加密单个文件""" try: file_path = Path(file_path) if not file_path.exists(): print(f"❌ 文件不存在: {file_path}") return False # 获取密钥 if password: salt = self._load_salt() key, _ = self._generate_key_from_password(password, salt) else: key = self._load_key() # 创建Fernet实例 fernet = Fernet(key) # 读取原文件 with open(file_path, 'rb') as f: data = f.read() # 加密数据 encrypted_data = fernet.encrypt(data) # 写入加密文件 encrypted_file = file_path.with_suffix(file_path.suffix + '.enc') with open(encrypted_file, 'wb') as f: f.write(encrypted_data) print(f"✅ 文件加密完成: {encrypted_file}") return True except Exception as e: print(f"❌ 文件加密失败: {e}") return False def decrypt_file(self, encrypted_file_path: str, password: str = None) -> bool: """解密单个文件""" try: encrypted_file_path = Path(encrypted_file_path) if not encrypted_file_path.exists(): print(f"❌ 加密文件不存在: {encrypted_file_path}") return False # 获取密钥 if password: salt = self._load_salt() key, _ = self._generate_key_from_password(password, salt) else: key = self._load_key() # 创建Fernet实例 fernet = Fernet(key) # 读取加密文件 with open(encrypted_file_path, 'rb') as f: encrypted_data = f.read() # 解密数据 decrypted_data = fernet.decrypt(encrypted_data) # 确定输出文件名 if encrypted_file_path.suffix == '.enc': output_file = encrypted_file_path.with_suffix('') else: output_file = encrypted_file_path.with_suffix('.dec') # 写入解密文件 with open(output_file, 'wb') as f: f.write(decrypted_data) print(f"✅ 文件解密完成: {output_file}") return True except Exception as e: print(f"❌ 文件解密失败: {e}") return False def encrypt_backup(self, backup_file: str, password: str) -> bool: """加密备份文件""" print(f"🔐 加密备份文件: {backup_file}") return self.encrypt_file(backup_file, password) def decrypt_backup(self, encrypted_backup: str, password: str) -> bool: """解密备份文件""" print(f"🔓 解密备份文件: {encrypted_backup}") return self.decrypt_file(encrypted_backup, password) def get_encryption_status(self) -> Dict[str, Any]: """获取加密状态""" status = { "encrypted": self.is_encrypted(), "key_file_exists": self.key_file.exists(), "salt_file_exists": self.salt_file.exists(), } if self.is_encrypted(): try: with open(self.encrypted_marker, 'r') as f: marker_data = json.load(f) status.update(marker_data) except: pass return status def remove_encryption(self, password: str) -> bool: """移除加密设置""" try: # 验证密码 salt = self._load_salt() key, _ = self._generate_key_from_password(password, salt) # 尝试用密钥创建Fernet实例(验证密码正确性) Fernet(key) # 删除加密相关文件 if self.key_file.exists(): self.key_file.unlink() if self.salt_file.exists(): self.salt_file.unlink() if self.encrypted_marker.exists(): self.encrypted_marker.unlink() print("✅ 加密设置已移除") print("⚠️ 现有加密文件仍需手动解密") return True except Exception as e: print(f"❌ 移除加密设置失败: {e}") return False def main(): """命令行接口""" import argparse parser = argparse.ArgumentParser(description="MemOS数据加密工具") parser.add_argument("command", choices=[ "setup", "status", "encrypt", "decrypt", "encrypt-backup", "decrypt-backup", "remove" ], help="操作命令") parser.add_argument("--file", help="要加密/解密的文件路径") parser.add_argument("--password", help="加密密码") parser.add_argument("--data-dir", default="./memos_data", help="数据目录") args = parser.parse_args() encryptor = MemOSDataEncryption(args.data_dir) if args.command == "setup": if not args.password: import getpass password = getpass.getpass("请输入加密密码: ") else: password = args.password encryptor.setup_encryption(password) elif args.command == "status": status = encryptor.get_encryption_status() print("🔒 加密状态:") for key, value in status.items(): print(f" {key}: {value}") elif args.command == "encrypt": if not args.file: print("❌ 请指定要加密的文件") return password = args.password if not password: import getpass password = getpass.getpass("请输入加密密码: ") encryptor.encrypt_file(args.file, password) elif args.command == "decrypt": if not args.file: print("❌ 请指定要解密的文件") return password = args.password if not password: import getpass password = getpass.getpass("请输入解密密码: ") encryptor.decrypt_file(args.file, password) elif args.command == "encrypt-backup": if not args.file: print("❌ 请指定要加密的备份文件") return password = args.password if not password: import getpass password = getpass.getpass("请输入加密密码: ") encryptor.encrypt_backup(args.file, password) elif args.command == "decrypt-backup": if not args.file: print("❌ 请指定要解密的备份文件") return password = args.password if not password: import getpass password = getpass.getpass("请输入解密密码: ") encryptor.decrypt_backup(args.file, password) elif args.command == "remove": password = args.password if not password: import getpass password = getpass.getpass("请输入当前密码以确认移除: ") encryptor.remove_encryption(password) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server