# tool_server.py (Or whatever you named your main file with mcp.run)
from typing import Any, List, Dict, Optional # Import necessary types for type hinting
from mcp.server.fastmcp import FastMCP
import requests # Import requests specifically for catching its exceptions
import json # Import json for potential error detail parsing
import logging # Import logging
# Configure logging for this tool server file
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Initialize FastMCP server ---
# This server exposes the tools defined below to the LLM/Agent (via stdio or http)
mcp = FastMCP("ProductDataToolServer") # Give it a descriptive name
# --- Initialize the client to talk to the SEPARATE Flask Data Server ---
# Assuming mcp_client.py is in the same directory or accessible in your Python path
try:
from mcp_client import McpClient
except ImportError:
logger.error("Failed to import McpClient. Make sure mcp_client.py is in your Python path.")
# Depending on criticality, you might sys.exit(1) here if the client is essential
McpClient = None # Set to None so subsequent checks can handle it
# The URL for your Flask Data Server. This MUST match where mcp_data_server.py is running.
# Based on your mcp_data_server.py and mcp_client.py, the default is http://127.0.0.1:5001
# If your Flask server is running on a different machine or port, update this constant.
DATA_SERVER_URL = "http://127.0.0.1:5001"
# Initialize the client instance. We'll check if McpClient was successfully imported first.
data_client = None
if McpClient:
data_client = McpClient(server_url=DATA_SERVER_URL)
logger.info(f"Product Data McpClient initialized pointing to {DATA_SERVER_URL}")
else:
logger.error("McpClient is not available. Tools using data_client will not function.")
# --- Define the MCP Tool Functions using the McpClient ---
# Tool 1: Query products from a specific store
@mcp.tool()
async def query_store_products(
store_name: str,
filter_criteria: Optional[Dict[str, Any]] = None,
sort_by: Optional[str] = None,
sort_order: str = 'desc',
limit: int = 10,
fields: Optional[List[str]] = None,
) -> str | List[Dict[str, Any]]:
"""Retrieves product data from a specific store based on filtering and sorting criteria.
Uses a separate Product Data Server to fetch information.
Args:
store_name: The name of the store to search within (e.g., 'aeropress_product_cleaned'). This is required.
filter_criteria: Optional. A dictionary specifying filter criteria. Keys are field names (e.g., 'price', 'rating'). Values are either exact match values (e.g., {'rating': 4.5}) or dictionaries for operators (e.g., {'price': {'operator': '<', 'value': 50.0}}). Supported operators by the data server include '==', '!=', '<', '>', '<=', '>='. Consult data server documentation (or the available_fields tool) for available fields and operators.
sort_by: Optional. The field name to sort the results by (e.g., 'price', 'rating', 'score'). If not specified, the data server might default to 'score' or its own internal default.
sort_order: Optional. The order of sorting ('asc' for ascending, 'desc' for descending). Defaults to 'desc'.
limit: Optional. The maximum number of products to return. Defaults to 10.
fields: Optional. A list of specific fields to include in the results (e.g., ['product_name', 'price', 'product_url']). If not specified, the data server returns all available fields for the matching products.
Returns:
A list of product dictionaries matching the criteria on success.
Returns a string error message if the data server cannot be reached,
the store is not found, or if there's an error during the query.
"""
logger.info(f"Tool call received: query_store_products for store='{store_name}' with filters={filter_criteria}, sort_by={sort_by}, order={sort_order}, limit={limit}, fields={fields}")
if not data_client:
return "Error: Product data client is not initialized. The data server might not be configured correctly."
if not store_name:
return "Error: 'store_name' argument is required for query_store_products."
# Map the tool arguments to the McpClient method parameters
# The McpClient's get_store_products method expects 'query' instead of 'filter_criteria'
client_params = {
"store_name": store_name,
"query": filter_criteria if filter_criteria is not None else {},
"sort_by": sort_by, # Pass None if not specified, let client/server handle default
"sort_order": sort_order,
"limit": limit,
"fields": fields,
"intent": "tool_query_store_products" # Add an intent recognizable by the data server logs
}
try:
# Use the initialized data_client to call the data server
results = data_client.get_store_products(**client_params)
logger.info(f"Tool query_store_products successful: retrieved {len(results)} results for store '{store_name}'.")
return results # McpClient methods return the parsed JSON response (list of dicts)
except ConnectionError as e:
# Catch the specific ConnectionError raised by McpClient for network issues
logger.error(f"Tool query_store_products failed: Connection Error - {e}", exc_info=True)
# Return a user-friendly error message string
return f"Error connecting to the data server at {DATA_SERVER_URL}: {e}"
except requests.exceptions.RequestException as e:
# Catch other HTTP errors (4xx, 5xx) raised by McpClient
logger.error(f"Tool query_store_products failed: HTTP Request Error - {e}", exc_info=True)
error_message = f"Error received from data server ({e.response.status_code if hasattr(e, 'response') and e.response else 'unknown status'}): {e}"
# Try to extract more detail if the response is available and JSON
if hasattr(e, 'response') and e.response is not None:
try:
server_error_details = e.response.json()
# Assuming the server's error payload has an 'error' key
# Append server-provided error message if available
server_msg = server_error_details.get('error', None)
if server_msg:
error_message = f"Data server responded with error: {server_msg}"
else:
# If no 'error' key, include the full response text
error_message += f" Server Response: {e.response.text}"
except json.JSONDecodeError:
# If server response isn't JSON, just append the text
error_message += f" Server Response: {e.response.text}"
return error_message
except Exception as e:
# Catch any other unexpected errors during tool execution
logger.error(f"Tool query_store_products failed: Unexpected Error - {e}", exc_info=True)
return f"An unexpected error occurred while querying products: {e}"
# Tool 2: Get overall top products across all stores
@mcp.tool()
async def get_overall_top_products(
k: int = 10,
sort_by: Optional[str] = None, # Let client/server handle default ('score')
sort_order: str = 'desc',
fields: Optional[List[str]] = None,
) -> str | List[Dict[str, Any]]:
"""Retrieves the top K products across all stores.
Args:
k: The number of top products to return. Defaults to 10. Must be a positive integer.
sort_by: Optional. The field to sort by (e.g., 'score', 'price'). Defaults to 'score' on the data server.
sort_order: Optional. The order of sorting ('asc' or 'desc'). Defaults to 'desc'.
fields: Optional. A list of specific fields to include. Defaults to all available fields.
Returns:
A list of product dictionaries for the top products, or a string error message.
"""
logger.info(f"Tool call received: get_overall_top_products with k={k}, sort_by={sort_by}, order={sort_order}, fields={fields}")
if not data_client:
return "Error: Product data client is not initialized. The data server might not be configured correctly."
if not isinstance(k, int) or k <= 0:
return "Error: 'k' must be a positive integer."
client_params = {
"k": k,
"sort_by": sort_by,
"sort_order": sort_order,
"fields": fields,
"intent": "tool_get_overall_top"
}
try:
results = data_client.get_all_top_products(**client_params)
logger.info(f"Tool get_overall_top_products successful: retrieved {len(results)} results.")
return results
except ConnectionError as e:
logger.error(f"Tool get_overall_top_products failed: Connection Error - {e}", exc_info=True)
return f"Error connecting to the data server at {DATA_SERVER_URL} for top products: {e}"
except requests.exceptions.RequestException as e:
logger.error(f"Tool get_overall_top_products failed: HTTP Request Error - {e}", exc_info=True)
error_message = f"Error received from data server ({e.response.status_code if hasattr(e, 'response') and e.response else 'unknown status'}): {e}"
if hasattr(e, 'response') and e.response is not None:
try:
server_error_details = e.response.json()
server_msg = server_error_details.get('error', None)
if server_msg:
error_message = f"Data server responded with error: {server_msg}"
else:
error_message += f" Server Response: {e.response.text}"
except json.JSONDecodeError:
error_message += f" Server Response: {e.response.text}"
return error_message
except Exception as e:
logger.error(f"Tool get_overall_top_products failed: Unexpected Error - {e}", exc_info=True)
return f"An unexpected error occurred while getting top products: {e}"
# Tool 3: Get details for specific products by URL
@mcp.tool()
async def get_product_details_by_url( # Renamed slightly for clarity with client's method
product_urls: List[str],
fields: Optional[List[str]] = None,
) -> str | List[Dict[str, Any]]:
"""Retrieves detailed information for specific products using their URLs.
Queries the separate Product Data Server. Note: The data server expects product_ids,
but the client library method takes product_urls. Ensure the data server
can correctly map these URLs to its internal product_ids if necessary.
Args:
product_urls: A list of product URLs for which to retrieve details. Required.
fields: Optional. A list of specific fields to include. Defaults to all available fields.
Returns:
A list of product dictionaries for the found products, or a string error message.
"""
logger.info(f"Tool call received: get_product_details_by_url for URLs: {product_urls} with fields: {fields}")
if not data_client:
return "Error: Product data client is not initialized. The data server might not be configured correctly."
if not product_urls or not isinstance(product_urls, list):
return "Error: 'product_urls' must be a non-empty list of strings."
client_params = {
"product_urls": product_urls, # Client method takes product_urls
"fields": fields,
"intent": "tool_get_product_details"
}
try:
# Use the initialized data_client to call the data server
# NOTE: The server endpoint is /get_product_details and expects product_ids.
# The client method get_product_details sends product_urls.
# This might require adjustment in either the client or the server depending on actual implementation.
# For now, we call the client method as provided.
results = data_client.get_product_details(**client_params) # Client method uses 'product_urls' key
logger.info(f"Tool get_product_details_by_url successful: retrieved {len(results)} details out of {len(product_urls)} requested.")
return results
except ConnectionError as e:
logger.error(f"Tool get_product_details_by_url failed: Connection Error - {e}", exc_info=True)
return f"Error connecting to the data server at {DATA_SERVER_URL} for product details: {e}"
except requests.exceptions.RequestException as e:
logger.error(f"Tool get_product_details_by_url failed: HTTP Request Error - {e}", exc_info=True)
error_message = f"Error received from data server ({e.response.status_code if hasattr(e, 'response') and e.response else 'unknown status'}): {e}"
if hasattr(e, 'response') and e.response is not None:
try:
server_error_details = e.response.json()
server_msg = server_error_details.get('error', None)
if server_msg:
error_message = f"Data server responded with error: {server_msg}"
else:
error_message += f" Server Response: {e.response.text}"
except json.JSONDecodeError:
error_message += f" Server Response: {e.response.text}"
return error_message
except Exception as e:
logger.error(f"Tool get_product_details_by_url failed: Unexpected Error - {e}", exc_info=True)
return f"An unexpected error occurred while getting product details: {e}"
# Tool 4: Get available fields/columns for filtering/sorting
@mcp.tool()
async def get_available_fields(
store_name: Optional[str] = None,
) -> str | Dict[str, Any]:
"""Retrieves the list of available data fields (columns) for filtering and sorting.
Queries the separate Product Data Server.
Args:
store_name: Optional. The name of the store to check fields for. If omitted, returns fields available across all stores.
Returns:
A dictionary containing a list of available fields (e.g., {'available_fields': [...]}), or a string error message.
"""
logger.info(f"Tool call received: get_available_fields for store: {store_name}")
if not data_client:
return "Error: Product data client is not initialized. The data server might not be configured correctly."
client_params = {
"store_name": store_name, # Pass None if not specified
"intent": "tool_get_available_fields"
}
try:
# Use the initialized data_client to call the data server
results = data_client.get_available_filters(**client_params) # Client method is get_available_filters
logger.info(f"Tool get_available_fields successful: retrieved fields.")
return results # Should be a dict like {'available_fields': [...]}
except ConnectionError as e:
logger.error(f"Tool get_available_fields failed: Connection Error - {e}", exc_info=True)
return f"Error connecting to the data server at {DATA_SERVER_URL} for available fields: {e}"
except requests.exceptions.RequestException as e:
logger.error(f"Tool get_available_fields failed: HTTP Request Error - {e}", exc_info=True)
error_message = f"Error received from data server ({e.response.status_code if hasattr(e, 'response') and e.response else 'unknown status'}): {e}"
if hasattr(e, 'response') and e.response is not None:
try:
server_error_details = e.response.json()
server_msg = server_error_details.get('error', None)
if server_msg:
error_message = f"Data server responded with error: {server_msg}"
else:
error_message += f" Server Response: {e.response.text}"
except json.JSONDecodeError:
error_message += f" Server Response: {e.response.text}"
return error_message
except Exception as e:
logger.error(f"Tool get_available_fields failed: Unexpected Error - {e}", exc_info=True)
return f"An unexpected error occurred while getting available fields: {e}"
# --- Main block to run the FastMCP Tool Server ---
if __name__ == "__main__":
if not data_client:
logger.error("Skipping FastMCP Tool Server run because McpClient failed to initialize.")
else:
logger.info("Starting FastMCP Tool Server...")
# Initialize and run the tool server
# Use transport='stdio' if it's meant to be run as a subprocess (e.g., by an LLM agent)
# Use transport='http' if you want to expose the tool server itself as an HTTP API
# For typical LLM integration using stdio, keep this as stdio.
# Remember: Your Flask Data Server (mcp_data_server.py) MUST be running separately!
mcp.run(transport='stdio')
logger.info("FastMCP Tool Server stopped.")