MCP Unified Server

#!/usr/bin/env python3 import os import json import logging import time import asyncio from enum import Enum from typing import List, Dict, Optional, Any, Union from urllib.parse import urljoin # Ensure compatibility with mcp server from mcp.server.fastmcp import FastMCP, Context # External MCP reference for tool registration external_mcp = None def set_external_mcp(mcp): """Set the external MCP reference for tool registration""" global external_mcp external_mcp = mcp logging.info("Shopify API tools MCP reference set") class ShopifyTools(str, Enum): """Enum of Shopify tool names""" GET_PRODUCTS = "shopify_get_products" GET_PRODUCT = "shopify_get_product" CREATE_PRODUCT = "shopify_create_product" UPDATE_PRODUCT = "shopify_update_product" DELETE_PRODUCT = "shopify_delete_product" GET_ORDERS = "shopify_get_orders" GET_ORDER = "shopify_get_order" CREATE_ORDER = "shopify_create_order" UPDATE_ORDER = "shopify_update_order" CANCEL_ORDER = "shopify_cancel_order" GET_CUSTOMERS = "shopify_get_customers" GET_CUSTOMER = "shopify_get_customer" CREATE_CUSTOMER = "shopify_create_customer" UPDATE_CUSTOMER = "shopify_update_customer" GET_INVENTORY = "shopify_get_inventory" UPDATE_INVENTORY = "shopify_update_inventory" GET_COLLECTIONS = "shopify_get_collections" CREATE_COLLECTION = "shopify_create_collection" UPDATE_COLLECTION = "shopify_update_collection" class ShopifyService: """Service to handle Shopify API operations""" def __init__(self, shop_domain, api_version, api_key=None, api_password=None, access_token=None): """Initialize Shopify service with credentials""" self.shop_domain = shop_domain self.api_version = api_version self.api_key = api_key self.api_password = api_password self.access_token = access_token # Set base URL for API calls self.base_url = f"https://{self.shop_domain}/admin/api/{self.api_version}/" # For rate limiting self.last_request_time = 0 self.min_request_interval = 0.5 # 500ms minimum between requests # Request headers self.headers = self._get_headers() def _get_headers(self): """Generate appropriate headers based on authentication method""" headers = { "Content-Type": "application/json", "Accept": "application/json" } if self.access_token: headers["X-Shopify-Access-Token"] = self.access_token elif self.api_key and self.api_password: # This will be used in the auth parameter, not in headers pass else: raise ValueError( "Either access_token or (api_key and api_password) must be provided") return headers def _get_auth(self): """Return appropriate auth tuple if using API key""" if self.api_key and self.api_password: return (self.api_key, self.api_password) return None async def _make_request(self, method, endpoint, params=None, data=None, json_data=None): """Make a rate-limited request to Shopify API""" # Basic rate limiting current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_request_interval: await asyncio.sleep(self.min_request_interval - time_since_last) url = urljoin(self.base_url, endpoint) auth = self._get_auth() import httpx async with httpx.AsyncClient() as client: self.last_request_time = time.time() if method.lower() == "get": response = await client.get(url, params=params, headers=self.headers, auth=auth) elif method.lower() == "post": response = await client.post(url, params=params, json=json_data, headers=self.headers, auth=auth) elif method.lower() == "put": response = await client.put(url, params=params, json=json_data, headers=self.headers, auth=auth) elif method.lower() == "delete": response = await client.delete(url, params=params, headers=self.headers, auth=auth) else: raise ValueError(f"Unsupported HTTP method: {method}") # Check for Shopify API response errors if response.status_code >= 400: error_msg = f"Shopify API error: {response.status_code}" try: error_detail = response.json() error_msg += f" - {json.dumps(error_detail)}" except: error_msg += f" - {response.text}" raise Exception(error_msg) # Parse response if it has content if response.status_code != 204 and response.content: # No content return response.json() return None # Product operations async def get_products(self, limit=50, page_info=None, collection_id=None, product_type=None, vendor=None): """Get a list of products""" params = {"limit": limit} if page_info: params["page_info"] = page_info if collection_id: params["collection_id"] = collection_id if product_type: params["product_type"] = product_type if vendor: params["vendor"] = vendor return await self._make_request("get", "products.json", params=params) async def get_product(self, product_id): """Get a specific product by ID""" return await self._make_request("get", f"products/{product_id}.json") async def create_product(self, product_data): """Create a new product""" return await self._make_request("post", "products.json", json_data={"product": product_data}) async def update_product(self, product_id, product_data): """Update an existing product""" return await self._make_request("put", f"products/{product_id}.json", json_data={"product": product_data}) async def delete_product(self, product_id): """Delete a product""" return await self._make_request("delete", f"products/{product_id}.json") # Order operations async def get_orders(self, limit=50, page_info=None, status=None, financial_status=None, fulfillment_status=None): """Get a list of orders""" params = {"limit": limit} if page_info: params["page_info"] = page_info if status: params["status"] = status if financial_status: params["financial_status"] = financial_status if fulfillment_status: params["fulfillment_status"] = fulfillment_status return await self._make_request("get", "orders.json", params=params) async def get_order(self, order_id): """Get a specific order by ID""" return await self._make_request("get", f"orders/{order_id}.json") async def create_order(self, order_data): """Create a new order""" return await self._make_request("post", "orders.json", json_data={"order": order_data}) async def update_order(self, order_id, order_data): """Update an existing order""" return await self._make_request("put", f"orders/{order_id}.json", json_data={"order": order_data}) async def cancel_order(self, order_id, reason=None): """Cancel an order""" data = {} if reason: data["reason"] = reason return await self._make_request("post", f"orders/{order_id}/cancel.json", json_data=data) # Customer operations async def get_customers(self, limit=50, page_info=None, query=None): """Get a list of customers""" params = {"limit": limit} if page_info: params["page_info"] = page_info if query: params["query"] = query return await self._make_request("get", "customers.json", params=params) async def get_customer(self, customer_id): """Get a specific customer by ID""" return await self._make_request("get", f"customers/{customer_id}.json") async def create_customer(self, customer_data): """Create a new customer""" return await self._make_request("post", "customers.json", json_data={"customer": customer_data}) async def update_customer(self, customer_id, customer_data): """Update an existing customer""" return await self._make_request("put", f"customers/{customer_id}.json", json_data={"customer": customer_data}) # Inventory operations async def get_inventory_levels(self, inventory_item_ids=None, location_id=None): """Get inventory levels""" params = {} if inventory_item_ids: params["inventory_item_ids"] = ",".join( str(id) for id in inventory_item_ids) if location_id: params["location_id"] = location_id return await self._make_request("get", "inventory_levels.json", params=params) async def adjust_inventory(self, inventory_item_id, location_id, adjustment): """Adjust inventory level""" data = { "inventory_item_id": inventory_item_id, "location_id": location_id, "available_adjustment": adjustment } return await self._make_request("post", "inventory_levels/adjust.json", json_data=data) # Collections operations async def get_collections(self, limit=50, page_info=None): """Get a list of custom collections""" params = {"limit": limit} if page_info: params["page_info"] = page_info # First get custom collections custom = await self._make_request("get", "custom_collections.json", params=params) # Then get smart collections smart = await self._make_request("get", "smart_collections.json", params=params) # Combine them result = {"custom_collections": custom.get("custom_collections", [])} result["smart_collections"] = smart.get("smart_collections", []) return result async def create_collection(self, collection_data, collection_type="custom"): """Create a new collection""" if collection_type == "custom": return await self._make_request("post", "custom_collections.json", json_data={"custom_collection": collection_data}) else: return await self._make_request("post", "smart_collections.json", json_data={"smart_collection": collection_data}) async def update_collection(self, collection_id, collection_data, collection_type="custom"): """Update an existing collection""" if collection_type == "custom": return await self._make_request("put", f"custom_collections/{collection_id}.json", json_data={"custom_collection": collection_data}) else: return await self._make_request("put", f"smart_collections/{collection_id}.json", json_data={"smart_collection": collection_data}) # Tool function implementations async def shopify_get_products(limit: int = 50, page_info: str = None, collection_id: str = None, product_type: str = None, vendor: str = None, ctx: Context = None) -> str: """Get a list of products from Shopify store Parameters: - limit: Maximum number of products to return (default: 50, max: 250) - page_info: Pagination parameter (from previous response) - collection_id: Filter by collection ID - product_type: Filter by product type - vendor: Filter by vendor name """ shopify = _get_shopify_service() if not shopify: return "Shopify API is not configured. Please set the required environment variables." try: result = await shopify.get_products(limit, page_info, collection_id, product_type, vendor) return json.dumps(result, indent=2) except Exception as e: return f"Error retrieving products: {str(e)}" async def shopify_get_product(product_id: str, ctx: Context = None) -> str: """Get a specific product by ID Parameters: - product_id: The ID of the product to retrieve """ shopify = _get_shopify_service() if not shopify: return "Shopify API is not configured. Please set the required environment variables." try: result = await shopify.get_product(product_id) return json.dumps(result, indent=2) except Exception as e: return f"Error retrieving product: {str(e)}" async def shopify_create_product(title: str, product_type: str = None, vendor: str = None, body_html: str = None, variants: List[Dict] = None, images: List[Dict] = None, tags: str = None, ctx: Context = None) -> str: """Create a new product in the Shopify store Parameters: - title: Product title (required) - product_type: Type of product - vendor: Vendor name - body_html: Product description in HTML format - variants: List of variant objects - images: List of image objects - tags: Comma-separated list of tags """ shopify = _get_shopify_service() if not shopify: return "Shopify API is not configured. Please set the required environment variables." try: product_data = { "title": title } if product_type: product_data["product_type"] = product_type if vendor: product_data["vendor"] = vendor if body_html: product_data["body_html"] = body_html if variants: product_data["variants"] = variants if images: product_data["images"] = images if tags: product_data["tags"] = tags result = await shopify.create_product(product_data) return json.dumps(result, indent=2) except Exception as e: return f"Error creating product: {str(e)}" # Remaining tool function implementations follow the same pattern # I've included just a few examples for brevity - in a real implementation, # you would implement all functions from the ShopifyTools enum # Tool registration and initialization _shopify_service = None def initialize_shopify_service(shop_domain=None, api_version=None, api_key=None, api_password=None, access_token=None): """Initialize the Shopify service with credentials""" global _shopify_service # Use environment variables as fallback if shop_domain is None: shop_domain = os.environ.get("SHOPIFY_SHOP_DOMAIN") if api_version is None: # Default to recent version api_version = os.environ.get("SHOPIFY_API_VERSION", "2023-10") if api_key is None: api_key = os.environ.get("SHOPIFY_API_KEY") if api_password is None: api_password = os.environ.get("SHOPIFY_API_PASSWORD") if access_token is None: access_token = os.environ.get("SHOPIFY_ACCESS_TOKEN") # Validate required credentials if not shop_domain: logging.warning( "Shopify shop domain not configured. Please set SHOPIFY_SHOP_DOMAIN environment variable.") return None if not api_version: logging.warning( "Shopify API version not configured. Using default version.") api_version = "2023-10" # Default to recent version if not ((api_key and api_password) or access_token): logging.warning( "Shopify credentials not configured. Please set either SHOPIFY_ACCESS_TOKEN or both SHOPIFY_API_KEY and SHOPIFY_API_PASSWORD environment variables.") return None _shopify_service = ShopifyService( shop_domain, api_version, api_key, api_password, access_token) return _shopify_service def _get_shopify_service(): """Get or initialize the Shopify service""" global _shopify_service if _shopify_service is None: _shopify_service = initialize_shopify_service() return _shopify_service def get_shopify_tools(): """Get a dictionary of all Shopify tools for registration with MCP""" return { ShopifyTools.GET_PRODUCTS: shopify_get_products, ShopifyTools.GET_PRODUCT: shopify_get_product, ShopifyTools.CREATE_PRODUCT: shopify_create_product, # Add all other tool functions here } # This function will be called by the unified server to initialize the module def initialize(mcp=None): """Initialize the Shopify module with MCP reference and credentials""" if mcp: set_external_mcp(mcp) # Initialize the service service = initialize_shopify_service() if service: logging.info("Shopify API service initialized successfully") else: logging.warning("Failed to initialize Shopify API service") return service is not None