---
description: FastMCP Python v2.0 client
globs:
alwaysApply: false
---
> You are an expert in FastMCP Python v2.0 client library, transport protocols, connection management, and distributed MCP architectures. You focus on creating reliable, efficient client applications with proper transport handling, error recovery, and multi-server coordination.
## FastMCP Client Architecture Flow
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Client │ │ Transport │ │ Server │
│ Application │───▶│ Layer │───▶│ Connection │
│ │ │ │ │ │
│ - Tool calls │ │ - STDIO │ │ - MCP Protocol │
│ - Resource req │ │ - HTTP/SSE │ │ - Message │
│ - Multi-server │ │ - In-memory │ │ handling │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Connection │ │ Error │ │ Response │
│ Pooling │ │ Recovery │ │ Processing │
│ & Lifecycle │ │ & Retry │ │ & Validation │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## Project Structure
```
fastmcp_client_project/
├── src/
│ ├── my_mcp_client/
│ │ ├── __init__.py # Package initialization
│ │ ├── client.py # Main client implementation
│ │ └── config.py # Client configuration
│ ├── transports/
│ │ ├── __init__.py # Transport exports
│ │ ├── stdio.py # STDIO transport
│ │ ├── http.py # HTTP/SSE transport
│ │ ├── memory.py # In-memory transport
│ │ └── websocket.py # WebSocket transport
│ ├── connections/
│ │ ├── __init__.py # Connection exports
│ │ ├── pool.py # Connection pooling
│ │ ├── manager.py # Connection management
│ │ └── retry.py # Retry mechanisms
│ ├── multi_server/
│ │ ├── __init__.py # Multi-server exports
│ │ ├── coordinator.py # Server coordination
│ │ ├── load_balancer.py # Load balancing
│ │ └── failover.py # Failover handling
│ └── utils/
│ ├── __init__.py # Utility exports
│ ├── serialization.py # Message serialization
│ └── validation.py # Response validation
├── tests/
│ ├── test_transports/
│ │ ├── test_stdio.py
│ │ ├── test_http.py
│ │ └── test_memory.py
│ ├── test_client.py
│ └── test_multi_server.py
└── examples/
├── basic_client.py # Basic client usage
├── multi_server_client.py # Multi-server example
└── testing_client.py # Testing patterns
```
## Core Implementation Patterns
### Basic Client Configuration and Usage
```python
# ✅ DO: Proper client configuration with comprehensive transport setup
from fastmcp import Client
from fastmcp.transports import StdioTransport, HttpTransport, InMemoryTransport
from fastmcp.client.config import ClientConfig
from typing import Dict, Any, List, Optional, Union
import asyncio
import logging
from contextlib import asynccontextmanager
# Client configuration with multiple transport options
class MultiTransportConfig:
"""Configuration for client with multiple transport options."""
def __init__(self):
self.transports = {
"stdio": {
"command": ["python", "-m", "my_mcp_server"],
"args": ["--config", "server_config.json"],
"env": {"MCP_LOG_LEVEL": "INFO"},
"timeout": 30.0,
"max_retries": 3
},
"http": {
"base_url": "http://localhost:8000",
"timeout": 30.0,
"headers": {"Authorization": "Bearer token"},
"max_connections": 10,
"keepalive": True
},
"memory": {
"server_instance": None, # Will be set dynamically
"isolated": True
}
}
self.client_settings = {
"default_timeout": 30.0,
"max_concurrent_requests": 50,
"retry_backoff": "exponential",
"enable_logging": True,
"log_level": "INFO"
}
# Basic client usage with error handling
async def create_basic_client() -> Client:
"""Create a basic MCP client with STDIO transport."""
transport = StdioTransport(
command=["python", "-m", "my_mcp_server"],
args=["--port", "8080"],
timeout=30.0
)
client = Client(transport=transport)
try:
await client.initialize()
logging.info("Client initialized successfully")
return client
except Exception as e:
logging.error(f"Failed to initialize client: {e}")
raise
# Advanced client with HTTP transport
async def create_http_client(base_url: str, auth_token: str) -> Client:
"""Create MCP client with HTTP/SSE transport."""
transport = HttpTransport(
base_url=base_url,
headers={
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
"User-Agent": "FastMCP-Client/1.0"
},
timeout=30.0,
max_connections=10,
keepalive_timeout=60.0
)
client = Client(
transport=transport,
config=ClientConfig(
max_concurrent_requests=20,
default_timeout=15.0,
retry_attempts=3,
retry_backoff="exponential"
)
)
await client.initialize()
return client
# In-memory client for testing
async def create_memory_client(server_instance) -> Client:
"""Create in-memory client for testing."""
transport = InMemoryTransport(server=server_instance)
client = Client(
transport=transport,
config=ClientConfig(
max_concurrent_requests=100, # Higher for testing
default_timeout=5.0,
enable_debug_logging=True
)
)
await client.initialize()
return client
# Client usage with proper lifecycle management
@asynccontextmanager
async def managed_client(transport_config: Dict[str, Any]):
"""Context manager for proper client lifecycle."""
client = None
try:
# Create transport based on config
transport_type = transport_config.get("type", "stdio")
if transport_type == "stdio":
transport = StdioTransport(**transport_config["params"])
elif transport_type == "http":
transport = HttpTransport(**transport_config["params"])
elif transport_type == "memory":
transport = InMemoryTransport(**transport_config["params"])
else:
raise ValueError(f"Unsupported transport type: {transport_type}")
# Create and initialize client
client = Client(transport=transport)
await client.initialize()
yield client
except Exception as e:
logging.error(f"Client error: {e}")
raise
finally:
if client:
await client.cleanup()
# ❌ DON'T: Use clients without proper configuration or lifecycle management
async def bad_client_usage():
# No error handling, no configuration
client = Client() # No transport specified
await client.call_tool("some_tool", {}) # No initialization
# No cleanup
```
### Advanced Tool and Resource Operations
```python
# ✅ DO: Implement comprehensive tool and resource operations with error handling
from fastmcp.client.exceptions import ToolError, ResourceError, TransportError
from fastmcp.client.retry import RetryPolicy, ExponentialBackoff
import json
from datetime import datetime
class AdvancedMCPClient:
"""Advanced MCP client with comprehensive error handling and features."""
def __init__(self, client: Client):
self.client = client
self.retry_policy = RetryPolicy(
max_attempts=3,
backoff=ExponentialBackoff(initial_delay=1.0, max_delay=30.0),
retriable_exceptions=(TransportError, ConnectionError, TimeoutError)
)
async def call_tool_with_retry(
self,
tool_name: str,
arguments: Dict[str, Any],
timeout: Optional[float] = None,
retry_policy: Optional[RetryPolicy] = None
) -> List[Any]:
"""Call tool with comprehensive retry logic and error handling."""
retry_pol = retry_policy or self.retry_policy
last_exception = None
for attempt in range(retry_pol.max_attempts):
try:
logging.info(f"Calling tool '{tool_name}' (attempt {attempt + 1}/{retry_pol.max_attempts})")
# Validate arguments before sending
validated_args = await self._validate_tool_arguments(tool_name, arguments)
# Call tool with timeout
result = await asyncio.wait_for(
self.client.call_tool(tool_name, validated_args),
timeout=timeout or 30.0
)
logging.info(f"Tool '{tool_name}' completed successfully")
return result
except (TransportError, ConnectionError, TimeoutError) as e:
last_exception = e
if attempt < retry_pol.max_attempts - 1:
delay = retry_pol.backoff.get_delay(attempt)
logging.warning(f"Tool call failed (attempt {attempt + 1}), retrying in {delay}s: {e}")
await asyncio.sleep(delay)
else:
logging.error(f"Tool call failed after {retry_pol.max_attempts} attempts: {e}")
except ToolError as e:
# Tool errors are not retriable
logging.error(f"Tool '{tool_name}' returned error: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error calling tool '{tool_name}': {e}")
raise
# If we get here, all retries failed
raise last_exception
async def batch_tool_calls(
self,
tool_calls: List[Dict[str, Any]],
max_concurrent: int = 5,
fail_fast: bool = False
) -> List[Union[List[Any], Exception]]:
"""Execute multiple tool calls concurrently with proper error handling."""
logging.info(f"Executing {len(tool_calls)} tool calls with max_concurrent={max_concurrent}")
semaphore = asyncio.Semaphore(max_concurrent)
async def execute_single_call(call_info: Dict[str, Any]) -> Union[List[Any], Exception]:
async with semaphore:
try:
tool_name = call_info["tool"]
arguments = call_info.get("arguments", {})
timeout = call_info.get("timeout")
return await self.call_tool_with_retry(tool_name, arguments, timeout)
except Exception as e:
if fail_fast:
raise
return e
# Execute all calls
tasks = [execute_single_call(call) for call in tool_calls]
if fail_fast:
results = await asyncio.gather(*tasks)
else:
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log summary
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
logging.info(f"Batch execution complete: {successful} successful, {failed} failed")
return results
async def read_resource_with_caching(
self,
resource_uri: str,
cache_duration: int = 300, # 5 minutes
force_refresh: bool = False
) -> List[Any]:
"""Read resource with client-side caching."""
cache_key = f"resource:{resource_uri}"
# Check cache unless force refresh
if not force_refresh:
cached_result = await self._get_from_cache(cache_key)
if cached_result:
logging.debug(f"Resource '{resource_uri}' found in cache")
return cached_result
logging.info(f"Reading resource: {resource_uri}")
try:
# Read resource with retry
result = await self.retry_policy.execute(
self.client.read_resource,
resource_uri
)
# Cache the result
await self._set_cache(cache_key, result, cache_duration)
logging.info(f"Resource '{resource_uri}' read successfully")
return result
except ResourceError as e:
logging.error(f"Resource error for '{resource_uri}': {e}")
raise
except Exception as e:
logging.error(f"Unexpected error reading resource '{resource_uri}': {e}")
raise
async def stream_resource_updates(
self,
resource_uri_pattern: str,
callback: callable,
poll_interval: float = 5.0
):
"""Stream resource updates by polling."""
logging.info(f"Starting resource streaming for pattern: {resource_uri_pattern}")
last_content_hash = None
while True:
try:
# List resources matching pattern
resources = await self.client.list_resources()
matching_resources = [
r for r in resources
if r.uri.startswith(resource_uri_pattern.replace("*", ""))
]
for resource in matching_resources:
try:
content = await self.client.read_resource(resource.uri)
content_hash = hash(str(content))
# Check if content changed
cache_key = f"hash:{resource.uri}"
old_hash = await self._get_from_cache(cache_key)
if content_hash != old_hash:
logging.debug(f"Resource '{resource.uri}' changed, notifying callback")
await callback(resource.uri, content)
await self._set_cache(cache_key, content_hash, 3600)
except Exception as e:
logging.warning(f"Error processing resource '{resource.uri}': {e}")
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
logging.info("Resource streaming cancelled")
break
except Exception as e:
logging.error(f"Error in resource streaming: {e}")
await asyncio.sleep(poll_interval)
async def _validate_tool_arguments(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Validate tool arguments against tool schema."""
# Get tool info
tools = await self.client.list_tools()
tool_info = next((t for t in tools if t.name == tool_name), None)
if not tool_info:
raise ToolError(f"Tool '{tool_name}' not found")
# Basic validation - in production would use JSON schema
if tool_info.inputSchema:
required_fields = tool_info.inputSchema.get("required", [])
for field in required_fields:
if field not in arguments:
raise ToolError(f"Required field '{field}' missing for tool '{tool_name}'")
return arguments
async def _get_from_cache(self, key: str) -> Optional[Any]:
"""Get value from cache (implementation depends on cache backend)."""
# This would integrate with Redis, memcached, or in-memory cache
return None
async def _set_cache(self, key: str, value: Any, ttl: int = 300):
"""Set value in cache (implementation depends on cache backend)."""
# This would integrate with Redis, memcached, or in-memory cache
pass
# Usage example
async def advanced_client_example():
"""Example of advanced client usage."""
async with managed_client({
"type": "http",
"params": {
"base_url": "http://localhost:8000",
"timeout": 30.0
}
}) as client:
advanced_client = AdvancedMCPClient(client)
# Single tool call with retry
result = await advanced_client.call_tool_with_retry(
"process_data",
{"input": "test data", "format": "json"}
)
# Batch tool calls
batch_calls = [
{"tool": "validate_data", "arguments": {"data": "item1"}},
{"tool": "validate_data", "arguments": {"data": "item2"}},
{"tool": "process_data", "arguments": {"input": "batch_data"}}
]
batch_results = await advanced_client.batch_tool_calls(
batch_calls,
max_concurrent=3
)
# Resource with caching
config_data = await advanced_client.read_resource_with_caching(
"config://server/settings"
)
# ❌ DON'T: Use simple client calls without error handling
async def bad_client_usage():
client = Client()
result = await client.call_tool("tool", {}) # No error handling, no validation
return result # No logging, no retry logic
```
### Multi-Server Client Management
```python
# ✅ DO: Implement comprehensive multi-server client management
from fastmcp.client.multi import MultiServerClient, ServerConfig, LoadBalancer
from typing import Dict, List, Optional, Callable
import random
import time
class ServerHealth:
"""Track server health and performance metrics."""
def __init__(self, server_id: str):
self.server_id = server_id
self.is_healthy = True
self.last_check = time.time()
self.response_times: List[float] = []
self.error_count = 0
self.success_count = 0
self.consecutive_failures = 0
def record_success(self, response_time: float):
"""Record successful operation."""
self.success_count += 1
self.consecutive_failures = 0
self.response_times.append(response_time)
# Keep only last 100 response times
if len(self.response_times) > 100:
self.response_times.pop(0)
self.is_healthy = True
self.last_check = time.time()
def record_failure(self):
"""Record failed operation."""
self.error_count += 1
self.consecutive_failures += 1
self.last_check = time.time()
# Mark as unhealthy after 3 consecutive failures
if self.consecutive_failures >= 3:
self.is_healthy = False
@property
def average_response_time(self) -> float:
"""Get average response time."""
if not self.response_times:
return 0.0
return sum(self.response_times) / len(self.response_times)
@property
def success_rate(self) -> float:
"""Get success rate percentage."""
total = self.success_count + self.error_count
if total == 0:
return 0.0
return (self.success_count / total) * 100
class SmartLoadBalancer:
"""Smart load balancer with health-aware routing."""
def __init__(self):
self.servers: Dict[str, ServerHealth] = {}
self.routing_strategy = "weighted_round_robin"
self.current_index = 0
def add_server(self, server_id: str):
"""Add server to load balancer."""
self.servers[server_id] = ServerHealth(server_id)
def remove_server(self, server_id: str):
"""Remove server from load balancer."""
self.servers.pop(server_id, None)
def select_server(self, exclude_servers: Optional[List[str]] = None) -> Optional[str]:
"""Select best server based on health and performance."""
exclude_servers = exclude_servers or []
available_servers = [
server_id for server_id, health in self.servers.items()
if health.is_healthy and server_id not in exclude_servers
]
if not available_servers:
# No healthy servers, try any available server
available_servers = [
server_id for server_id in self.servers.keys()
if server_id not in exclude_servers
]
if not available_servers:
return None
if self.routing_strategy == "round_robin":
return self._round_robin_select(available_servers)
elif self.routing_strategy == "weighted_round_robin":
return self._weighted_select(available_servers)
elif self.routing_strategy == "fastest":
return self._fastest_select(available_servers)
else:
return random.choice(available_servers)
def _round_robin_select(self, servers: List[str]) -> str:
"""Simple round-robin selection."""
server = servers[self.current_index % len(servers)]
self.current_index += 1
return server
def _weighted_select(self, servers: List[str]) -> str:
"""Weighted selection based on performance."""
weights = []
for server_id in servers:
health = self.servers[server_id]
# Weight based on success rate and response time
weight = health.success_rate / (health.average_response_time + 1.0)
weights.append(weight)
# Select based on weights
total_weight = sum(weights)
if total_weight == 0:
return random.choice(servers)
r = random.uniform(0, total_weight)
cumulative = 0
for i, weight in enumerate(weights):
cumulative += weight
if r <= cumulative:
return servers[i]
return servers[-1]
def _fastest_select(self, servers: List[str]) -> str:
"""Select server with best average response time."""
fastest_server = min(
servers,
key=lambda s: self.servers[s].average_response_time
)
return fastest_server
class DistributedMCPClient:
"""Distributed MCP client with multiple server support."""
def __init__(self):
self.clients: Dict[str, Client] = {}
self.server_configs: Dict[str, ServerConfig] = {}
self.load_balancer = SmartLoadBalancer()
self.health_check_interval = 30.0
self._health_check_task: Optional[asyncio.Task] = None
async def initialize(self):
"""Initialize distributed client."""
# Start health check task
self._health_check_task = asyncio.create_task(self._health_check_loop())
logging.info("Distributed MCP client initialized")
async def cleanup(self):
"""Clean up distributed client."""
# Cancel health check task
if self._health_check_task:
self._health_check_task.cancel()
try:
await self._health_check_task
except asyncio.CancelledError:
pass
# Clean up all clients
for client in self.clients.values():
await client.cleanup()
logging.info("Distributed MCP client cleaned up")
async def add_server(
self,
server_id: str,
config: ServerConfig,
auto_connect: bool = True
):
"""Add server to the distributed client."""
self.server_configs[server_id] = config
self.load_balancer.add_server(server_id)
if auto_connect:
await self._connect_server(server_id)
logging.info(f"Added server '{server_id}' to distributed client")
async def remove_server(self, server_id: str):
"""Remove server from distributed client."""
# Clean up client connection
if server_id in self.clients:
await self.clients[server_id].cleanup()
del self.clients[server_id]
# Remove from config and load balancer
self.server_configs.pop(server_id, None)
self.load_balancer.remove_server(server_id)
logging.info(f"Removed server '{server_id}' from distributed client")
async def call_tool_distributed(
self,
tool_name: str,
arguments: Dict[str, Any],
preferred_server: Optional[str] = None,
fallback_enabled: bool = True,
timeout: Optional[float] = None
) -> List[Any]:
"""Call tool across distributed servers with fallback."""
# Determine server selection
if preferred_server and preferred_server in self.clients:
selected_servers = [preferred_server]
if fallback_enabled:
# Add other servers as fallback
other_servers = [s for s in self.clients.keys() if s != preferred_server]
selected_servers.extend(other_servers)
else:
# Use load balancer to select servers
primary_server = self.load_balancer.select_server()
if not primary_server:
raise ConnectionError("No healthy servers available")
selected_servers = [primary_server]
if fallback_enabled:
other_servers = [s for s in self.clients.keys() if s != primary_server]
selected_servers.extend(other_servers)
last_exception = None
for server_id in selected_servers:
try:
client = self.clients.get(server_id)
if not client:
continue
logging.debug(f"Trying tool '{tool_name}' on server '{server_id}'")
start_time = time.time()
result = await asyncio.wait_for(
client.call_tool(tool_name, arguments),
timeout=timeout or 30.0
)
response_time = time.time() - start_time
# Record success
self.load_balancer.servers[server_id].record_success(response_time)
logging.info(f"Tool '{tool_name}' completed on server '{server_id}' in {response_time:.2f}s")
return result
except Exception as e:
last_exception = e
# Record failure
if server_id in self.load_balancer.servers:
self.load_balancer.servers[server_id].record_failure()
logging.warning(f"Tool '{tool_name}' failed on server '{server_id}': {e}")
if not fallback_enabled:
raise
# All servers failed
raise last_exception or ConnectionError("All servers failed")
async def aggregate_resources(self) -> Dict[str, List[Any]]:
"""Aggregate resources from all connected servers."""
all_resources = {}
for server_id, client in self.clients.items():
try:
resources = await client.list_resources()
server_resources = []
for resource in resources:
server_resources.append({
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"server_id": server_id
})
all_resources[server_id] = server_resources
except Exception as e:
logging.warning(f"Failed to get resources from server '{server_id}': {e}")
all_resources[server_id] = []
return all_resources
async def broadcast_operation(
self,
operation: Callable,
*args,
**kwargs
) -> Dict[str, Union[Any, Exception]]:
"""Broadcast operation to all connected servers."""
tasks = {}
for server_id, client in self.clients.items():
tasks[server_id] = asyncio.create_task(
operation(client, *args, **kwargs)
)
results = {}
for server_id, task in tasks.items():
try:
results[server_id] = await task
except Exception as e:
results[server_id] = e
return results
async def _connect_server(self, server_id: str):
"""Connect to a specific server."""
config = self.server_configs[server_id]
try:
# Create transport based on config
if config.transport_type == "stdio":
transport = StdioTransport(**config.transport_params)
elif config.transport_type == "http":
transport = HttpTransport(**config.transport_params)
else:
raise ValueError(f"Unsupported transport: {config.transport_type}")
# Create and initialize client
client = Client(transport=transport)
await client.initialize()
self.clients[server_id] = client
logging.info(f"Connected to server '{server_id}'")
except Exception as e:
logging.error(f"Failed to connect to server '{server_id}': {e}")
raise
async def _health_check_loop(self):
"""Periodic health check for all servers."""
while True:
try:
await asyncio.sleep(self.health_check_interval)
await self._perform_health_checks()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Health check error: {e}")
async def _perform_health_checks(self):
"""Perform health checks on all servers."""
for server_id, client in self.clients.items():
try:
# Simple health check - list tools
start_time = time.time()
await asyncio.wait_for(client.list_tools(), timeout=5.0)
response_time = time.time() - start_time
self.load_balancer.servers[server_id].record_success(response_time)
except Exception as e:
self.load_balancer.servers[server_id].record_failure()
logging.warning(f"Health check failed for server '{server_id}': {e}")
# ❌ DON'T: Use multiple clients without coordination or health monitoring
async def bad_multi_client():
# No coordination, no health monitoring, no failover
client1 = Client()
client2 = Client()
# Random selection without intelligence
import random
client = random.choice([client1, client2])
return await client.call_tool("tool", {})
```
## Testing Patterns and In-Memory Transport
```python
# ✅ DO: Comprehensive testing patterns with in-memory transport
import pytest
from fastmcp import FastMCP, Client
from fastmcp.transports import InMemoryTransport
from unittest.mock import Mock, AsyncMock, patch
@pytest.fixture
async def test_server():
"""Create test server for client testing."""
server = FastMCP(name="test-server")
@server.tool()
async def echo_tool(message: str) -> str:
return f"Echo: {message}"
@server.tool()
async def failing_tool(should_fail: bool = False) -> str:
if should_fail:
raise Exception("Tool intentionally failed")
return "Success"
@server.resource("test://data")
def test_data() -> Dict[str, Any]:
return {"test": "data", "timestamp": "2024-01-01"}
return server
@pytest.fixture
async def memory_client(test_server):
"""Create in-memory client for testing."""
transport = InMemoryTransport(server=test_server)
client = Client(transport=transport)
await client.initialize()
yield client
await client.cleanup()
@pytest.mark.asyncio
async def test_basic_client_operations(memory_client):
"""Test basic client operations."""
# Test tool calling
result = await memory_client.call_tool("echo_tool", {"message": "test"})
assert len(result) == 1
assert "Echo: test" in result[0].text
# Test resource reading
content = await memory_client.read_resource("test://data")
assert len(content) == 1
import json
data = json.loads(content[0].text)
assert data["test"] == "data"
@pytest.mark.asyncio
async def test_client_error_handling(memory_client):
"""Test client error handling."""
# Test tool error
with pytest.raises(Exception):
await memory_client.call_tool("failing_tool", {"should_fail": True})
# Test non-existent tool
with pytest.raises(Exception):
await memory_client.call_tool("non_existent_tool", {})
@pytest.mark.asyncio
async def test_advanced_client_operations():
"""Test advanced client operations with mocked transport."""
# Create mock transport
mock_transport = Mock()
mock_transport.initialize = AsyncMock()
mock_transport.cleanup = AsyncMock()
mock_transport.send_request = AsyncMock()
# Mock responses
mock_transport.send_request.return_value = {
"result": [{"type": "text", "text": "Mocked response"}]
}
client = Client(transport=mock_transport)
await client.initialize()
try:
# Test with mocked response
result = await client.call_tool("test_tool", {"param": "value"})
assert result[0].text == "Mocked response"
# Verify transport was called correctly
mock_transport.send_request.assert_called_once()
call_args = mock_transport.send_request.call_args[0][0]
assert call_args["method"] == "tools/call"
assert call_args["params"]["name"] == "test_tool"
finally:
await client.cleanup()
@pytest.mark.asyncio
async def test_concurrent_client_operations(memory_client):
"""Test concurrent client operations."""
# Test concurrent tool calls
tasks = [
memory_client.call_tool("echo_tool", {"message": f"test_{i}"})
for i in range(10)
]
results = await asyncio.gather(*tasks)
assert len(results) == 10
for i, result in enumerate(results):
assert f"Echo: test_{i}" in result[0].text
@pytest.mark.asyncio
async def test_client_lifecycle():
"""Test client lifecycle management."""
server = FastMCP(name="lifecycle-test")
@server.tool()
async def test_tool() -> str:
return "test"
transport = InMemoryTransport(server=server)
client = Client(transport=transport)
# Test initialization
await client.initialize()
assert client.is_connected()
# Test operation
result = await client.call_tool("test_tool", {})
assert result[0].text == "test"
# Test cleanup
await client.cleanup()
assert not client.is_connected()
@pytest.mark.asyncio
async def test_multi_server_client():
"""Test multi-server client functionality."""
# Create test servers
server1 = FastMCP(name="server1")
server2 = FastMCP(name="server2")
@server1.tool()
async def server1_tool() -> str:
return "response from server1"
@server2.tool()
async def server2_tool() -> str:
return "response from server2"
# Create distributed client
distributed_client = DistributedMCPClient()
await distributed_client.initialize()
try:
# Add servers
await distributed_client.add_server("server1", ServerConfig(
transport_type="memory",
transport_params={"server": server1}
))
await distributed_client.add_server("server2", ServerConfig(
transport_type="memory",
transport_params={"server": server2}
))
# Test distributed tool calls
result1 = await distributed_client.call_tool_distributed(
"server1_tool", {}, preferred_server="server1"
)
assert "server1" in result1[0].text
result2 = await distributed_client.call_tool_distributed(
"server2_tool", {}, preferred_server="server2"
)
assert "server2" in result2[0].text
finally:
await distributed_client.cleanup()
# ❌ DON'T: Skip testing or use inadequate test patterns
@pytest.mark.asyncio
async def bad_test():
# No proper setup, no error handling, no cleanup
client = Client()
result = await client.call_tool("tool", {})
assert result # Too generic assertion
```
## Best Practices Summary
### Client Configuration
- Use appropriate transport for your use case
- Configure proper timeouts and retry policies
- Implement comprehensive error handling
- Use context managers for lifecycle management
### Transport Selection
- STDIO for subprocess-based servers
- HTTP/SSE for remote servers and web deployments
- In-memory for testing and embedded scenarios
- WebSocket for real-time bidirectional communication
### Multi-Server Management
- Implement health monitoring and load balancing
- Use intelligent server selection algorithms
- Provide graceful failover capabilities
- Monitor performance metrics across servers
### Testing
- Use in-memory transport for unit testing
- Test error scenarios and edge cases
- Verify concurrent operation handling
- Test client lifecycle management
## References
- [FastMCP Client Documentation](mdc:https:/gofastmcp.com/clients/client)
- [FastMCP Transports Guide](mdc:https:/gofastmcp.com/clients/transports)
- [MCP Client Specification](mdc:https:/spec.modelcontextprotocol.io/specification/client)
- [Python asyncio Documentation](mdc:https:/docs.python.org/3/library/asyncio.html)