# mcp_client.py
# This file contains the McpClient class used by the MCP Host (LLM orchestration or Cursor tools)
# to communicate with the MCP Data Server. It includes methods for all defined and planned endpoints.
import requests
import json
import logging
import sys # Import sys for better error reporting
# --- Configuration ---
# IMPORTANT: Set this URL to match the address and port your MCP Data Server is running on.
# If running locally on the default port:
MCP_SERVER_URL = "http://127.0.0.1:5001"
# If you exposed it on 0.0.0.0 and are accessing it from another machine/container:
# MCP_SERVER_URL = "http://<Server_IP_Address>:5001"
# --- Logging Setup for the Client ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout)
logger = logging.getLogger(__name__)
# Ensure requests logging is not overly verbose unless needed for debugging
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class McpClient:
"""
Client class to interact with the MCP Data Server via HTTP requests.
Translates method calls into structured POST requests with JSON payloads.
"""
def __init__(self, server_url: str = MCP_SERVER_URL):
"""
Initializes the McpClient.
Args:
server_url (str): The base URL of the MCP Data Server.
"""
self.server_url = server_url
logger.info(f"McpClient initialized for server {self.server_url}")
def _call_server(self, method_name: str, parameters: dict):
"""
Internal helper to send a POST request to a specific server endpoint.
Args:
method_name (str): The name of the MCP method (endpoint) to call (e.g., 'get_store_products').
parameters (dict): A dictionary of parameters to send in the JSON payload.
This dictionary should include the 'intent' parameter.
Returns:
dict or list: The JSON response from the server.
Raises:
requests.exceptions.RequestException: If the HTTP request fails or returns an error status.
ConnectionError: If unable to connect to the server.
"""
endpoint = f"{self.server_url}/{method_name}"
logger.info(f"Attempting to call MCP Server endpoint: {endpoint}")
# logger.debug(f"Parameters being sent: {json.dumps(parameters)}") # Use debug level for full params
try:
# Send the POST request with JSON payload
response = requests.post(endpoint, json=parameters, timeout=30) # Add a timeout
# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()
# Parse the JSON response from the server
result = response.json()
logger.info(f"Successfully received response from {method_name}")
return result
except requests.exceptions.Timeout:
logger.error(f"MCP Client: Request to {endpoint} timed out after 30 seconds.")
raise # Re-raise the exception for the caller to handle
except requests.exceptions.ConnectionError as e:
logger.error(f"MCP Client: Failed to connect to server {self.server_url} for {method_name}: {e}")
# Provide a more user-friendly error message if possible
raise ConnectionError(f"Could not connect to the MCP server at {self.server_url}. Is it running and accessible?") from e
except requests.exceptions.RequestException as e:
logger.error(f"MCP Client: HTTP request failed for {endpoint}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Server responded with status {e.response.status_code}: {e.response.text}")
try:
server_error_details = e.response.json()
logger.error(f"Server error details: {server_error_details}")
# You might include server details in the re-raised exception
raise requests.exceptions.RequestException(
f"Server responded with error {e.response.status_code}: {server_error_details.get('error', e.response.text)}"
) from e
except json.JSONDecodeError:
# Server didn't return JSON error details
raise requests.exceptions.RequestException(
f"Server responded with error {e.response.status_code}: {e.response.text}"
) from e
else:
raise # Re-raise the original exception
# --- Client Methods Mirroring Server Endpoints ---
# These methods package the arguments into the 'parameters' dict for _call_server
# The 'intent' parameter is included in the dict for logging/auditing on the server side.
def get_store_products(self, store_name: str, query: dict = None, sort_by: str = 'score', sort_order: str = 'desc', limit: int = 10, fields: list = None, intent: str = "query_store_data"):
"""
Calls the /get_store_products MCP method on the server.
Retrieves product data from a specific store based on query, sorting, and limit.
Args:
store_name (str): The name of the store (Required).
query (dict, optional): Filtering criteria. Defaults to None.
sort_by (str, optional): Field to sort by. Defaults to 'score'.
sort_order (str, optional): 'asc' or 'desc'. Defaults to 'desc'.
limit (int, optional): Maximum number of results. Defaults to 10.
fields (list, optional): List of fields to return. Defaults to all.
intent (str, optional): Client's purpose. Defaults to "query_store_data".
Returns:
list: A list of product dictionaries.
"""
parameters = {
"store_name": store_name,
"query": query if query is not None else {},
"sort_by": sort_by,
"sort_order": sort_order,
"limit": limit,
"fields": fields,
"intent": intent # Pass the intent received by the client caller
}
# Ensure store_name is provided, as it's required by the server
if not store_name:
logger.error(f"get_store_products called without store_name (Intent: {intent})")
raise ValueError("store_name is required for get_store_products")
return self._call_server("get_store_products", parameters)
def get_all_top_products(self, k: int = 10, sort_by: str = 'score', sort_order: str = 'desc', fields: list = None, intent: str = "get_overall_top"):
"""
Calls the /get_all_top_products MCP method on the server.
Retrieves the top products across all loaded stores, ranked by score or other criteria.
Args:
k (int, optional): Number of top products to return overall. Defaults to 10.
sort_by (str, optional): Field to sort by. Defaults to 'score'.
sort_order (str, optional): 'asc' or 'desc'. Defaults to 'desc'.
fields (list, optional): List of fields to return. Defaults to all.
intent (str, optional): Client's purpose. Defaults to "get_overall_top".
Returns:
list: A list of product dictionaries.
"""
parameters = {
"k": k,
"sort_by": sort_by,
"sort_order": sort_order,
"fields": fields,
"intent": intent
}
return self._call_server("get_all_top_products", parameters)
def get_product_details(self, product_urls: list, fields: list = None, intent: str = "get_product_details"):
"""
Calls the /get_product_details MCP method on the server.
Retrieves detailed information for specific products using their URLs.
Args:
product_urls (list): List of product URLs to retrieve (Required).
fields (list, optional): List of fields to return. Defaults to all.
intent (str, optional): Client's purpose. Defaults to "get_product_details".
Returns:
list: A list of product dictionaries for the found products.
"""
# Basic validation before calling the server
if not product_urls or not isinstance(product_urls, list):
logger.warning(f"get_product_details called with empty or non-list product_urls (Intent: {intent}). Returning empty list.")
return [] # Return empty list immediately if no URLs provided
parameters = {
"product_urls": product_urls,
"fields": fields,
"intent": intent # Pass the intent
}
return self._call_server("get_product_details", parameters)
def get_available_filters(self, store_name: str = None, intent: str = "get_filter_options"):
"""
Calls the /get_available_filters MCP method on the server.
Retrieves the names of available data fields (columns) for filtering/sorting.
Args:
store_name (str, optional): The name of the store. If omitted, returns aggregated fields. Defaults to None.
intent (str, optional): Client's purpose. Defaults to "get_filter_options".
Returns:
dict: A dictionary, typically with a key like 'available_fields' containing a list of strings.
"""
parameters = {
"store_name": store_name, # Can be None for aggregated fields
"intent": intent
}
return self._call_server("get_available_filters", parameters)
# --- Potential Future Methods (if you extend the server) ---
def list_available_stores(self, intent: str = "list_stores"):
"""
(Potential Future Method) Calls a server endpoint to list all store names
for which data has been loaded.
Args:
intent (str, optional): Client's purpose. Defaults to "list_stores".
Returns:
list: A list of store name strings.
Note: This endpoint (/list_stores) needs to be implemented on the server.
"""
# This assumes a future server endpoint '/list_stores' that takes no parameters
# or potentially takes an 'intent' parameter.
parameters = {"intent": intent}
logger.warning("Calling potential future method list_available_stores. Ensure your server implements the '/list_stores' endpoint.")
# If the server endpoint requires GET or different parameters, adjust this
return self._call_server("list_stores", parameters) # Assumes endpoint is /list_stores
def get_product_aggregations(self, store_name: str = None, query: dict = None, aggregations: dict = None, intent: str = "get_aggregations"):
# This assumes a future server endpoint '/get_product_aggregations'
parameters = {
"store_name": store_name,
"query": query if query is not None else {},
"aggregations": aggregations, # Pass the requested aggregations structure
"intent": intent
}
logger.warning("Calling potential future method get_product_aggregations. Ensure your server implements the '/get_product_aggregations' endpoint.")
return self._call_server("get_product_aggregations", parameters) # Assumes endpoint is /get_product_aggregation
# --- Example Usage (for testing the client library itself) ---
if __name__ == "__main__":
# This block runs only when you execute this script directly
# It's useful for verifying the client can talk to the server
# Make sure your MCP Data Server (mcp_data_server.py) is running in a separate terminal!
client = McpClient()
print(f"Testing client connection to: {client.server_url}\n")
# --- Test get_store_products ---
print("--- Testing get_store_products ---")
try:
# Use your actual store name loaded by the server (e.g., 'aeropress_product_cleaned')
store_name_to_test = "aeropress_product_cleaned" # <-- REPLACE with your actual store name
logger.info(f"Attempting to get products from store: {store_name_to_test}")
products = client.get_store_products(
store_name=store_name_to_test,
limit=3,
fields=["product_name", "price", "score", "rating"], # Use your actual column names
intent="client_test_get_store_products"
)
print(f"Successfully retrieved {len(products)} products from {store_name_to_test}:")
print(json.dumps(products, indent=2))
except Exception as e:
logger.error(f"Error testing get_store_products: {e}")
print("\n" + "="*50 + "\n")
# --- Test get_all_top_products ---
print("--- Testing get_all_top_products ---")
try:
logger.info("Attempting to get overall top 5 products.")
overall_top = client.get_all_top_products(
k=5,
fields=["product_name", "store_name", "score", "product_url"],
intent="client_test_get_overall_top"
)
print(f"Successfully retrieved {len(overall_top)} overall top products:")
print(json.dumps(overall_top, indent=2))
except Exception as e:
logger.error(f"Error testing get_all_top_products: {e}")
print("\n" + "="*50 + "\n")
# --- Test get_available_filters ---
print("--- Testing get_available_filters ---")
try:
logger.info(f"Attempting to get available fields for store: {store_name_to_test}") # Use the same store name
available_fields = client.get_available_filters(
store_name=store_name_to_test,
intent="client_test_get_fields"
)
print("Successfully retrieved available fields:")
print(json.dumps(available_fields, indent=2))
except Exception as e:
logger.error(f"Error testing get_available_filters: {e}")
print("\n" + "="*50 + "\n")
# --- Test get_product_details (requires actual URLs from your data) ---
print("--- Testing get_product_details ---")
try:
# Replace with actual product URLs from your data!
# Get a couple of URLs from the previous output if needed.
urls_to_test = [
"REPLACE_WITH_AN_ACTUAL_PRODUCT_URL_FROM_YOUR_DATA_1",
"REPLACE_WITH_AN_ACTUAL_PRODUCT_URL_FROM_YOUR_DATA_2"
]
# Check if placeholder URLs are still there or if the list is empty
if urls_to_test and not urls_to_test[0].startswith("REPLACE_WITH_"):
logger.info(f"Attempting to get details for URLs: {urls_to_test}")
product_details = client.get_product_details(
product_urls=urls_to_test,
fields=["product_name", "description", "price", "product_url"],
intent="client_test_get_details"
)
print(f"Successfully retrieved details for {len(product_details)} products:")
print(json.dumps(product_details, indent=2))
else:
logger.warning("Skipping get_product_details test: Please replace placeholder URLs with actual URLs from your data.")
except Exception as e:
logger.error(f"Error testing get_product_details: {e}")
print("\n" + "="*50 + "\n")
# --- Test Potential Future Methods (These will likely fail unless you add endpoints to your server!) ---
print("--- Testing Potential Future Method: list_available_stores ---")
try:
# This will likely fail unless you add a '/list_stores' endpoint to your server
logger.info("Attempting to list available stores.")
stores_list = client.list_available_stores(intent="client_test_list_stores")
print("Successfully retrieved list of stores:")
print(json.dumps(stores_list, indent=2))
except Exception as e:
logger.error(f"Error testing list_available_stores (Expected if server endpoint is not implemented): {e}")
print("\n" + "="*50 + "\n")
print("--- Testing Potential Future Method: get_product_aggregations ---")
try:
# This will likely fail unless you add a '/get_product_aggregations' endpoint
logger.info(f"Attempting to get aggregations for store: {store_name_to_test}")
aggregations_results = client.get_product_aggregations(
store_name=store_name_to_test,
query={"price": {"operator": ">", "value": 10}}, # Example query
aggregations={"avg_price": "price", "total_reviews": "review_count"}, # Example aggregations
intent="client_test_get_aggregations"
)
print("Successfully retrieved aggregation results:")
print(json.dumps(aggregations_results, indent=2))
except Exception as e:
logger.error(f"Error testing get_product_aggregations (Expected if server endpoint is not implemented): {e}")
print("\n" + "="*50 + "\n")