#!/usr/bin/env python3
"""
SellerChamp MCP Server - Enhanced Edition v2.6
Comprehensive tools for managing inventory, pricing, orders, and cross-listing via SellerChamp API.
Supports marketplace-aware operations for Amazon, eBay, and other connected platforms.
API Documentation: http://apidocs.sellerchamp.com/
Changes in v2.6:
- Dynamic multi-marketplace support: find_missing_listings and find_crosslist_candidates
now work with any marketplace pair (amazon, ebay, shopify, walmart, etc.)
- New source_marketplace + missing_from_marketplace params for find_missing_listings
- New marketplace_filters JSON param for find_crosslist_candidates
- Optional country_code filter (replaces hardcoded US-only)
- Full backwards compatibility with legacy amazon/ebay params
Changes in v2.5:
- Replaced simple delay-based rate limiter with sliding window rate limiter
- Tracks actual request timestamps in a deque, enforces max 110 requests per 60 seconds
- Leaves headroom for urllib3 retries (which bypass the pre-request check)
- Logs when rate limit sleep occurs for visibility
Changes in v2.4:
- Optimized bulk product fetching using limit/skip pattern for fewer API calls
- Added retry logic with exponential backoff for resilience
- Reduced rate limiting for bulk operations
- Better error handling and progress tracking
"""
import json
import sys
import logging
import os
import time
from collections import deque
from typing import Any, Dict, List, Optional, Sequence
from datetime import datetime
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger(__name__)
class SellerChampAPI:
"""SellerChamp API client with rate limiting, retries, and comprehensive endpoint support."""
def __init__(self, api_token: str):
self.api_token = api_token
self.base_url = "https://app.sellerchamp.com/api"
self._marketplace_cache = None
self._marketplace_cache_time = 0
self._cache_ttl = 300 # 5 minute cache for marketplace accounts
# Sliding window rate limiter: max 110 requests per 60 seconds
# (Using 110 instead of 120 to leave headroom and account for retries)
self._max_requests_per_window = 110
self._window_seconds = 60
self._request_timestamps = deque()
# Create session with retry logic
# Note: retries count against our rate limit since _rate_limit()
# is called before the request, but urllib3 retries happen
# transparently. We use a conservative limit (110) to absorb retries.
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "PUT", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def _rate_limit(self, bulk_mode: bool = False):
"""Enforce rate limiting using a sliding window to stay under 120 requests/minute.
Tracks timestamps of recent requests. If we've hit the limit, sleeps until
the oldest request in the window expires. Uses 110 max to leave headroom
for retries (which bypass this check).
"""
now = time.time()
window_start = now - self._window_seconds
# Purge timestamps older than the window
while self._request_timestamps and self._request_timestamps[0] <= window_start:
self._request_timestamps.popleft()
# If we're at the limit, wait until the oldest request falls outside the window
if len(self._request_timestamps) >= self._max_requests_per_window:
oldest = self._request_timestamps[0]
sleep_time = oldest + self._window_seconds - now + 0.1 # +0.1s buffer
if sleep_time > 0:
logger.info(f"Rate limit: {len(self._request_timestamps)} requests in window, sleeping {sleep_time:.1f}s")
time.sleep(sleep_time)
# Purge again after sleeping
now = time.time()
window_start = now - self._window_seconds
while self._request_timestamps and self._request_timestamps[0] <= window_start:
self._request_timestamps.popleft()
# Record this request
self._request_timestamps.append(time.time())
def _make_request(self, method: str, endpoint: str, data: dict = None, params: dict = None,
timeout: int = 30, bulk_mode: bool = False) -> dict:
"""Make API request with rate limiting, retries, and error handling."""
self._rate_limit(bulk_mode=bulk_mode)
headers = {
'Token': self.api_token,
'Content-Type': 'application/json'
}
url = f"{self.base_url}/{endpoint}"
try:
if method == 'GET':
response = self.session.get(url, headers=headers, params=params, timeout=timeout)
elif method == 'PUT':
response = self.session.put(url, headers=headers, json=data, timeout=timeout)
elif method == 'POST':
response = self.session.post(url, headers=headers, json=data, timeout=timeout)
elif method == 'DELETE':
response = self.session.delete(url, headers=headers, timeout=timeout)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
# Handle empty responses
if response.text:
return response.json()
return {"success": True}
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP {response.status_code}: {response.text[:500] if response.text else 'No response body'}"
logger.error(f"API request failed: {error_msg}")
raise Exception(error_msg)
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {str(e)}")
raise
def _get_products_bulk(self, limit: int = 50, skip: int = 0) -> dict:
"""
Fetch products using limit/skip pattern for bulk operations.
This pattern returns more data per request (typically 50 items).
"""
return self._make_request(
'GET',
f'products?limit={limit}&skip={skip}',
timeout=60,
bulk_mode=True
)
# ==================== MARKETPLACE ACCOUNTS ====================
def get_marketplace_accounts(self, force_refresh: bool = False) -> dict:
"""Get connected marketplace accounts (Amazon, eBay, Shopify, etc.)."""
now = time.time()
if not force_refresh and self._marketplace_cache and (now - self._marketplace_cache_time) < self._cache_ttl:
return self._marketplace_cache
result = self._make_request('GET', 'marketplace_accounts')
self._marketplace_cache = result
self._marketplace_cache_time = now
return result
def get_marketplace_by_name(self, marketplace_name: str) -> Optional[dict]:
"""Get a specific marketplace account by name (case-insensitive partial match)."""
accounts = self.get_marketplace_accounts()
marketplace_name_lower = marketplace_name.lower()
for account in accounts.get('marketplace_accounts', []):
if marketplace_name_lower in account.get('name', '').lower():
return account
if marketplace_name_lower in account.get('marketplace', '').lower():
return account
return None
def _resolve_marketplace_accounts(
self,
marketplace_types: List[str],
country_code: Optional[str] = None
) -> Dict[str, set]:
"""Resolve marketplace type names to sets of account IDs.
Args:
marketplace_types: e.g. ["amazon", "ebay", "shopify"]
country_code: Optional country filter (e.g. "US"). None = all active accounts.
Returns:
Dict mapping marketplace type -> set of account IDs.
"""
accounts = self.get_marketplace_accounts()
result = {mp_type.lower(): set() for mp_type in marketplace_types}
for account in accounts.get('marketplace_accounts', []):
if not account.get('active'):
continue
if country_code and account.get('country_code', '').upper() != country_code.upper():
continue
mp_type = account.get('marketplace', '').lower()
if mp_type in result:
result[mp_type].add(account.get('id'))
return result
# ==================== ORDERS ====================
def get_orders(
self,
page: int = 1,
page_size: int = 50,
order_status: str = None,
marketplace_account_id: str = None,
purchased_at_start: str = None,
purchased_at_end: str = None,
created_at_start: str = None,
created_at_end: str = None
) -> dict:
"""Get orders with filtering options."""
params = {
'page': page,
'page_size': page_size
}
if order_status:
params['order_status'] = order_status
if marketplace_account_id:
params['marketplace_account_id'] = marketplace_account_id
if purchased_at_start:
params['purchased_at_start'] = purchased_at_start
if purchased_at_end:
params['purchased_at_end'] = purchased_at_end
if created_at_start:
params['created_at_start'] = created_at_start
if created_at_end:
params['created_at_end'] = created_at_end
return self._make_request('GET', 'orders', params=params)
def get_order_by_id(self, order_id: str) -> dict:
"""Get a specific order by ID."""
return self._make_request('GET', f'orders/{order_id}')
def update_order(self, order_id: str, update_data: dict) -> dict:
"""Update order details."""
return self._make_request('PUT', f'orders/{order_id}', data={'order': update_data})
def acknowledge_order(self, order_id: str) -> dict:
"""Acknowledge/confirm an order."""
return self._make_request('POST', f'orders/{order_id}/acknowledge')
# ==================== PRODUCTS ====================
def get_products(
self,
page: int = 1,
page_size: int = 100,
sku: str = None,
asin: str = None,
upc: str = None,
marketplace_id: str = None,
tag: str = None,
tags_array: List[str] = None,
created_at_start: str = None,
created_at_end: str = None,
updated_at_start: str = None,
updated_at_end: str = None
) -> dict:
"""Get products with comprehensive filtering."""
params = {
'page': page,
'page_size': min(page_size, 100)
}
if sku:
params['sku'] = sku
if asin:
params['asin'] = asin
if upc:
params['upc'] = upc
if marketplace_id:
params['marketplace_id'] = marketplace_id
if tag:
params['tag'] = tag
if tags_array:
params['tags_array[]'] = tags_array
if created_at_start:
params['created_at_start'] = created_at_start
if created_at_end:
params['created_at_end'] = created_at_end
if updated_at_start:
params['updated_at_start'] = updated_at_start
if updated_at_end:
params['updated_at_end'] = updated_at_end
return self._make_request('GET', 'products', params=params)
def get_products_compact(
self,
page: int = 1,
page_size: int = 100,
fields: List[str] = None
) -> dict:
"""Get products with only essential fields for compact response."""
result = self.get_products(page=page, page_size=page_size)
# Default compact fields
if not fields:
fields = [
'id', 'sku', 'title', 'asin', 'upc',
'retail_price', 'min_price', 'max_price', 'cost_price',
'quantity_available', 'marketplace_status',
'marketplace_account_id', 'marketplace_account_name',
'item_condition', 'created_at', 'updated_at'
]
compact_products = []
for product in result.get('products', []):
compact = {}
for field in fields:
if field in product:
compact[field] = product[field]
compact_products.append(compact)
return {
'products': compact_products,
'total_count': len(compact_products),
'page': page,
'page_size': page_size
}
def find_products_for_crosslist(
self,
min_quantity: int = 1,
marketplace_filters: Dict[str, str] = None,
amazon_status: str = None,
ebay_status: str = None,
max_results: int = 500,
country_code: str = None
) -> dict:
"""
Find products matching cross-listing criteria across any connected marketplace.
Aggregates by SKU, filters by per-marketplace status.
Args:
min_quantity: Minimum quantity on ANY marketplace
marketplace_filters: Dict of {marketplace_type: status_filter}.
Status can be 'active', 'inactive', or 'missing'.
Example: {"amazon": "active", "shopify": "missing"}
amazon_status: LEGACY - 'active', 'inactive', or None
ebay_status: LEGACY - 'active', 'inactive', 'missing', or None
max_results: Max SKUs to return
country_code: Optional country filter (e.g. 'US'). None = all active accounts.
"""
start_time = time.time()
# Build effective filters from legacy + new params
effective_filters = {}
if amazon_status:
effective_filters['amazon'] = amazon_status.lower()
if ebay_status:
effective_filters['ebay'] = ebay_status.lower()
if marketplace_filters:
for mp_type, status in marketplace_filters.items():
effective_filters[mp_type.lower()] = status.lower()
# Determine which marketplace types to track
if effective_filters:
marketplace_types = list(effective_filters.keys())
else:
# No filters: discover all active marketplace types
accounts = self.get_marketplace_accounts()
marketplace_types = list(set(
a.get('marketplace', '').lower()
for a in accounts.get('marketplace_accounts', [])
if a.get('active') and (
not country_code or a.get('country_code', '').upper() == country_code.upper()
)
))
# Resolve marketplace types to account IDs
mp_accounts = self._resolve_marketplace_accounts(marketplace_types, country_code=country_code)
# Build reverse lookup: account_id -> marketplace_type
account_to_mp_type = {}
for mp_type, account_ids in mp_accounts.items():
for acc_id in account_ids:
account_to_mp_type[acc_id] = mp_type
logger.info(f"Cross-list scan: tracking {marketplace_types}, {len(account_to_mp_type)} accounts")
# Aggregate products by SKU
sku_data = {}
page = 1
page_size = 100
max_pages = 200 # Up to 20K products
total_fetched = 0
empty_pages = 0
while page <= max_pages and empty_pages < 2:
try:
result = self.get_products(page=page, page_size=page_size)
products = result.get('products', [])
if not products:
empty_pages += 1
page += 1
continue
empty_pages = 0
total_fetched += len(products)
for product in products:
sku = product.get('sku')
if not sku:
continue
mp_account_id = product.get('marketplace_account_id')
mp_type = account_to_mp_type.get(mp_account_id)
if mp_type is None:
continue
mp_status = (product.get('marketplace_status') or '').lower()
qty = product.get('quantity_available', 0) or 0
condition = product.get('item_condition', 'unknown')
if sku not in sku_data:
sku_data[sku] = {
'sku': sku,
'title': (product.get('title') or '')[:80],
'asin': product.get('asin'),
'upc': product.get('upc'),
'marketplaces': {}
}
mp_info = {
'id': product.get('id'),
'status': mp_status,
'quantity': qty,
'condition': condition,
'retail_price': product.get('retail_price'),
'min_price': product.get('min_price'),
'max_price': product.get('max_price'),
'cost_price': product.get('cost_price')
}
existing = sku_data[sku]['marketplaces'].get(mp_type)
if existing is None or qty > (existing.get('quantity') or 0):
sku_data[sku]['marketplaces'][mp_type] = mp_info
if page % 10 == 0:
logger.info(f"Cross-list page {page}: {total_fetched} products, {len(sku_data)} SKUs")
page += 1
except Exception as e:
logger.warning(f"Error fetching page {page}: {e}")
page += 1
empty_pages += 1
elapsed = time.time() - start_time
# Filter based on criteria
matching_products = []
for sku, data in sku_data.items():
marketplaces = data.get('marketplaces', {})
# Max quantity across all tracked marketplaces
qty = max(
(mp_info.get('quantity', 0) or 0 for mp_info in marketplaces.values()),
default=0
)
if qty < min_quantity:
continue
# Apply marketplace filters
skip = False
for mp_type, required_status in effective_filters.items():
mp_info = marketplaces.get(mp_type)
if required_status == 'missing':
if mp_info is not None:
skip = True
break
elif required_status == 'active':
if not mp_info or mp_info.get('status') != 'active':
skip = True
break
elif required_status == 'inactive':
if not mp_info or mp_info.get('status') == 'active':
skip = True
break
if skip:
continue
# Build result with dynamic per-marketplace columns
result_item = {
'sku': sku,
'title': data.get('title'),
'asin': data.get('asin'),
'quantity': qty,
}
for mp_type in marketplace_types:
mp_info = marketplaces.get(mp_type)
result_item[f'{mp_type}_status'] = mp_info.get('status') if mp_info else 'missing'
result_item[f'{mp_type}_price'] = mp_info.get('retail_price') if mp_info else None
result_item[f'{mp_type}_id'] = mp_info.get('id') if mp_info else None
# Condition and cost from first available marketplace
for mp_info in marketplaces.values():
if mp_info:
result_item.setdefault('condition', mp_info.get('condition'))
result_item.setdefault('cost_price', mp_info.get('cost_price'))
matching_products.append(result_item)
if len(matching_products) >= max_results:
break
return {
'products': matching_products,
'total_found': len(matching_products),
'total_skus_scanned': len(sku_data),
'total_products_fetched': total_fetched,
'scan_time_seconds': round(elapsed, 1),
'marketplaces_tracked': marketplace_types,
'filters_applied': {
'min_quantity': min_quantity,
'marketplace_filters': effective_filters,
'country_code': country_code
}
}
def find_missing_listings(
self,
source_marketplace: str = None,
missing_from_marketplace: str = None,
missing_from: str = None,
min_quantity: int = 0,
country_code: str = None
) -> dict:
"""
Find SKUs that exist on one marketplace but are missing from another.
Works with any marketplace pair (amazon, ebay, shopify, walmart, etc.).
Args:
source_marketplace: Marketplace where products exist (e.g. 'amazon', 'shopify')
missing_from_marketplace: Marketplace to check for missing listings (e.g. 'ebay')
missing_from: LEGACY param - alias for missing_from_marketplace ('ebay' or 'amazon')
min_quantity: Minimum quantity to include (0 = include all)
country_code: Optional country filter (e.g. 'US'). None = all active accounts.
Returns compact list: sku, title, condition, quantity, status, price
"""
# Backwards compatibility: legacy missing_from param
if missing_from and not missing_from_marketplace:
missing_from_marketplace = missing_from
if not source_marketplace:
source_marketplace = 'amazon' if missing_from == 'ebay' else 'ebay'
if not source_marketplace or not missing_from_marketplace:
raise ValueError(
"Both source_marketplace and missing_from_marketplace are required. "
"Example: source_marketplace='amazon', missing_from_marketplace='ebay'"
)
source_mp = source_marketplace.lower()
target_mp = missing_from_marketplace.lower()
start_time = time.time()
# Resolve marketplace accounts
mp_accounts = self._resolve_marketplace_accounts(
[source_mp, target_mp], country_code=country_code
)
source_account_ids = mp_accounts[source_mp]
target_account_ids = mp_accounts[target_mp]
if not source_account_ids:
raise ValueError(
f"No active accounts found for marketplace '{source_mp}'"
+ (f" with country_code='{country_code}'" if country_code else "")
)
logger.info(f"Missing listings scan: {source_mp}({len(source_account_ids)} accounts) -> {target_mp}({len(target_account_ids)} accounts)")
# Aggregate all products by SKU
sku_data = {}
page = 1
page_size = 100
max_pages = 200
total_fetched = 0
empty_pages = 0
while page <= max_pages and empty_pages < 2:
try:
result = self.get_products(page=page, page_size=page_size)
products = result.get('products', [])
if not products:
empty_pages += 1
page += 1
continue
empty_pages = 0
total_fetched += len(products)
for product in products:
sku = product.get('sku')
if not sku:
continue
mp_account_id = product.get('marketplace_account_id')
is_source = mp_account_id in source_account_ids
is_target = mp_account_id in target_account_ids
if not is_source and not is_target:
continue
mp_status = (product.get('marketplace_status') or '').lower()
qty = product.get('quantity_available', 0) or 0
if sku not in sku_data:
sku_data[sku] = {
'sku': sku,
'title': (product.get('title') or '')[:100],
'condition': product.get('item_condition'),
'asin': product.get('asin'),
'on_source': False,
'on_target': False,
'source_status': None,
'target_status': None,
'source_qty': 0,
'target_qty': 0,
'source_price': None,
'target_price': None
}
if is_source:
sku_data[sku]['on_source'] = True
sku_data[sku]['source_status'] = mp_status
sku_data[sku]['source_qty'] = max(sku_data[sku]['source_qty'], qty)
if product.get('retail_price'):
sku_data[sku]['source_price'] = product.get('retail_price')
elif is_target:
sku_data[sku]['on_target'] = True
sku_data[sku]['target_status'] = mp_status
sku_data[sku]['target_qty'] = max(sku_data[sku]['target_qty'], qty)
if product.get('retail_price'):
sku_data[sku]['target_price'] = product.get('retail_price')
if page % 10 == 0:
logger.info(f"Page {page}: {total_fetched} products, {len(sku_data)} unique SKUs")
page += 1
except Exception as e:
logger.warning(f"Error fetching page {page}: {e}")
page += 1
empty_pages += 1
elapsed = time.time() - start_time
logger.info(f"Catalog scan complete: {total_fetched} products, {len(sku_data)} SKUs in {elapsed:.1f}s")
# Filter: on source but NOT on target
missing = []
for sku, data in sku_data.items():
if not data['on_source'] or data['on_target']:
continue
qty = data['source_qty']
if qty < min_quantity:
continue
missing.append({
'sku': data['sku'],
'title': data['title'],
'condition': data['condition'],
'quantity': qty,
'status': data['source_status'],
'price': data['source_price'],
'asin': data['asin']
})
return {
'source_marketplace': source_mp,
'missing_from_marketplace': target_mp,
'missing_from': target_mp,
'country_code_filter': country_code,
'products': missing,
'total_found': len(missing),
'total_skus_scanned': len(sku_data),
'total_products_fetched': total_fetched,
'min_quantity_filter': min_quantity,
'scan_time_seconds': round(elapsed, 1)
}
def get_product_by_id(self, product_id: str) -> dict:
"""Get a single product by ID with variants and images."""
return self._make_request('GET', f'products/{product_id}')
def get_product_by_sku(self, sku: str) -> dict:
"""Get a single product by SKU."""
return self._make_request('GET', f'products/sku/{sku}')
def update_product(self, product_id: str, update_data: dict) -> dict:
"""Update a single product."""
return self._make_request('PUT', f'products/{product_id}', data={'product': update_data})
def bulk_update_products(self, products: List[dict]) -> dict:
"""Bulk update up to 1000 products."""
if len(products) > 1000:
raise ValueError("Maximum 1000 products per bulk update")
return self._make_request('PUT', 'products/bulk_update', data={'products': products})
def delete_product(self, product_id: str, end_marketplace_listings: bool = False) -> dict:
"""Delete a product."""
params = {}
if end_marketplace_listings:
params['end_marketplace_listings'] = 'true'
return self._make_request('DELETE', f'products/{product_id}')
# ==================== VARIANTS ====================
def get_variants(self, product_id: str) -> dict:
"""Get all variants for a product."""
return self._make_request('GET', f'products/{product_id}/variants')
def get_variant(self, variant_id: str) -> dict:
"""Get a specific variant."""
return self._make_request('GET', f'variants/{variant_id}')
def update_variant(self, variant_id: str, update_data: dict) -> dict:
"""Update a variant's properties."""
return self._make_request('PUT', f'variants/{variant_id}', data={'variant': update_data})
def bulk_update_variants(self, variants: List[dict]) -> dict:
"""Bulk update variants."""
return self._make_request('PUT', 'variants/bulk_update', data={'variants': variants})
# ==================== INVENTORY LOCATIONS ====================
def get_inventory_locations(self, product_id: str) -> dict:
"""Get inventory locations for a product."""
return self._make_request('GET', f'products/{product_id}/inventory_locations')
def update_inventory_location(self, product_id: str, location_id: str, update_data: dict) -> dict:
"""Update inventory at a specific location."""
return self._make_request('PUT', f'products/{product_id}/inventory_locations/{location_id}',
data={'inventory_location': update_data})
def bulk_update_inventory_locations(self, locations: List[dict]) -> dict:
"""Bulk update up to 1000 inventory locations."""
if len(locations) > 1000:
raise ValueError("Maximum 1000 locations per bulk update")
return self._make_request('PUT', 'inventory_locations/bulk_update', data={'inventory_locations': locations})
# ==================== MANIFESTS (LISTING COLLECTIONS) ====================
def get_manifests(self, marketplace_account_id: str = None, manifest_folder_id: str = None) -> dict:
"""Get manifests (product listing collections)."""
params = {}
if marketplace_account_id:
params['marketplace_account_id'] = marketplace_account_id
if manifest_folder_id:
params['manifest_folder_id'] = manifest_folder_id
return self._make_request('GET', 'manifests', params=params)
def create_manifest(self, name: str, marketplace_account_id: str, product_listings: List[dict] = None) -> dict:
"""Create a new manifest."""
data = {
'manifest': {
'name': name,
'marketplace_account_id': marketplace_account_id
}
}
if product_listings:
data['manifest']['product_listings'] = product_listings
return self._make_request('POST', 'manifests', data=data)
# ==================== MCP TOOL DEFINITIONS ====================
def get_tools() -> List[dict]:
"""Return comprehensive list of available MCP tools."""
return [
# ===== MARKETPLACE ACCOUNTS =====
{
"name": "sellerchamp_get_marketplace_accounts",
"description": "Get all connected marketplace accounts (Amazon, eBay, Shopify, etc.). Returns account IDs needed for filtering orders/products by marketplace.",
"inputSchema": {
"type": "object",
"properties": {
"force_refresh": {
"type": "boolean",
"description": "Force refresh of cached marketplace data",
"default": False
}
}
}
},
# ===== ORDERS =====
{
"name": "sellerchamp_get_orders",
"description": "Get orders with comprehensive filtering. Returns order details including title, purchase date, ship date, marketplace, pricing, and buyer info.",
"inputSchema": {
"type": "object",
"properties": {
"page": {
"type": "integer",
"description": "Page number (default 1)",
"default": 1
},
"page_size": {
"type": "integer",
"description": "Orders per page (default 50)",
"default": 50
},
"order_status": {
"type": "string",
"description": "Filter by status: 'open', 'shipped', 'cancelled', 'pending', 'unshipped'",
"enum": ["open", "shipped", "cancelled", "pending", "unshipped"]
},
"marketplace_account_id": {
"type": "string",
"description": "Filter by marketplace account ID (get from sellerchamp_get_marketplace_accounts)"
},
"purchased_at_start": {
"type": "string",
"description": "Filter orders purchased after this date (ISO format: YYYY-MM-DD)"
},
"purchased_at_end": {
"type": "string",
"description": "Filter orders purchased before this date (ISO format: YYYY-MM-DD)"
},
"created_at_start": {
"type": "string",
"description": "Filter orders created after this date (ISO format: YYYY-MM-DD)"
},
"created_at_end": {
"type": "string",
"description": "Filter orders created before this date (ISO format: YYYY-MM-DD)"
}
}
}
},
{
"name": "sellerchamp_get_order",
"description": "Get a specific order by ID with full details including items, shipping address, and tracking.",
"inputSchema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to retrieve"
}
},
"required": ["order_id"]
}
},
{
"name": "sellerchamp_update_order",
"description": "Update order details such as tracking number, carrier, notes.",
"inputSchema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to update"
},
"tracking_number": {
"type": "string",
"description": "Shipment tracking number"
},
"carrier_code": {
"type": "string",
"description": "Shipping carrier code (e.g., 'usps', 'ups', 'fedex')"
},
"notes": {
"type": "string",
"description": "Internal notes for the order"
}
},
"required": ["order_id"]
}
},
{
"name": "sellerchamp_acknowledge_order",
"description": "Acknowledge/confirm an order has been received and is being processed.",
"inputSchema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to acknowledge"
}
},
"required": ["order_id"]
}
},
# ===== PRODUCTS =====
{
"name": "sellerchamp_find_missing_listings",
"description": "Find SKUs listed on one marketplace but missing from another. Works with any marketplace pair (amazon, ebay, shopify, walmart, etc.). Scans entire catalog. Returns: sku, title, condition, quantity, status, price.",
"inputSchema": {
"type": "object",
"properties": {
"source_marketplace": {
"type": "string",
"description": "Marketplace where products exist (e.g. 'amazon', 'ebay', 'shopify', 'walmart'). If omitted with missing_from, defaults to the other of amazon/ebay for backwards compatibility."
},
"missing_from_marketplace": {
"type": "string",
"description": "Marketplace to check for missing listings (e.g. 'ebay', 'shopify'). Finds items on source_marketplace that are NOT on this marketplace."
},
"missing_from": {
"type": "string",
"description": "LEGACY: alias for missing_from_marketplace. Prefer source_marketplace + missing_from_marketplace.",
"enum": ["ebay", "amazon"]
},
"min_quantity": {
"type": "integer",
"description": "Only include items with at least this quantity (0 = all)",
"default": 0
},
"country_code": {
"type": "string",
"description": "Filter accounts by country (e.g. 'US', 'CA', 'UK'). Omit for all active accounts."
}
}
}
},
{
"name": "sellerchamp_get_products",
"description": "Get products with filtering. WARNING: Returns large responses. Use sellerchamp_get_products_compact or sellerchamp_find_missing_listings for better performance.",
"inputSchema": {
"type": "object",
"properties": {
"page": {
"type": "integer",
"description": "Page number (default 1)",
"default": 1
},
"page_size": {
"type": "integer",
"description": "Products per page (max 100, recommend 25 for large catalogs)",
"default": 25
},
"sku": {
"type": "string",
"description": "Filter by exact SKU"
},
"asin": {
"type": "string",
"description": "Filter by ASIN"
},
"upc": {
"type": "string",
"description": "Filter by UPC"
},
"marketplace_id": {
"type": "string",
"description": "Filter by marketplace ID"
},
"tag": {
"type": "string",
"description": "Filter by single tag"
}
}
}
},
{
"name": "sellerchamp_get_products_compact",
"description": "Get products with ONLY essential fields (sku, title, price, quantity, status). Much smaller response than sellerchamp_get_products. Use this for listing/browsing products.",
"inputSchema": {
"type": "object",
"properties": {
"page": {
"type": "integer",
"description": "Page number (default 1)",
"default": 1
},
"page_size": {
"type": "integer",
"description": "Products per page (max 100)",
"default": 50
}
}
}
},
{
"name": "sellerchamp_find_crosslist_candidates",
"description": "Find products matching cross-listing criteria across any marketplace. Scans entire catalog, aggregates by SKU, filters by per-marketplace status. Supports amazon, ebay, shopify, walmart, and any connected marketplace.",
"inputSchema": {
"type": "object",
"properties": {
"min_quantity": {
"type": "integer",
"description": "Minimum quantity available on any marketplace (default 1)",
"default": 1
},
"marketplace_filters": {
"type": "string",
"description": "JSON string of marketplace status filters. Keys are marketplace types, values are 'active', 'inactive', or 'missing'. Example: '{\"amazon\": \"active\", \"ebay\": \"missing\"}' finds items active on Amazon but not on eBay. Example: '{\"shopify\": \"active\", \"walmart\": \"missing\"}' for Shopify-to-Walmart analysis."
},
"amazon_status": {
"type": "string",
"description": "LEGACY: Filter by Amazon status. Prefer marketplace_filters.",
"enum": ["active", "inactive"]
},
"ebay_status": {
"type": "string",
"description": "LEGACY: Filter by eBay status. Prefer marketplace_filters.",
"enum": ["active", "inactive", "missing"]
},
"max_results": {
"type": "integer",
"description": "Maximum products to return (default 500)",
"default": 500
},
"country_code": {
"type": "string",
"description": "Filter accounts by country (e.g. 'US', 'CA'). Omit for all active accounts."
}
}
}
},
{
"name": "sellerchamp_get_product",
"description": "Get a single product by SKU or product ID with full details including variants, images, and inventory locations.",
"inputSchema": {
"type": "object",
"properties": {
"sku": {
"type": "string",
"description": "Product SKU"
},
"product_id": {
"type": "string",
"description": "Product ID (alternative to SKU)"
}
}
}
},
{
"name": "sellerchamp_update_product",
"description": "Update a product's pricing, quantity, or other fields. Supports min/max price, cost, retail price, quantity, and marketplace-specific settings.",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "Product ID to update (required)"
},
"retail_price": {
"type": "number",
"description": "Primary selling price"
},
"min_price": {
"type": "number",
"description": "Minimum allowable selling price"
},
"max_price": {
"type": "number",
"description": "Maximum allowable selling price"
},
"cost_price": {
"type": "number",
"description": "Product cost"
},
"msrp_price": {
"type": "number",
"description": "Manufacturer's suggested retail price"
},
"map_price": {
"type": "number",
"description": "Minimum Advertised Price"
},
"quantity_available": {
"type": "integer",
"description": "Available inventory quantity"
},
"title": {
"type": "string",
"description": "Product title"
},
"description": {
"type": "string",
"description": "Product description"
},
"item_condition": {
"type": "string",
"description": "Item condition",
"enum": ["new", "like_new", "very_good", "good", "acceptable", "refurbished", "salvage"]
},
"handling_cost": {
"type": "number",
"description": "Handling cost"
},
"shipping_cost": {
"type": "number",
"description": "Shipping cost"
}
},
"required": ["product_id"]
}
},
{
"name": "sellerchamp_set_product_pricing",
"description": "Convenience tool to set min/max/retail price for a product by SKU. Looks up product ID automatically.",
"inputSchema": {
"type": "object",
"properties": {
"sku": {
"type": "string",
"description": "Product SKU"
},
"retail_price": {
"type": "number",
"description": "Primary selling price"
},
"min_price": {
"type": "number",
"description": "Minimum allowable price"
},
"max_price": {
"type": "number",
"description": "Maximum allowable price"
},
"cost_price": {
"type": "number",
"description": "Product cost"
}
},
"required": ["sku"]
}
},
{
"name": "sellerchamp_set_product_quantity",
"description": "Set the available quantity for a product by SKU.",
"inputSchema": {
"type": "object",
"properties": {
"sku": {
"type": "string",
"description": "Product SKU"
},
"quantity": {
"type": "integer",
"description": "New quantity available"
}
},
"required": ["sku", "quantity"]
}
},
{
"name": "sellerchamp_bulk_update_products",
"description": "Bulk update up to 1000 products at once. Each product needs id, sku, or alt_sku plus fields to update.",
"inputSchema": {
"type": "object",
"properties": {
"products": {
"type": "array",
"description": "Array of product updates. Each should have 'id', 'sku', or 'alt_sku' plus update fields.",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"sku": {"type": "string"},
"alt_sku": {"type": "string"},
"retail_price": {"type": "number"},
"min_price": {"type": "number"},
"max_price": {"type": "number"},
"cost_price": {"type": "number"},
"quantity_available": {"type": "integer"}
}
}
}
},
"required": ["products"]
}
},
# ===== VARIANTS =====
{
"name": "sellerchamp_get_variants",
"description": "Get all variants for a product (sizes, colors, etc.).",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "Product ID to get variants for"
}
},
"required": ["product_id"]
}
},
{
"name": "sellerchamp_update_variant",
"description": "Update a specific variant's pricing or inventory.",
"inputSchema": {
"type": "object",
"properties": {
"variant_id": {
"type": "string",
"description": "Variant ID to update"
},
"retail_price": {"type": "number"},
"min_price": {"type": "number"},
"max_price": {"type": "number"},
"cost_price": {"type": "number"},
"quantity_available": {"type": "integer"},
"sku": {"type": "string"}
},
"required": ["variant_id"]
}
},
# ===== INVENTORY LOCATIONS =====
{
"name": "sellerchamp_get_inventory_locations",
"description": "Get inventory locations (warehouse bins) for a product.",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "Product ID"
}
},
"required": ["product_id"]
}
},
{
"name": "sellerchamp_bulk_update_inventory",
"description": "Bulk update inventory quantities across locations (up to 1000 at once).",
"inputSchema": {
"type": "object",
"properties": {
"locations": {
"type": "array",
"description": "Array of inventory location updates",
"items": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Location ID"},
"quantity": {"type": "integer", "description": "New quantity"}
},
"required": ["id", "quantity"]
}
}
},
"required": ["locations"]
}
},
# ===== MANIFESTS =====
{
"name": "sellerchamp_get_manifests",
"description": "Get manifests (product listing collections) optionally filtered by marketplace.",
"inputSchema": {
"type": "object",
"properties": {
"marketplace_account_id": {
"type": "string",
"description": "Filter by marketplace account ID"
}
}
}
}
]
def handle_tool_call(api: SellerChampAPI, name: str, arguments: dict) -> Sequence[Any]:
"""Execute a tool call and return results."""
try:
result = None
# ===== MARKETPLACE ACCOUNTS =====
if name == "sellerchamp_get_marketplace_accounts":
result = api.get_marketplace_accounts(
force_refresh=arguments.get('force_refresh', False)
)
# ===== ORDERS =====
elif name == "sellerchamp_get_orders":
result = api.get_orders(
page=arguments.get('page', 1),
page_size=arguments.get('page_size', 50),
order_status=arguments.get('order_status'),
marketplace_account_id=arguments.get('marketplace_account_id'),
purchased_at_start=arguments.get('purchased_at_start'),
purchased_at_end=arguments.get('purchased_at_end'),
created_at_start=arguments.get('created_at_start'),
created_at_end=arguments.get('created_at_end')
)
elif name == "sellerchamp_get_order":
result = api.get_order_by_id(arguments['order_id'])
elif name == "sellerchamp_update_order":
update_data = {}
for key in ['tracking_number', 'carrier_code', 'notes']:
if key in arguments:
update_data[key] = arguments[key]
result = api.update_order(arguments['order_id'], update_data)
elif name == "sellerchamp_acknowledge_order":
result = api.acknowledge_order(arguments['order_id'])
# ===== PRODUCTS =====
elif name == "sellerchamp_find_missing_listings":
result = api.find_missing_listings(
source_marketplace=arguments.get('source_marketplace'),
missing_from_marketplace=arguments.get('missing_from_marketplace'),
missing_from=arguments.get('missing_from'),
min_quantity=arguments.get('min_quantity', 0),
country_code=arguments.get('country_code')
)
elif name == "sellerchamp_get_products":
result = api.get_products(
page=arguments.get('page', 1),
page_size=arguments.get('page_size', 25), # Smaller default
sku=arguments.get('sku'),
asin=arguments.get('asin'),
upc=arguments.get('upc'),
marketplace_id=arguments.get('marketplace_id'),
tag=arguments.get('tag'),
tags_array=arguments.get('tags_array'),
created_at_start=arguments.get('created_at_start'),
created_at_end=arguments.get('created_at_end'),
updated_at_start=arguments.get('updated_at_start'),
updated_at_end=arguments.get('updated_at_end')
)
elif name == "sellerchamp_get_products_compact":
result = api.get_products_compact(
page=arguments.get('page', 1),
page_size=arguments.get('page_size', 50)
)
elif name == "sellerchamp_find_crosslist_candidates":
# Parse marketplace_filters from JSON string if provided
mp_filters = None
mp_filters_raw = arguments.get('marketplace_filters')
if mp_filters_raw:
if isinstance(mp_filters_raw, str):
try:
mp_filters = json.loads(mp_filters_raw)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in marketplace_filters: {e}")
elif isinstance(mp_filters_raw, dict):
mp_filters = mp_filters_raw
result = api.find_products_for_crosslist(
min_quantity=arguments.get('min_quantity', 1),
marketplace_filters=mp_filters,
amazon_status=arguments.get('amazon_status'),
ebay_status=arguments.get('ebay_status'),
max_results=arguments.get('max_results', 500),
country_code=arguments.get('country_code')
)
elif name == "sellerchamp_get_product":
if 'sku' in arguments and arguments['sku']:
result = api.get_product_by_sku(arguments['sku'])
elif 'product_id' in arguments and arguments['product_id']:
result = api.get_product_by_id(arguments['product_id'])
else:
raise ValueError("Either 'sku' or 'product_id' must be provided")
elif name == "sellerchamp_update_product":
update_data = {}
update_fields = [
'retail_price', 'min_price', 'max_price', 'cost_price',
'msrp_price', 'map_price', 'quantity_available', 'title',
'description', 'item_condition', 'handling_cost', 'shipping_cost'
]
for field in update_fields:
if field in arguments and arguments[field] is not None:
update_data[field] = arguments[field]
result = api.update_product(arguments['product_id'], update_data)
elif name == "sellerchamp_set_product_pricing":
# Look up product by SKU first
product = api.get_product_by_sku(arguments['sku'])
product_id = product.get('product', {}).get('id') or product.get('id')
if not product_id:
raise ValueError(f"Product not found for SKU: {arguments['sku']}")
update_data = {}
for field in ['retail_price', 'min_price', 'max_price', 'cost_price']:
if field in arguments and arguments[field] is not None:
update_data[field] = arguments[field]
result = api.update_product(product_id, update_data)
elif name == "sellerchamp_set_product_quantity":
# Look up product by SKU first
product = api.get_product_by_sku(arguments['sku'])
product_id = product.get('product', {}).get('id') or product.get('id')
if not product_id:
raise ValueError(f"Product not found for SKU: {arguments['sku']}")
result = api.update_product(product_id, {'quantity_available': arguments['quantity']})
elif name == "sellerchamp_bulk_update_products":
result = api.bulk_update_products(arguments['products'])
# ===== VARIANTS =====
elif name == "sellerchamp_get_variants":
result = api.get_variants(arguments['product_id'])
elif name == "sellerchamp_update_variant":
update_data = {}
for field in ['retail_price', 'min_price', 'max_price', 'cost_price', 'quantity_available', 'sku']:
if field in arguments and arguments[field] is not None:
update_data[field] = arguments[field]
result = api.update_variant(arguments['variant_id'], update_data)
# ===== INVENTORY LOCATIONS =====
elif name == "sellerchamp_get_inventory_locations":
result = api.get_inventory_locations(arguments['product_id'])
elif name == "sellerchamp_bulk_update_inventory":
result = api.bulk_update_inventory_locations(arguments['locations'])
# ===== MANIFESTS =====
elif name == "sellerchamp_get_manifests":
result = api.get_manifests(
marketplace_account_id=arguments.get('marketplace_account_id')
)
else:
raise ValueError(f"Unknown tool: {name}")
return [{
"type": "text",
"text": json.dumps(result, indent=2, default=str)
}]
except Exception as e:
logger.error(f"Tool execution failed: {str(e)}")
return [{
"type": "text",
"text": f"Error: {str(e)}"
}]
def main():
"""Main MCP server loop - JSON-RPC 2.0 over stdin/stdout."""
api_token = os.getenv('SELLERCHAMP_API_TOKEN', '')
if not api_token:
logger.error("SELLERCHAMP_API_TOKEN environment variable not set")
sys.exit(1)
api = SellerChampAPI(api_token)
logger.info("SellerChamp MCP Server (Enhanced v2.6) starting...")
for line in sys.stdin:
try:
request = json.loads(line)
method = request.get('method')
params = request.get('params', {})
request_id = request.get('id')
if method == 'initialize':
response = {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "sellerchamp-mcp",
"version": "2.6.0"
}
}
elif method == 'notifications/initialized':
# Notification - no response needed
continue
elif method == 'tools/list':
response = {"tools": get_tools()}
elif method == 'tools/call':
tool_name = params.get('name')
arguments = params.get('arguments', {})
response = {
"content": handle_tool_call(api, tool_name, arguments)
}
else:
response = {"error": f"Unknown method: {method}"}
result = {
"jsonrpc": "2.0",
"id": request_id,
"result": response
}
print(json.dumps(result), flush=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {str(e)}")
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
error_response = {
"jsonrpc": "2.0",
"id": request.get('id') if 'request' in dir() else None,
"error": {
"code": -32603,
"message": str(e)
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
main()