Skip to main content
Glama
parquet_mcp_server.py85.1 kB
#!/usr/bin/env python3 """ MCP Server for Parquet File Interactions Provides tools for reading, querying, and modifying parquet files in a repository data directory. """ import json import os import re import sys import uuid from datetime import date, datetime from pathlib import Path from typing import Any, Dict, List, Optional import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, Prompt, PromptArgument, PromptMessage # Optional OpenAI for embeddings try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False OpenAI = None # Optional fuzzy matching try: from difflib import SequenceMatcher FUZZY_AVAILABLE = True except ImportError: FUZZY_AVAILABLE = False # Project root directory # Go up from mcp-servers/parquet/parquet_mcp_server.py to workspace root PROJECT_ROOT = Path(__file__).parent.parent.parent DATA_DIR = PROJECT_ROOT / "data" SCHEMAS_DIR = PROJECT_ROOT / "data" / "schemas" SNAPSHOTS_DIR = PROJECT_ROOT / "data" / "snapshots" LOGS_DIR = PROJECT_ROOT / "data" / "logs" AUDIT_LOG_PATH = LOGS_DIR / "audit_log.parquet" # Backup configuration FULL_SNAPSHOT_ENABLED = os.environ.get("MCP_FULL_SNAPSHOTS", "false").lower() == "true" SNAPSHOT_FREQUENCY = os.environ.get("MCP_SNAPSHOT_FREQUENCY", "weekly") # daily, weekly, monthly, never # Embeddings directory EMBEDDINGS_DIR = DATA_DIR / "embeddings" EMBEDDINGS_DIR.mkdir(parents=True, exist_ok=True) # Ensure directories exist SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) # OpenAI client for embeddings (lazy initialization) _openai_client = None def get_openai_client(): """Get or create OpenAI client for embeddings.""" global _openai_client if not OPENAI_AVAILABLE: return None if _openai_client is None: api_key = os.getenv("OPENAI_API_KEY") if api_key: _openai_client = OpenAI(api_key=api_key) return _openai_client # Initialize MCP server app = Server("parquet") def get_parquet_file_path(data_type: str) -> Path: """Get the parquet file path for a data type.""" return DATA_DIR / data_type / f"{data_type}.parquet" def get_embeddings_file_path(data_type: str) -> Path: """Get the embeddings parquet file path for a data type.""" return EMBEDDINGS_DIR / f"{data_type}_embeddings.parquet" def apply_enhanced_filter(df: pd.DataFrame, column: str, filter_spec: Any) -> pd.DataFrame: """ Apply enhanced filtering with support for multiple operators. Filter spec can be: - Simple value: exact match - List: in list - Dict with operator: {"$contains": "text"}, {"$starts_with": "text"}, etc. Supported operators: - $contains: substring match (case-insensitive) - $starts_with: prefix match (case-insensitive) - $ends_with: suffix match (case-insensitive) - $regex: regex pattern match - $fuzzy: fuzzy string matching (similarity threshold 0-1) - $gt, $gte, $lt, $lte: numeric comparisons - $in: list membership (same as passing list directly) - $ne: not equal """ if column not in df.columns: return df # Simple value or list if not isinstance(filter_spec, dict): if isinstance(filter_spec, list): return df[df[column].isin(filter_spec)] else: return df[df[column] == filter_spec] # Dict with operators mask = pd.Series([True] * len(df)) for op, value in filter_spec.items(): if op == "$contains": mask = mask & df[column].astype(str).str.contains(str(value), case=False, na=False, regex=False) elif op == "$starts_with": mask = mask & df[column].astype(str).str.startswith(str(value), na=False) elif op == "$ends_with": mask = mask & df[column].astype(str).str.endswith(str(value), na=False) elif op == "$regex": try: mask = mask & df[column].astype(str).str.contains(value, case=False, na=False, regex=True) except re.error: # Invalid regex, return empty result return df.iloc[0:0] elif op == "$fuzzy": if not FUZZY_AVAILABLE: # Fallback to contains if fuzzy not available mask = mask & df[column].astype(str).str.contains(str(value), case=False, na=False, regex=False) else: threshold = value.get("threshold", 0.6) if isinstance(value, dict) else 0.6 search_text = value.get("text", value) if isinstance(value, dict) else str(value) search_text_lower = search_text.lower() def fuzzy_match(val): if pd.isna(val): return False val_str = str(val).lower() similarity = SequenceMatcher(None, search_text_lower, val_str).ratio() return similarity >= threshold mask = mask & df[column].apply(fuzzy_match) elif op == "$gt": mask = mask & (df[column] > value) elif op == "$gte": mask = mask & (df[column] >= value) elif op == "$lt": mask = mask & (df[column] < value) elif op == "$lte": mask = mask & (df[column] <= value) elif op == "$in": mask = mask & df[column].isin(value if isinstance(value, list) else [value]) elif op == "$ne": mask = mask & (df[column] != value) else: # Unknown operator, skip continue return df[mask] def get_schema_path(data_type: str) -> Optional[Path]: """Get the schema file path for a data type.""" schema_file = SCHEMAS_DIR / f"{data_type}_schema.json" if schema_file.exists(): return schema_file return None def load_json_schema(data_type: str) -> Optional[Dict[str, Any]]: """Load JSON schema definition for a data type, if available.""" schema_path = get_schema_path(data_type) if not schema_path: return None with open(schema_path, "r", encoding="utf-8") as f: return json.load(f) def get_date_fields(data_type: str) -> List[str]: """Return list of fields declared as date/datetime/timestamp for this data type.""" schema = load_json_schema(data_type) if not schema: return [] result: List[str] = [] for name, type_name in schema.get("schema", {}).items(): if type_name in {"date", "datetime", "timestamp"}: result.append(name) return result def coerce_record_dates(data_type: str, record: Dict[str, Any]) -> Dict[str, Any]: """Coerce date/datetime fields in a single record according to schema.""" date_fields = get_date_fields(data_type) if not date_fields: return record new_record = dict(record) for field in date_fields: if field not in new_record: continue value = new_record[field] if value is None: continue if isinstance(value, (datetime, date)): continue if isinstance(value, str): try: dt = datetime.fromisoformat(value) except ValueError: try: dt = datetime.strptime(value, "%Y-%m-%d") except Exception: # Leave as-is if parsing fails continue # For 'date' fields, downcast to date; for 'timestamp'/'datetime', keep as datetime schema = load_json_schema(data_type) or {} type_name = schema.get("schema", {}).get(field) if type_name == "date": new_record[field] = dt.date() elif type_name in {"datetime", "timestamp"}: new_record[field] = dt else: new_record[field] = dt return new_record def coerce_date_columns(df: pd.DataFrame, data_type: Optional[str] = None) -> pd.DataFrame: """Coerce any *date*/*time* columns according to JSON schema for stable parquet writes.""" if df.empty: return df if data_type: json_schema = load_json_schema(data_type) if json_schema: schema_def = json_schema.get("schema", {}) for col_name, type_name in schema_def.items(): if col_name not in df.columns: continue if type_name == "date": # Convert to datetime.date objects (not pd.Timestamp, not strings) # Use apply for robust conversion that handles all types def to_date_safe(val): if val is None or pd.isna(val): return None if isinstance(val, date) and not isinstance(val, datetime): return val if isinstance(val, datetime): return val.date() if isinstance(val, pd.Timestamp): return val.date() if isinstance(val, str): try: dt = pd.to_datetime(val, errors="coerce") if pd.isna(dt): return None return dt.date() except: return None # Unknown type, try to convert try: dt = pd.to_datetime(val, errors="coerce") if pd.isna(dt): return None return dt.date() except: return None df[col_name] = df[col_name].apply(to_date_safe) elif type_name in {"datetime", "timestamp"}: # Convert to pd.Timestamp with UTC timezone to match schema df[col_name] = pd.to_datetime(df[col_name], errors="coerce", utc=True) return df # Fallback: generic coercion for date/time columns for col in df.columns: name = str(col).lower() if "date" in name or "time" in name: try: df[col] = pd.to_datetime(df[col], errors="coerce") except Exception: continue return df def get_pyarrow_schema_for_write(data_type: str) -> Optional[pa.Schema]: """Get pyarrow schema for writing, if available.""" try: # Try to import from scripts module import sys scripts_path = PROJECT_ROOT / "scripts" if str(scripts_path) not in sys.path: sys.path.insert(0, str(scripts_path)) from parquet_schema_definitions import get_pyarrow_schema return get_pyarrow_schema(data_type) except (ImportError, AttributeError): # Fallback: read existing schema from parquet file file_path = get_parquet_file_path(data_type) if file_path.exists(): try: return pq.read_schema(file_path) except Exception: pass return None def write_parquet_with_schema(df: pd.DataFrame, file_path: Path, data_type: str) -> None: """Write parquet file using explicit schema if available, otherwise fallback to default.""" # For now, write without explicit schema to avoid timestamp conversion issues # PyArrow will preserve existing column types when appending to existing files # TODO: Fix timestamp schema conversion issue try: df.to_parquet(file_path, index=False, engine="pyarrow") except Exception as e: print(f"Error writing parquet file: {e}", file=sys.stderr) raise def create_snapshot(file_path: Path) -> Path: """Create a timestamped snapshot of a parquet file.""" timestamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') filename = file_path.stem snapshot_path = SNAPSHOTS_DIR / f"{filename}-{timestamp}.parquet" if file_path.exists(): df = pd.read_parquet(file_path) # Infer data_type from file path data_type = file_path.parent.name df = coerce_date_columns(df, data_type) # Add missing schema columns before creating snapshot json_schema = load_json_schema(data_type) if json_schema: schema_def = json_schema.get("schema", {}) for key in schema_def.keys(): if key not in df.columns: df[key] = None # Use explicit schema for snapshot too schema = get_pyarrow_schema_for_write(data_type) if schema: table = pa.Table.from_pandas(df, schema=schema, preserve_index=False, safe=False) pq.write_table(table, snapshot_path) else: df.to_parquet(snapshot_path, index=False, engine="pyarrow") return snapshot_path return None def should_create_full_snapshot(data_type: str) -> bool: """Determine if a full snapshot should be created based on configuration.""" if not FULL_SNAPSHOT_ENABLED: return False if SNAPSHOT_FREQUENCY == "never": return False # Check if recent snapshot exists file_path = get_parquet_file_path(data_type) if not file_path.exists(): return False filename = file_path.stem # Find most recent snapshot for this data type snapshots = sorted(SNAPSHOTS_DIR.glob(f"{filename}-*.parquet"), reverse=True) if not snapshots: return True # No snapshot exists, create one most_recent = snapshots[0] # Extract timestamp from filename: {filename}-YYYY-MM-DD-HHMMSS.parquet try: timestamp_str = most_recent.stem.split('-', 1)[1] # Get everything after first dash snapshot_time = datetime.strptime(timestamp_str, '%Y-%m-%d-%H%M%S') now = datetime.now() age = now - snapshot_time if SNAPSHOT_FREQUENCY == "daily": return age.days >= 1 elif SNAPSHOT_FREQUENCY == "weekly": return age.days >= 7 elif SNAPSHOT_FREQUENCY == "monthly": return age.days >= 30 except (ValueError, IndexError): return True # If we can't parse, create snapshot return False def create_audit_entry( operation: str, data_type: str, record_id: str, affected_fields: List[str] = None, old_values: Dict[str, Any] = None, new_values: Dict[str, Any] = None, snapshot_reference: str = None, notes: str = None ) -> Dict[str, Any]: """Create an audit log entry for a data modification.""" audit_entry = { "audit_id": str(uuid.uuid4())[:16], "timestamp": datetime.now(), "operation": operation, "data_type": data_type, "record_id": record_id, "affected_fields": json.dumps(affected_fields) if affected_fields else None, "old_values": json.dumps(old_values, default=str) if old_values else None, "new_values": json.dumps(new_values, default=str) if new_values else None, "user": "mcp_server", "snapshot_reference": snapshot_reference, "notes": notes } # Append to audit log try: if AUDIT_LOG_PATH.exists(): df_audit = pd.read_parquet(AUDIT_LOG_PATH) df_audit = pd.concat([df_audit, pd.DataFrame([audit_entry])], ignore_index=True) else: df_audit = pd.DataFrame([audit_entry]) df_audit = coerce_date_columns(df_audit, "audit_log") # Write audit log without explicit schema to avoid conversion issues df_audit.to_parquet(AUDIT_LOG_PATH, index=False, engine="pyarrow") except Exception as e: # Log error but don't fail the operation print(f"Warning: Failed to create audit entry: {e}", file=sys.stderr) return audit_entry def list_available_data_types() -> List[str]: """List all available data types (directories with parquet files).""" data_types = [] for item in DATA_DIR.iterdir(): if item.is_dir() and not item.name.startswith('_'): parquet_file = item / f"{item.name}.parquet" if parquet_file.exists(): data_types.append(item.name) return sorted(data_types) @app.list_tools() async def list_tools() -> List[Tool]: """List available MCP tools.""" return [ Tool( name="list_data_types", description="List all available data types (parquet files) in the data directory", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="get_schema", description="Get the schema definition for a data type", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, }, "required": ["data_type"], }, ), Tool( name="read_parquet", description="Read and query a parquet file with optional filters. Supports enhanced filtering operators: $contains, $starts_with, $ends_with, $regex, $fuzzy, $gt, $gte, $lt, $lte, $in, $ne", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, "filters": { "type": "object", "description": "Optional filters to apply. Can use enhanced operators: {\"title\": {\"$contains\": \"therapy\"}} or {\"title\": {\"$fuzzy\": {\"text\": \"therapy\", \"threshold\": 0.7}}}", "additionalProperties": True, }, "limit": { "type": "integer", "description": "Maximum number of rows to return (default: 1000)", "default": 1000, }, "columns": { "type": "array", "items": {"type": "string"}, "description": "Optional list of column names to return (default: all columns)", }, }, "required": ["data_type"], }, ), Tool( name="search_parquet", description="Semantic search using embeddings. Searches text fields (title, description, notes, etc.) for semantically similar records.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'tasks', 'contacts', 'transactions')", }, "query": { "type": "string", "description": "Search query text to find semantically similar records", }, "text_fields": { "type": "array", "items": {"type": "string"}, "description": "Optional list of text fields to search (default: auto-detect common text fields like title, description, name, notes)", }, "limit": { "type": "integer", "description": "Maximum number of results to return (default: 10)", "default": 10, }, "min_similarity": { "type": "number", "description": "Minimum cosine similarity threshold (0-1, default: 0.7)", "default": 0.7, }, "additional_filters": { "type": "object", "description": "Optional additional filters to apply (same format as read_parquet filters)", "additionalProperties": True, }, }, "required": ["data_type", "query"], }, ), Tool( name="generate_embeddings", description="Generate and store embeddings for text fields in a data type. Creates or updates embeddings parquet file.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'tasks', 'contacts')", }, "text_fields": { "type": "array", "items": {"type": "string"}, "description": "List of text fields to generate embeddings for (default: auto-detect common text fields)", }, "force_regenerate": { "type": "boolean", "description": "Force regeneration of all embeddings even if they exist (default: false)", "default": False, }, }, "required": ["data_type"], }, ), Tool( name="add_record", description="Add a new record to a parquet file. Creates audit log entry and optionally creates full snapshot based on configuration. MANDATORY: Proactively use this tool to persist any relevant structured data that emerges during conversations—do not wait for explicit user requests. Persist data when user mentions: financial transactions/expenses/income/flows, tasks/projects/outcomes, contacts/companies/relationships, purchases/orders/contracts, events/appointments, health/workout/food/meal info, properties/equipment/locations, investments/holdings/crypto/tax events, beliefs/goals, or any other structured information matching existing data types. Check for existing records first to avoid duplicates.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, "record": { "type": "object", "description": "The record data as a JSON object matching the schema", "additionalProperties": True, }, }, "required": ["data_type", "record"], }, ), Tool( name="update_records", description="Update existing records in a parquet file. Creates audit log entry and optionally creates full snapshot based on configuration.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, "filters": { "type": "object", "description": "Filters to identify records to update (key-value pairs)", "additionalProperties": True, }, "updates": { "type": "object", "description": "Fields to update (key-value pairs)", "additionalProperties": True, }, }, "required": ["data_type", "filters", "updates"], }, ), Tool( name="upsert_record", description="Insert or update a record (upsert). Checks for existing records using enhanced filters (supports $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records where you want to update existing records or create new ones.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'contacts', 'tasks', 'transactions')", }, "filters": { "type": "object", "description": "Enhanced filters to identify existing records (supports all read_parquet filter operators: $contains, $fuzzy, $starts_with, $ends_with, $regex, $gt, $gte, $lt, $lte, $in, $ne, or simple values)", "additionalProperties": True, }, "record": { "type": "object", "description": "The record data to insert or update. If updating, this data will be merged with existing record(s).", "additionalProperties": True, }, }, "required": ["data_type", "filters", "record"], }, ), Tool( name="delete_records", description="Delete records from a parquet file. Creates audit log entry and optionally creates full snapshot based on configuration.", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, "filters": { "type": "object", "description": "Filters to identify records to delete (key-value pairs)", "additionalProperties": True, }, }, "required": ["data_type", "filters"], }, ), Tool( name="get_statistics", description="Get basic statistics about a parquet file", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "The data type name (e.g., 'flows', 'transactions', 'tasks')", }, }, "required": ["data_type"], }, ), Tool( name="read_audit_log", description="Read audit log entries with optional filters", inputSchema={ "type": "object", "properties": { "data_type": { "type": "string", "description": "Filter by data type (optional)", }, "operation": { "type": "string", "description": "Filter by operation: add, update, delete (optional)", }, "record_id": { "type": "string", "description": "Filter by specific record ID (optional)", }, "limit": { "type": "integer", "description": "Maximum number of entries to return (default: 100)", "default": 100, }, }, }, ), Tool( name="rollback_operation", description="Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.", inputSchema={ "type": "object", "properties": { "audit_id": { "type": "string", "description": "The audit ID of the operation to rollback", }, }, "required": ["audit_id"], }, ), ] @app.list_prompts() async def list_prompts() -> List[Prompt]: """List available prompts for agent guidance.""" return [ Prompt( name="data_persistence_guidance", description="Guidance on proactively persisting structured data from conversations", arguments=[ PromptArgument( name="context", description="Brief context about the conversation or task", required=False, ), ], ), ] @app.get_prompt() async def get_prompt(name: str, arguments: Dict[str, Any]) -> List[PromptMessage]: """Get prompt content.""" if name == "data_persistence_guidance": context = arguments.get("context", "") guidance = """# Data Persistence Requirements MANDATORY: Proactively identify and persist any relevant structured data that emerges during conversations. Do not wait for explicit user requests—automatically capture data that would be useful for future reference or querying. ## When to Persist Data Persist data whenever the user mentions: - Financial transactions, expenses, income, or flows - Tasks, projects, or outcomes - Contacts, companies, or relationships - Purchases, orders, contracts, or agreements - Events, appointments, or recurring events - Health, workout, food, or meal information - Properties, equipment, or locations - Investments, holdings, crypto transactions, or tax events - Beliefs, arguments, or goals - Any other structured information matching existing data types ## Process 1. Use `list_data_types` to discover available data types 2. Use `get_schema` to understand the schema for the identified data type 3. Use `read_parquet` to check for existing records (avoid duplicates) 4. Use `upsert_record` to add new entries or update existing ones (recommended for contacts, tasks, and other records where duplicates should be avoided) 5. Alternatively, use `add_record` to add new entries or `update_records` if matching record exists 6. Link related data via appropriate ID fields ## Examples - User mentions subscription → Add to `fixed_costs` - User discusses property maintenance → Add to `flows` - User shares contact details → Update `contacts` - User mentions workout → Add to `workouts` - User discusses task → Create/update `tasks` with outcome/project links Always use MCP tools—never access parquet files directly. Context: {context} """.format(context=context or "General conversation") return [ PromptMessage( role="user", content=TextContent(type="text", text=guidance), ), ] raise ValueError(f"Unknown prompt: {name}") @app.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls.""" if name == "list_data_types": data_types = list_available_data_types() return [TextContent( type="text", text=json.dumps({ "data_types": data_types, "count": len(data_types), }, indent=2) )] elif name == "get_schema": data_type = arguments["data_type"] schema_path = get_schema_path(data_type) if not schema_path: return [TextContent( type="text", text=json.dumps({ "error": f"Schema not found for data type: {data_type}", }, indent=2) )] with open(schema_path, 'r') as f: schema = json.load(f) return [TextContent( type="text", text=json.dumps(schema, indent=2) )] elif name == "read_parquet": data_type = arguments["data_type"] filters = arguments.get("filters", {}) limit = arguments.get("limit", 1000) columns = arguments.get("columns") file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: df = pd.read_parquet(file_path) # Apply enhanced filters for key, value in filters.items(): df = apply_enhanced_filter(df, key, value) # Select columns if specified if columns: missing_cols = [c for c in columns if c not in df.columns] if missing_cols: return [TextContent( type="text", text=json.dumps({ "error": f"Columns not found: {missing_cols}", "available_columns": list(df.columns), }, indent=2) )] df = df[columns] # Apply limit df = df.head(limit) # Convert to JSON-serializable format result = df.to_dict(orient='records') # Convert date/datetime objects to strings for record in result: for key, value in record.items(): if pd.isna(value): record[key] = None elif isinstance(value, (pd.Timestamp, datetime)): record[key] = value.isoformat() elif isinstance(value, date): record[key] = value.isoformat() return [TextContent( type="text", text=json.dumps({ "count": len(result), "total_rows": len(df) if limit >= len(df) else f"{len(df)}+", "data": result, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "add_record": data_type = arguments["data_type"] record = arguments["record"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: # Create full snapshot if configured to do so snapshot_path = None if should_create_full_snapshot(data_type): snapshot_path = create_snapshot(file_path) # Load existing data df = pd.read_parquet(file_path) # Generate ID if needed (check for common ID patterns) id_field = f"{data_type.rstrip('s')}_id" if id_field not in record and id_field in df.columns: record[id_field] = str(uuid.uuid4())[:16] # Set import_date if not provided, or normalize if provided as string if "import_date" not in record and "import_date" in df.columns: record["import_date"] = date.today() elif "import_date" in record and isinstance(record["import_date"], str): try: record["import_date"] = date.fromisoformat(record["import_date"]) except ValueError: # Leave as-is if it cannot be parsed pass # Set import_source_file if not provided if "import_source_file" not in record and "import_source_file" in df.columns: record["import_source_file"] = "mcp_manual_entry" # Set timestamp fields if not provided or None (for tasks and similar) schema = load_json_schema(data_type) if schema: schema_def = schema.get("schema", {}) # Use UTC timezone-aware datetime for timestamps from datetime import timezone now = datetime.now(timezone.utc) for field, type_name in schema_def.items(): if type_name == "timestamp" and field in df.columns: # Set created_at/updated_at to current time if not provided or None if field not in record or record.get(field) is None: record[field] = now # Normalize date fields according to schema record = coerce_record_dates(data_type, record) # Get record ID for audit log record_id = record.get(id_field, "unknown") # Ensure all columns exist in record (set to None for missing ones) for col in df.columns: if col not in record: record[col] = None # Create new row DataFrame new_row = pd.DataFrame([record]) # Concatenate df_new = pd.concat([df, new_row], ignore_index=True) # Coerce date/timestamp columns according to schema df_new = coerce_date_columns(df_new, data_type) # Ensure timestamp columns match existing schema (handle timezone) # Read existing file schema to match format file_path = get_parquet_file_path(data_type) if file_path.exists(): try: existing_schema = pq.read_schema(file_path) for field in existing_schema: col_name = field.name if col_name in df_new.columns and isinstance(field.type, pa.TimestampType): # Ensure timezone-aware UTC timestamps to match schema # Convert to datetime64[ns, UTC] dtype (not object) df_new[col_name] = pd.to_datetime(df_new[col_name], utc=True, errors="coerce") # Ensure dtype is datetime64, not object if df_new[col_name].dtype == 'object': # Force conversion if still object dtype df_new[col_name] = pd.to_datetime(df_new[col_name], utc=True, errors="coerce") except Exception: pass # Save parquet file (without explicit schema to avoid conversion issues) write_parquet_with_schema(df_new, file_path, data_type) # Create audit log entry (errors here won't fail the operation) try: audit_entry = create_audit_entry( operation="add", data_type=data_type, record_id=record_id, affected_fields=list(record.keys()), new_values=record, snapshot_reference=str(snapshot_path) if snapshot_path else None, notes=f"Added new record via MCP" ) except Exception as audit_error: # Don't fail the operation if audit log fails audit_entry = {"audit_id": "failed", "error": str(audit_error)} return [TextContent( type="text", text=json.dumps({ "success": True, "audit_id": audit_entry["audit_id"], "snapshot_created": str(snapshot_path) if snapshot_path else None, "total_records": len(df_new), "added_record": record, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "update_records": data_type = arguments["data_type"] filters = arguments["filters"] updates = arguments["updates"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: # Create full snapshot if configured to do so snapshot_path = None if should_create_full_snapshot(data_type): snapshot_path = create_snapshot(file_path) # Load existing data df = pd.read_parquet(file_path) # Create mask for matching records mask = pd.Series([True] * len(df)) for key, value in filters.items(): if key in df.columns: if isinstance(value, list): mask = mask & df[key].isin(value) else: mask = mask & (df[key] == value) # Count matches match_count = mask.sum() if match_count == 0: return [TextContent( type="text", text=json.dumps({ "success": False, "error": "No records matched the filters", "filters": filters, }, indent=2) )] # Get old values for audit log (for affected records) id_field = f"{data_type.rstrip('s')}_id" affected_records = df[mask] # Create audit entries for each updated record audit_ids = [] for idx, row in affected_records.iterrows(): record_id = row.get(id_field, f"idx_{idx}") old_values = {k: row[k] for k in updates.keys() if k in row.index} audit_entry = create_audit_entry( operation="update", data_type=data_type, record_id=str(record_id), affected_fields=list(updates.keys()), old_values=old_values, new_values=updates, snapshot_reference=str(snapshot_path) if snapshot_path else None, notes=f"Updated record via MCP with filters: {json.dumps(filters)}" ) audit_ids.append(audit_entry["audit_id"]) # Apply updates updates = coerce_record_dates(data_type, updates) # Check schema for missing columns and add them (all schema columns, not just update keys) schema = load_json_schema(data_type) if schema: schema_def = schema.get("schema", {}) # Add all missing schema columns, not just those in updates for key in schema_def.keys(): if key not in df.columns: # Add new column with None values df[key] = None for key, value in updates.items(): if key in df.columns: df.loc[mask, key] = value elif key in schema_def if schema else False: # Column was just added, set values df.loc[mask, key] = value # Update updated_date if column exists if "updated_date" in df.columns: df.loc[mask, "updated_date"] = date.today() # Coerce date columns before saving df = coerce_date_columns(df, data_type) # Save with explicit schema write_parquet_with_schema(df, file_path, data_type) return [TextContent( type="text", text=json.dumps({ "success": True, "audit_ids": audit_ids, "snapshot_created": str(snapshot_path) if snapshot_path else None, "records_updated": int(match_count), "filters": filters, "updates": updates, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "upsert_record": data_type = arguments["data_type"] filters = arguments["filters"] record = arguments["record"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: # Create full snapshot if configured to do so snapshot_path = None if should_create_full_snapshot(data_type): snapshot_path = create_snapshot(file_path) # Load existing data df = pd.read_parquet(file_path) # Apply enhanced filters to find matching records df_filtered = df.copy() for key, value in filters.items(): df_filtered = apply_enhanced_filter(df_filtered, key, value) match_count = len(df_filtered) id_field = f"{data_type.rstrip('s')}_id" if match_count == 0: # No match found - create new record (use add_record logic) # Generate ID if needed if id_field not in record and id_field in df.columns: record[id_field] = str(uuid.uuid4())[:16] # Set import_date if not provided if "import_date" not in record and "import_date" in df.columns: record["import_date"] = date.today() elif "import_date" in record and isinstance(record["import_date"], str): try: record["import_date"] = date.fromisoformat(record["import_date"]) except ValueError: pass # Set import_source_file if not provided if "import_source_file" not in record and "import_source_file" in df.columns: record["import_source_file"] = "mcp_upsert_entry" # Set timestamp fields if needed schema = load_json_schema(data_type) if schema: schema_def = schema.get("schema", {}) from datetime import timezone now = datetime.now(timezone.utc) for field, type_name in schema_def.items(): if type_name == "timestamp" and field in df.columns: if field not in record or record[field] is None: if field in ["created_at", "updated_at"]: record[field] = now # Coerce dates record = coerce_record_dates(data_type, record) # Add missing schema columns if schema: schema_def = schema.get("schema", {}) for key in schema_def.keys(): if key not in record and key in df.columns: record[key] = None # Set created_date/updated_date if columns exist if "created_date" in df.columns and "created_date" not in record: record["created_date"] = date.today() if "updated_date" in df.columns and "updated_date" not in record: record["updated_date"] = date.today() # Append new record df_new = pd.concat([df, pd.DataFrame([record])], ignore_index=True) # Coerce date columns df_new = coerce_date_columns(df_new, data_type) # Save write_parquet_with_schema(df_new, file_path, data_type) # Create audit entry record_id = record.get(id_field, "unknown") audit_entry = create_audit_entry( operation="add", data_type=data_type, record_id=str(record_id), affected_fields=list(record.keys()), new_values=record, snapshot_reference=str(snapshot_path) if snapshot_path else None, notes=f"Created record via upsert (no match found) with filters: {json.dumps(filters)}" ) return [TextContent( type="text", text=json.dumps({ "success": True, "action": "created", "audit_id": audit_entry["audit_id"], "record_id": str(record_id), "snapshot_created": str(snapshot_path) if snapshot_path else None, "filters": filters, "record": record, }, indent=2, default=str) )] else: # Match found - update existing record(s) (use update_records logic) # Merge record data with existing data (record takes precedence) updates = dict(record) # Coerce dates updates = coerce_record_dates(data_type, updates) # Get old values for audit log affected_records = df_filtered audit_ids = [] # Update each matched record for idx, row in affected_records.iterrows(): record_id = row.get(id_field, f"idx_{idx}") # Get old values for fields being updated old_values = {k: row[k] for k in updates.keys() if k in row.index} # Create audit entry audit_entry = create_audit_entry( operation="update", data_type=data_type, record_id=str(record_id), affected_fields=list(updates.keys()), old_values=old_values, new_values=updates, snapshot_reference=str(snapshot_path) if snapshot_path else None, notes=f"Updated record via upsert (match found) with filters: {json.dumps(filters)}" ) audit_ids.append(audit_entry["audit_id"]) # Apply updates to matched records # Create mask for matched records using index intersection matched_indices = set(df_filtered.index) mask = df.index.isin(matched_indices) # Check schema for missing columns schema = load_json_schema(data_type) if schema: schema_def = schema.get("schema", {}) for key in schema_def.keys(): if key not in df.columns: df[key] = None # Apply updates for key, value in updates.items(): if key in df.columns: df.loc[mask, key] = value # Update updated_date if column exists if "updated_date" in df.columns: df.loc[mask, "updated_date"] = date.today() # Coerce date columns before saving df = coerce_date_columns(df, data_type) # Save write_parquet_with_schema(df, file_path, data_type) # Return first record ID for convenience first_record_id = affected_records.iloc[0].get(id_field, "unknown") return [TextContent( type="text", text=json.dumps({ "success": True, "action": "updated", "audit_ids": audit_ids, "records_updated": int(match_count), "record_id": str(first_record_id), "snapshot_created": str(snapshot_path) if snapshot_path else None, "filters": filters, "updates": updates, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "delete_records": data_type = arguments["data_type"] filters = arguments["filters"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: # Create full snapshot if configured to do so snapshot_path = None if should_create_full_snapshot(data_type): snapshot_path = create_snapshot(file_path) # Load existing data df = pd.read_parquet(file_path) # Create mask for matching records mask = pd.Series([True] * len(df)) for key, value in filters.items(): if key in df.columns: if isinstance(value, list): mask = mask & df[key].isin(value) else: mask = mask & (df[key] == value) # Count matches match_count = mask.sum() if match_count == 0: return [TextContent( type="text", text=json.dumps({ "success": False, "error": "No records matched the filters", "filters": filters, }, indent=2) )] # Get old values for audit log (for deleted records) id_field = f"{data_type.rstrip('s')}_id" deleted_records = df[mask] # Create audit entries for each deleted record audit_ids = [] for idx, row in deleted_records.iterrows(): record_id = row.get(id_field, f"idx_{idx}") old_values = row.to_dict() audit_entry = create_audit_entry( operation="delete", data_type=data_type, record_id=str(record_id), affected_fields=list(old_values.keys()), old_values=old_values, snapshot_reference=str(snapshot_path) if snapshot_path else None, notes=f"Deleted record via MCP with filters: {json.dumps(filters)}" ) audit_ids.append(audit_entry["audit_id"]) # Delete records df_new = df[~mask] # Coerce date columns before saving df_new = coerce_date_columns(df_new, data_type) # Save parquet file try: write_parquet_with_schema(df_new, file_path, data_type) except Exception as write_error: # If write fails, provide detailed error error_msg = f"Failed to write parquet file: {write_error}" print(error_msg, file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return [TextContent( type="text", text=json.dumps({ "error": error_msg, "traceback": traceback.format_exc() }, indent=2) )] return [TextContent( type="text", text=json.dumps({ "success": True, "audit_ids": audit_ids, "snapshot_created": str(snapshot_path) if snapshot_path else None, "records_deleted": int(match_count), "remaining_records": len(df_new), "filters": filters, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "get_statistics": data_type = arguments["data_type"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] try: df = pd.read_parquet(file_path) stats = { "data_type": data_type, "file_path": str(file_path), "row_count": len(df), "column_count": len(df.columns), "columns": list(df.columns), "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, "memory_usage_mb": df.memory_usage(deep=True).sum() / 1024 / 1024, } # Add date range info if date columns exist date_columns = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()] for col in date_columns: try: df[col] = pd.to_datetime(df[col], errors='coerce') if df[col].notna().any(): stats[f"{col}_range"] = { "min": str(df[col].min()), "max": str(df[col].max()), } except: pass return [TextContent( type="text", text=json.dumps(stats, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "read_audit_log": filters = {} if "data_type" in arguments: filters["data_type"] = arguments["data_type"] if "operation" in arguments: filters["operation"] = arguments["operation"] if "record_id" in arguments: filters["record_id"] = arguments["record_id"] limit = arguments.get("limit", 100) if not AUDIT_LOG_PATH.exists(): return [TextContent( type="text", text=json.dumps({ "count": 0, "message": "No audit log exists yet", "data": [], }, indent=2) )] try: df = pd.read_parquet(AUDIT_LOG_PATH) # Apply filters for key, value in filters.items(): if key in df.columns: if isinstance(value, list): df = df[df[key].isin(value)] else: df = df[df[key] == value] # Sort by timestamp descending (most recent first) if "timestamp" in df.columns: df = df.sort_values("timestamp", ascending=False) # Apply limit df = df.head(limit) # Convert to JSON-serializable format result = df.to_dict(orient='records') # Convert date/datetime objects to strings for record in result: for key, value in record.items(): if pd.isna(value): record[key] = None elif isinstance(value, (pd.Timestamp, datetime)): record[key] = value.isoformat() elif isinstance(value, date): record[key] = value.isoformat() return [TextContent( type="text", text=json.dumps({ "count": len(result), "filters": filters, "data": result, }, indent=2, default=str) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "rollback_operation": audit_id = arguments["audit_id"] if not AUDIT_LOG_PATH.exists(): return [TextContent( type="text", text=json.dumps({ "error": "No audit log exists", }, indent=2) )] try: # Find the audit entry df_audit = pd.read_parquet(AUDIT_LOG_PATH) audit_entry = df_audit[df_audit["audit_id"] == audit_id] if len(audit_entry) == 0: return [TextContent( type="text", text=json.dumps({ "error": f"Audit entry not found: {audit_id}", }, indent=2) )] entry = audit_entry.iloc[0].to_dict() operation = entry["operation"] data_type = entry["data_type"] record_id = entry["record_id"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] # Perform inverse operation if operation == "add": # Rollback add = delete the record id_field = f"{data_type.rstrip('s')}_id" df = pd.read_parquet(file_path) df_new = df[df[id_field] != record_id] # Save write_parquet_with_schema(df_new, file_path, data_type) # Create audit entry for rollback rollback_audit = create_audit_entry( operation="delete", data_type=data_type, record_id=record_id, notes=f"Rollback of add operation {audit_id}" ) return [TextContent( type="text", text=json.dumps({ "success": True, "rollback_audit_id": rollback_audit["audit_id"], "action": "deleted record (rollback of add)", "record_id": record_id, }, indent=2) )] elif operation == "update": # Rollback update = restore old values old_values = json.loads(entry["old_values"]) if entry["old_values"] else {} if not old_values: return [TextContent( type="text", text=json.dumps({ "error": "Cannot rollback: no old values stored", }, indent=2) )] id_field = f"{data_type.rstrip('s')}_id" df = pd.read_parquet(file_path) mask = df[id_field] == record_id # Apply old values old_values = coerce_record_dates(data_type, old_values) for key, value in old_values.items(): if key in df.columns: df.loc[mask, key] = value # Save df = coerce_date_columns(df, data_type) write_parquet_with_schema(df, file_path, data_type) # Create audit entry for rollback new_values = json.loads(entry["new_values"]) if entry["new_values"] else {} rollback_audit = create_audit_entry( operation="update", data_type=data_type, record_id=record_id, affected_fields=list(old_values.keys()), old_values=new_values, new_values=old_values, notes=f"Rollback of update operation {audit_id}" ) return [TextContent( type="text", text=json.dumps({ "success": True, "rollback_audit_id": rollback_audit["audit_id"], "action": "restored old values (rollback of update)", "record_id": record_id, "restored_values": old_values, }, indent=2, default=str) )] elif operation == "delete": # Rollback delete = restore the record old_values = json.loads(entry["old_values"]) if entry["old_values"] else {} if not old_values: return [TextContent( type="text", text=json.dumps({ "error": "Cannot rollback: no old values stored", }, indent=2) )] df = pd.read_parquet(file_path) # Normalize date fields old_values = coerce_record_dates(data_type, old_values) # Add record back new_row = pd.DataFrame([old_values]) for col in df.columns: if col not in new_row.columns: new_row[col] = None df_new = pd.concat([df, new_row], ignore_index=True) df_new = coerce_date_columns(df_new, data_type) # Save write_parquet_with_schema(df_new, file_path, data_type) # Create audit entry for rollback rollback_audit = create_audit_entry( operation="add", data_type=data_type, record_id=record_id, affected_fields=list(old_values.keys()), new_values=old_values, notes=f"Rollback of delete operation {audit_id}" ) return [TextContent( type="text", text=json.dumps({ "success": True, "rollback_audit_id": rollback_audit["audit_id"], "action": "restored record (rollback of delete)", "record_id": record_id, "restored_record": old_values, }, indent=2, default=str) )] else: return [TextContent( type="text", text=json.dumps({ "error": f"Unknown operation type: {operation}", }, indent=2) )] except Exception as e: return [TextContent( type="text", text=json.dumps({ "error": str(e), }, indent=2) )] elif name == "search_parquet": data_type = arguments["data_type"] query = arguments["query"] text_fields = arguments.get("text_fields") limit = arguments.get("limit", 10) min_similarity = arguments.get("min_similarity", 0.7) additional_filters = arguments.get("additional_filters", {}) file_path = get_parquet_file_path(data_type) embeddings_path = get_embeddings_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] if not embeddings_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Embeddings not found for {data_type}. Run generate_embeddings first.", }, indent=2) )] try: # Load data and embeddings df = pd.read_parquet(file_path) df_embeddings = pd.read_parquet(embeddings_path) # Get ID field id_field = f"{data_type.rstrip('s')}_id" if id_field not in df.columns: id_field = df.index.name or "index" # Apply additional filters first for key, value in additional_filters.items(): df = apply_enhanced_filter(df, key, value) # Generate query embedding client = get_openai_client() if not client: return [TextContent( type="text", text=json.dumps({ "error": "OpenAI API not available. Set OPENAI_API_KEY environment variable.", }, indent=2) )] query_response = client.embeddings.create( model="text-embedding-3-small", input=query ) query_embedding = np.array(query_response.data[0].embedding) # Calculate similarities similarities = [] for idx, row in df.iterrows(): record_id = row.get(id_field, idx) # Find matching embedding row emb_row = df_embeddings[df_embeddings[id_field] == record_id] if emb_row.empty: continue # Get combined embedding (average of all text field embeddings) emb_values = [] for col in df_embeddings.columns: if col.endswith("_embedding") and col in emb_row.columns: emb_val = emb_row[col].iloc[0] if emb_val is not None and not pd.isna(emb_val): # Handle both list and array formats if isinstance(emb_val, (list, np.ndarray)): emb_values.append(np.array(emb_val)) elif isinstance(emb_val, str): # Try to parse JSON string try: emb_values.append(np.array(json.loads(emb_val))) except: continue if not emb_values: continue # Average embeddings combined_embedding = np.mean(emb_values, axis=0) # Calculate cosine similarity similarity = np.dot(query_embedding, combined_embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(combined_embedding) ) if similarity >= min_similarity: similarities.append((idx, similarity, row)) # Sort by similarity descending similarities.sort(key=lambda x: x[1], reverse=True) # Take top results results = [] for idx, similarity, row in similarities[:limit]: record = row.to_dict() record["_similarity"] = float(similarity) results.append(record) # Convert to JSON-serializable format for record in results: for key, value in record.items(): if pd.isna(value): record[key] = None elif isinstance(value, (pd.Timestamp, datetime)): record[key] = value.isoformat() elif isinstance(value, date): record[key] = value.isoformat() elif isinstance(value, np.floating): record[key] = float(value) elif isinstance(value, np.integer): record[key] = int(value) return [TextContent( type="text", text=json.dumps({ "count": len(results), "query": query, "min_similarity": min_similarity, "data": results, }, indent=2, default=str) )] except Exception as e: import traceback return [TextContent( type="text", text=json.dumps({ "error": str(e), "traceback": traceback.format_exc(), }, indent=2) )] elif name == "generate_embeddings": data_type = arguments["data_type"] text_fields = arguments.get("text_fields") force_regenerate = arguments.get("force_regenerate", False) file_path = get_parquet_file_path(data_type) embeddings_path = get_embeddings_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] client = get_openai_client() if not client: return [TextContent( type="text", text=json.dumps({ "error": "OpenAI API not available. Set OPENAI_API_KEY environment variable.", }, indent=2) )] try: df = pd.read_parquet(file_path) # Auto-detect text fields if not provided if not text_fields: text_fields = [] for col in df.columns: col_lower = col.lower() if col_lower in ["title", "description", "name", "notes", "summary", "content", "text"]: text_fields.append(col) elif df[col].dtype == "object" and col not in ["id", f"{data_type.rstrip('s')}_id"]: # Check if column contains mostly strings sample = df[col].dropna().head(100) if len(sample) > 0 and all(isinstance(x, str) for x in sample): text_fields.append(col) if not text_fields: return [TextContent( type="text", text=json.dumps({ "error": "No text fields found. Specify text_fields parameter.", }, indent=2) )] # Get ID field id_field = f"{data_type.rstrip('s')}_id" if id_field not in df.columns: id_field = df.index.name or "index" # Load existing embeddings if they exist existing_embeddings = {} if embeddings_path.exists() and not force_regenerate: df_existing = pd.read_parquet(embeddings_path) for _, row in df_existing.iterrows(): record_id = row[id_field] existing_embeddings[record_id] = {} for col in df_existing.columns: if col.endswith("_embedding") and not pd.isna(row[col]): existing_embeddings[record_id][col] = row[col] # Generate embeddings embeddings_data = [] total_records = len(df) for idx, row in df.iterrows(): record_id = row.get(id_field, idx) record_embeddings = {id_field: record_id} # Copy other fields from original record for col in df.columns: if col not in text_fields and col != id_field: record_embeddings[col] = row[col] # Generate embeddings for each text field for field in text_fields: if field not in df.columns: continue text_value = row[field] if pd.isna(text_value) or not isinstance(text_value, str) or not text_value.strip(): record_embeddings[f"{field}_embedding"] = None continue # Check if we already have this embedding emb_key = f"{field}_embedding" if record_id in existing_embeddings and emb_key in existing_embeddings[record_id]: # Preserve existing embedding format existing_emb = existing_embeddings[record_id][emb_key] if isinstance(existing_emb, (list, np.ndarray)): record_embeddings[emb_key] = list(existing_emb) if isinstance(existing_emb, np.ndarray) else existing_emb else: record_embeddings[emb_key] = existing_emb continue # Generate new embedding try: response = client.embeddings.create( model="text-embedding-3-small", input=text_value ) embedding = response.data[0].embedding # Store as list for parquet compatibility record_embeddings[emb_key] = list(embedding) except Exception as e: record_embeddings[emb_key] = None print(f"Warning: Failed to generate embedding for {field} in record {record_id}: {e}", file=sys.stderr) embeddings_data.append(record_embeddings) # Create embeddings dataframe df_embeddings = pd.DataFrame(embeddings_data) # Save embeddings df_embeddings.to_parquet(embeddings_path, index=False) # Count generated vs reused generated_count = 0 reused_count = 0 for record in embeddings_data: record_id_val = record.get(id_field) for key, value in record.items(): if key.endswith("_embedding") and value is not None: if record_id_val and record_id_val in existing_embeddings and key in existing_embeddings[record_id_val]: reused_count += 1 else: generated_count += 1 return [TextContent( type="text", text=json.dumps({ "success": True, "data_type": data_type, "text_fields": text_fields, "total_records": total_records, "embeddings_generated": generated_count, "embeddings_reused": reused_count, "embeddings_file": str(embeddings_path), }, indent=2, default=str) )] except Exception as e: import traceback return [TextContent( type="text", text=json.dumps({ "error": str(e), "traceback": traceback.format_exc(), }, indent=2) )] elif name == "validate_schema": data_type = arguments["data_type"] file_path = get_parquet_file_path(data_type) if not file_path.exists(): return [TextContent( type="text", text=json.dumps({ "error": f"Parquet file not found: {file_path}", }, indent=2) )] schema = load_json_schema(data_type) if not schema: return [TextContent( type="text", text=json.dumps({ "error": f"JSON schema not found for data type: {data_type}", }, indent=2) )] df = pd.read_parquet(file_path) declared = schema.get("schema", {}) issues: Dict[str, Any] = { "missing_columns": [], "extra_columns": [], "date_parse_errors": {}, } for col in declared.keys(): if col not in df.columns: issues["missing_columns"].append(col) for col in df.columns: if col not in declared.keys(): issues["extra_columns"].append(col) for col, type_name in declared.items(): if type_name in {"date", "datetime"} and col in df.columns: try: _ = pd.to_datetime(df[col], errors="raise") except Exception as e: issues["date_parse_errors"][col] = str(e) return [TextContent( type="text", text=json.dumps({ "data_type": data_type, "file_path": str(file_path), "issues": issues, }, indent=2, default=str) )] else: return [TextContent( type="text", text=json.dumps({ "error": f"Unknown tool: {name}", }, indent=2) )] async def main(): """Run the MCP server.""" async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": import asyncio asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/markmhendrickson/mcp-server-parquet'

If you have feedback or need assistance with the MCP directory API, please join our Discord server