from fastmcp import FastMCP
import snowflake.connector
import json
# Create the FastMCP server
mcp = FastMCP("e-commerce-demo")
# --- Snowflake Configuration ---
PARAMS = {
"user": "EESHA",
"password": "Eesha1234567890",
"account": "DBASMWF-AX44508",
"warehouse": "SNOWFLAKE_LEARNING_WH",
"database": "SNOWFLAKE_LEARNING_DB",
"schema": "EESHA_LOAD_SAMPLE_DATA_FROM_S3"
}
def get_connection():
return snowflake.connector.connect(**PARAMS)
# --- Resources ---
@mcp.resource("e-commerce://products")
def list_products() -> str:
"""Returns a list of all available products (limit 100)."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, NAME, PRICE, STOCK FROM PRODUCTS LIMIT 100")
columns = [col[0].lower() for col in cur.description]
products = [dict(zip(columns, row)) for row in cur.fetchall()]
return json.dumps(products, indent=2)
finally:
conn.close()
@mcp.resource("e-commerce://customers")
def list_customers() -> str:
"""Returns a list of all registered customers (limit 100)."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, NAME, EMAIL FROM CUSTOMERS LIMIT 100")
columns = [col[0].lower() for col in cur.description]
customers = [dict(zip(columns, row)) for row in cur.fetchall()]
return json.dumps(customers, indent=2)
finally:
conn.close()
@mcp.resource("e-commerce://orders")
def list_orders() -> str:
"""Returns a list of recent orders (limit 50)."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, CUSTOMER_ID, STATUS, CREATED_AT FROM ORDERS ORDER BY CREATED_AT DESC LIMIT 50")
columns = [col[0].lower() for col in cur.description]
# Handle datetime serialization if needed, or rely on default strings
orders = []
for row in cur.fetchall():
order = dict(zip(columns, row))
# Convert datetime to string for JSON serialization
if order.get("created_at"):
order["created_at"] = str(order["created_at"])
orders.append(order)
return json.dumps(orders, indent=2)
finally:
conn.close()
# --- Tools ---
@mcp.tool()
def get_product(product_id: str) -> str:
"""Get details of a specific product by its ID."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, NAME, PRICE, STOCK FROM PRODUCTS WHERE ID = %s", (product_id,))
result = cur.fetchone()
if result:
columns = [col[0].lower() for col in cur.description]
return json.dumps(dict(zip(columns, result)), indent=2)
return f"Product with ID {product_id} not found."
finally:
conn.close()
@mcp.tool()
def get_customer(customer_id: str) -> str:
"""Get details of a specific customer by its ID."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, NAME, EMAIL FROM CUSTOMERS WHERE ID = %s", (customer_id,))
result = cur.fetchone()
if result:
columns = [col[0].lower() for col in cur.description]
return json.dumps(dict(zip(columns, result)), indent=2)
return f"Customer with ID {customer_id} not found."
finally:
conn.close()
@mcp.tool()
def get_order(order_id: str) -> str:
"""Get details of a specific order by its ID."""
conn = get_connection()
try:
cur = conn.cursor()
# Get Order Details
cur.execute("SELECT ID, CUSTOMER_ID, STATUS, CREATED_AT FROM ORDERS WHERE ID = %s", (order_id,))
order_row = cur.fetchone()
if not order_row:
return f"Order with ID {order_id} not found."
columns = [col[0].lower() for col in cur.description]
order = dict(zip(columns, order_row))
if order.get("created_at"):
order["created_at"] = str(order["created_at"])
# Get Order Items
cur.execute("SELECT PRODUCT_ID, QUANTITY FROM ORDER_ITEMS WHERE ORDER_ID = %s", (order_id,))
items = [{"product_id": row[0], "quantity": row[1]} for row in cur.fetchall()]
order["items"] = items
return json.dumps(order, indent=2)
finally:
conn.close()
@mcp.tool()
def add_product(name: str, price: float, stock: int) -> str:
"""Add a new product to the catalog."""
conn = get_connection()
try:
cur = conn.cursor()
# Simple ID generation strategy: find max p_id number or just random/UUID.
# Using UUID for simplicity in a real DB or just a random hash string from python.
# But to keep consistent with previous p1, p2, let's fetch count.
cur.execute("SELECT COUNT(*) FROM PRODUCTS")
count = cur.fetchone()[0]
new_id = f"p{count + 10000}" # Start higher to avoid collision with initial batch
cur.execute("INSERT INTO PRODUCTS (ID, NAME, PRICE, STOCK) VALUES (%s, %s, %s, %s)", (new_id, name, price, stock))
# No commit needed if autocommit is on (default for connector?), but better safe
conn.commit()
return f"Created product {name} with ID {new_id}"
finally:
conn.close()
@mcp.tool()
def add_customer(name: str, email: str) -> str:
"""Add a new customer to the registry."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM CUSTOMERS")
count = cur.fetchone()[0]
new_id = f"c{count + 10000}"
cur.execute("INSERT INTO CUSTOMERS (ID, NAME, EMAIL) VALUES (%s, %s, %s)", (new_id, name, email))
conn.commit()
return f"Created customer {name} with ID {new_id}"
finally:
conn.close()
@mcp.tool()
def add_order(customer_id: str, product_ids: list[str]) -> str:
"""Place a new order for a customer."""
conn = get_connection()
try:
cur = conn.cursor()
# Validate Customer
cur.execute("SELECT ID FROM CUSTOMERS WHERE ID = %s", (customer_id,))
if not cur.fetchone():
return f"Error: Customer {customer_id} not found"
# Validate Products
# Construct a query to check all provided IDs
# Snowflake supports IN clause
# Note: simplistic validation here
cur.execute("SELECT COUNT(*) FROM ORDERS")
count = cur.fetchone()[0]
new_id = f"o{count + 10000}"
# Insert Order
cur.execute("INSERT INTO ORDERS (ID, CUSTOMER_ID, STATUS, CREATED_AT) VALUES (%s, %s, 'pending', CURRENT_TIMESTAMP())", (new_id, customer_id))
# Insert Items
# Assuming quantity 1 for all list items for simplicity unless aggregated
for pid in product_ids:
cur.execute("INSERT INTO ORDER_ITEMS (ORDER_ID, PRODUCT_ID, QUANTITY) VALUES (%s, %s, 1)", (new_id, pid))
conn.commit()
return f"Created order {new_id} for customer {customer_id}"
finally:
conn.close()
@mcp.tool()
def search_products(query: str) -> str:
"""Search for products by name (case-insensitive partial match)."""
conn = get_connection()
try:
cur = conn.cursor()
search_term = f"%{query}%"
cur.execute("SELECT ID, NAME, PRICE, STOCK FROM PRODUCTS WHERE NAME ILIKE %s LIMIT 20", (search_term,))
columns = [col[0].lower() for col in cur.description]
products = [dict(zip(columns, row)) for row in cur.fetchall()]
if not products:
return f"No products found searching for '{query}'"
return json.dumps(products, indent=2)
finally:
conn.close()
@mcp.tool()
def search_customers(query: str) -> str:
"""Search for customers by name or email (case-insensitive partial match)."""
conn = get_connection()
try:
cur = conn.cursor()
search_term = f"%{query}%"
cur.execute("SELECT ID, NAME, EMAIL FROM CUSTOMERS WHERE NAME ILIKE %s OR EMAIL ILIKE %s LIMIT 20", (search_term, search_term))
columns = [col[0].lower() for col in cur.description]
customers = [dict(zip(columns, row)) for row in cur.fetchall()]
if not customers:
return f"No customers found searching for '{query}'"
return json.dumps(customers, indent=2)
finally:
conn.close()
@mcp.tool()
def list_customer_orders(customer_id: str) -> str:
"""List all orders for a specific customer."""
conn = get_connection()
try:
cur = conn.cursor()
cur.execute("SELECT ID, STATUS, CREATED_AT FROM ORDERS WHERE CUSTOMER_ID = %s ORDER BY CREATED_AT DESC", (customer_id,))
columns = [col[0].lower() for col in cur.description]
orders = []
for row in cur.fetchall():
order = dict(zip(columns, row))
if order.get("created_at"):
order["created_at"] = str(order["created_at"])
orders.append(order)
if not orders:
return f"No orders found for customer {customer_id}"
return json.dumps(orders, indent=2)
finally:
conn.close()
if __name__ == "__main__":
# fastmcp run server.py
mcp.run()