Skip to main content
Glama

DP-MCP Server

by devraj21
mcp_client.py24.9 kB
#!/usr/bin/env python3 """ DP-MCP Client A comprehensive MCP client for testing and interacting with the DP-MCP server. This client demonstrates how to properly communicate with MCP servers using the JSON-RPC 2.0 protocol over HTTP with Server-Sent Events. Usage: python mcp_client.py [options] Examples: python mcp_client.py --interactive python mcp_client.py --list-tools python mcp_client.py --call describe_db_table --args '{"table_name": "users"}' """ import argparse import asyncio import json import sys import uuid from typing import Any, Dict, List, Optional import aiohttp import sseclient from datetime import datetime class Colors: """ANSI color codes for pretty output.""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def print_header(text: str): """Print a colored header.""" print(f"\n{Colors.HEADER}{Colors.BOLD}🔌 {text}{Colors.END}") print(f"{Colors.HEADER}{'=' * (len(text) + 4)}{Colors.END}") def print_success(text: str): """Print success message.""" print(f"{Colors.GREEN}✅ {text}{Colors.END}") def print_error(text: str): """Print error message.""" print(f"{Colors.RED}❌ {text}{Colors.END}") def print_info(text: str): """Print info message.""" print(f"{Colors.CYAN}ℹ️ {text}{Colors.END}") def print_warning(text: str): """Print warning message.""" print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}") class MCPClient: """MCP Client for communicating with DP-MCP server.""" def __init__(self, server_url: str = "http://127.0.0.1:8888/mcp"): self.server_url = server_url self.session_id = None self.request_id = 0 def _generate_request_id(self) -> str: """Generate a unique request ID.""" self.request_id += 1 return str(self.request_id) def _create_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Create a JSON-RPC 2.0 request.""" request = { "jsonrpc": "2.0", "id": self._generate_request_id(), "method": method } if params: request["params"] = params return request async def _send_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Send a request to the MCP server.""" headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "Cache-Control": "no-cache" } # Add session ID if we have one if self.session_id: headers["mcp-session-id"] = self.session_id async with aiohttp.ClientSession() as session: try: async with session.post( self.server_url, json=request, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: # Capture session ID from response headers if "mcp-session-id" in response.headers: self.session_id = response.headers["mcp-session-id"] if response.status != 200: raise Exception(f"HTTP {response.status}: {await response.text()}") # Handle Server-Sent Events response content = await response.text() # Try to parse as direct JSON first (for simple responses) try: return json.loads(content) except json.JSONDecodeError: pass # Parse SSE format lines = content.strip().split('\n') for line in lines: line = line.strip() if line.startswith('data: '): data = line[6:] # Remove 'data: ' prefix if data.strip() and data != '[DONE]': try: return json.loads(data) except json.JSONDecodeError: continue elif line and not line.startswith('event:') and not line.startswith('id:'): # Try parsing line as direct JSON try: return json.loads(line) except json.JSONDecodeError: continue raise Exception("No valid JSON-RPC response found in SSE stream") except aiohttp.ClientError as e: raise Exception(f"Connection error: {e}") except asyncio.TimeoutError: raise Exception("Request timeout") async def _establish_session(self): """Establish a session with the server by making an initial request.""" headers = { "Accept": "application/json, text/event-stream", "Cache-Control": "no-cache" } async with aiohttp.ClientSession() as session: try: # Make GET request to establish session async with session.get( self.server_url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: # Capture session ID from response headers if "mcp-session-id" in response.headers: self.session_id = response.headers["mcp-session-id"] print_info(f"Session established: {self.session_id[:8]}...") else: raise Exception("No session ID received from server") except Exception as e: raise Exception(f"Failed to establish session: {e}") async def initialize(self) -> Dict[str, Any]: """Initialize connection with the MCP server.""" print_info("Initializing MCP connection...") # First, establish a session if not self.session_id: await self._establish_session() request = self._create_request("initialize", { "protocolVersion": "2024-11-05", "capabilities": { "roots": {"listChanged": True}, "sampling": {} }, "clientInfo": { "name": "dp-mcp-client", "version": "1.0.0" } }) try: response = await self._send_request(request) if "error" in response: raise Exception(f"Initialization failed: {response['error']['message']}") print_success("MCP connection initialized") return response.get("result", {}) except Exception as e: print_error(f"Failed to initialize: {e}") raise async def list_tools(self) -> List[Dict[str, Any]]: """List all available tools.""" print_info("Requesting available tools...") # Establish session if we don't have one if not self.session_id: await self._establish_session() request = self._create_request("tools/list", {}) try: response = await self._send_request(request) if "error" in response: raise Exception(f"List tools failed: {response['error']['message']}") tools = response.get("result", {}).get("tools", []) print_success(f"Retrieved {len(tools)} tools") return tools except Exception as e: print_error(f"Failed to list tools: {e}") raise async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a specific tool.""" print_info(f"Calling tool: {tool_name}") # Establish session if we don't have one if not self.session_id: await self._establish_session() # Try different request formats for FastMCP compatibility requests_to_try = [ # Standard MCP format self._create_request("tools/call", { "name": tool_name, "arguments": arguments }), # Alternative format 1 self._create_request("call_tool", { "name": tool_name, "arguments": arguments }), # Alternative format 2 - direct tool name as method self._create_request(tool_name, arguments), ] last_error = None for request in requests_to_try: try: response = await self._send_request(request) if "error" not in response: result = response.get("result", {}) print_success(f"Tool '{tool_name}' executed successfully") return result else: last_error = response['error'] except Exception as e: last_error = {"message": str(e)} continue # If all attempts failed, raise the last error if last_error: error_msg = last_error.get('message', 'Unknown error') error_data = last_error.get('data', {}) raise Exception(f"Tool call failed: {error_msg}\nDetails: {error_data}") else: raise Exception(f"All attempts to call tool '{tool_name}' failed") async def ping(self) -> bool: """Ping the server to check connectivity.""" print_info("Pinging server...") # Establish session if we don't have one if not self.session_id: try: await self._establish_session() except Exception as e: print_error(f"Failed to establish session: {e}") return False request = self._create_request("ping") try: response = await self._send_request(request) if "error" in response: return False print_success("Server responded to ping") return True except Exception as e: print_error(f"Server did not respond to ping: {e}") return False class InteractiveMCPClient: """Interactive MCP client with command-line interface.""" def __init__(self, client: MCPClient): self.client = client self.tools = [] async def start(self): """Start the interactive session.""" print_header("DP-MCP Interactive Client") print("Connecting to DP-MCP server...") try: # Establish session if not self.client.session_id: await self.client._establish_session() # Try to load available tools try: self.tools = await self.client.list_tools() except Exception as e: print_warning(f"Could not load tools automatically: {e}") print_info("You can still call tools directly if you know their names") print_info(f"Connected to server: {self.client.server_url}") print_info(f"Available tools: {len(self.tools)}") # Start interactive loop await self.interactive_loop() except Exception as e: print_error(f"Failed to connect: {e}") sys.exit(1) async def interactive_loop(self): """Main interactive loop.""" print_info("Type 'help' for available commands, 'quit' to exit") while True: try: command = input(f"\n{Colors.CYAN}mcp> {Colors.END}").strip() if not command: continue if command.lower() in ['quit', 'exit', 'q']: print_info("Goodbye!") break await self.handle_command(command) except KeyboardInterrupt: print_info("\nGoodbye!") break except EOFError: break async def handle_command(self, command: str): """Handle interactive commands.""" parts = command.split() cmd = parts[0].lower() if cmd == 'help': self.show_help() elif cmd == 'tools': self.show_tools() elif cmd == 'ping': await self.client.ping() elif cmd == 'call': await self.handle_tool_call(parts[1:]) elif cmd in ['describe', 'desc']: self.describe_tool(parts[1] if len(parts) > 1 else None) else: print_error(f"Unknown command: {cmd}. Type 'help' for available commands.") def show_help(self): """Show help information.""" print_header("Available Commands") print(f"{Colors.BOLD}help{Colors.END} - Show this help message") print(f"{Colors.BOLD}tools{Colors.END} - List all available tools") print(f"{Colors.BOLD}describe <tool>{Colors.END} - Describe a specific tool") print(f"{Colors.BOLD}ping{Colors.END} - Ping the server") print(f"{Colors.BOLD}call <tool> [args]{Colors.END} - Call a tool with arguments") print(f"{Colors.BOLD}quit{Colors.END} - Exit the client") print_header("Example Tool Calls") print(f"{Colors.YELLOW}call list_db_tables{Colors.END}") print(f"{Colors.YELLOW}call describe_db_table table_name=users{Colors.END}") print(f"{Colors.YELLOW}call execute_sql_query query=\"SELECT * FROM users LIMIT 5\"{Colors.END}") print(f"{Colors.YELLOW}call list_minio_buckets{Colors.END}") print(f"{Colors.YELLOW}call upload_to_minio bucket_name=default-bucket object_name=test.txt data=\"Hello World\"{Colors.END}") def show_tools(self): """Show all available tools.""" print_header("Available Tools") if not self.tools: print_warning("No tools available") return # Group tools by category postgres_tools = [] minio_tools = [] combined_tools = [] for tool in self.tools: name = tool.get('name', '') if 'db' in name or 'sql' in name or 'table' in name: postgres_tools.append(tool) elif 'minio' in name or 'bucket' in name: minio_tools.append(tool) else: combined_tools.append(tool) if postgres_tools: print(f"\n{Colors.BLUE}📊 PostgreSQL Tools:{Colors.END}") for tool in postgres_tools: print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}") if minio_tools: print(f"\n{Colors.BLUE}🪣 MinIO Tools:{Colors.END}") for tool in minio_tools: print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}") if combined_tools: print(f"\n{Colors.BLUE}🔄 Combined Tools:{Colors.END}") for tool in combined_tools: print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}") def describe_tool(self, tool_name: Optional[str]): """Describe a specific tool.""" if not tool_name: print_error("Please specify a tool name. Use 'tools' to see available tools.") return tool = next((t for t in self.tools if t['name'] == tool_name), None) if not tool: print_error(f"Tool '{tool_name}' not found. Use 'tools' to see available tools.") return print_header(f"Tool: {tool_name}") print(f"{Colors.BOLD}Description:{Colors.END} {tool.get('description', 'No description')}") schema = tool.get('inputSchema', {}) properties = schema.get('properties', {}) required = schema.get('required', []) if properties: print(f"\n{Colors.BOLD}Parameters:{Colors.END}") for param, details in properties.items(): req_marker = f" {Colors.RED}(required){Colors.END}" if param in required else "" param_type = details.get('type', 'unknown') description = details.get('description', 'No description') default = details.get('default') print(f" • {Colors.BOLD}{param}{Colors.END} ({param_type}){req_marker}: {description}") if default is not None: print(f" Default: {default}") async def handle_tool_call(self, args: List[str]): """Handle tool call command.""" if not args: print_error("Please specify a tool name. Use 'tools' to see available tools.") return tool_name = args[0] # Parse arguments arguments = {} for arg in args[1:]: if '=' in arg: key, value = arg.split('=', 1) # Try to parse as JSON, fallback to string try: # Handle quoted strings if value.startswith('"') and value.endswith('"'): arguments[key] = value[1:-1] # Remove quotes elif value.lower() in ['true', 'false']: arguments[key] = value.lower() == 'true' elif value.isdigit(): arguments[key] = int(value) else: arguments[key] = value except: arguments[key] = value try: result = await self.client.call_tool(tool_name, arguments) # Display result print_header("Tool Result") content = result.get('content', []) for item in content: if item.get('type') == 'text': text = item.get('text', '') print(text) else: print(json.dumps(item, indent=2)) except Exception as e: print_error(f"Tool call failed: {e}") async def test_all_tools(client: MCPClient): """Test all available tools with sample data.""" print_header("Testing All Tools") try: # Initialize and get tools await client.initialize() tools = await client.list_tools() print_info(f"Testing {len(tools)} tools...") # Test cases for each tool test_cases = { 'list_db_tables': {}, 'describe_db_table': {'table_name': 'users'}, 'execute_sql_query': {'query': 'SELECT COUNT(*) as total FROM users', 'limit': 10}, 'export_table_csv': {'table_name': 'users', 'limit': 5}, 'list_minio_buckets': {}, 'list_bucket_objects': {'bucket_name': 'default-bucket', 'max_keys': 10}, 'create_minio_bucket': {'bucket_name': f'test-bucket-{int(datetime.now().timestamp())}'}, 'upload_to_minio': { 'bucket_name': 'default-bucket', 'object_name': f'test-{int(datetime.now().timestamp())}.txt', 'data': 'Hello from MCP client!', 'content_type': 'text/plain' } } results = [] for tool in tools: tool_name = tool['name'] if tool_name in test_cases: print_info(f"Testing {tool_name}...") try: result = await client.call_tool(tool_name, test_cases[tool_name]) results.append((tool_name, 'SUCCESS', None)) print_success(f"✓ {tool_name}") except Exception as e: results.append((tool_name, 'FAILED', str(e))) print_error(f"✗ {tool_name}: {e}") else: results.append((tool_name, 'SKIPPED', 'No test case defined')) print_warning(f"- {tool_name}: No test case") # Summary print_header("Test Results Summary") success_count = sum(1 for _, status, _ in results if status == 'SUCCESS') total_count = len(results) print(f"Total tools: {total_count}") print(f"Successful: {Colors.GREEN}{success_count}{Colors.END}") print(f"Failed: {Colors.RED}{total_count - success_count}{Colors.END}") # Detailed results for tool_name, status, error in results: if status == 'SUCCESS': print(f" ✅ {tool_name}") elif status == 'FAILED': print(f" ❌ {tool_name}: {error}") else: print(f" ⏭️ {tool_name}: {error}") except Exception as e: print_error(f"Test suite failed: {e}") def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="DP-MCP Client - Test and interact with DP-MCP server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Interactive mode python mcp_client.py --interactive # List available tools python mcp_client.py --list-tools # Call a specific tool python mcp_client.py --call list_db_tables python mcp_client.py --call describe_db_table --args '{"table_name": "users"}' # Test all tools python mcp_client.py --test-all # Custom server URL python mcp_client.py --server http://localhost:9000/mcp --interactive """ ) parser.add_argument('--server', default='http://127.0.0.1:8888/mcp', help='MCP server URL (default: http://127.0.0.1:8888/mcp)') parser.add_argument('--interactive', action='store_true', help='Start interactive mode') parser.add_argument('--list-tools', action='store_true', help='List all available tools') parser.add_argument('--call', help='Call a specific tool') parser.add_argument('--args', help='Tool arguments as JSON string') parser.add_argument('--test-all', action='store_true', help='Test all available tools') parser.add_argument('--ping', action='store_true', help='Ping the server') args = parser.parse_args() # Create client client = MCPClient(args.server) async def run(): if args.interactive: interactive_client = InteractiveMCPClient(client) await interactive_client.start() elif args.list_tools: await client.initialize() tools = await client.list_tools() print_header("Available Tools") for tool in tools: print(f"• {Colors.BOLD}{tool['name']}{Colors.END}: {tool.get('description', 'No description')}") elif args.call: tool_args = {} if args.args: try: tool_args = json.loads(args.args) except json.JSONDecodeError as e: print_error(f"Invalid JSON in --args: {e}") return await client.initialize() result = await client.call_tool(args.call, tool_args) # Display result content = result.get('content', []) for item in content: if item.get('type') == 'text': print(item.get('text', '')) else: print(json.dumps(item, indent=2)) elif args.test_all: await test_all_tools(client) elif args.ping: await client.ping() else: parser.print_help() try: asyncio.run(run()) except KeyboardInterrupt: print_info("\nOperation cancelled") except Exception as e: print_error(f"Client error: {e}") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/devraj21/dp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server