MCP Unified Server
by getfounded
- tools
- shopify
#!/usr/bin/env python3
import os
import json
import logging
import time
import asyncio
from enum import Enum
from typing import List, Dict, Optional, Any, Union
from urllib.parse import urljoin
# Ensure compatibility with mcp server
from mcp.server.fastmcp import FastMCP, Context
# External MCP reference for tool registration
external_mcp = None
def set_external_mcp(mcp):
"""Set the external MCP reference for tool registration"""
global external_mcp
external_mcp = mcp
logging.info("Shopify API tools MCP reference set")
class ShopifyTools(str, Enum):
"""Enum of Shopify tool names"""
GET_PRODUCTS = "shopify_get_products"
GET_PRODUCT = "shopify_get_product"
CREATE_PRODUCT = "shopify_create_product"
UPDATE_PRODUCT = "shopify_update_product"
DELETE_PRODUCT = "shopify_delete_product"
GET_ORDERS = "shopify_get_orders"
GET_ORDER = "shopify_get_order"
CREATE_ORDER = "shopify_create_order"
UPDATE_ORDER = "shopify_update_order"
CANCEL_ORDER = "shopify_cancel_order"
GET_CUSTOMERS = "shopify_get_customers"
GET_CUSTOMER = "shopify_get_customer"
CREATE_CUSTOMER = "shopify_create_customer"
UPDATE_CUSTOMER = "shopify_update_customer"
GET_INVENTORY = "shopify_get_inventory"
UPDATE_INVENTORY = "shopify_update_inventory"
GET_COLLECTIONS = "shopify_get_collections"
CREATE_COLLECTION = "shopify_create_collection"
UPDATE_COLLECTION = "shopify_update_collection"
class ShopifyService:
"""Service to handle Shopify API operations"""
def __init__(self, shop_domain, api_version, api_key=None, api_password=None, access_token=None):
"""Initialize Shopify service with credentials"""
self.shop_domain = shop_domain
self.api_version = api_version
self.api_key = api_key
self.api_password = api_password
self.access_token = access_token
# Set base URL for API calls
self.base_url = f"https://{self.shop_domain}/admin/api/{self.api_version}/"
# For rate limiting
self.last_request_time = 0
self.min_request_interval = 0.5 # 500ms minimum between requests
# Request headers
self.headers = self._get_headers()
def _get_headers(self):
"""Generate appropriate headers based on authentication method"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.access_token:
headers["X-Shopify-Access-Token"] = self.access_token
elif self.api_key and self.api_password:
# This will be used in the auth parameter, not in headers
pass
else:
raise ValueError(
"Either access_token or (api_key and api_password) must be provided")
return headers
def _get_auth(self):
"""Return appropriate auth tuple if using API key"""
if self.api_key and self.api_password:
return (self.api_key, self.api_password)
return None
async def _make_request(self, method, endpoint, params=None, data=None, json_data=None):
"""Make a rate-limited request to Shopify API"""
# Basic rate limiting
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_request_interval:
await asyncio.sleep(self.min_request_interval - time_since_last)
url = urljoin(self.base_url, endpoint)
auth = self._get_auth()
import httpx
async with httpx.AsyncClient() as client:
self.last_request_time = time.time()
if method.lower() == "get":
response = await client.get(url, params=params, headers=self.headers, auth=auth)
elif method.lower() == "post":
response = await client.post(url, params=params, json=json_data, headers=self.headers, auth=auth)
elif method.lower() == "put":
response = await client.put(url, params=params, json=json_data, headers=self.headers, auth=auth)
elif method.lower() == "delete":
response = await client.delete(url, params=params, headers=self.headers, auth=auth)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Check for Shopify API response errors
if response.status_code >= 400:
error_msg = f"Shopify API error: {response.status_code}"
try:
error_detail = response.json()
error_msg += f" - {json.dumps(error_detail)}"
except:
error_msg += f" - {response.text}"
raise Exception(error_msg)
# Parse response if it has content
if response.status_code != 204 and response.content: # No content
return response.json()
return None
# Product operations
async def get_products(self, limit=50, page_info=None, collection_id=None, product_type=None, vendor=None):
"""Get a list of products"""
params = {"limit": limit}
if page_info:
params["page_info"] = page_info
if collection_id:
params["collection_id"] = collection_id
if product_type:
params["product_type"] = product_type
if vendor:
params["vendor"] = vendor
return await self._make_request("get", "products.json", params=params)
async def get_product(self, product_id):
"""Get a specific product by ID"""
return await self._make_request("get", f"products/{product_id}.json")
async def create_product(self, product_data):
"""Create a new product"""
return await self._make_request("post", "products.json", json_data={"product": product_data})
async def update_product(self, product_id, product_data):
"""Update an existing product"""
return await self._make_request("put", f"products/{product_id}.json", json_data={"product": product_data})
async def delete_product(self, product_id):
"""Delete a product"""
return await self._make_request("delete", f"products/{product_id}.json")
# Order operations
async def get_orders(self, limit=50, page_info=None, status=None, financial_status=None, fulfillment_status=None):
"""Get a list of orders"""
params = {"limit": limit}
if page_info:
params["page_info"] = page_info
if status:
params["status"] = status
if financial_status:
params["financial_status"] = financial_status
if fulfillment_status:
params["fulfillment_status"] = fulfillment_status
return await self._make_request("get", "orders.json", params=params)
async def get_order(self, order_id):
"""Get a specific order by ID"""
return await self._make_request("get", f"orders/{order_id}.json")
async def create_order(self, order_data):
"""Create a new order"""
return await self._make_request("post", "orders.json", json_data={"order": order_data})
async def update_order(self, order_id, order_data):
"""Update an existing order"""
return await self._make_request("put", f"orders/{order_id}.json", json_data={"order": order_data})
async def cancel_order(self, order_id, reason=None):
"""Cancel an order"""
data = {}
if reason:
data["reason"] = reason
return await self._make_request("post", f"orders/{order_id}/cancel.json", json_data=data)
# Customer operations
async def get_customers(self, limit=50, page_info=None, query=None):
"""Get a list of customers"""
params = {"limit": limit}
if page_info:
params["page_info"] = page_info
if query:
params["query"] = query
return await self._make_request("get", "customers.json", params=params)
async def get_customer(self, customer_id):
"""Get a specific customer by ID"""
return await self._make_request("get", f"customers/{customer_id}.json")
async def create_customer(self, customer_data):
"""Create a new customer"""
return await self._make_request("post", "customers.json", json_data={"customer": customer_data})
async def update_customer(self, customer_id, customer_data):
"""Update an existing customer"""
return await self._make_request("put", f"customers/{customer_id}.json", json_data={"customer": customer_data})
# Inventory operations
async def get_inventory_levels(self, inventory_item_ids=None, location_id=None):
"""Get inventory levels"""
params = {}
if inventory_item_ids:
params["inventory_item_ids"] = ",".join(
str(id) for id in inventory_item_ids)
if location_id:
params["location_id"] = location_id
return await self._make_request("get", "inventory_levels.json", params=params)
async def adjust_inventory(self, inventory_item_id, location_id, adjustment):
"""Adjust inventory level"""
data = {
"inventory_item_id": inventory_item_id,
"location_id": location_id,
"available_adjustment": adjustment
}
return await self._make_request("post", "inventory_levels/adjust.json", json_data=data)
# Collections operations
async def get_collections(self, limit=50, page_info=None):
"""Get a list of custom collections"""
params = {"limit": limit}
if page_info:
params["page_info"] = page_info
# First get custom collections
custom = await self._make_request("get", "custom_collections.json", params=params)
# Then get smart collections
smart = await self._make_request("get", "smart_collections.json", params=params)
# Combine them
result = {"custom_collections": custom.get("custom_collections", [])}
result["smart_collections"] = smart.get("smart_collections", [])
return result
async def create_collection(self, collection_data, collection_type="custom"):
"""Create a new collection"""
if collection_type == "custom":
return await self._make_request("post", "custom_collections.json",
json_data={"custom_collection": collection_data})
else:
return await self._make_request("post", "smart_collections.json",
json_data={"smart_collection": collection_data})
async def update_collection(self, collection_id, collection_data, collection_type="custom"):
"""Update an existing collection"""
if collection_type == "custom":
return await self._make_request("put", f"custom_collections/{collection_id}.json",
json_data={"custom_collection": collection_data})
else:
return await self._make_request("put", f"smart_collections/{collection_id}.json",
json_data={"smart_collection": collection_data})
# Tool function implementations
async def shopify_get_products(limit: int = 50, page_info: str = None,
collection_id: str = None, product_type: str = None,
vendor: str = None, ctx: Context = None) -> str:
"""Get a list of products from Shopify store
Parameters:
- limit: Maximum number of products to return (default: 50, max: 250)
- page_info: Pagination parameter (from previous response)
- collection_id: Filter by collection ID
- product_type: Filter by product type
- vendor: Filter by vendor name
"""
shopify = _get_shopify_service()
if not shopify:
return "Shopify API is not configured. Please set the required environment variables."
try:
result = await shopify.get_products(limit, page_info, collection_id, product_type, vendor)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error retrieving products: {str(e)}"
async def shopify_get_product(product_id: str, ctx: Context = None) -> str:
"""Get a specific product by ID
Parameters:
- product_id: The ID of the product to retrieve
"""
shopify = _get_shopify_service()
if not shopify:
return "Shopify API is not configured. Please set the required environment variables."
try:
result = await shopify.get_product(product_id)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error retrieving product: {str(e)}"
async def shopify_create_product(title: str, product_type: str = None,
vendor: str = None, body_html: str = None,
variants: List[Dict] = None, images: List[Dict] = None,
tags: str = None, ctx: Context = None) -> str:
"""Create a new product in the Shopify store
Parameters:
- title: Product title (required)
- product_type: Type of product
- vendor: Vendor name
- body_html: Product description in HTML format
- variants: List of variant objects
- images: List of image objects
- tags: Comma-separated list of tags
"""
shopify = _get_shopify_service()
if not shopify:
return "Shopify API is not configured. Please set the required environment variables."
try:
product_data = {
"title": title
}
if product_type:
product_data["product_type"] = product_type
if vendor:
product_data["vendor"] = vendor
if body_html:
product_data["body_html"] = body_html
if variants:
product_data["variants"] = variants
if images:
product_data["images"] = images
if tags:
product_data["tags"] = tags
result = await shopify.create_product(product_data)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error creating product: {str(e)}"
# Remaining tool function implementations follow the same pattern
# I've included just a few examples for brevity - in a real implementation,
# you would implement all functions from the ShopifyTools enum
# Tool registration and initialization
_shopify_service = None
def initialize_shopify_service(shop_domain=None, api_version=None, api_key=None, api_password=None, access_token=None):
"""Initialize the Shopify service with credentials"""
global _shopify_service
# Use environment variables as fallback
if shop_domain is None:
shop_domain = os.environ.get("SHOPIFY_SHOP_DOMAIN")
if api_version is None:
# Default to recent version
api_version = os.environ.get("SHOPIFY_API_VERSION", "2023-10")
if api_key is None:
api_key = os.environ.get("SHOPIFY_API_KEY")
if api_password is None:
api_password = os.environ.get("SHOPIFY_API_PASSWORD")
if access_token is None:
access_token = os.environ.get("SHOPIFY_ACCESS_TOKEN")
# Validate required credentials
if not shop_domain:
logging.warning(
"Shopify shop domain not configured. Please set SHOPIFY_SHOP_DOMAIN environment variable.")
return None
if not api_version:
logging.warning(
"Shopify API version not configured. Using default version.")
api_version = "2023-10" # Default to recent version
if not ((api_key and api_password) or access_token):
logging.warning(
"Shopify credentials not configured. Please set either SHOPIFY_ACCESS_TOKEN or both SHOPIFY_API_KEY and SHOPIFY_API_PASSWORD environment variables.")
return None
_shopify_service = ShopifyService(
shop_domain, api_version, api_key, api_password, access_token)
return _shopify_service
def _get_shopify_service():
"""Get or initialize the Shopify service"""
global _shopify_service
if _shopify_service is None:
_shopify_service = initialize_shopify_service()
return _shopify_service
def get_shopify_tools():
"""Get a dictionary of all Shopify tools for registration with MCP"""
return {
ShopifyTools.GET_PRODUCTS: shopify_get_products,
ShopifyTools.GET_PRODUCT: shopify_get_product,
ShopifyTools.CREATE_PRODUCT: shopify_create_product,
# Add all other tool functions here
}
# This function will be called by the unified server to initialize the module
def initialize(mcp=None):
"""Initialize the Shopify module with MCP reference and credentials"""
if mcp:
set_external_mcp(mcp)
# Initialize the service
service = initialize_shopify_service()
if service:
logging.info("Shopify API service initialized successfully")
else:
logging.warning("Failed to initialize Shopify API service")
return service is not None