Skip to main content
Glama

VTION E-Commerce MCP Server

by adityac7
vtion_ecom_mcp.py20.8 kB
#!/usr/bin/env python3 """ VTION E-Commerce Analytics MCP Server Provides access to e-commerce analytics data including: - Product catalog and inventory - Customer behavior and demographics - Order history and transactions - Shopping cart analytics - User engagement metrics All queries automatically apply: - Weighting (users represent N others in their demographic cell) - Aggregation limits for performance """ import os import json import asyncio from typing import Optional, List, Dict, Any, Literal from enum import Enum from contextlib import asynccontextmanager import asyncpg from pydantic import BaseModel, Field, ConfigDict from mcp.server.fastmcp import FastMCP, Context # Constants CHARACTER_LIMIT = 25000 RAW_DATA_LIMIT = 5 # Max rows for non-aggregated queries AGGREGATED_LIMIT = 1000 # Max rows for GROUP BY queries class ResponseFormat(str, Enum): """Output format options.""" MARKDOWN = "markdown" JSON = "json" # Dataset registry - populated from environment variables DATASETS: Dict[int, Dict[str, Any]] = {} def load_datasets_from_env(): """Load dataset configurations from environment variables. Expected format: DATASET_1_NAME=vtion_ecom DATASET_1_DESC=VTION E-commerce platform data DATASET_1_CONNECTION=postgresql://user:pass@host:port/db DATASET_1_DICTIONARY={"products": "Product catalog", "orders": "Order history"} """ dataset_id = 1 while True: name_key = f"DATASET_{dataset_id}_NAME" if name_key not in os.environ: break DATASETS[dataset_id] = { "id": dataset_id, "name": os.environ[name_key], "description": os.environ.get(f"DATASET_{dataset_id}_DESC", ""), "connection": os.environ[f"DATASET_{dataset_id}_CONNECTION"], "dictionary": json.loads(os.environ.get(f"DATASET_{dataset_id}_DICTIONARY", "{}")) } dataset_id += 1 @asynccontextmanager async def app_lifespan(): """Manage database connection pools.""" load_datasets_from_env() # Create connection pools for each dataset pools = {} for ds_id, ds_info in DATASETS.items(): pools[ds_id] = await asyncpg.create_pool( ds_info["connection"], min_size=2, max_size=10, command_timeout=60 ) yield {"pools": pools} # Cleanup for pool in pools.values(): await pool.close() # Initialize FastMCP server mcp = FastMCP("vtion_ecom_mcp", lifespan=app_lifespan) # ============================================================================ # HELPER FUNCTIONS # ============================================================================ async def get_pool(ctx: Context, dataset_id: int) -> asyncpg.Pool: """Get connection pool for a dataset.""" pools = ctx.request_context.lifespan_state["pools"] if dataset_id not in pools: raise ValueError(f"Dataset {dataset_id} not found. Use list_available_datasets to see available datasets.") return pools[dataset_id] def has_group_by(query: str) -> bool: """Check if query contains GROUP BY clause.""" return 'group by' in query.lower() def format_markdown_table(rows: List[Dict], columns: List[str]) -> str: """Format query results as markdown table.""" if not rows: return "No results found." # Build header header = "| " + " | ".join(columns) + " |" separator = "| " + " | ".join(["---"] * len(columns)) + " |" # Build rows lines = [header, separator] for row in rows: values = [str(row.get(col, "")) for col in columns] lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) def truncate_response(response: str, metadata: str = "") -> str: """Truncate response if it exceeds character limit.""" if len(response) <= CHARACTER_LIMIT: return response # Calculate how much space for content truncation_msg = f"\n\n⚠️ **Response truncated** (exceeded {CHARACTER_LIMIT:,} character limit). Use more specific filters or reduce limit parameter.\n{metadata}" available_chars = CHARACTER_LIMIT - len(truncation_msg) truncated = response[:available_chars] + "..." return truncated + truncation_msg # ============================================================================ # CONTEXT LOADING TOOLS # ============================================================================ class GetContextInput(BaseModel): """Input for progressive context loading.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') level: int = Field( default=0, description="Context level: 0=global rules, 1=dataset list, 2=schema for dataset, 3=full details with samples", ge=0, le=3 ) dataset_id: Optional[int] = Field( default=None, description="Dataset ID (required for levels 2-3)" ) @mcp.tool( name="get_context", annotations={ "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_context(params: GetContextInput, ctx: Context) -> str: """Get progressive context about the MCP server and datasets. Context Levels: - Level 0: Global rules (weighting, aggregation limits, output rules) - Level 1: List of all active datasets - Level 2: Detailed schema for specific dataset (requires dataset_id) - Level 3: Full details with samples (requires dataset_id) Args: params (GetContextInput): Context request parameters - level (int): Context level 0-3 - dataset_id (Optional[int]): Required for levels 2-3 Returns: str: Markdown formatted context """ if params.level in [2, 3] and params.dataset_id is None: return "❌ Error: dataset_id required for levels 2-3" # Level 0: Global rules if params.level == 0: return """# VTION E-Commerce Analytics MCP - Global Rules ## Data Overview - **Platform**: VTION E-Commerce Analytics - **Update frequency**: Real-time - **Data format**: Transactional and analytical tables ## Query Rules - **Raw data queries** (no GROUP BY): Limited to 5 rows (for inspection only) - **Aggregated queries** (with GROUP BY): Up to 1,000 rows - **Always include**: WHERE clauses to filter by date ranges, categories, or customer segments - **Performance**: Use indexes on timestamp, customer_id, product_id, order_id ## Response Format - Results returned as markdown tables by default - JSON format available via `response_format` parameter ## Security - Only SELECT queries are allowed - No data modification operations permitted """ # Level 1: Dataset list if params.level == 1: if not DATASETS: return "❌ No datasets configured. Check environment variables." lines = ["# Available Datasets\n"] for ds_id, ds_info in DATASETS.items(): lines.append(f"## Dataset {ds_id}: {ds_info['name']}") lines.append(f"{ds_info['description']}\n") if ds_info['dictionary']: lines.append("**Tables:**") for table, desc in ds_info['dictionary'].items(): lines.append(f"- `{table}`: {desc}") lines.append("") return "\n".join(lines) # Level 2: Schema if params.level == 2: pool = await get_pool(ctx, params.dataset_id) async with pool.acquire() as conn: # Get all tables tables = await conn.fetch(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """) lines = [f"# Dataset {params.dataset_id} Schema\n"] for table_row in tables: table = table_row['table_name'] # Get columns columns = await conn.fetch(""" SELECT column_name, data_type, character_maximum_length, is_nullable FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position """, table) lines.append(f"## Table: `{table}`") if DATASETS[params.dataset_id]['dictionary'].get(table): lines.append(f"*{DATASETS[params.dataset_id]['dictionary'][table]}*\n") lines.append("| Column | Type | Nullable |") lines.append("|--------|------|----------|") for col in columns: col_name = col['column_name'] col_type = col['data_type'] if col['character_maximum_length']: col_type += f"({col['character_maximum_length']})" nullable = "Yes" if col['is_nullable'] == 'YES' else "No" lines.append(f"| `{col_name}` | {col_type} | {nullable} |") lines.append("") return "\n".join(lines) # Level 3: Full details with samples if params.level == 3: schema = await get_context(GetContextInput(level=2, dataset_id=params.dataset_id), ctx) pool = await get_pool(ctx, params.dataset_id) async with pool.acquire() as conn: tables = await conn.fetch(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """) samples = ["\n## Sample Data\n"] for table_row in tables: table = table_row['table_name'] # Get 3 sample rows rows = await conn.fetch(f'SELECT * FROM "{table}" LIMIT 3') if rows: samples.append(f"### `{table}` (3 rows)") cols = list(rows[0].keys()) samples.append(format_markdown_table([dict(r) for r in rows], cols)) samples.append("") return schema + "\n".join(samples) # ============================================================================ # DATASET TOOLS # ============================================================================ @mcp.tool( name="list_available_datasets", annotations={ "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def list_available_datasets() -> str: """List all available datasets in the analytics platform. Returns: Markdown formatted table of datasets with id, name, and description """ if not DATASETS: return "❌ No datasets configured. Administrator needs to set environment variables:\n- DATASET_N_NAME\n- DATASET_N_DESC\n- DATASET_N_CONNECTION\n- DATASET_N_DICTIONARY" lines = ["# Available Datasets\n"] lines.append("| ID | Name | Description |") lines.append("|----|------|-------------|") for ds_id, ds_info in DATASETS.items(): lines.append(f"| {ds_id} | `{ds_info['name']}` | {ds_info['description']} |") lines.append("\n**Next steps:**") lines.append("1. Use `get_context(level=1)` for brief dataset summaries") lines.append("2. Use `get_dataset_schema(dataset_id)` to see table structures") lines.append("3. Use `query_dataset(dataset_id, query)` to run SQL queries") return "\n".join(lines) class GetSchemaInput(BaseModel): """Input for getting dataset schema.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') dataset_id: int = Field( description="ID of the dataset to get schema for" ) @mcp.tool( name="get_dataset_schema", annotations={ "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_dataset_schema(params: GetSchemaInput, ctx: Context) -> str: """Get the schema metadata for a specific dataset in ONE call. Returns the complete schema with table structures, column types, and descriptions in a single markdown response. LLM can immediately use this to write queries. Args: params (GetSchemaInput): Schema request parameters - dataset_id (int): Dataset ID Returns: Markdown formatted schema with ALL tables, columns, types, and descriptions """ return await get_context(GetContextInput(level=2, dataset_id=params.dataset_id), ctx) class QueryDatasetInput(BaseModel): """Input for querying a dataset.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') dataset_id: int = Field( description="ID of the dataset to query" ) query: str = Field( description="SQL SELECT query to execute (only SELECT statements allowed)", min_length=10 ) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' for tables or 'json' for structured data" ) @mcp.tool( name="query_dataset", annotations={ "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def query_dataset(params: QueryDatasetInput, ctx: Context) -> str: """Execute a SQL SELECT query on a specific dataset with automatic optimizations. 🚀 **For multiple queries: Call this tool multiple times in parallel!** Each call executes independently and returns immediately when done. Fast queries won't wait for slow ones. Features: - Only SELECT statements allowed - Raw data queries (no GROUP BY): Limited to 5 rows - Aggregated queries (with GROUP BY): Up to 1000 rows - Parallel execution when called multiple times Args: params (QueryDatasetInput): Query parameters - dataset_id (int): Dataset to query - query (str): SELECT query - response_format (str): 'markdown' or 'json' Returns: Markdown formatted results table with metadata Example - Multiple parallel queries: # These execute in parallel automatically: query_dataset(1, "SELECT category, COUNT(*) FROM products GROUP BY category") query_dataset(1, "SELECT status, COUNT(*) FROM orders GROUP BY status") query_dataset(1, "SELECT * FROM customers LIMIT 10") """ # Security: Only allow SELECT query_upper = params.query.strip().upper() if not query_upper.startswith('SELECT'): return "❌ Error: Only SELECT queries allowed" # Check for dangerous keywords dangerous = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'TRUNCATE', 'ALTER', 'CREATE'] if any(kw in query_upper for kw in dangerous): return f"❌ Error: Query contains forbidden keywords: {', '.join(dangerous)}" try: pool = await get_pool(ctx, params.dataset_id) # Determine if aggregated is_aggregated = has_group_by(params.query) limit = AGGREGATED_LIMIT if is_aggregated else RAW_DATA_LIMIT # Add LIMIT if not present query = params.query if 'LIMIT' not in query_upper: query = f"{query.rstrip(';')} LIMIT {limit}" async with pool.acquire() as conn: rows = await conn.fetch(query) if not rows: return "No results found." # Convert to list of dicts results = [dict(row) for row in rows] columns = list(results[0].keys()) # Build metadata metadata_lines = [ f"**Query executed on dataset {params.dataset_id}**", f"- Rows returned: {len(results)}", f"- Query type: {'Aggregated (GROUP BY)' if is_aggregated else 'Raw data'}", f"- Limit applied: {limit}", ] if not is_aggregated and len(results) >= RAW_DATA_LIMIT: metadata_lines.append(f"- ⚠️ Raw data limited to {RAW_DATA_LIMIT} rows (use GROUP BY for more)") metadata = "\n".join(metadata_lines) # Format response if params.response_format == ResponseFormat.JSON: response = json.dumps({ "metadata": { "dataset_id": params.dataset_id, "rows": len(results), "aggregated": is_aggregated, "limit": limit }, "data": results }, indent=2, default=str) else: # Markdown table response = metadata + "\n\n" + format_markdown_table(results, columns) return truncate_response(response, metadata) except Exception as e: error_msg = str(e) # Provide helpful hints if "relation" in error_msg.lower() and "does not exist" in error_msg.lower(): return f"❌ Error: Table not found. Use `get_dataset_schema({params.dataset_id})` to see available tables.\n\nDetails: {error_msg}" elif "column" in error_msg.lower() and "does not exist" in error_msg.lower(): return f"❌ Error: Column not found. Use `get_dataset_schema({params.dataset_id})` to see available columns.\n\nDetails: {error_msg}" elif "syntax error" in error_msg.lower(): return f"❌ SQL syntax error. Check your query syntax.\n\nDetails: {error_msg}" else: return f"❌ Query error: {error_msg}" class GetSampleInput(BaseModel): """Input for getting sample data.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') dataset_id: int = Field( description="ID of the dataset" ) table_name: str = Field( description="Name of the table to sample from" ) limit: int = Field( default=10, description="Number of sample rows (max 100)", ge=1, le=100 ) @mcp.tool( name="get_dataset_sample", annotations={ "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_dataset_sample(params: GetSampleInput, ctx: Context) -> str: """Get sample data from a specific table in a dataset. Args: params (GetSampleInput): Sample request parameters - dataset_id (int): Dataset ID - table_name (str): Table name - limit (int): Number of rows (max 100) Returns: Markdown formatted sample data table """ try: pool = await get_pool(ctx, params.dataset_id) async with pool.acquire() as conn: # Verify table exists table_check = await conn.fetchval(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_name = $1 AND table_schema = 'public' """, params.table_name) if table_check == 0: available = await conn.fetch(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """) table_list = ", ".join([f"`{t['table_name']}`" for t in available]) return f"❌ Error: Table `{params.table_name}` not found.\n\nAvailable tables: {table_list}" # Get sample data rows = await conn.fetch(f'SELECT * FROM "{params.table_name}" LIMIT {params.limit}') if not rows: return f"Table `{params.table_name}` exists but is empty." results = [dict(row) for row in rows] columns = list(results[0].keys()) header = f"## Sample from `{params.table_name}` ({len(results)} rows)\n" table = format_markdown_table(results, columns) return header + table except Exception as e: return f"❌ Error getting sample: {str(e)}" # ============================================================================ # SERVER ENTRY POINT # ============================================================================ if __name__ == "__main__": # Verify environment setup if not any(key.startswith("DATASET_") for key in os.environ): print("⚠️ WARNING: No dataset environment variables found!") print("\nRequired format:") print(" DATASET_1_NAME=vtion_ecom") print(" DATASET_1_DESC=VTION E-commerce platform data") print(" DATASET_1_CONNECTION=postgresql://user:pass@host:port/db") print(' DATASET_1_DICTIONARY={"products": "Product catalog", "orders": "Order history"}') print("\nServer will start but no datasets will be available.") mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/adityac7/VTION-ECOM'

If you have feedback or need assistance with the MCP directory API, please join our Discord server