# mcp_server/mcp_data_server.py
from flask import Flask, request, jsonify
import pandas as pd
import os
import logging
# --- Configuration ---
# IMPORTANT: Set this path to where your data CSVs are located relative to THIS script
# If your data dir is 'your_project_directory/data/' and server is 'your_project_directory/mcp_server/'
# then DATA_DIR = '../data'
DATA_DIR = os.path.join(os.path.dirname(__file__), './data') # Adjust this path as needed!
CSV_FILE_PATTERN = "*.csv" # Pattern to find your store data files (e.g., "*.csv", "*_data.csv")
STORE_NAME_FROM_FILENAME = lambda filename: filename.replace('.csv', '').replace('_data', '') # Function to extract store name from filename
# --- Flask App Setup ---
app = Flask(__name__)
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Optional: Configure logging to a file
# file_handler = logging.FileHandler('mcp_server.log')
# file_handler.setLevel(logging.INFO)
# formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# file_handler.setFormatter(formatter)
# app.logger.addHandler(file_handler) # Use Flask's built-in logger
logging.info("MCP Data Server initializing...")
# --- Data Storage ---
# Dictionary to hold pandas DataFrames, keyed by store name
store_data = {}
# --- Data Loading Function ---
def load_data():
"""Loads scored data from CSVs into pandas DataFrames."""
store_data.clear() # Clear existing data if reloading
logging.info(f"Loading data from directory: {DATA_DIR}")
if not os.path.exists(DATA_DIR):
logging.error(f"Data directory not found: {DATA_DIR}")
return
from glob import glob # Import glob here
csv_files = glob(os.path.join(DATA_DIR, CSV_FILE_PATTERN))
if not csv_files:
logging.warning(f"No files found matching pattern '{CSV_FILE_PATTERN}' in {DATA_DIR}")
return
for filepath in csv_files:
filename = os.path.basename(filepath)
store_name = STORE_NAME_FROM_FILENAME(filename)
try:
df = pd.read_csv(filepath)
# Optional: Ensure basic columns are present
if 'score' not in df.columns:
logging.warning(f"'score' column not found in {filename}. Skipping.")
continue
if df.empty:
logging.warning(f"CSV file {filename} is empty. Skipping.")
continue
store_data[store_name] = df
logging.info(f"Loaded data for store '{store_name}' ({len(df)} products) from {filename}")
except Exception as e:
logging.error(f"Error loading {filepath}: {e}")
logging.info(f"Finished data loading. Loaded data for {len(store_data)} stores.")
# Load data when the server starts
load_data()
# --- Basic Server Status Endpoint ---
@app.route('/')
def status():
"""Simple endpoint to check if the server is running."""
return jsonify({"status": "MCP Data Server is running", "loaded_stores_count": len(store_data)})
# --- Placeholder for MCP Methods ---
# We will add these in the next steps
# @app.route('/get_store_products', methods=['POST'])
# ... etc.
# mcp_server/mcp_data_server.py (Continue from Step 2)
# --- Implement MCP Methods ---
@app.route('/get_store_products', methods=['POST'])
def get_store_products():
"""
MCP Method to get product data from a specific store based on parameters.
Parameters:
- store_name (str): The name of the store. (Required)
- query (dict): Filtering criteria (e.g., {"price": {"operator": "<", "value": 100}}). (Optional)
- sort_by (str): Field to sort by (e.g., 'score'). (Optional, default 'score')
- sort_order (str): 'asc' or 'desc'. (Optional, default 'desc')
- limit (int): Maximum number of results. (Optional, default 10)
- fields (list): List of fields to return. (Optional, default all)
- intent (str): Client's purpose for calling this method. (Optional, for logging/auditing)
"""
params = request.json
# Log the request including intent and parameters
intent = params.get('intent', 'unspecified_intent')
logging.info(f"Received request: get_store_products (Intent: {intent}) with params: {params}")
store_name = params.get('store_name')
query = params.get('query', {})
sort_by = params.get('sort_by', 'score')
sort_order = params.get('sort_order', 'desc')
limit = params.get('limit', 10)
fields = params.get('fields') # List of desired data fields
# --- Basic Parameter Validation ---
if not store_name:
logging.warning(f"get_store_products: Missing 'store_name' parameter (Intent: {intent}).")
return jsonify({"error": "Missing 'store_name' parameter"}), 400 # Bad Request
if store_name not in store_data:
logging.warning(f"get_store_products: Data for store '{store_name}' not found (Intent: {intent}).")
# You could try load_data() here again if files might be added while server is running
return jsonify({"error": f"Data for store '{store_name}' not found"}), 404 # Not Found
df = store_data[store_name].copy() # Work on a copy
# --- Apply Filtering based on 'query' dictionary ---
# Expected query format: {"field1": {"operator": "op", "value": val}, "field2": val2 (for ==)}
try:
for field, criteria in query.items():
if field not in df.columns:
logging.warning(f"get_store_products: Filter field '{field}' not found for store '{store_name}' (Intent: {intent}). Skipping filter.")
continue # Skip filter if field doesn't exist
if isinstance(criteria, dict) and 'operator' in criteria and 'value' in criteria:
op = criteria['operator']
val = criteria['value']
# Apply pandas filter based on operator
if op == '==':
df = df[df[field] == val]
elif op == '!=':
df = df[df[field] != val]
elif op == '<':
# Handle potential type errors if column isn't numeric
try:
df = df[df[field] < val]
except TypeError:
logging.warning(f"get_store_products: Cannot apply '<' filter on non-numeric field '{field}' (Intent: {intent}).")
# Decide whether to skip, error, or filter rows where type is wrong
pass # For now, just log and skip this filter
elif op == '>':
try:
df = df[df[field] > val]
except TypeError:
logging.warning(f"get_store_products: Cannot apply '>' filter on non-numeric field '{field}' (Intent: {intent}).")
pass
elif op == '<=':
try:
df = df[df[field] <= val]
except TypeError:
logging.warning(f"get_store_products: Cannot apply '<=' filter on non-numeric field '{field}' (Intent: {intent}).")
pass
elif op == '>=':
try:
df = df[df[field] >= val]
except TypeError:
logging.warning(f"get_store_products: Cannot apply '>=' filter on non-numeric field '{field}' (Intent: {intent}).")
pass
# Add more operators like 'in' (for lists), 'contains' (for strings) if needed
else:
logging.warning(f"get_store_products: Unsupported operator '{op}' for field '{field}' (Intent: {intent}).")
# Decide whether to skip or return an error
# return jsonify({"error": f"Unsupported operator '{op}' for field '{field}'"}), 400
pass # For now, just log and skip this filter
else: # Simple equality filter assumed if not a dict with operator/value
df = df[df[field] == criteria]
except Exception as e:
logging.error(f"get_store_products: Error applying filters for store '{store_name}' (Intent: {intent}): {e}", exc_info=True)
return jsonify({"error": f"Error applying query filters: {e}"}), 400 # Bad Request due to invalid query
# --- Apply Sorting ---
if sort_by in df.columns:
ascending = sort_order.lower() == 'asc'
df = df.sort_values(by=sort_by, ascending=ascending)
else:
logging.warning(f"get_store_products: Sort field '{sort_by}' not found for store '{store_name}' (Intent: {intent}). Skipping sort.")
# You might default to sorting by score if sort_by is invalid or doesn't exist
# if 'score' in df.columns:
# df = df.sort_values(by='score', ascending=sort_order.lower() == 'asc')
# --- Apply Limit ---
# Ensure limit is a positive integer
try:
limit = int(limit)
if limit > 0:
df = df.head(limit)
else:
logging.warning(f"get_store_products: Invalid limit value {limit} (Intent: {intent}). Skipping limit.")
except (ValueError, TypeError):
logging.warning(f"get_store_products: Invalid limit type (Intent: {intent}). Skipping limit.")
pass # Skip limit if invalid type
# --- Apply Field Selection ---
if fields is not None: # Use is not None to distinguish from empty list
valid_fields = [field for field in fields if field in df.columns]
result_df = df[valid_fields]
if len(valid_fields) < len(fields):
missing = set(fields) - set(valid_fields)
logging.warning(f"get_store_products: Some requested fields not found for store '{store_name}' (Intent: {intent}): {missing}")
# If fields is an empty list [], result_df will be a DataFrame with 0 columns.
# If valid_fields is empty after checking, it means none of the requested fields exist.
# You might want to handle this case, maybe return an error or just return empty records.
# Current behavior: if fields is [], result_df has no columns; if requested fields don't exist, result_df also has no columns.
# Converting to dict(orient='records') handles this gracefully returning list of empty dicts or dicts with only valid fields.
else:
# If fields is None, return all columns
result_df = df
# --- Convert to JSON-friendly format ---
result = result_df.to_dict(orient='records')
logging.info(f"get_store_products: Successfully retrieved {len(result)} results for '{store_name}' (Intent: {intent}).")
return jsonify(result)
# mcp_server/mcp_data_server.py (Continue from Step 3)
@app.route('/get_all_top_products', methods=['POST'])
def get_all_top_products():
"""
MCP Method to aggregate and get Top-K products across all loaded stores,
ranked by the pre-calculated 'score'.
Parameters:
- k (int): Number of top products to return overall. (Optional, default 10)
- sort_by (str): Field to sort by (Optional, default 'score') - often ignored here as intent is 'top'
- sort_order (str): 'asc' or 'desc'. (Optional, default 'desc' for top)
- fields (list): List of fields to return. (Optional, default all)
- intent (str): Client's purpose for calling this method. (Optional)
"""
params = request.json
intent = params.get('intent', 'unspecified_intent')
logging.info(f"Received request: get_all_top_products (Intent: {intent}) with params: {params}")
k = params.get('k', 10)
sort_by = params.get('sort_by', 'score') # Still respect sort_by if not 'score'
sort_order = params.get('sort_order', 'desc')
fields = params.get('fields')
if not store_data:
logging.warning(f"get_all_top_products: No store data loaded (Intent: {intent}).")
return jsonify({"error": "No store data available"}), 404
# Concatenate all dataframes that have a 'score' column
dfs_with_score = [df for df in store_data.values() if 'score' in df.columns and not df.empty]
if not dfs_with_score:
logging.warning(f"get_all_top_products: No store data with 'score' column found (Intent: {intent}).")
return jsonify({"error": "No scored data available"}), 404
all_products_df = pd.concat(dfs_with_score, ignore_index=True)
# Ensure k is a positive integer
try:
k = int(k)
if k <= 0:
logging.warning(f"get_all_top_products: Invalid k value {k} (Intent: {intent}). Returning empty list.")
return jsonify([]) # Return empty if k is non-positive
except (ValueError, TypeError):
logging.warning(f"get_all_top_products: Invalid k type (Intent: {intent}). Defaulting to k=10.")
k = 10 # Default if invalid
# Get the top k overall based on the score or requested sort_by
sort_col = sort_by if sort_by in all_products_df.columns else 'score'
if sort_col != sort_by:
logging.warning(f"get_all_top_products: Sort field '{sort_by}' not found in aggregated data, sorting by '{sort_col}' instead (Intent: {intent}).")
ascending = sort_order.lower() == 'asc'
# Use drop_duplicates on product_id to ensure unique products in the overall top list
# Assuming 'product_id' is the unique identifier across stores
if 'product_id' in all_products_df.columns:
top_overall_df = all_products_df.sort_values(by=sort_col, ascending=ascending).drop_duplicates(subset=['product_id']).head(k)
else:
logging.warning(f"get_all_top_products: 'product_id' column not found for deduplication (Intent: {intent}). Results may contain duplicates.")
top_overall_df = all_products_df.sort_values(by=sort_col, ascending=ascending).head(k)
# Apply field selection
if fields is not None:
valid_fields = [field for field in fields if field in top_overall_df.columns]
result_df = top_overall_df[valid_fields]
if len(valid_fields) < len(fields):
missing = set(fields) - set(valid_fields)
logging.warning(f"get_all_top_products: Some requested fields not found in aggregated data (Intent: {intent}): {missing}")
else:
result_df = top_overall_df # Include all fields including score
result = result_df.to_dict(orient='records')
logging.info(f"get_all_top_products: Successfully retrieved {len(result)} overall products (Intent: {intent}).")
return jsonify(result)
@app.route('/get_product_details', methods=['POST'])
def get_product_details():
"""
MCP Method to get detailed information for specific products by ID.
Searches across all loaded store data.
Parameters:
- product_ids (list): List of product IDs to retrieve. (Required)
- fields (list): List of fields to return. (Optional, default all)
- intent (str): Client's purpose. (Optional)
"""
params = request.json
intent = params.get('intent', 'unspecified_intent')
logging.info(f"Received request: get_product_details (Intent: {intent}) with params: {params}")
product_ids = params.get('product_ids')
fields = params.get('fields')
if not product_ids or not isinstance(product_ids, list):
logging.warning(f"get_product_details: Missing or invalid 'product_ids' parameter (Intent: {intent}).")
return jsonify({"error": "Missing or invalid 'product_ids' parameter (must be a list)"}), 400
if not store_data:
logging.warning(f"get_product_details: No store data loaded (Intent: {intent}).")
return jsonify({"error": "No store data available"}), 404
# Aggregate all data into one DataFrame for searching
# Consider performance if you have A LOT of data - might need a better lookup structure
dfs_with_id = [df for df in store_data.values() if 'product_id' in df.columns and not df.empty]
if not dfs_with_id:
logging.warning(f"get_product_details: No store data with 'product_id' column found (Intent: {intent}).")
return jsonify({"error": "No product data with IDs available"}), 404
all_products_df = pd.concat(dfs_with_id, ignore_index=True)
# Filter for the requested product IDs
# Use .isin() for efficiency with lists of IDs
requested_products_df = all_products_df[all_products_df['product_id'].isin(product_ids)].drop_duplicates(subset=['product_id'])
# Apply field selection
if fields is not None:
valid_fields = [field for field in fields if field in requested_products_df.columns]
result_df = requested_products_df[valid_fields]
if len(valid_fields) < len(fields):
missing = set(fields) - set(valid_fields)
logging.warning(f"get_product_details: Some requested fields not found for products (Intent: {intent}): {missing}")
else:
result_df = requested_products_df # Include all fields
result = result_df.to_dict(orient='records')
logging.info(f"get_product_details: Retrieved {len(result)} out of {len(product_ids)} requested products (Intent: {intent}).")
# You might compare len(result) to len(product_ids) and warn if some were not found
return jsonify(result)
@app.route('/get_available_filters', methods=['POST'])
def get_available_filters():
"""
MCP Method to get the list of available columns (fields) for filtering/sorting
for a specific store or aggregated data.
Parameters:
- store_name (str): The name of the store (Optional - if omitted, returns aggregated columns).
- intent (str): Client's purpose. (Optional)
"""
params = request.json
intent = params.get('intent', 'unspecified_intent')
logging.info(f"Received request: get_available_filters (Intent: {intent}) with params: {params}")
store_name = params.get('store_name')
df_to_check = None
if store_name:
if store_name in store_data:
df_to_check = store_data[store_name]
logging.info(f"get_available_filters: Checking columns for store '{store_name}' (Intent: {intent}).")
else:
logging.warning(f"get_available_filters: Store '{store_name}' not found (Intent: {intent}).")
return jsonify({"error": f"Data for store '{store_name}' not found"}), 404
else:
# If no store_name, return columns available in the aggregated data
if not store_data:
logging.warning(f"get_available_filters: No store data loaded to check aggregated columns (Intent: {intent}).")
return jsonify({"error": "No store data available"}), 404
dfs = [df for df in store_data.values() if not df.empty]
if not dfs:
logging.warning(f"get_available_filters: No non-empty store data loaded to check aggregated columns (Intent: {intent}).")
return jsonify({"error": "No non-empty store data available"}), 404
all_cols = pd.concat(dfs, ignore_index=True).columns.tolist()
logging.info(f"get_available_filters: Checking columns for aggregated data (Intent: {intent}).")
return jsonify({"available_fields": sorted(list(set(all_cols)))}) # Return unique sorted columns
if df_to_check is not None:
available_fields = df_to_check.columns.tolist()
logging.info(f"get_available_filters: Found {len(available_fields)} fields for store '{store_name}' (Intent: {intent}).")
return jsonify({"available_fields": sorted(available_fields)})
else:
logging.error(f"get_available_filters: Unexpected state - df_to_check is None (Intent: {intent}).")
return jsonify({"error": "Internal server error retrieving fields"}), 500
# mcp_server/mcp_data_server.py (Continue from Step 4)
# --- Server Runner ---
if __name__ == '__main__':
# Set host='0.0.0.0' to make the server accessible from outside localhost
# This is necessary if your client code runs in a different container or machine
# Set port=5001 (or any other free port)
# Set debug=True for development (auto-reloads on code changes, provides debug info)
# Set debug=False and use a production-ready WSGI server (like Gunicorn or uWSGI) for deployment
logging.info("Starting Flask server...")
app.run(debug=True, host='0.0.0.0', port=5001)
logging.info("Flask server stopped.")