Enables semantic search and embedding generation for parquet data using OpenAI's embedding models to find semantically similar records across text fields.
Parquet MCP Server
MCP server for interacting with parquet files in a repository.
Features
Read/Query: Query parquet files with filters, column selection, and limits
Add Records: Add new records to parquet files with audit trail
Update Records: Update existing records matching filters with audit trail
Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)
Delete Records: Delete records matching filters with audit trail
Audit Log: Complete change history with old/new values for all modifications
Rollback: Undo specific operations using audit IDs
Schema Discovery: Get schema definitions for data types
Statistics: Get basic statistics about parquet files
Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)
Optional Full Snapshots: Configurable periodic snapshots for additional safety
Installation
Configuration
Cursor Configuration
Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):
Development (Audit Log Only):
Production (With Periodic Snapshots):
Claude Desktop Configuration
Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):
Available Tools
list_data_types
List all available data types (parquet files) in the data directory.
get_schema
Get the schema definition for a data type.
Parameters:
data_type(required): The data type name (e.g., 'flows', 'transactions', 'tasks')
read_parquet
Read and query a parquet file with optional filters. Supports enhanced filtering operators.
Parameters:
data_type(required): The data type namefilters(optional): Key-value pairs to filter records. Supports enhanced operators:Simple value: exact match
List: in list (
["value1", "value2"]){"$contains": "text"}: substring match (case-insensitive){"$starts_with": "text"}: prefix match (case-insensitive){"$ends_with": "text"}: suffix match (case-insensitive){"$regex": "pattern"}: regex pattern match{"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity){"$gt": 100},{"$gte": 100},{"$lt": 100},{"$lte": 100}: numeric comparisons{"$ne": "value"}: not equal
limit(optional): Maximum number of rows to return (default: 1000)columns(optional): List of column names to return (default: all columns)
Examples:
add_record
Add a new record to a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namerecord(required): The record data as a JSON object matching the schema
Example:
update_records
Update existing records in a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to updateupdates(required): Fields to update
Example:
upsert_record
Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.
Parameters:
data_type(required): The data type namefilters(required): Enhanced filters to identify existing records (supports allread_parquetfilter operators)record(required): The record data to insert or update
Returns:
action: "created" or "updated"audit_idoraudit_ids: Audit log entry ID(s)record_id: The ID of the created/updated record
Example (exact match):
Example (fuzzy match):
Example (contains match):
delete_records
Delete records from a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to delete
Example:
get_statistics
Get basic statistics about a parquet file.
Parameters:
data_type(required): The data type name
read_audit_log
Read audit log entries with optional filters. View complete history of all data modifications.
Parameters:
data_type(optional): Filter by data typeoperation(optional): Filter by operation (add, update, delete)record_id(optional): Filter by specific record IDlimit(optional): Maximum number of entries to return (default: 100)
Example:
rollback_operation
Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.
Parameters:
audit_id(required): The audit ID of the operation to rollback
Rollback Logic:
addoperation → Delete the recordupdateoperation → Restore old valuesdeleteoperation → Restore the record
Example:
search_parquet
Semantic search using embeddings. Searches text fields for semantically similar records.
Parameters:
data_type(required): The data type namequery(required): Search query texttext_fields(optional): List of text fields to search (default: auto-detect)limit(optional): Maximum number of results (default: 10)min_similarity(optional): Minimum cosine similarity threshold 0-1 (default: 0.7)additional_filters(optional): Additional filters to apply (same format as read_parquet)
Prerequisites:
Must run
generate_embeddingsfirst to create embeddings for the data typeRequires
OPENAI_API_KEYenvironment variable
Example:
generate_embeddings
Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.
Parameters:
data_type(required): The data type nametext_fields(optional): List of text fields to generate embeddings for (default: auto-detect)force_regenerate(optional): Force regeneration of all embeddings (default: false)
Prerequisites:
Requires
OPENAI_API_KEYenvironment variable
Example:
Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.
Backup & Recovery
Audit Log (Default)
All write operations create lightweight audit log entries in data/logs/audit_log.parquet:
Storage: ~1 KB per operation (99%+ reduction vs full snapshots)
Content: Operation type, record ID, affected fields, old/new values, timestamp
Recovery: Rollback specific operations using
rollback_operationtool
Optional Full Snapshots
Configure periodic full snapshots for additional safety:
Environment Variables:
MCP_FULL_SNAPSHOTS: Set to "true" to enable periodic snapshots (default: false)MCP_SNAPSHOT_FREQUENCY: "daily", "weekly", "monthly", "never" (default: weekly)
Snapshot Location:
Storage Comparison
Approach | Storage per Operation | 100 Operations |
Full snapshots (old) | 10 MB | 1 GB |
Audit log (new) | ~1 KB | ~100 KB |
Savings | 99.99% | 99.99% |
Recovery Options
Recent Changes: Use
rollback_operationwith audit IDMultiple Changes: Rollback operations in reverse chronological order
Full Restore: Restore from periodic snapshot (if enabled)
Point-in-Time: Restore snapshot + replay audit log to specific timestamp
See AUDIT_LOG_GUIDE.md for detailed documentation.
Data Types
The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:
flows- Cash flow and expense datatransactions- Transaction datatasks- Task management datacontacts- Contact/merchant informationincome- Income datafixed_costs- Fixed cost dataAnd many more...
Error Handling
The server returns structured error messages in JSON format when operations fail. Common errors include:
File not found errors
Schema validation errors
Column not found errors
Filter matching errors
Testing
After installation/updates, run the test script:
This validates:
Audit log creation
Schema compliance
Operation tracking
See IMPLEMENTATION_SUMMARY.md for manual testing procedures.
Documentation
README.md - This file, overview and quick reference
AUDIT_LOG_GUIDE.md - Complete audit log documentation
IMPLEMENTATION_SUMMARY.md - Implementation details and testing
SETUP.md - Setup and configuration instructions
Notes
The server uses audit log for efficient change tracking (99%+ storage reduction)
All date fields are automatically converted to ISO format strings in responses
Null/NaN values are converted to
nullin JSON responsesThe server runs in stdio mode for MCP communication
Audit log entries are never automatically deleted (manual archival if needed)