Solr MCP

by allenday
Verified
#!/usr/bin/env python3 """ Direct MCP server test script. Tests the raw JSON-RPC interface that Claude uses to communicate with MCP servers. """ import sys import os import json import subprocess import time from threading import Thread import tempfile # Add the project root to your path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # First clean up any existing MCP servers os.system("pkill -f 'python -m solr_mcp.server'") time.sleep(1) # Let them shut down def write_to_stdin(process, data): """Write data to the stdin of a process and flush.""" process.stdin.write(data) process.stdin.flush() def read_from_stdout(process): """Read a JSON-RPC message from stdout of a process.""" line = process.stdout.readline().strip() if not line: return None try: return json.loads(line) except json.JSONDecodeError: print(f"Error decoding JSON: {line}") return None # Start a new MCP server process cmd = ["python", "-m", "solr_mcp.server"] server_process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # Line buffered ) print("MCP server started.") time.sleep(2) # Give it time to initialize # Test search methods def test_search(query): print(f"\n\nTesting search for: '{query}'") # Try a standard search request = { "jsonrpc": "2.0", "id": "1", "method": "execute_tool", "params": { "name": "solr_search", "arguments": { "query": query } } } print("\nSending search request:", json.dumps(request, indent=2)) write_to_stdin(server_process, json.dumps(request) + "\n") response = read_from_stdout(server_process) print("\nGot response:", json.dumps(response, indent=2) if response else "No response") # Try a hybrid search request = { "jsonrpc": "2.0", "id": "2", "method": "execute_tool", "params": { "name": "solr_hybrid_search", "arguments": { "query": query, "blend_factor": 0.5 } } } print("\nSending hybrid search request:", json.dumps(request, indent=2)) write_to_stdin(server_process, json.dumps(request) + "\n") response = read_from_stdout(server_process) print("\nGot hybrid response:", json.dumps(response, indent=2) if response else "No response") # Test with a query we know exists test_search("double spend") # Test with another query test_search("blockchain") # Clean up print("\nCleaning up...") server_process.terminate() server_process.wait() print("Done!")