Skip to main content
Glama
ecommerce_server.py39.9 kB
""" E-commerce MCP Server A FastMCP server for managing products, customers, and orders. Provides full CRUD operations and data analysis capabilities for business users. """ import os import sys import uuid from datetime import datetime from pathlib import Path from typing import Optional # Ensure the script's directory is in the path for imports SCRIPT_DIR = Path(__file__).parent.resolve() sys.path.insert(0, str(SCRIPT_DIR)) from fastmcp import FastMCP from sqlalchemy import func from sqlalchemy.orm import joinedload from models import ( Product, Customer, Order, OrderItem, init_db, get_session, get_engine, ) # Initialize the MCP server mcp = FastMCP( "E-commerce Server", instructions=""" This MCP server helps business users manage their e-commerce operations. Available operations: - Products: Add, view, update, delete products and manage inventory - Customers: Add, view, update, delete customer records - Orders: Create orders, add items, update status, view order history - Analytics: Get sales reports, inventory status, customer insights Use natural language to interact with the system. """, ) # Database path - can be configured via environment variable # Default to a file in the same directory as the script DEFAULT_DB_PATH = str(SCRIPT_DIR / "ecommerce.db") DB_PATH = os.environ.get("ECOMMERCE_DB_PATH", DEFAULT_DB_PATH) # Initialize database on startup engine = init_db(DB_PATH) def get_db_session(): """Get a database session.""" return get_session(engine) # ============================================================================= # PRODUCT MANAGEMENT TOOLS # ============================================================================= @mcp.tool() def add_product( name: str, sku: str, price: float, description: Optional[str] = None, cost: Optional[float] = None, quantity_in_stock: int = 0, category: Optional[str] = None, ) -> dict: """ Add a new product to the inventory. Args: name: Product name sku: Stock Keeping Unit (unique identifier) price: Selling price description: Product description cost: Product cost (for profit calculation) quantity_in_stock: Initial stock quantity category: Product category Returns: The created product details """ session = get_db_session() try: # Check if SKU already exists existing = session.query(Product).filter(Product.sku == sku).first() if existing: return {"error": f"Product with SKU '{sku}' already exists"} product = Product( name=name, sku=sku, price=price, description=description, cost=cost, quantity_in_stock=quantity_in_stock, category=category, ) session.add(product) session.commit() session.refresh(product) return {"success": True, "product": product.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def get_product(product_id: Optional[int] = None, sku: Optional[str] = None) -> dict: """ Get a product by ID or SKU. Args: product_id: Product ID sku: Product SKU Returns: Product details or error message """ if not product_id and not sku: return {"error": "Please provide either product_id or sku"} session = get_db_session() try: if product_id: product = session.query(Product).filter(Product.id == product_id).first() else: product = session.query(Product).filter(Product.sku == sku).first() if not product: return {"error": "Product not found"} return {"product": product.to_dict()} finally: session.close() @mcp.tool() def list_products( category: Optional[str] = None, active_only: bool = True, low_stock_threshold: Optional[int] = None, limit: int = 100, offset: int = 0, ) -> dict: """ List products with optional filtering. Args: category: Filter by category active_only: Only show active products low_stock_threshold: Only show products with stock below this threshold limit: Maximum number of products to return offset: Number of products to skip Returns: List of products matching the criteria """ session = get_db_session() try: query = session.query(Product) if active_only: query = query.filter(Product.is_active == True) if category: query = query.filter(Product.category == category) if low_stock_threshold is not None: query = query.filter(Product.quantity_in_stock < low_stock_threshold) total = query.count() products = query.offset(offset).limit(limit).all() return { "total": total, "limit": limit, "offset": offset, "products": [p.to_dict() for p in products], } finally: session.close() @mcp.tool() def update_product( product_id: int, name: Optional[str] = None, description: Optional[str] = None, price: Optional[float] = None, cost: Optional[float] = None, quantity_in_stock: Optional[int] = None, category: Optional[str] = None, is_active: Optional[bool] = None, ) -> dict: """ Update an existing product. Args: product_id: ID of the product to update name: New product name description: New description price: New price cost: New cost quantity_in_stock: New stock quantity category: New category is_active: Active status Returns: Updated product details """ session = get_db_session() try: product = session.query(Product).filter(Product.id == product_id).first() if not product: return {"error": "Product not found"} if name is not None: product.name = name if description is not None: product.description = description if price is not None: product.price = price if cost is not None: product.cost = cost if quantity_in_stock is not None: product.quantity_in_stock = quantity_in_stock if category is not None: product.category = category if is_active is not None: product.is_active = is_active session.commit() session.refresh(product) return {"success": True, "product": product.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def delete_product(product_id: int) -> dict: """ Delete a product (soft delete - sets is_active to False). Args: product_id: ID of the product to delete Returns: Success message or error """ session = get_db_session() try: product = session.query(Product).filter(Product.id == product_id).first() if not product: return {"error": "Product not found"} product.is_active = False session.commit() return {"success": True, "message": f"Product '{product.name}' has been deactivated"} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def update_stock(product_id: int, quantity_change: int, reason: Optional[str] = None) -> dict: """ Update product stock quantity (add or subtract). Args: product_id: Product ID quantity_change: Amount to add (positive) or subtract (negative) reason: Reason for stock change Returns: Updated stock information """ session = get_db_session() try: product = session.query(Product).filter(Product.id == product_id).first() if not product: return {"error": "Product not found"} old_quantity = product.quantity_in_stock new_quantity = old_quantity + quantity_change if new_quantity < 0: return {"error": f"Cannot reduce stock below 0. Current stock: {old_quantity}"} product.quantity_in_stock = new_quantity session.commit() return { "success": True, "product_id": product_id, "product_name": product.name, "old_quantity": old_quantity, "change": quantity_change, "new_quantity": new_quantity, "reason": reason, } except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() # ============================================================================= # CUSTOMER MANAGEMENT TOOLS # ============================================================================= @mcp.tool() def add_customer( email: str, first_name: str, last_name: str, phone: Optional[str] = None, address: Optional[str] = None, city: Optional[str] = None, state: Optional[str] = None, postal_code: Optional[str] = None, country: Optional[str] = None, ) -> dict: """ Add a new customer. Args: email: Customer email (unique) first_name: Customer first name last_name: Customer last name phone: Phone number address: Street address city: City state: State/Province postal_code: Postal/ZIP code country: Country Returns: Created customer details """ session = get_db_session() try: # Check if email already exists existing = session.query(Customer).filter(Customer.email == email).first() if existing: return {"error": f"Customer with email '{email}' already exists"} customer = Customer( email=email, first_name=first_name, last_name=last_name, phone=phone, address=address, city=city, state=state, postal_code=postal_code, country=country, ) session.add(customer) session.commit() session.refresh(customer) return {"success": True, "customer": customer.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def get_customer(customer_id: Optional[int] = None, email: Optional[str] = None) -> dict: """ Get a customer by ID or email. Args: customer_id: Customer ID email: Customer email Returns: Customer details or error """ if not customer_id and not email: return {"error": "Please provide either customer_id or email"} session = get_db_session() try: if customer_id: customer = session.query(Customer).filter(Customer.id == customer_id).first() else: customer = session.query(Customer).filter(Customer.email == email).first() if not customer: return {"error": "Customer not found"} return {"customer": customer.to_dict()} finally: session.close() @mcp.tool() def list_customers( active_only: bool = True, city: Optional[str] = None, country: Optional[str] = None, limit: int = 100, offset: int = 0, ) -> dict: """ List customers with optional filtering. Args: active_only: Only show active customers city: Filter by city country: Filter by country limit: Maximum number of customers to return offset: Number of customers to skip Returns: List of customers """ session = get_db_session() try: query = session.query(Customer) if active_only: query = query.filter(Customer.is_active == True) if city: query = query.filter(Customer.city == city) if country: query = query.filter(Customer.country == country) total = query.count() customers = query.offset(offset).limit(limit).all() return { "total": total, "limit": limit, "offset": offset, "customers": [c.to_dict() for c in customers], } finally: session.close() @mcp.tool() def update_customer( customer_id: int, email: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, phone: Optional[str] = None, address: Optional[str] = None, city: Optional[str] = None, state: Optional[str] = None, postal_code: Optional[str] = None, country: Optional[str] = None, is_active: Optional[bool] = None, ) -> dict: """ Update an existing customer. Args: customer_id: ID of the customer to update email: New email first_name: New first name last_name: New last name phone: New phone address: New address city: New city state: New state postal_code: New postal code country: New country is_active: Active status Returns: Updated customer details """ session = get_db_session() try: customer = session.query(Customer).filter(Customer.id == customer_id).first() if not customer: return {"error": "Customer not found"} if email is not None: # Check if new email is taken by another customer existing = session.query(Customer).filter( Customer.email == email, Customer.id != customer_id ).first() if existing: return {"error": f"Email '{email}' is already in use"} customer.email = email if first_name is not None: customer.first_name = first_name if last_name is not None: customer.last_name = last_name if phone is not None: customer.phone = phone if address is not None: customer.address = address if city is not None: customer.city = city if state is not None: customer.state = state if postal_code is not None: customer.postal_code = postal_code if country is not None: customer.country = country if is_active is not None: customer.is_active = is_active session.commit() session.refresh(customer) return {"success": True, "customer": customer.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def delete_customer(customer_id: int) -> dict: """ Delete a customer (soft delete - sets is_active to False). Args: customer_id: ID of the customer to delete Returns: Success message or error """ session = get_db_session() try: customer = session.query(Customer).filter(Customer.id == customer_id).first() if not customer: return {"error": "Customer not found"} customer.is_active = False session.commit() return { "success": True, "message": f"Customer '{customer.first_name} {customer.last_name}' has been deactivated", } except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() # ============================================================================= # ORDER MANAGEMENT TOOLS # ============================================================================= def generate_order_number() -> str: """Generate a unique order number.""" timestamp = datetime.now().strftime("%Y%m%d%H%M%S") unique_id = uuid.uuid4().hex[:6].upper() return f"ORD-{timestamp}-{unique_id}" @mcp.tool() def create_order( customer_id: int, shipping_address: Optional[str] = None, notes: Optional[str] = None, ) -> dict: """ Create a new order for a customer. Args: customer_id: ID of the customer placing the order shipping_address: Shipping address (defaults to customer address) notes: Order notes Returns: Created order details """ session = get_db_session() try: customer = session.query(Customer).filter(Customer.id == customer_id).first() if not customer: return {"error": "Customer not found"} if not shipping_address: # Use customer's address as default addr_parts = [customer.address, customer.city, customer.state, customer.postal_code, customer.country] shipping_address = ", ".join(filter(None, addr_parts)) or None order = Order( customer_id=customer_id, order_number=generate_order_number(), shipping_address=shipping_address, notes=notes, status="pending", ) session.add(order) session.commit() session.refresh(order) return {"success": True, "order": order.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def add_order_item( order_id: int, product_id: int, quantity: int, ) -> dict: """ Add an item to an existing order. Args: order_id: ID of the order product_id: ID of the product to add quantity: Quantity to order Returns: Updated order details """ session = get_db_session() try: order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first() if not order: return {"error": "Order not found"} if order.status not in ["pending", "confirmed"]: return {"error": f"Cannot modify order with status '{order.status}'"} product = session.query(Product).filter(Product.id == product_id).first() if not product: return {"error": "Product not found"} if not product.is_active: return {"error": "Product is not available"} if product.quantity_in_stock < quantity: return {"error": f"Insufficient stock. Available: {product.quantity_in_stock}"} # Check if item already exists in order existing_item = next((item for item in order.items if item.product_id == product_id), None) if existing_item: existing_item.quantity += quantity else: order_item = OrderItem( order_id=order_id, product_id=product_id, quantity=quantity, unit_price=product.price, ) session.add(order_item) # Update order total session.flush() order.total_amount = sum(item.quantity * item.unit_price for item in order.items) session.commit() session.refresh(order) return {"success": True, "order": order.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def remove_order_item(order_id: int, product_id: int) -> dict: """ Remove an item from an order. Args: order_id: ID of the order product_id: ID of the product to remove Returns: Updated order details """ session = get_db_session() try: order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first() if not order: return {"error": "Order not found"} if order.status not in ["pending", "confirmed"]: return {"error": f"Cannot modify order with status '{order.status}'"} item_to_remove = next((item for item in order.items if item.product_id == product_id), None) if not item_to_remove: return {"error": "Item not found in order"} session.delete(item_to_remove) # Update order total session.flush() order.total_amount = sum(item.quantity * item.unit_price for item in order.items if item.product_id != product_id) session.commit() session.refresh(order) return {"success": True, "order": order.to_dict()} except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() @mcp.tool() def get_order(order_id: Optional[int] = None, order_number: Optional[str] = None) -> dict: """ Get an order by ID or order number. Args: order_id: Order ID order_number: Order number Returns: Order details with items """ if not order_id and not order_number: return {"error": "Please provide either order_id or order_number"} session = get_db_session() try: query = session.query(Order).options(joinedload(Order.items).joinedload(OrderItem.product)) if order_id: order = query.filter(Order.id == order_id).first() else: order = query.filter(Order.order_number == order_number).first() if not order: return {"error": "Order not found"} return {"order": order.to_dict()} finally: session.close() @mcp.tool() def list_orders( customer_id: Optional[int] = None, status: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 100, offset: int = 0, ) -> dict: """ List orders with optional filtering. Args: customer_id: Filter by customer status: Filter by status (pending, confirmed, shipped, delivered, cancelled) start_date: Filter orders from this date (YYYY-MM-DD) end_date: Filter orders until this date (YYYY-MM-DD) limit: Maximum number of orders to return offset: Number of orders to skip Returns: List of orders """ session = get_db_session() try: query = session.query(Order).options(joinedload(Order.items)) if customer_id: query = query.filter(Order.customer_id == customer_id) if status: query = query.filter(Order.status == status) if start_date: start = datetime.strptime(start_date, "%Y-%m-%d") query = query.filter(Order.created_at >= start) if end_date: end = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter(Order.created_at <= end) total = query.count() orders = query.order_by(Order.created_at.desc()).offset(offset).limit(limit).all() return { "total": total, "limit": limit, "offset": offset, "orders": [o.to_dict() for o in orders], } finally: session.close() @mcp.tool() def update_order_status( order_id: int, status: str, notes: Optional[str] = None, ) -> dict: """ Update the status of an order. Args: order_id: ID of the order status: New status (pending, confirmed, shipped, delivered, cancelled) notes: Additional notes about the status change Returns: Updated order details """ valid_statuses = ["pending", "confirmed", "shipped", "delivered", "cancelled"] if status not in valid_statuses: return {"error": f"Invalid status. Valid statuses: {', '.join(valid_statuses)}"} session = get_db_session() try: order = session.query(Order).options(joinedload(Order.items)).filter(Order.id == order_id).first() if not order: return {"error": "Order not found"} old_status = order.status # Handle stock adjustments if status == "confirmed" and old_status == "pending": # Reduce stock when order is confirmed for item in order.items: product = session.query(Product).filter(Product.id == item.product_id).first() if product.quantity_in_stock < item.quantity: return {"error": f"Insufficient stock for product '{product.name}'"} product.quantity_in_stock -= item.quantity elif status == "cancelled" and old_status in ["pending", "confirmed"]: # Restore stock when order is cancelled if old_status == "confirmed": for item in order.items: product = session.query(Product).filter(Product.id == item.product_id).first() product.quantity_in_stock += item.quantity order.status = status if notes: order.notes = (order.notes or "") + f"\n[{datetime.now().isoformat()}] Status changed to {status}: {notes}" session.commit() session.refresh(order) return { "success": True, "old_status": old_status, "new_status": status, "order": order.to_dict(), } except Exception as e: session.rollback() return {"error": str(e)} finally: session.close() # ============================================================================= # DATA ANALYSIS & REPORTING TOOLS # ============================================================================= @mcp.tool() def get_sales_summary( start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> dict: """ Get a summary of sales for a given period. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Sales summary including total revenue, order count, average order value """ session = get_db_session() try: query = session.query(Order).filter(Order.status.in_(["confirmed", "shipped", "delivered"])) if start_date: start = datetime.strptime(start_date, "%Y-%m-%d") query = query.filter(Order.created_at >= start) if end_date: end = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter(Order.created_at <= end) orders = query.all() total_revenue = sum(o.total_amount for o in orders) order_count = len(orders) avg_order_value = total_revenue / order_count if order_count > 0 else 0 # Status breakdown status_counts = {} for order in orders: status_counts[order.status] = status_counts.get(order.status, 0) + 1 return { "period": { "start_date": start_date or "all time", "end_date": end_date or "present", }, "total_revenue": round(total_revenue, 2), "order_count": order_count, "average_order_value": round(avg_order_value, 2), "status_breakdown": status_counts, } finally: session.close() @mcp.tool() def get_top_products( limit: int = 10, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> dict: """ Get top selling products by quantity and revenue. Args: limit: Number of top products to return start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: List of top products with sales metrics """ session = get_db_session() try: query = session.query( Product.id, Product.name, Product.sku, Product.category, func.sum(OrderItem.quantity).label("total_quantity"), func.sum(OrderItem.quantity * OrderItem.unit_price).label("total_revenue"), ).join( OrderItem, Product.id == OrderItem.product_id ).join( Order, OrderItem.order_id == Order.id ).filter( Order.status.in_(["confirmed", "shipped", "delivered"]) ) if start_date: start = datetime.strptime(start_date, "%Y-%m-%d") query = query.filter(Order.created_at >= start) if end_date: end = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter(Order.created_at <= end) results = query.group_by(Product.id).order_by(func.sum(OrderItem.quantity * OrderItem.unit_price).desc()).limit(limit).all() top_products = [ { "product_id": r.id, "name": r.name, "sku": r.sku, "category": r.category, "total_quantity_sold": r.total_quantity or 0, "total_revenue": round(r.total_revenue or 0, 2), } for r in results ] return { "period": { "start_date": start_date or "all time", "end_date": end_date or "present", }, "top_products": top_products, } finally: session.close() @mcp.tool() def get_top_customers( limit: int = 10, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> dict: """ Get top customers by total spend and order count. Args: limit: Number of top customers to return start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: List of top customers with purchase metrics """ session = get_db_session() try: query = session.query( Customer.id, Customer.email, Customer.first_name, Customer.last_name, func.count(Order.id).label("order_count"), func.sum(Order.total_amount).label("total_spent"), ).join( Order, Customer.id == Order.customer_id ).filter( Order.status.in_(["confirmed", "shipped", "delivered"]) ) if start_date: start = datetime.strptime(start_date, "%Y-%m-%d") query = query.filter(Order.created_at >= start) if end_date: end = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter(Order.created_at <= end) results = query.group_by(Customer.id).order_by(func.sum(Order.total_amount).desc()).limit(limit).all() top_customers = [ { "customer_id": r.id, "email": r.email, "name": f"{r.first_name} {r.last_name}", "order_count": r.order_count or 0, "total_spent": round(r.total_spent or 0, 2), } for r in results ] return { "period": { "start_date": start_date or "all time", "end_date": end_date or "present", }, "top_customers": top_customers, } finally: session.close() @mcp.tool() def get_inventory_report( include_inactive: bool = False, low_stock_threshold: int = 10, ) -> dict: """ Get inventory status report. Args: include_inactive: Include inactive products low_stock_threshold: Threshold for low stock warning Returns: Inventory report with stock levels and values """ session = get_db_session() try: query = session.query(Product) if not include_inactive: query = query.filter(Product.is_active == True) products = query.all() total_products = len(products) total_stock_value = sum((p.cost or p.price) * p.quantity_in_stock for p in products) total_retail_value = sum(p.price * p.quantity_in_stock for p in products) out_of_stock = [p.to_dict() for p in products if p.quantity_in_stock == 0] low_stock = [p.to_dict() for p in products if 0 < p.quantity_in_stock < low_stock_threshold] # Category breakdown categories = {} for p in products: cat = p.category or "Uncategorized" if cat not in categories: categories[cat] = {"count": 0, "total_stock": 0, "total_value": 0} categories[cat]["count"] += 1 categories[cat]["total_stock"] += p.quantity_in_stock categories[cat]["total_value"] += p.price * p.quantity_in_stock return { "summary": { "total_products": total_products, "total_stock_value": round(total_stock_value, 2), "total_retail_value": round(total_retail_value, 2), "out_of_stock_count": len(out_of_stock), "low_stock_count": len(low_stock), }, "out_of_stock_products": out_of_stock, "low_stock_products": low_stock, "category_breakdown": categories, } finally: session.close() @mcp.tool() def get_customer_order_history(customer_id: int) -> dict: """ Get complete order history for a specific customer. Args: customer_id: Customer ID Returns: Customer details with full order history """ session = get_db_session() try: customer = session.query(Customer).filter(Customer.id == customer_id).first() if not customer: return {"error": "Customer not found"} orders = ( session.query(Order) .options(joinedload(Order.items).joinedload(OrderItem.product)) .filter(Order.customer_id == customer_id) .order_by(Order.created_at.desc()) .all() ) total_spent = sum(o.total_amount for o in orders if o.status in ["confirmed", "shipped", "delivered"]) total_orders = len(orders) return { "customer": customer.to_dict(), "stats": { "total_orders": total_orders, "total_spent": round(total_spent, 2), "average_order_value": round(total_spent / total_orders, 2) if total_orders > 0 else 0, }, "orders": [o.to_dict() for o in orders], } finally: session.close() @mcp.tool() def get_profit_analysis( start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> dict: """ Get profit analysis for sold products. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Profit analysis including margins and totals """ session = get_db_session() try: query = session.query( Product.id, Product.name, Product.sku, Product.cost, OrderItem.unit_price, OrderItem.quantity, ).join( OrderItem, Product.id == OrderItem.product_id ).join( Order, OrderItem.order_id == Order.id ).filter( Order.status.in_(["confirmed", "shipped", "delivered"]) ) if start_date: start = datetime.strptime(start_date, "%Y-%m-%d") query = query.filter(Order.created_at >= start) if end_date: end = datetime.strptime(end_date, "%Y-%m-%d") query = query.filter(Order.created_at <= end) results = query.all() total_revenue = 0 total_cost = 0 product_profits = {} for r in results: revenue = r.unit_price * r.quantity cost = (r.cost or 0) * r.quantity profit = revenue - cost total_revenue += revenue total_cost += cost if r.id not in product_profits: product_profits[r.id] = { "name": r.name, "sku": r.sku, "total_revenue": 0, "total_cost": 0, "total_profit": 0, "units_sold": 0, } product_profits[r.id]["total_revenue"] += revenue product_profits[r.id]["total_cost"] += cost product_profits[r.id]["total_profit"] += profit product_profits[r.id]["units_sold"] += r.quantity # Sort by profit sorted_products = sorted(product_profits.values(), key=lambda x: x["total_profit"], reverse=True) total_profit = total_revenue - total_cost margin = (total_profit / total_revenue * 100) if total_revenue > 0 else 0 return { "period": { "start_date": start_date or "all time", "end_date": end_date or "present", }, "summary": { "total_revenue": round(total_revenue, 2), "total_cost": round(total_cost, 2), "total_profit": round(total_profit, 2), "profit_margin_percent": round(margin, 2), }, "product_breakdown": [ { **p, "total_revenue": round(p["total_revenue"], 2), "total_cost": round(p["total_cost"], 2), "total_profit": round(p["total_profit"], 2), "margin_percent": round((p["total_profit"] / p["total_revenue"] * 100) if p["total_revenue"] > 0 else 0, 2), } for p in sorted_products ], } finally: session.close() @mcp.tool() def search_products( query: str, category: Optional[str] = None, min_price: Optional[float] = None, max_price: Optional[float] = None, in_stock_only: bool = False, ) -> dict: """ Search for products by name or description. Args: query: Search query (searches name and description) category: Filter by category min_price: Minimum price filter max_price: Maximum price filter in_stock_only: Only show products in stock Returns: List of matching products """ session = get_db_session() try: db_query = session.query(Product).filter(Product.is_active == True) # Search in name and description search_pattern = f"%{query}%" db_query = db_query.filter( (Product.name.ilike(search_pattern)) | (Product.description.ilike(search_pattern)) ) if category: db_query = db_query.filter(Product.category == category) if min_price is not None: db_query = db_query.filter(Product.price >= min_price) if max_price is not None: db_query = db_query.filter(Product.price <= max_price) if in_stock_only: db_query = db_query.filter(Product.quantity_in_stock > 0) products = db_query.all() return { "query": query, "result_count": len(products), "products": [p.to_dict() for p in products], } finally: session.close() @mcp.tool() def search_customers( query: str, ) -> dict: """ Search for customers by name or email. Args: query: Search query (searches name and email) Returns: List of matching customers """ session = get_db_session() try: search_pattern = f"%{query}%" customers = ( session.query(Customer) .filter(Customer.is_active == True) .filter( (Customer.email.ilike(search_pattern)) | (Customer.first_name.ilike(search_pattern)) | (Customer.last_name.ilike(search_pattern)) ) .all() ) return { "query": query, "result_count": len(customers), "customers": [c.to_dict() for c in customers], } finally: session.close() def main(): """Run the MCP server.""" mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eeshapatel12/Commerce-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server