Skip to main content
Glama

E-commerce Local MCP Server

data_mapper.py19.6 kB
""" Data transformation mapper between MySQL and MongoDB. Handles data type conversion and field mapping. """ import uuid import logging from typing import Dict, List, Any, Optional from datetime import datetime from decimal import Decimal from .mysql_connector import TableInfo from ..models.mongodb_models import Product, Customer, Order, OrderItem, Inventory logger = logging.getLogger(__name__) class DataMapper: """ Transforms MySQL data to MongoDB format with intelligent mapping. Handles data type conversion and field normalization. """ def __init__(self): self.field_mappings = { # Common field name mappings from MySQL to MongoDB 'id': '_id', 'user_id': 'customer_id', 'userid': 'customer_id', 'username': 'name', 'first_name': 'name', # Will be combined with last_name 'email_address': 'email', 'phone_number': 'phone', 'created': 'created_at', 'modified': 'updated_at', 'date_created': 'created_at', 'date_modified': 'updated_at', 'price': 'price', 'cost': 'price', 'amount': 'total_amount', 'qty': 'quantity', 'stock': 'quantity' } self.type_conversions = { 'DECIMAL': self._convert_decimal, 'DATETIME': self._convert_datetime, 'TIMESTAMP': self._convert_datetime, 'DATE': self._convert_datetime, 'TIME': self._convert_time, 'TINYINT(1)': self._convert_boolean, 'JSON': self._convert_json, 'TEXT': self._convert_text, 'LONGTEXT': self._convert_text } async def transform_table_data( self, table_name: str, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Transform MySQL table data to MongoDB format.""" logger.info(f"Transforming {len(mysql_data)} records from table: {table_name}") # Try to map to specific MongoDB models if available if self._is_product_table(table_name, table_info): return await self._transform_to_products(mysql_data, table_info) elif self._is_customer_table(table_name, table_info): return await self._transform_to_customers(mysql_data, table_info) elif self._is_order_table(table_name, table_info): return await self._transform_to_orders(mysql_data, table_info) elif self._is_inventory_table(table_name, table_info): return await self._transform_to_inventory(mysql_data, table_info) else: # Generic transformation return await self._transform_generic(mysql_data, table_info) def _is_product_table(self, table_name: str, table_info: TableInfo) -> bool: """Check if table represents products.""" table_lower = table_name.lower() product_indicators = ['product', 'item', 'goods', 'catalog'] if any(indicator in table_lower for indicator in product_indicators): return True # Check for product-like columns columns = [col.lower() for col in table_info.columns.keys()] product_columns = ['name', 'price', 'sku', 'category'] return len(set(product_columns) & set(columns)) >= 2 def _is_customer_table(self, table_name: str, table_info: TableInfo) -> bool: """Check if table represents customers/users.""" table_lower = table_name.lower() customer_indicators = ['customer', 'user', 'client', 'member', 'account'] if any(indicator in table_lower for indicator in customer_indicators): return True columns = [col.lower() for col in table_info.columns.keys()] return 'email' in columns and ('name' in columns or 'first_name' in columns) def _is_order_table(self, table_name: str, table_info: TableInfo) -> bool: """Check if table represents orders.""" table_lower = table_name.lower() order_indicators = ['order', 'purchase', 'transaction', 'sale'] if any(indicator in table_lower for indicator in order_indicators): return True columns = [col.lower() for col in table_info.columns.keys()] order_columns = ['customer_id', 'total', 'amount', 'status'] return len(set(order_columns) & set(columns)) >= 2 def _is_inventory_table(self, table_name: str, table_info: TableInfo) -> bool: """Check if table represents inventory.""" table_lower = table_name.lower() inventory_indicators = ['inventory', 'stock', 'warehouse'] if any(indicator in table_lower for indicator in inventory_indicators): return True columns = [col.lower() for col in table_info.columns.keys()] inventory_columns = ['product_id', 'quantity', 'stock'] return len(set(inventory_columns) & set(columns)) >= 2 async def _transform_to_products( self, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Transform data to Product model format.""" products = [] for record in mysql_data: try: # Map common fields product_data = { '_id': self._ensure_string_id(record.get('id') or record.get('product_id')), 'name': record.get('name') or record.get('product_name') or record.get('title', 'Unknown Product'), 'category': record.get('category') or record.get('category_name') or 'Uncategorized', 'price': self._convert_to_float(record.get('price') or record.get('cost', 0)), 'description': record.get('description') or record.get('details'), 'sku': record.get('sku') or record.get('product_code') or f"SKU-{uuid.uuid4().hex[:8].upper()}", 'brand': record.get('brand') or record.get('manufacturer'), 'weight': self._convert_to_float(record.get('weight')), 'dimensions': record.get('dimensions') or record.get('size'), 'created_at': self._convert_datetime(record.get('created_at') or record.get('date_created')), 'updated_at': self._convert_datetime(record.get('updated_at') or record.get('date_modified')) } # Remove None values product_data = {k: v for k, v in product_data.items() if v is not None} # Ensure required fields if not product_data.get('created_at'): product_data['created_at'] = datetime.utcnow() if not product_data.get('updated_at'): product_data['updated_at'] = datetime.utcnow() products.append(product_data) except Exception as e: logger.error(f"Error transforming product record: {e}") logger.debug(f"Record: {record}") return products async def _transform_to_customers( self, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Transform data to Customer model format.""" customers = [] for record in mysql_data: try: # Handle name field (might be split into first_name, last_name) name = record.get('name') or record.get('full_name') if not name: first_name = record.get('first_name', '') last_name = record.get('last_name', '') name = f"{first_name} {last_name}".strip() or 'Unknown' customer_data = { '_id': self._ensure_string_id(record.get('id') or record.get('customer_id') or record.get('user_id')), 'customer_id': record.get('customer_id') or f"CUST-{uuid.uuid4().hex[:8].upper()}", 'name': name, 'email': record.get('email') or record.get('email_address'), 'phone': record.get('phone') or record.get('phone_number'), 'address': record.get('address') or record.get('street_address'), 'city': record.get('city'), 'country': record.get('country') or record.get('country_code'), 'total_orders': int(record.get('total_orders', 0)), 'total_spent': self._convert_to_float(record.get('total_spent', 0.0)), 'loyalty_tier': record.get('loyalty_tier') or record.get('tier', 'Bronze'), 'created_at': self._convert_datetime(record.get('created_at') or record.get('date_created')), 'last_purchase_date': self._convert_datetime(record.get('last_purchase_date')) } # Remove None values customer_data = {k: v for k, v in customer_data.items() if v is not None} # Ensure required fields if not customer_data.get('created_at'): customer_data['created_at'] = datetime.utcnow() customers.append(customer_data) except Exception as e: logger.error(f"Error transforming customer record: {e}") logger.debug(f"Record: {record}") return customers async def _transform_to_orders( self, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Transform data to Order model format.""" orders = [] for record in mysql_data: try: order_data = { '_id': self._ensure_string_id(record.get('id') or record.get('order_id')), 'order_id': record.get('order_id') or f"ORD-{uuid.uuid4().hex[:8].upper()}", 'customer_id': self._ensure_string_id(record.get('customer_id') or record.get('user_id')), 'customer_name': record.get('customer_name') or record.get('user_name') or 'Unknown', 'customer_email': record.get('customer_email') or record.get('email', 'unknown@example.com'), 'order_date': self._convert_datetime(record.get('order_date') or record.get('created_at')), 'status': record.get('status') or record.get('order_status', 'pending'), 'total_amount': self._convert_to_float(record.get('total_amount') or record.get('total', 0)), 'shipping_address': record.get('shipping_address') or record.get('address'), 'payment_method': record.get('payment_method') or record.get('payment_type'), 'discount_applied': self._convert_to_float(record.get('discount_applied', 0.0)), 'shipping_cost': self._convert_to_float(record.get('shipping_cost', 0.0)), 'notes': record.get('notes') or record.get('comments'), 'fulfilled_date': self._convert_datetime(record.get('fulfilled_date')), 'items': [] # Will be populated separately if available } # Remove None values order_data = {k: v for k, v in order_data.items() if v is not None} # Ensure required fields if not order_data.get('order_date'): order_data['order_date'] = datetime.utcnow() orders.append(order_data) except Exception as e: logger.error(f"Error transforming order record: {e}") logger.debug(f"Record: {record}") return orders async def _transform_to_inventory( self, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Transform data to Inventory model format.""" inventory = [] for record in mysql_data: try: inventory_data = { '_id': self._ensure_string_id(record.get('id')), 'product_id': self._ensure_string_id(record.get('product_id')), 'product_name': record.get('product_name') or record.get('name', 'Unknown Product'), 'product_sku': record.get('product_sku') or record.get('sku', 'UNKNOWN'), 'quantity': int(record.get('quantity') or record.get('stock', 0)), 'reserved_quantity': int(record.get('reserved_quantity', 0)), 'available_quantity': int(record.get('available_quantity') or record.get('available', 0)), 'reorder_level': int(record.get('reorder_level', 10)), 'max_stock_level': int(record.get('max_stock_level', 1000)), 'low_stock_threshold': int(record.get('low_stock_threshold', 5)), 'last_updated': self._convert_datetime(record.get('last_updated') or record.get('updated_at')), 'supplier': record.get('supplier') or record.get('vendor'), 'cost_per_unit': self._convert_to_float(record.get('cost_per_unit') or record.get('unit_cost')) } # Calculate available quantity if not provided if 'available_quantity' not in record: total_qty = inventory_data.get('quantity', 0) reserved_qty = inventory_data.get('reserved_quantity', 0) inventory_data['available_quantity'] = max(0, total_qty - reserved_qty) # Remove None values inventory_data = {k: v for k, v in inventory_data.items() if v is not None} # Ensure required fields if not inventory_data.get('last_updated'): inventory_data['last_updated'] = datetime.utcnow() inventory.append(inventory_data) except Exception as e: logger.error(f"Error transforming inventory record: {e}") logger.debug(f"Record: {record}") return inventory async def _transform_generic( self, mysql_data: List[Dict[str, Any]], table_info: TableInfo ) -> List[Dict[str, Any]]: """Generic transformation for unknown table types.""" transformed = [] for record in mysql_data: try: transformed_record = {} for column_name, value in record.items(): # Map field names mongodb_field = self.field_mappings.get(column_name.lower(), column_name) # Convert data types column_type = table_info.columns.get(column_name, 'VARCHAR') converted_value = self._convert_value(value, column_type) transformed_record[mongodb_field] = converted_value # Ensure _id field if '_id' not in transformed_record and 'id' in record: transformed_record['_id'] = self._ensure_string_id(record['id']) elif '_id' not in transformed_record: transformed_record['_id'] = str(uuid.uuid4()) # Add timestamps if not present if 'created_at' not in transformed_record: transformed_record['created_at'] = datetime.utcnow() if 'updated_at' not in transformed_record: transformed_record['updated_at'] = datetime.utcnow() transformed.append(transformed_record) except Exception as e: logger.error(f"Error in generic transformation: {e}") logger.debug(f"Record: {record}") return transformed def _convert_value(self, value: Any, mysql_type: str) -> Any: """Convert a value based on MySQL column type.""" if value is None: return None # Get base type (remove size specifications) base_type = mysql_type.split('(')[0].upper() converter = self.type_conversions.get(mysql_type) or self.type_conversions.get(base_type) if converter: return converter(value) return value def _convert_decimal(self, value) -> float: """Convert Decimal to float.""" if isinstance(value, Decimal): return float(value) return self._convert_to_float(value) def _convert_datetime(self, value) -> Optional[datetime]: """Convert various datetime formats to datetime object.""" if value is None: return None if isinstance(value, datetime): return value if isinstance(value, str): # Try common datetime formats formats = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y/%m/%d %H:%M:%S', '%Y/%m/%d', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y' ] for fmt in formats: try: return datetime.strptime(value, fmt) except ValueError: continue return None def _convert_time(self, value) -> Optional[str]: """Convert time to string format.""" if value is None: return None return str(value) def _convert_boolean(self, value) -> bool: """Convert various boolean representations.""" if isinstance(value, bool): return value if isinstance(value, int): return bool(value) if isinstance(value, str): return value.lower() in ('1', 'true', 'yes', 'on') return bool(value) def _convert_json(self, value) -> Any: """Convert JSON string to object.""" if isinstance(value, str): try: import json return json.loads(value) except json.JSONDecodeError: return value return value def _convert_text(self, value) -> Optional[str]: """Convert text fields to string.""" if value is None: return None return str(value) def _convert_to_float(self, value) -> Optional[float]: """Safely convert value to float.""" if value is None: return None try: if isinstance(value, Decimal): return float(value) return float(value) except (ValueError, TypeError): return None def _ensure_string_id(self, value) -> str: """Ensure ID is a string.""" if value is None: return str(uuid.uuid4()) return str(value)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnisurRahman06046/mcptestwithmodel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server