Skip to main content
Glama

Qdrant Sync MCP Server

by No-Smoke
server.py20 kB
#!/usr/bin/env python3 """ Qdrant Sync MCP Server A dedicated MCP server for administering the Qdrant sync system between local workstation (localhost:6335) and VPS (74.50.49.35:6333). Tools: - qdrant_sync_status: Check connectivity and collection counts on both instances - qdrant_sync_all: Full sync of all collections (local → VPS) - qdrant_sync_collection: Sync a single collection - qdrant_sync_dry_run: Preview what would be synced - qdrant_sync_logs: View recent sync logs - qdrant_compare_collections: Compare collection stats between local and VPS Author: Vanya + Claude Created: November 23, 2025 """ import asyncio import json import os import subprocess import glob from datetime import datetime from typing import Optional from pathlib import Path import httpx from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent # ============================================================================= # Configuration # ============================================================================= CONFIG_FILE = os.environ.get("QDRANT_SYNC_CONFIG", "/home/vanya/.config/qdrant-sync/config.env") SYNC_SCRIPT = os.environ.get("QDRANT_SYNC_SCRIPT", "/home/vanya/scripts/sync-qdrant-to-vps.sh") SYNC_SCRIPT_REVERSE = os.environ.get("QDRANT_SYNC_SCRIPT_REVERSE", "/home/vanya/scripts/sync-qdrant-from-vps.sh") LOG_DIR = os.environ.get("QDRANT_SYNC_LOG_DIR", "/home/vanya/logs") # Load config from env file def load_config(): """Load configuration from config.env file.""" config = {} if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) # Remove quotes and inline comments from value value = value.strip().strip('"').strip("'") # Remove inline comments (anything after # with preceding whitespace) if '#' in value: value = value.split('#')[0].strip().strip('"').strip("'") config[key] = value return config CONFIG = load_config() # Default values if config not loaded SOURCE_REST_URL = CONFIG.get("SOURCE_REST_URL", "http://localhost:6335") TARGET_REST_URL = CONFIG.get("TARGET_REST_URL", "http://74.50.49.35:6333") TARGET_API_KEY = CONFIG.get("TARGET_API_KEY", "") # ============================================================================= # Helper Functions # ============================================================================= async def check_qdrant_connectivity(url: str, api_key: Optional[str] = None) -> dict: """Check if a Qdrant instance is reachable and get basic info.""" headers = {} if api_key: headers["api-key"] = api_key try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{url}/collections", headers=headers) if response.status_code == 200: data = response.json() collections = data.get("result", {}).get("collections", []) return { "status": "connected", "url": url, "collection_count": len(collections), "collections": [c.get("name") for c in collections] } else: return { "status": "error", "url": url, "error": f"HTTP {response.status_code}" } except Exception as e: return { "status": "unreachable", "url": url, "error": str(e) } async def get_collection_info(url: str, name: str, api_key: Optional[str] = None) -> dict: """Get detailed info about a specific collection.""" headers = {} if api_key: headers["api-key"] = api_key try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{url}/collections/{name}", headers=headers) if response.status_code == 200: data = response.json() result = data.get("result", {}) return { "name": name, "status": result.get("status", "unknown"), "points_count": result.get("points_count", 0), "vectors_count": result.get("vectors_count", 0), "indexed_vectors_count": result.get("indexed_vectors_count", 0) } else: return {"name": name, "error": f"HTTP {response.status_code}"} except Exception as e: return {"name": name, "error": str(e)} async def get_all_collections_with_counts(url: str, api_key: Optional[str] = None) -> list: """Get all collections with their point counts.""" conn = await check_qdrant_connectivity(url, api_key) if conn.get("status") != "connected": return [] collections = [] for name in conn.get("collections", []): info = await get_collection_info(url, name, api_key) collections.append(info) return sorted(collections, key=lambda x: x.get("name", "")) def run_sync_script(args: list = None, script_path: str = None) -> dict: """Run the sync script with given arguments.""" script = script_path or SYNC_SCRIPT cmd = [script] if args: cmd.extend(args) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=1800 # 30 minute timeout for full sync ) return { "success": result.returncode == 0, "returncode": result.returncode, "stdout": result.stdout, "stderr": result.stderr } except subprocess.TimeoutExpired: return { "success": False, "error": "Sync operation timed out after 30 minutes" } except Exception as e: return { "success": False, "error": str(e) } def get_recent_logs(count: int = 5) -> list: """Get the most recent sync log files.""" pattern = os.path.join(LOG_DIR, "qdrant-sync-*.log") log_files = sorted(glob.glob(pattern), reverse=True)[:count] logs = [] for log_file in log_files: try: with open(log_file, 'r') as f: content = f.read() # Parse log file name for timestamp basename = os.path.basename(log_file) timestamp = basename.replace("qdrant-sync-", "").replace(".log", "") logs.append({ "file": log_file, "timestamp": timestamp, "size_bytes": os.path.getsize(log_file), "content": content }) except Exception as e: logs.append({ "file": log_file, "error": str(e) }) return logs # ============================================================================= # MCP Server Setup # ============================================================================= server = Server("qdrant-sync") # Define tools TOOLS = [ Tool( name="qdrant_sync_status", description="""Check connectivity and status of both Qdrant instances (local and VPS). Returns connection status, collection counts, and list of collections for each instance. Use this to verify both instances are reachable before syncing.""", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="qdrant_sync_all", description="""Execute a full sync of ALL collections from local Qdrant to VPS. This runs the sync-qdrant-to-vps.sh script without arguments. WARNING: This may take 5-10 minutes depending on the number of collections and data volume. Returns the script output including success/failure status for each collection.""", inputSchema={ "type": "object", "properties": { "confirm": { "type": "boolean", "description": "Set to true to confirm you want to sync all collections" } }, "required": ["confirm"] } ), Tool( name="qdrant_sync_collection", description="""Sync a single specific collection from local Qdrant to VPS. Use this for targeted syncs after updating specific collections. Faster than full sync - typically completes in 2-10 seconds per collection.""", inputSchema={ "type": "object", "properties": { "collection_name": { "type": "string", "description": "Name of the collection to sync" } }, "required": ["collection_name"] } ), Tool( name="qdrant_sync_dry_run", description="""Preview what would be synced without actually transferring data. Lists all collections with their point counts. Use this to verify the state before running a full sync.""", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="qdrant_sync_logs", description="""View recent sync operation logs. Returns the content of recent log files from /home/vanya/logs/qdrant-sync-*.log Useful for debugging failed syncs or reviewing sync history.""", inputSchema={ "type": "object", "properties": { "count": { "type": "integer", "description": "Number of recent log files to retrieve (default: 3, max: 10)", "default": 3 }, "latest_only": { "type": "boolean", "description": "If true, only return the most recent log file", "default": False } }, "required": [] } ), Tool( name="qdrant_compare_collections", description="""Compare collections between local and VPS instances. Shows which collections exist on each instance, point counts, and identifies discrepancies. Useful for verifying sync completeness or identifying missing collections.""", inputSchema={ "type": "object", "properties": { "collection_name": { "type": "string", "description": "Optional: compare a specific collection only. If omitted, compares all collections." } }, "required": [] } ), Tool( name="qdrant_sync_from_vps_all", description="""Execute a full sync of ALL collections from VPS to local Qdrant. This is the REVERSE direction: VPS → Local. WARNING: This may take 5-10 minutes depending on the number of collections and data volume. Returns the script output including success/failure status for each collection.""", inputSchema={ "type": "object", "properties": { "confirm": { "type": "boolean", "description": "Set to true to confirm you want to sync all collections from VPS to local" } }, "required": ["confirm"] } ), Tool( name="qdrant_sync_from_vps_collection", description="""Sync a single specific collection from VPS to local Qdrant. This is the REVERSE direction: VPS → Local. Use this to pull updates from VPS after remote changes. Faster than full sync - typically completes in 2-10 seconds per collection.""", inputSchema={ "type": "object", "properties": { "collection_name": { "type": "string", "description": "Name of the collection to sync from VPS to local" } }, "required": ["collection_name"] } ), Tool( name="qdrant_sync_from_vps_dry_run", description="""Preview what would be synced from VPS to local without actually transferring data. Lists all VPS collections with their point counts. Use this to verify VPS state before pulling to local.""", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @server.list_tools() async def list_tools(): """Return the list of available tools.""" return TOOLS @server.call_tool() async def call_tool(name: str, arguments: dict): """Handle tool calls.""" if name == "qdrant_sync_status": # Check both instances local_status = await check_qdrant_connectivity(SOURCE_REST_URL) vps_status = await check_qdrant_connectivity(TARGET_REST_URL, TARGET_API_KEY) result = { "timestamp": datetime.now().isoformat(), "local": local_status, "vps": vps_status, "sync_ready": local_status.get("status") == "connected" and vps_status.get("status") == "connected" } return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_all": confirm = arguments.get("confirm", False) if not confirm: return [TextContent(type="text", text=json.dumps({ "error": "Confirmation required. Set confirm=true to proceed with full sync.", "hint": "This will sync all collections from local to VPS and may take 5-10 minutes." }, indent=2))] result = run_sync_script() return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_collection": collection_name = arguments.get("collection_name") if not collection_name: return [TextContent(type="text", text=json.dumps({ "error": "collection_name is required" }, indent=2))] result = run_sync_script(["--collection", collection_name]) return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_dry_run": result = run_sync_script(["--dry-run"]) return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_logs": count = min(arguments.get("count", 3), 10) latest_only = arguments.get("latest_only", False) if latest_only: count = 1 logs = get_recent_logs(count) return [TextContent(type="text", text=json.dumps(logs, indent=2))] elif name == "qdrant_compare_collections": collection_name = arguments.get("collection_name") if collection_name: # Compare specific collection local_info = await get_collection_info(SOURCE_REST_URL, collection_name) vps_info = await get_collection_info(TARGET_REST_URL, collection_name, TARGET_API_KEY) local_points = local_info.get("points_count", 0) vps_points = vps_info.get("points_count", 0) result = { "collection": collection_name, "local": local_info, "vps": vps_info, "in_sync": local_points == vps_points, "difference": local_points - vps_points } else: # Compare all collections local_collections = await get_all_collections_with_counts(SOURCE_REST_URL) vps_collections = await get_all_collections_with_counts(TARGET_REST_URL, TARGET_API_KEY) # Create lookup dicts local_dict = {c["name"]: c for c in local_collections} vps_dict = {c["name"]: c for c in vps_collections} all_names = set(local_dict.keys()) | set(vps_dict.keys()) comparison = [] total_local_points = 0 total_vps_points = 0 out_of_sync = 0 for name in sorted(all_names): local_info = local_dict.get(name, {"points_count": 0, "missing": True}) vps_info = vps_dict.get(name, {"points_count": 0, "missing": True}) local_points = local_info.get("points_count", 0) vps_points = vps_info.get("points_count", 0) total_local_points += local_points total_vps_points += vps_points in_sync = local_points == vps_points and not local_info.get("missing") and not vps_info.get("missing") if not in_sync: out_of_sync += 1 comparison.append({ "name": name, "local_points": local_points, "vps_points": vps_points, "difference": local_points - vps_points, "in_sync": in_sync, "local_only": vps_info.get("missing", False), "vps_only": local_info.get("missing", False) }) result = { "timestamp": datetime.now().isoformat(), "summary": { "total_collections": len(all_names), "local_collections": len(local_dict), "vps_collections": len(vps_dict), "out_of_sync": out_of_sync, "total_local_points": total_local_points, "total_vps_points": total_vps_points, "total_difference": total_local_points - total_vps_points }, "collections": comparison } return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_from_vps_all": confirm = arguments.get("confirm", False) if not confirm: return [TextContent(type="text", text=json.dumps({ "error": "Confirmation required. Set confirm=true to proceed with full sync from VPS.", "hint": "This will sync all collections from VPS to local and may take 5-10 minutes." }, indent=2))] result = run_sync_script([], script_path=SYNC_SCRIPT_REVERSE) return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_from_vps_collection": collection_name = arguments.get("collection_name") if not collection_name: return [TextContent(type="text", text=json.dumps({ "error": "collection_name is required" }, indent=2))] result = run_sync_script(["--collection", collection_name], script_path=SYNC_SCRIPT_REVERSE) return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "qdrant_sync_from_vps_dry_run": result = run_sync_script(["--dry-run"], script_path=SYNC_SCRIPT_REVERSE) return [TextContent(type="text", text=json.dumps(result, indent=2))] else: return [TextContent(type="text", text=json.dumps({ "error": f"Unknown tool: {name}" }, indent=2))] # ============================================================================= # Main Entry Point # ============================================================================= async def main(): """Run the MCP server.""" async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, server.create_initialization_options()) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/qdrant-sync-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server