Skip to main content
Glama

SQLite MCP Server

by Rikxed
multi_agent_concurrency_example.py10.2 kB
""" 多Agent并发控制示例 - 展示如何避免脏读和脏写 """ import threading import time import random import subprocess import sys import json from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path # 模拟多个MCP Agent的并发访问 class MockMCPServer: """模拟MCP服务器""" def __init__(self, agent_id: str, db_path: str): self.agent_id = agent_id self.db_path = db_path self.lock_file = f"{db_path}.{agent_id}.lock" def execute_query(self, query: str, params: tuple = ()) -> list: """执行查询(模拟)""" # 在实际实现中,这里会调用数据库管理器 print(f"Agent {self.agent_id}: 执行查询 - {query}") time.sleep(random.uniform(0.1, 0.3)) return [{"id": 1, "name": "测试用户", "version": random.randint(1, 10)}] def execute_update(self, query: str, params: tuple = ()) -> int: """执行更新(模拟)""" print(f"Agent {self.agent_id}: 执行更新 - {query}") time.sleep(random.uniform(0.2, 0.5)) return 1 def demonstrate_dirty_read_problem(): """演示脏读问题""" print("=== 脏读问题演示 ===") # 模拟两个Agent同时访问同一数据 agent1 = MockMCPServer("Agent-1", "data/test.db") agent2 = MockMCPServer("Agent-2", "data/test.db") def agent1_operation(): """Agent1的操作:读取用户数据""" print("Agent1: 开始读取用户数据...") result = agent1.execute_query("SELECT * FROM users WHERE id = 1") print(f"Agent1: 读取结果 - {result}") return result def agent2_operation(): """Agent2的操作:修改用户数据""" print("Agent2: 开始修改用户数据...") time.sleep(0.1) # 模拟Agent2稍后开始 result = agent2.execute_update("UPDATE users SET name = '新名字' WHERE id = 1") print(f"Agent2: 修改完成,影响行数 - {result}") return result # 并发执行 with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(agent1_operation) future2 = executor.submit(agent2_operation) result1 = future1.result() result2 = future2.result() print("问题:Agent1可能读取到Agent2正在修改的数据(脏读)") def demonstrate_dirty_write_problem(): """演示脏写问题""" print("\n=== 脏写问题演示 ===") agent1 = MockMCPServer("Agent-1", "data/test.db") agent2 = MockMCPServer("Agent-2", "data/test.db") def agent1_write(): """Agent1的写操作""" print("Agent1: 开始写入数据...") time.sleep(0.1) result = agent1.execute_update("UPDATE users SET balance = balance + 100 WHERE id = 1") print("Agent1: 写入完成") return result def agent2_write(): """Agent2的写操作""" print("Agent2: 开始写入数据...") time.sleep(0.1) result = agent2.execute_update("UPDATE users SET balance = balance + 200 WHERE id = 1") print("Agent2: 写入完成") return result # 并发执行 with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(agent1_write) future2 = executor.submit(agent2_write) future1.result() future2.result() print("问题:两个Agent同时修改同一数据,可能导致数据不一致(脏写)") def demonstrate_solution_with_locks(): """演示使用锁的解决方案""" print("\n=== 使用锁的解决方案 ===") # 模拟文件锁机制 lock_file = "data/test.db.lock" def acquire_lock(agent_id: str, timeout: int = 5) -> bool: """获取锁""" start_time = time.time() while time.time() - start_time < timeout: try: with open(lock_file, 'w') as f: f.write(agent_id) return True except: time.sleep(0.1) return False def release_lock(): """释放锁""" try: Path(lock_file).unlink(missing_ok=True) except: pass def agent1_safe_operation(): """Agent1的安全操作""" print("Agent1: 尝试获取锁...") if acquire_lock("Agent-1"): try: print("Agent1: 获取锁成功,执行操作...") time.sleep(1) # 模拟操作时间 print("Agent1: 操作完成") finally: release_lock() print("Agent1: 释放锁") else: print("Agent1: 获取锁失败") def agent2_safe_operation(): """Agent2的安全操作""" print("Agent2: 尝试获取锁...") if acquire_lock("Agent-2"): try: print("Agent2: 获取锁成功,执行操作...") time.sleep(1) # 模拟操作时间 print("Agent2: 操作完成") finally: release_lock() print("Agent2: 释放锁") else: print("Agent2: 获取锁失败") # 并发执行 with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(agent1_safe_operation) future2 = executor.submit(agent2_safe_operation) future1.result() future2.result() print("解决方案:使用文件锁确保同一时间只有一个Agent可以修改数据") def demonstrate_optimistic_locking(): """演示乐观锁解决方案""" print("\n=== 乐观锁解决方案 ===") class OptimisticLockManager: def __init__(self): self.data = {"id": 1, "name": "用户", "version": 1} self.lock = threading.Lock() def read_data(self, agent_id: str): """读取数据""" with self.lock: print(f"{agent_id}: 读取数据 - {self.data}") return self.data.copy() def update_data(self, agent_id: str, new_name: str, expected_version: int): """更新数据(乐观锁)""" with self.lock: if self.data["version"] != expected_version: print(f"{agent_id}: 版本冲突!期望版本 {expected_version},实际版本 {self.data['version']}") return False self.data["name"] = new_name self.data["version"] += 1 print(f"{agent_id}: 更新成功 - {self.data}") return True manager = OptimisticLockManager() def agent1_optimistic_operation(): """Agent1的乐观锁操作""" print("Agent1: 开始乐观锁操作...") # 读取数据 data = manager.read_data("Agent1") time.sleep(0.5) # 模拟处理时间 # 尝试更新 success = manager.update_data("Agent1", "Agent1修改的名字", data["version"]) if success: print("Agent1: 更新成功") else: print("Agent1: 更新失败,需要重试") def agent2_optimistic_operation(): """Agent2的乐观锁操作""" print("Agent2: 开始乐观锁操作...") # 读取数据 data = manager.read_data("Agent2") time.sleep(0.3) # 模拟处理时间 # 尝试更新 success = manager.update_data("Agent2", "Agent2修改的名字", data["version"]) if success: print("Agent2: 更新成功") else: print("Agent2: 更新失败,需要重试") # 并发执行 with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(agent1_optimistic_operation) future2 = executor.submit(agent2_optimistic_operation) future1.result() future2.result() print("解决方案:使用乐观锁,只有在版本匹配时才允许更新") def demonstrate_transaction_isolation(): """演示事务隔离级别""" print("\n=== 事务隔离级别演示 ===") isolation_levels = { "read_uncommitted": "读未提交 - 可能发生脏读", "read_committed": "读已提交 - 避免脏读", "serializable": "串行化 - 最高隔离级别" } for level, description in isolation_levels.items(): print(f"\n{level}: {description}") def simulate_transaction(agent_id: str, isolation: str): print(f"{agent_id}: 开始事务 ({isolation})...") if isolation == "serializable": print(f"{agent_id}: 获取排他锁") time.sleep(0.5) print(f"{agent_id}: 执行操作") time.sleep(0.3) print(f"{agent_id}: 提交事务,释放锁") else: print(f"{agent_id}: 执行操作") time.sleep(0.3) print(f"{agent_id}: 提交事务") with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(simulate_transaction, "Agent1", level) future2 = executor.submit(simulate_transaction, "Agent2", level) future1.result() future2.result() def main(): """主函数""" print("多Agent并发控制演示") print("=" * 50) # 清理之前的锁文件 Path("data/test.db.lock").unlink(missing_ok=True) try: # 1. 演示问题 demonstrate_dirty_read_problem() demonstrate_dirty_write_problem() # 2. 演示解决方案 demonstrate_solution_with_locks() demonstrate_optimistic_locking() demonstrate_transaction_isolation() print("\n" + "=" * 50) print("演示完成!") except Exception as e: print(f"演示过程中出现错误: {e}") finally: # 清理锁文件 Path("data/test.db.lock").unlink(missing_ok=True) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Rikxed/sqlite-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server