mcp_client.py•24.9 kB
#!/usr/bin/env python3
"""
DP-MCP Client
A comprehensive MCP client for testing and interacting with the DP-MCP server.
This client demonstrates how to properly communicate with MCP servers using
the JSON-RPC 2.0 protocol over HTTP with Server-Sent Events.
Usage:
python mcp_client.py [options]
Examples:
python mcp_client.py --interactive
python mcp_client.py --list-tools
python mcp_client.py --call describe_db_table --args '{"table_name": "users"}'
"""
import argparse
import asyncio
import json
import sys
import uuid
from typing import Any, Dict, List, Optional
import aiohttp
import sseclient
from datetime import datetime
class Colors:
"""ANSI color codes for pretty output."""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_header(text: str):
"""Print a colored header."""
print(f"\n{Colors.HEADER}{Colors.BOLD}🔌 {text}{Colors.END}")
print(f"{Colors.HEADER}{'=' * (len(text) + 4)}{Colors.END}")
def print_success(text: str):
"""Print success message."""
print(f"{Colors.GREEN}✅ {text}{Colors.END}")
def print_error(text: str):
"""Print error message."""
print(f"{Colors.RED}❌ {text}{Colors.END}")
def print_info(text: str):
"""Print info message."""
print(f"{Colors.CYAN}ℹ️ {text}{Colors.END}")
def print_warning(text: str):
"""Print warning message."""
print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}")
class MCPClient:
"""MCP Client for communicating with DP-MCP server."""
def __init__(self, server_url: str = "http://127.0.0.1:8888/mcp"):
self.server_url = server_url
self.session_id = None
self.request_id = 0
def _generate_request_id(self) -> str:
"""Generate a unique request ID."""
self.request_id += 1
return str(self.request_id)
def _create_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Create a JSON-RPC 2.0 request."""
request = {
"jsonrpc": "2.0",
"id": self._generate_request_id(),
"method": method
}
if params:
request["params"] = params
return request
async def _send_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Send a request to the MCP server."""
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Cache-Control": "no-cache"
}
# Add session ID if we have one
if self.session_id:
headers["mcp-session-id"] = self.session_id
async with aiohttp.ClientSession() as session:
try:
async with session.post(
self.server_url,
json=request,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
# Capture session ID from response headers
if "mcp-session-id" in response.headers:
self.session_id = response.headers["mcp-session-id"]
if response.status != 200:
raise Exception(f"HTTP {response.status}: {await response.text()}")
# Handle Server-Sent Events response
content = await response.text()
# Try to parse as direct JSON first (for simple responses)
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# Parse SSE format
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
if data.strip() and data != '[DONE]':
try:
return json.loads(data)
except json.JSONDecodeError:
continue
elif line and not line.startswith('event:') and not line.startswith('id:'):
# Try parsing line as direct JSON
try:
return json.loads(line)
except json.JSONDecodeError:
continue
raise Exception("No valid JSON-RPC response found in SSE stream")
except aiohttp.ClientError as e:
raise Exception(f"Connection error: {e}")
except asyncio.TimeoutError:
raise Exception("Request timeout")
async def _establish_session(self):
"""Establish a session with the server by making an initial request."""
headers = {
"Accept": "application/json, text/event-stream",
"Cache-Control": "no-cache"
}
async with aiohttp.ClientSession() as session:
try:
# Make GET request to establish session
async with session.get(
self.server_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
# Capture session ID from response headers
if "mcp-session-id" in response.headers:
self.session_id = response.headers["mcp-session-id"]
print_info(f"Session established: {self.session_id[:8]}...")
else:
raise Exception("No session ID received from server")
except Exception as e:
raise Exception(f"Failed to establish session: {e}")
async def initialize(self) -> Dict[str, Any]:
"""Initialize connection with the MCP server."""
print_info("Initializing MCP connection...")
# First, establish a session
if not self.session_id:
await self._establish_session()
request = self._create_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {}
},
"clientInfo": {
"name": "dp-mcp-client",
"version": "1.0.0"
}
})
try:
response = await self._send_request(request)
if "error" in response:
raise Exception(f"Initialization failed: {response['error']['message']}")
print_success("MCP connection initialized")
return response.get("result", {})
except Exception as e:
print_error(f"Failed to initialize: {e}")
raise
async def list_tools(self) -> List[Dict[str, Any]]:
"""List all available tools."""
print_info("Requesting available tools...")
# Establish session if we don't have one
if not self.session_id:
await self._establish_session()
request = self._create_request("tools/list", {})
try:
response = await self._send_request(request)
if "error" in response:
raise Exception(f"List tools failed: {response['error']['message']}")
tools = response.get("result", {}).get("tools", [])
print_success(f"Retrieved {len(tools)} tools")
return tools
except Exception as e:
print_error(f"Failed to list tools: {e}")
raise
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a specific tool."""
print_info(f"Calling tool: {tool_name}")
# Establish session if we don't have one
if not self.session_id:
await self._establish_session()
# Try different request formats for FastMCP compatibility
requests_to_try = [
# Standard MCP format
self._create_request("tools/call", {
"name": tool_name,
"arguments": arguments
}),
# Alternative format 1
self._create_request("call_tool", {
"name": tool_name,
"arguments": arguments
}),
# Alternative format 2 - direct tool name as method
self._create_request(tool_name, arguments),
]
last_error = None
for request in requests_to_try:
try:
response = await self._send_request(request)
if "error" not in response:
result = response.get("result", {})
print_success(f"Tool '{tool_name}' executed successfully")
return result
else:
last_error = response['error']
except Exception as e:
last_error = {"message": str(e)}
continue
# If all attempts failed, raise the last error
if last_error:
error_msg = last_error.get('message', 'Unknown error')
error_data = last_error.get('data', {})
raise Exception(f"Tool call failed: {error_msg}\nDetails: {error_data}")
else:
raise Exception(f"All attempts to call tool '{tool_name}' failed")
async def ping(self) -> bool:
"""Ping the server to check connectivity."""
print_info("Pinging server...")
# Establish session if we don't have one
if not self.session_id:
try:
await self._establish_session()
except Exception as e:
print_error(f"Failed to establish session: {e}")
return False
request = self._create_request("ping")
try:
response = await self._send_request(request)
if "error" in response:
return False
print_success("Server responded to ping")
return True
except Exception as e:
print_error(f"Server did not respond to ping: {e}")
return False
class InteractiveMCPClient:
"""Interactive MCP client with command-line interface."""
def __init__(self, client: MCPClient):
self.client = client
self.tools = []
async def start(self):
"""Start the interactive session."""
print_header("DP-MCP Interactive Client")
print("Connecting to DP-MCP server...")
try:
# Establish session
if not self.client.session_id:
await self.client._establish_session()
# Try to load available tools
try:
self.tools = await self.client.list_tools()
except Exception as e:
print_warning(f"Could not load tools automatically: {e}")
print_info("You can still call tools directly if you know their names")
print_info(f"Connected to server: {self.client.server_url}")
print_info(f"Available tools: {len(self.tools)}")
# Start interactive loop
await self.interactive_loop()
except Exception as e:
print_error(f"Failed to connect: {e}")
sys.exit(1)
async def interactive_loop(self):
"""Main interactive loop."""
print_info("Type 'help' for available commands, 'quit' to exit")
while True:
try:
command = input(f"\n{Colors.CYAN}mcp> {Colors.END}").strip()
if not command:
continue
if command.lower() in ['quit', 'exit', 'q']:
print_info("Goodbye!")
break
await self.handle_command(command)
except KeyboardInterrupt:
print_info("\nGoodbye!")
break
except EOFError:
break
async def handle_command(self, command: str):
"""Handle interactive commands."""
parts = command.split()
cmd = parts[0].lower()
if cmd == 'help':
self.show_help()
elif cmd == 'tools':
self.show_tools()
elif cmd == 'ping':
await self.client.ping()
elif cmd == 'call':
await self.handle_tool_call(parts[1:])
elif cmd in ['describe', 'desc']:
self.describe_tool(parts[1] if len(parts) > 1 else None)
else:
print_error(f"Unknown command: {cmd}. Type 'help' for available commands.")
def show_help(self):
"""Show help information."""
print_header("Available Commands")
print(f"{Colors.BOLD}help{Colors.END} - Show this help message")
print(f"{Colors.BOLD}tools{Colors.END} - List all available tools")
print(f"{Colors.BOLD}describe <tool>{Colors.END} - Describe a specific tool")
print(f"{Colors.BOLD}ping{Colors.END} - Ping the server")
print(f"{Colors.BOLD}call <tool> [args]{Colors.END} - Call a tool with arguments")
print(f"{Colors.BOLD}quit{Colors.END} - Exit the client")
print_header("Example Tool Calls")
print(f"{Colors.YELLOW}call list_db_tables{Colors.END}")
print(f"{Colors.YELLOW}call describe_db_table table_name=users{Colors.END}")
print(f"{Colors.YELLOW}call execute_sql_query query=\"SELECT * FROM users LIMIT 5\"{Colors.END}")
print(f"{Colors.YELLOW}call list_minio_buckets{Colors.END}")
print(f"{Colors.YELLOW}call upload_to_minio bucket_name=default-bucket object_name=test.txt data=\"Hello World\"{Colors.END}")
def show_tools(self):
"""Show all available tools."""
print_header("Available Tools")
if not self.tools:
print_warning("No tools available")
return
# Group tools by category
postgres_tools = []
minio_tools = []
combined_tools = []
for tool in self.tools:
name = tool.get('name', '')
if 'db' in name or 'sql' in name or 'table' in name:
postgres_tools.append(tool)
elif 'minio' in name or 'bucket' in name:
minio_tools.append(tool)
else:
combined_tools.append(tool)
if postgres_tools:
print(f"\n{Colors.BLUE}📊 PostgreSQL Tools:{Colors.END}")
for tool in postgres_tools:
print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}")
if minio_tools:
print(f"\n{Colors.BLUE}🪣 MinIO Tools:{Colors.END}")
for tool in minio_tools:
print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}")
if combined_tools:
print(f"\n{Colors.BLUE}🔄 Combined Tools:{Colors.END}")
for tool in combined_tools:
print(f" • {Colors.BOLD}{tool['name']}{Colors.END} - {tool.get('description', 'No description')}")
def describe_tool(self, tool_name: Optional[str]):
"""Describe a specific tool."""
if not tool_name:
print_error("Please specify a tool name. Use 'tools' to see available tools.")
return
tool = next((t for t in self.tools if t['name'] == tool_name), None)
if not tool:
print_error(f"Tool '{tool_name}' not found. Use 'tools' to see available tools.")
return
print_header(f"Tool: {tool_name}")
print(f"{Colors.BOLD}Description:{Colors.END} {tool.get('description', 'No description')}")
schema = tool.get('inputSchema', {})
properties = schema.get('properties', {})
required = schema.get('required', [])
if properties:
print(f"\n{Colors.BOLD}Parameters:{Colors.END}")
for param, details in properties.items():
req_marker = f" {Colors.RED}(required){Colors.END}" if param in required else ""
param_type = details.get('type', 'unknown')
description = details.get('description', 'No description')
default = details.get('default')
print(f" • {Colors.BOLD}{param}{Colors.END} ({param_type}){req_marker}: {description}")
if default is not None:
print(f" Default: {default}")
async def handle_tool_call(self, args: List[str]):
"""Handle tool call command."""
if not args:
print_error("Please specify a tool name. Use 'tools' to see available tools.")
return
tool_name = args[0]
# Parse arguments
arguments = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
# Try to parse as JSON, fallback to string
try:
# Handle quoted strings
if value.startswith('"') and value.endswith('"'):
arguments[key] = value[1:-1] # Remove quotes
elif value.lower() in ['true', 'false']:
arguments[key] = value.lower() == 'true'
elif value.isdigit():
arguments[key] = int(value)
else:
arguments[key] = value
except:
arguments[key] = value
try:
result = await self.client.call_tool(tool_name, arguments)
# Display result
print_header("Tool Result")
content = result.get('content', [])
for item in content:
if item.get('type') == 'text':
text = item.get('text', '')
print(text)
else:
print(json.dumps(item, indent=2))
except Exception as e:
print_error(f"Tool call failed: {e}")
async def test_all_tools(client: MCPClient):
"""Test all available tools with sample data."""
print_header("Testing All Tools")
try:
# Initialize and get tools
await client.initialize()
tools = await client.list_tools()
print_info(f"Testing {len(tools)} tools...")
# Test cases for each tool
test_cases = {
'list_db_tables': {},
'describe_db_table': {'table_name': 'users'},
'execute_sql_query': {'query': 'SELECT COUNT(*) as total FROM users', 'limit': 10},
'export_table_csv': {'table_name': 'users', 'limit': 5},
'list_minio_buckets': {},
'list_bucket_objects': {'bucket_name': 'default-bucket', 'max_keys': 10},
'create_minio_bucket': {'bucket_name': f'test-bucket-{int(datetime.now().timestamp())}'},
'upload_to_minio': {
'bucket_name': 'default-bucket',
'object_name': f'test-{int(datetime.now().timestamp())}.txt',
'data': 'Hello from MCP client!',
'content_type': 'text/plain'
}
}
results = []
for tool in tools:
tool_name = tool['name']
if tool_name in test_cases:
print_info(f"Testing {tool_name}...")
try:
result = await client.call_tool(tool_name, test_cases[tool_name])
results.append((tool_name, 'SUCCESS', None))
print_success(f"✓ {tool_name}")
except Exception as e:
results.append((tool_name, 'FAILED', str(e)))
print_error(f"✗ {tool_name}: {e}")
else:
results.append((tool_name, 'SKIPPED', 'No test case defined'))
print_warning(f"- {tool_name}: No test case")
# Summary
print_header("Test Results Summary")
success_count = sum(1 for _, status, _ in results if status == 'SUCCESS')
total_count = len(results)
print(f"Total tools: {total_count}")
print(f"Successful: {Colors.GREEN}{success_count}{Colors.END}")
print(f"Failed: {Colors.RED}{total_count - success_count}{Colors.END}")
# Detailed results
for tool_name, status, error in results:
if status == 'SUCCESS':
print(f" ✅ {tool_name}")
elif status == 'FAILED':
print(f" ❌ {tool_name}: {error}")
else:
print(f" ⏭️ {tool_name}: {error}")
except Exception as e:
print_error(f"Test suite failed: {e}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="DP-MCP Client - Test and interact with DP-MCP server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode
python mcp_client.py --interactive
# List available tools
python mcp_client.py --list-tools
# Call a specific tool
python mcp_client.py --call list_db_tables
python mcp_client.py --call describe_db_table --args '{"table_name": "users"}'
# Test all tools
python mcp_client.py --test-all
# Custom server URL
python mcp_client.py --server http://localhost:9000/mcp --interactive
"""
)
parser.add_argument('--server', default='http://127.0.0.1:8888/mcp',
help='MCP server URL (default: http://127.0.0.1:8888/mcp)')
parser.add_argument('--interactive', action='store_true',
help='Start interactive mode')
parser.add_argument('--list-tools', action='store_true',
help='List all available tools')
parser.add_argument('--call', help='Call a specific tool')
parser.add_argument('--args', help='Tool arguments as JSON string')
parser.add_argument('--test-all', action='store_true',
help='Test all available tools')
parser.add_argument('--ping', action='store_true',
help='Ping the server')
args = parser.parse_args()
# Create client
client = MCPClient(args.server)
async def run():
if args.interactive:
interactive_client = InteractiveMCPClient(client)
await interactive_client.start()
elif args.list_tools:
await client.initialize()
tools = await client.list_tools()
print_header("Available Tools")
for tool in tools:
print(f"• {Colors.BOLD}{tool['name']}{Colors.END}: {tool.get('description', 'No description')}")
elif args.call:
tool_args = {}
if args.args:
try:
tool_args = json.loads(args.args)
except json.JSONDecodeError as e:
print_error(f"Invalid JSON in --args: {e}")
return
await client.initialize()
result = await client.call_tool(args.call, tool_args)
# Display result
content = result.get('content', [])
for item in content:
if item.get('type') == 'text':
print(item.get('text', ''))
else:
print(json.dumps(item, indent=2))
elif args.test_all:
await test_all_tools(client)
elif args.ping:
await client.ping()
else:
parser.print_help()
try:
asyncio.run(run())
except KeyboardInterrupt:
print_info("\nOperation cancelled")
except Exception as e:
print_error(f"Client error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()