from mcp.server.fastmcp import FastMCP
import psycopg2
from psycopg2.extras import RealDictCursor
import os
from urllib.parse import urlparse
mcp = FastMCP(name="inventory_mcp")
# Supabase PostgreSQL config
SUPABASE_URL = os.getenv("SUPABASE_URL", "https://sitomhzoqsoqpkaglxzg.supabase.co")
SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InNpdG9taHpvcXNvcXBrYWdseHpnIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjUwNjI4MDQsImV4cCI6MjA4MDYzODgwNH0.uk78pyHh_RcjbAJh_VI3Uu1elHAnnDqQjClkAoursxg")
# Extract host from Supabase URL and convert to database host format
host = urlparse(SUPABASE_URL).netloc.replace("https://", "").replace("http://", "")
# Convert Supabase URL to database host format
if ".supabase.co" in host:
host = "db." + host
# Database connection string (PostgreSQL format)
# Note: For Supabase, you need the database password from your Supabase dashboard
# Default: Using connection pooling URL format (port 6543 recommended)
DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD", "root")
DB_USER = os.getenv("SUPABASE_DB_USER", "postgres")
DB_NAME = os.getenv("SUPABASE_DB_NAME", "postgres")
DB_PORT = os.getenv("SUPABASE_DB_PORT", "6543") # 6543 = pooler (recommended), 5432 = direct
# Connection string for direct PostgreSQL connection
if DB_PASSWORD:
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{host}:{DB_PORT}/{DB_NAME}"
else:
# Fallback: use Supabase connection pooling (requires password)
DATABASE_URL = None
def get_db_connection():
"""Get database connection. Uses connection string or constructs from config."""
if DATABASE_URL:
return psycopg2.connect(DATABASE_URL, sslmode="require")
else:
# Try connection with defaults (requires password from Supabase settings)
return psycopg2.connect(
host=host,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
port=DB_PORT,
sslmode="require"
)
@mcp.tool()
def add_inventory(item_id: str, product_name: str, location: str, quantity: int) -> dict:
conn = get_db_connection()
cursor = conn.cursor()
# PostgreSQL uses ON CONFLICT instead of ON DUPLICATE KEY UPDATE
cursor.execute(
"""INSERT INTO inventory (item_id, product_name, location, quantity)
VALUES (%s, %s, %s, %s)
ON CONFLICT (item_id, location)
DO UPDATE SET quantity = inventory.quantity + %s, product_name = EXCLUDED.product_name""",
(item_id, product_name, location, quantity, quantity)
)
conn.commit()
cursor.close()
conn.close()
return {"message": f"Added {quantity} units of {product_name} ({item_id}) at {location}"}
@mcp.tool()
def remove_inventory(item_id: str, location: str, quantity: int) -> dict:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE inventory SET quantity = quantity - %s WHERE item_id=%s AND location=%s AND quantity >= %s",
(quantity, item_id, location, quantity)
)
conn.commit()
cursor.close()
conn.close()
return {"message": f"Removed {quantity} units of {item_id} from {location}"}
@mcp.tool()
def check_stock(item_id: str, location: str) -> dict:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT product_name, quantity FROM inventory WHERE item_id=%s AND location=%s",
(item_id, location)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return {
"item_id": item_id,
"location": location,
"product_name": result[0],
"quantity": result[1]
}
else:
return {
"item_id": item_id,
"location": location,
"product_name": None,
"quantity": 0
}
@mcp.tool()
def list_inventory() -> list:
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SELECT item_id, product_name, location, quantity FROM inventory")
rows = cursor.fetchall()
# Convert RealDictRow to dict for JSON serialization
result = [dict(row) for row in rows]
cursor.close()
conn.close()
return result
if __name__ == "__main__":
mcp.run()