"""
E-commerce MCP Server
A FastMCP server for managing products, customers, and orders.
Provides full CRUD operations and data analysis capabilities for business users.
"""
import os
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
# Ensure the script's directory is in the path for imports
SCRIPT_DIR = Path(__file__).parent.resolve()
sys.path.insert(0, str(SCRIPT_DIR))
from fastmcp import FastMCP
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from models import (
Product,
Customer,
Order,
OrderItem,
init_db,
get_session,
get_engine,
)
# Initialize the MCP server
mcp = FastMCP(
"E-commerce Server",
instructions="""
This MCP server helps business users manage their e-commerce operations.
Available operations:
- Products: Add, view, update, delete products and manage inventory
- Customers: Add, view, update, delete customer records
- Orders: Create orders, add items, update status, view order history
- Analytics: Get sales reports, inventory status, customer insights
Use natural language to interact with the system.
""",
)
# Database path - can be configured via environment variable
# Default to a file in the same directory as the script
DEFAULT_DB_PATH = str(SCRIPT_DIR / "ecommerce.db")
DB_PATH = os.environ.get("ECOMMERCE_DB_PATH", DEFAULT_DB_PATH)
# Initialize database on startup
engine = init_db(DB_PATH)
def get_db_session():
"""Get a database session."""
return get_session(engine)
# =============================================================================
# PRODUCT MANAGEMENT TOOLS
# =============================================================================
@mcp.tool()
def add_product(
name: str,
sku: str,
price: float,
description: Optional[str] = None,
cost: Optional[float] = None,
quantity_in_stock: int = 0,
category: Optional[str] = None,
) -> dict:
"""
Add a new product to the inventory.
Args:
name: Product name
sku: Stock Keeping Unit (unique identifier)
price: Selling price
description: Product description
cost: Product cost (for profit calculation)
quantity_in_stock: Initial stock quantity
category: Product category
Returns:
The created product details
"""
session = get_db_session()
try:
# Check if SKU already exists
existing = session.query(Product).filter(Product.sku == sku).first()
if existing:
return {"error": f"Product with SKU '{sku}' already exists"}
product = Product(
name=name,
sku=sku,
price=price,
description=description,
cost=cost,
quantity_in_stock=quantity_in_stock,
category=category,
)
session.add(product)
session.commit()
session.refresh(product)
return {"success": True, "product": product.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def get_product(product_id: Optional[int] = None, sku: Optional[str] = None) -> dict:
"""
Get a product by ID or SKU.
Args:
product_id: Product ID
sku: Product SKU
Returns:
Product details or error message
"""
if not product_id and not sku:
return {"error": "Please provide either product_id or sku"}
session = get_db_session()
try:
if product_id:
product = session.query(Product).filter(Product.id == product_id).first()
else:
product = session.query(Product).filter(Product.sku == sku).first()
if not product:
return {"error": "Product not found"}
return {"product": product.to_dict()}
finally:
session.close()
@mcp.tool()
def list_products(
category: Optional[str] = None,
active_only: bool = True,
low_stock_threshold: Optional[int] = None,
limit: int = 100,
offset: int = 0,
) -> dict:
"""
List products with optional filtering.
Args:
category: Filter by category
active_only: Only show active products
low_stock_threshold: Only show products with stock below this threshold
limit: Maximum number of products to return
offset: Number of products to skip
Returns:
List of products matching the criteria
"""
session = get_db_session()
try:
query = session.query(Product)
if active_only:
query = query.filter(Product.is_active == True)
if category:
query = query.filter(Product.category == category)
if low_stock_threshold is not None:
query = query.filter(Product.quantity_in_stock < low_stock_threshold)
total = query.count()
products = query.offset(offset).limit(limit).all()
return {
"total": total,
"limit": limit,
"offset": offset,
"products": [p.to_dict() for p in products],
}
finally:
session.close()
@mcp.tool()
def update_product(
product_id: int,
name: Optional[str] = None,
description: Optional[str] = None,
price: Optional[float] = None,
cost: Optional[float] = None,
quantity_in_stock: Optional[int] = None,
category: Optional[str] = None,
is_active: Optional[bool] = None,
) -> dict:
"""
Update an existing product.
Args:
product_id: ID of the product to update
name: New product name
description: New description
price: New price
cost: New cost
quantity_in_stock: New stock quantity
category: New category
is_active: Active status
Returns:
Updated product details
"""
session = get_db_session()
try:
product = session.query(Product).filter(Product.id == product_id).first()
if not product:
return {"error": "Product not found"}
if name is not None:
product.name = name
if description is not None:
product.description = description
if price is not None:
product.price = price
if cost is not None:
product.cost = cost
if quantity_in_stock is not None:
product.quantity_in_stock = quantity_in_stock
if category is not None:
product.category = category
if is_active is not None:
product.is_active = is_active
session.commit()
session.refresh(product)
return {"success": True, "product": product.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def delete_product(product_id: int) -> dict:
"""
Delete a product (soft delete - sets is_active to False).
Args:
product_id: ID of the product to delete
Returns:
Success message or error
"""
session = get_db_session()
try:
product = session.query(Product).filter(Product.id == product_id).first()
if not product:
return {"error": "Product not found"}
product.is_active = False
session.commit()
return {"success": True, "message": f"Product '{product.name}' has been deactivated"}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def update_stock(product_id: int, quantity_change: int, reason: Optional[str] = None) -> dict:
"""
Update product stock quantity (add or subtract).
Args:
product_id: Product ID
quantity_change: Amount to add (positive) or subtract (negative)
reason: Reason for stock change
Returns:
Updated stock information
"""
session = get_db_session()
try:
product = session.query(Product).filter(Product.id == product_id).first()
if not product:
return {"error": "Product not found"}
old_quantity = product.quantity_in_stock
new_quantity = old_quantity + quantity_change
if new_quantity < 0:
return {"error": f"Cannot reduce stock below 0. Current stock: {old_quantity}"}
product.quantity_in_stock = new_quantity
session.commit()
return {
"success": True,
"product_id": product_id,
"product_name": product.name,
"old_quantity": old_quantity,
"change": quantity_change,
"new_quantity": new_quantity,
"reason": reason,
}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
# =============================================================================
# CUSTOMER MANAGEMENT TOOLS
# =============================================================================
@mcp.tool()
def add_customer(
email: str,
first_name: str,
last_name: str,
phone: Optional[str] = None,
address: Optional[str] = None,
city: Optional[str] = None,
state: Optional[str] = None,
postal_code: Optional[str] = None,
country: Optional[str] = None,
) -> dict:
"""
Add a new customer.
Args:
email: Customer email (unique)
first_name: Customer first name
last_name: Customer last name
phone: Phone number
address: Street address
city: City
state: State/Province
postal_code: Postal/ZIP code
country: Country
Returns:
Created customer details
"""
session = get_db_session()
try:
# Check if email already exists
existing = session.query(Customer).filter(Customer.email == email).first()
if existing:
return {"error": f"Customer with email '{email}' already exists"}
customer = Customer(
email=email,
first_name=first_name,
last_name=last_name,
phone=phone,
address=address,
city=city,
state=state,
postal_code=postal_code,
country=country,
)
session.add(customer)
session.commit()
session.refresh(customer)
return {"success": True, "customer": customer.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def get_customer(customer_id: Optional[int] = None, email: Optional[str] = None) -> dict:
"""
Get a customer by ID or email.
Args:
customer_id: Customer ID
email: Customer email
Returns:
Customer details or error
"""
if not customer_id and not email:
return {"error": "Please provide either customer_id or email"}
session = get_db_session()
try:
if customer_id:
customer = session.query(Customer).filter(Customer.id == customer_id).first()
else:
customer = session.query(Customer).filter(Customer.email == email).first()
if not customer:
return {"error": "Customer not found"}
return {"customer": customer.to_dict()}
finally:
session.close()
@mcp.tool()
def list_customers(
active_only: bool = True,
city: Optional[str] = None,
country: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> dict:
"""
List customers with optional filtering.
Args:
active_only: Only show active customers
city: Filter by city
country: Filter by country
limit: Maximum number of customers to return
offset: Number of customers to skip
Returns:
List of customers
"""
session = get_db_session()
try:
query = session.query(Customer)
if active_only:
query = query.filter(Customer.is_active == True)
if city:
query = query.filter(Customer.city == city)
if country:
query = query.filter(Customer.country == country)
total = query.count()
customers = query.offset(offset).limit(limit).all()
return {
"total": total,
"limit": limit,
"offset": offset,
"customers": [c.to_dict() for c in customers],
}
finally:
session.close()
@mcp.tool()
def update_customer(
customer_id: int,
email: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
phone: Optional[str] = None,
address: Optional[str] = None,
city: Optional[str] = None,
state: Optional[str] = None,
postal_code: Optional[str] = None,
country: Optional[str] = None,
is_active: Optional[bool] = None,
) -> dict:
"""
Update an existing customer.
Args:
customer_id: ID of the customer to update
email: New email
first_name: New first name
last_name: New last name
phone: New phone
address: New address
city: New city
state: New state
postal_code: New postal code
country: New country
is_active: Active status
Returns:
Updated customer details
"""
session = get_db_session()
try:
customer = session.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
return {"error": "Customer not found"}
if email is not None:
# Check if new email is taken by another customer
existing = session.query(Customer).filter(
Customer.email == email, Customer.id != customer_id
).first()
if existing:
return {"error": f"Email '{email}' is already in use"}
customer.email = email
if first_name is not None:
customer.first_name = first_name
if last_name is not None:
customer.last_name = last_name
if phone is not None:
customer.phone = phone
if address is not None:
customer.address = address
if city is not None:
customer.city = city
if state is not None:
customer.state = state
if postal_code is not None:
customer.postal_code = postal_code
if country is not None:
customer.country = country
if is_active is not None:
customer.is_active = is_active
session.commit()
session.refresh(customer)
return {"success": True, "customer": customer.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def delete_customer(customer_id: int) -> dict:
"""
Delete a customer (soft delete - sets is_active to False).
Args:
customer_id: ID of the customer to delete
Returns:
Success message or error
"""
session = get_db_session()
try:
customer = session.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
return {"error": "Customer not found"}
customer.is_active = False
session.commit()
return {
"success": True,
"message": f"Customer '{customer.first_name} {customer.last_name}' has been deactivated",
}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
# =============================================================================
# ORDER MANAGEMENT TOOLS
# =============================================================================
def generate_order_number() -> str:
"""Generate a unique order number."""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:6].upper()
return f"ORD-{timestamp}-{unique_id}"
@mcp.tool()
def create_order(
customer_id: int,
shipping_address: Optional[str] = None,
notes: Optional[str] = None,
) -> dict:
"""
Create a new order for a customer.
Args:
customer_id: ID of the customer placing the order
shipping_address: Shipping address (defaults to customer address)
notes: Order notes
Returns:
Created order details
"""
session = get_db_session()
try:
customer = session.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
return {"error": "Customer not found"}
if not shipping_address:
# Use customer's address as default
addr_parts = [customer.address, customer.city, customer.state, customer.postal_code, customer.country]
shipping_address = ", ".join(filter(None, addr_parts)) or None
order = Order(
customer_id=customer_id,
order_number=generate_order_number(),
shipping_address=shipping_address,
notes=notes,
status="pending",
)
session.add(order)
session.commit()
session.refresh(order)
return {"success": True, "order": order.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def add_order_item(
order_id: int,
product_id: int,
quantity: int,
) -> dict:
"""
Add an item to an existing order.
Args:
order_id: ID of the order
product_id: ID of the product to add
quantity: Quantity to order
Returns:
Updated order details
"""
session = get_db_session()
try:
order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first()
if not order:
return {"error": "Order not found"}
if order.status not in ["pending", "confirmed"]:
return {"error": f"Cannot modify order with status '{order.status}'"}
product = session.query(Product).filter(Product.id == product_id).first()
if not product:
return {"error": "Product not found"}
if not product.is_active:
return {"error": "Product is not available"}
if product.quantity_in_stock < quantity:
return {"error": f"Insufficient stock. Available: {product.quantity_in_stock}"}
# Check if item already exists in order
existing_item = next((item for item in order.items if item.product_id == product_id), None)
if existing_item:
existing_item.quantity += quantity
else:
order_item = OrderItem(
order_id=order_id,
product_id=product_id,
quantity=quantity,
unit_price=product.price,
)
session.add(order_item)
# Update order total
session.flush()
order.total_amount = sum(item.quantity * item.unit_price for item in order.items)
session.commit()
session.refresh(order)
return {"success": True, "order": order.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def remove_order_item(order_id: int, product_id: int) -> dict:
"""
Remove an item from an order.
Args:
order_id: ID of the order
product_id: ID of the product to remove
Returns:
Updated order details
"""
session = get_db_session()
try:
order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first()
if not order:
return {"error": "Order not found"}
if order.status not in ["pending", "confirmed"]:
return {"error": f"Cannot modify order with status '{order.status}'"}
item_to_remove = next((item for item in order.items if item.product_id == product_id), None)
if not item_to_remove:
return {"error": "Item not found in order"}
session.delete(item_to_remove)
# Update order total
session.flush()
order.total_amount = sum(item.quantity * item.unit_price for item in order.items if item.product_id != product_id)
session.commit()
session.refresh(order)
return {"success": True, "order": order.to_dict()}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
@mcp.tool()
def get_order(order_id: Optional[int] = None, order_number: Optional[str] = None) -> dict:
"""
Get an order by ID or order number.
Args:
order_id: Order ID
order_number: Order number
Returns:
Order details with items
"""
if not order_id and not order_number:
return {"error": "Please provide either order_id or order_number"}
session = get_db_session()
try:
query = session.query(Order).options(joinedload(Order.items).joinedload(OrderItem.product))
if order_id:
order = query.filter(Order.id == order_id).first()
else:
order = query.filter(Order.order_number == order_number).first()
if not order:
return {"error": "Order not found"}
return {"order": order.to_dict()}
finally:
session.close()
@mcp.tool()
def list_orders(
customer_id: Optional[int] = None,
status: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> dict:
"""
List orders with optional filtering.
Args:
customer_id: Filter by customer
status: Filter by status (pending, confirmed, shipped, delivered, cancelled)
start_date: Filter orders from this date (YYYY-MM-DD)
end_date: Filter orders until this date (YYYY-MM-DD)
limit: Maximum number of orders to return
offset: Number of orders to skip
Returns:
List of orders
"""
session = get_db_session()
try:
query = session.query(Order).options(joinedload(Order.items))
if customer_id:
query = query.filter(Order.customer_id == customer_id)
if status:
query = query.filter(Order.status == status)
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
query = query.filter(Order.created_at >= start)
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(Order.created_at <= end)
total = query.count()
orders = query.order_by(Order.created_at.desc()).offset(offset).limit(limit).all()
return {
"total": total,
"limit": limit,
"offset": offset,
"orders": [o.to_dict() for o in orders],
}
finally:
session.close()
@mcp.tool()
def update_order_status(
order_id: int,
status: str,
notes: Optional[str] = None,
) -> dict:
"""
Update the status of an order.
Args:
order_id: ID of the order
status: New status (pending, confirmed, shipped, delivered, cancelled)
notes: Additional notes about the status change
Returns:
Updated order details
"""
valid_statuses = ["pending", "confirmed", "shipped", "delivered", "cancelled"]
if status not in valid_statuses:
return {"error": f"Invalid status. Valid statuses: {', '.join(valid_statuses)}"}
session = get_db_session()
try:
order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first()
if not order:
return {"error": "Order not found"}
old_status = order.status
# Handle stock adjustments
if status == "confirmed" and old_status == "pending":
# Reduce stock when order is confirmed
for item in order.items:
product = session.query(Product).filter(Product.id == item.product_id).first()
if product.quantity_in_stock < item.quantity:
return {"error": f"Insufficient stock for product '{product.name}'"}
product.quantity_in_stock -= item.quantity
elif status == "cancelled" and old_status in ["pending", "confirmed"]:
# Restore stock when order is cancelled
if old_status == "confirmed":
for item in order.items:
product = session.query(Product).filter(Product.id == item.product_id).first()
product.quantity_in_stock += item.quantity
order.status = status
if notes:
order.notes = (order.notes or "") + f"\n[{datetime.now().isoformat()}] Status changed to {status}: {notes}"
session.commit()
session.refresh(order)
return {
"success": True,
"old_status": old_status,
"new_status": status,
"order": order.to_dict(),
}
except Exception as e:
session.rollback()
return {"error": str(e)}
finally:
session.close()
# =============================================================================
# DATA ANALYSIS & REPORTING TOOLS
# =============================================================================
@mcp.tool()
def get_sales_summary(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> dict:
"""
Get a summary of sales for a given period.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Sales summary including total revenue, order count, average order value
"""
session = get_db_session()
try:
query = session.query(Order).filter(Order.status.in_(["confirmed", "shipped", "delivered"]))
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
query = query.filter(Order.created_at >= start)
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(Order.created_at <= end)
orders = query.all()
total_revenue = sum(o.total_amount for o in orders)
order_count = len(orders)
avg_order_value = total_revenue / order_count if order_count > 0 else 0
# Status breakdown
status_counts = {}
for order in orders:
status_counts[order.status] = status_counts.get(order.status, 0) + 1
return {
"period": {
"start_date": start_date or "all time",
"end_date": end_date or "present",
},
"total_revenue": round(total_revenue, 2),
"order_count": order_count,
"average_order_value": round(avg_order_value, 2),
"status_breakdown": status_counts,
}
finally:
session.close()
@mcp.tool()
def get_top_products(
limit: int = 10,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> dict:
"""
Get top selling products by quantity and revenue.
Args:
limit: Number of top products to return
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of top products with sales metrics
"""
session = get_db_session()
try:
query = session.query(
Product.id,
Product.name,
Product.sku,
Product.category,
func.sum(OrderItem.quantity).label("total_quantity"),
func.sum(OrderItem.quantity * OrderItem.unit_price).label("total_revenue"),
).join(
OrderItem, Product.id == OrderItem.product_id
).join(
Order, OrderItem.order_id == Order.id
).filter(
Order.status.in_(["confirmed", "shipped", "delivered"])
)
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
query = query.filter(Order.created_at >= start)
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(Order.created_at <= end)
results = query.group_by(Product.id).order_by(func.sum(OrderItem.quantity * OrderItem.unit_price).desc()).limit(limit).all()
top_products = [
{
"product_id": r.id,
"name": r.name,
"sku": r.sku,
"category": r.category,
"total_quantity_sold": r.total_quantity or 0,
"total_revenue": round(r.total_revenue or 0, 2),
}
for r in results
]
return {
"period": {
"start_date": start_date or "all time",
"end_date": end_date or "present",
},
"top_products": top_products,
}
finally:
session.close()
@mcp.tool()
def get_top_customers(
limit: int = 10,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> dict:
"""
Get top customers by total spend and order count.
Args:
limit: Number of top customers to return
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of top customers with purchase metrics
"""
session = get_db_session()
try:
query = session.query(
Customer.id,
Customer.email,
Customer.first_name,
Customer.last_name,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_spent"),
).join(
Order, Customer.id == Order.customer_id
).filter(
Order.status.in_(["confirmed", "shipped", "delivered"])
)
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
query = query.filter(Order.created_at >= start)
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(Order.created_at <= end)
results = query.group_by(Customer.id).order_by(func.sum(Order.total_amount).desc()).limit(limit).all()
top_customers = [
{
"customer_id": r.id,
"email": r.email,
"name": f"{r.first_name} {r.last_name}",
"order_count": r.order_count or 0,
"total_spent": round(r.total_spent or 0, 2),
}
for r in results
]
return {
"period": {
"start_date": start_date or "all time",
"end_date": end_date or "present",
},
"top_customers": top_customers,
}
finally:
session.close()
@mcp.tool()
def get_inventory_report(
include_inactive: bool = False,
low_stock_threshold: int = 10,
) -> dict:
"""
Get inventory status report.
Args:
include_inactive: Include inactive products
low_stock_threshold: Threshold for low stock warning
Returns:
Inventory report with stock levels and values
"""
session = get_db_session()
try:
query = session.query(Product)
if not include_inactive:
query = query.filter(Product.is_active == True)
products = query.all()
total_products = len(products)
total_stock_value = sum((p.cost or p.price) * p.quantity_in_stock for p in products)
total_retail_value = sum(p.price * p.quantity_in_stock for p in products)
out_of_stock = [p.to_dict() for p in products if p.quantity_in_stock == 0]
low_stock = [p.to_dict() for p in products if 0 < p.quantity_in_stock < low_stock_threshold]
# Category breakdown
categories = {}
for p in products:
cat = p.category or "Uncategorized"
if cat not in categories:
categories[cat] = {"count": 0, "total_stock": 0, "total_value": 0}
categories[cat]["count"] += 1
categories[cat]["total_stock"] += p.quantity_in_stock
categories[cat]["total_value"] += p.price * p.quantity_in_stock
return {
"summary": {
"total_products": total_products,
"total_stock_value": round(total_stock_value, 2),
"total_retail_value": round(total_retail_value, 2),
"out_of_stock_count": len(out_of_stock),
"low_stock_count": len(low_stock),
},
"out_of_stock_products": out_of_stock,
"low_stock_products": low_stock,
"category_breakdown": categories,
}
finally:
session.close()
@mcp.tool()
def get_customer_order_history(customer_id: int) -> dict:
"""
Get complete order history for a specific customer.
Args:
customer_id: Customer ID
Returns:
Customer details with full order history
"""
session = get_db_session()
try:
customer = session.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
return {"error": "Customer not found"}
orders = (
session.query(Order)
.options(joinedload(Order.items).joinedload(OrderItem.product))
.filter(Order.customer_id == customer_id)
.order_by(Order.created_at.desc())
.all()
)
total_spent = sum(o.total_amount for o in orders if o.status in ["confirmed", "shipped", "delivered"])
total_orders = len(orders)
return {
"customer": customer.to_dict(),
"stats": {
"total_orders": total_orders,
"total_spent": round(total_spent, 2),
"average_order_value": round(total_spent / total_orders, 2) if total_orders > 0 else 0,
},
"orders": [o.to_dict() for o in orders],
}
finally:
session.close()
@mcp.tool()
def get_profit_analysis(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> dict:
"""
Get profit analysis for sold products.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Profit analysis including margins and totals
"""
session = get_db_session()
try:
query = session.query(
Product.id,
Product.name,
Product.sku,
Product.cost,
OrderItem.unit_price,
OrderItem.quantity,
).join(
OrderItem, Product.id == OrderItem.product_id
).join(
Order, OrderItem.order_id == Order.id
).filter(
Order.status.in_(["confirmed", "shipped", "delivered"])
)
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
query = query.filter(Order.created_at >= start)
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(Order.created_at <= end)
results = query.all()
total_revenue = 0
total_cost = 0
product_profits = {}
for r in results:
revenue = r.unit_price * r.quantity
cost = (r.cost or 0) * r.quantity
profit = revenue - cost
total_revenue += revenue
total_cost += cost
if r.id not in product_profits:
product_profits[r.id] = {
"name": r.name,
"sku": r.sku,
"total_revenue": 0,
"total_cost": 0,
"total_profit": 0,
"units_sold": 0,
}
product_profits[r.id]["total_revenue"] += revenue
product_profits[r.id]["total_cost"] += cost
product_profits[r.id]["total_profit"] += profit
product_profits[r.id]["units_sold"] += r.quantity
# Sort by profit
sorted_products = sorted(product_profits.values(), key=lambda x: x["total_profit"], reverse=True)
total_profit = total_revenue - total_cost
margin = (total_profit / total_revenue * 100) if total_revenue > 0 else 0
return {
"period": {
"start_date": start_date or "all time",
"end_date": end_date or "present",
},
"summary": {
"total_revenue": round(total_revenue, 2),
"total_cost": round(total_cost, 2),
"total_profit": round(total_profit, 2),
"profit_margin_percent": round(margin, 2),
},
"product_breakdown": [
{
**p,
"total_revenue": round(p["total_revenue"], 2),
"total_cost": round(p["total_cost"], 2),
"total_profit": round(p["total_profit"], 2),
"margin_percent": round((p["total_profit"] / p["total_revenue"] * 100) if p["total_revenue"] > 0 else 0, 2),
}
for p in sorted_products
],
}
finally:
session.close()
@mcp.tool()
def search_products(
query: str,
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock_only: bool = False,
) -> dict:
"""
Search for products by name or description.
Args:
query: Search query (searches name and description)
category: Filter by category
min_price: Minimum price filter
max_price: Maximum price filter
in_stock_only: Only show products in stock
Returns:
List of matching products
"""
session = get_db_session()
try:
db_query = session.query(Product).filter(Product.is_active == True)
# Search in name and description
search_pattern = f"%{query}%"
db_query = db_query.filter(
(Product.name.ilike(search_pattern)) | (Product.description.ilike(search_pattern))
)
if category:
db_query = db_query.filter(Product.category == category)
if min_price is not None:
db_query = db_query.filter(Product.price >= min_price)
if max_price is not None:
db_query = db_query.filter(Product.price <= max_price)
if in_stock_only:
db_query = db_query.filter(Product.quantity_in_stock > 0)
products = db_query.all()
return {
"query": query,
"result_count": len(products),
"products": [p.to_dict() for p in products],
}
finally:
session.close()
@mcp.tool()
def search_customers(
query: str,
) -> dict:
"""
Search for customers by name or email.
Args:
query: Search query (searches name and email)
Returns:
List of matching customers
"""
session = get_db_session()
try:
search_pattern = f"%{query}%"
customers = (
session.query(Customer)
.filter(Customer.is_active == True)
.filter(
(Customer.email.ilike(search_pattern))
| (Customer.first_name.ilike(search_pattern))
| (Customer.last_name.ilike(search_pattern))
)
.all()
)
return {
"query": query,
"result_count": len(customers),
"customers": [c.to_dict() for c in customers],
}
finally:
session.close()
def main():
"""Run the MCP server."""
mcp.run()
if __name__ == "__main__":
main()