Skip to main content
Glama

Parquet MCP Server

MCP server for interacting with parquet files in a repository. Provides comprehensive data management with audit logging, rollback capabilities, and semantic search.

Credits

This is a custom MCP server implementation for parquet file management with audit trail support.

Features

  • Read/Query: Query parquet files with filters, column selection, and limits

  • Add Records: Add new records to parquet files with audit trail

  • Update Records: Update existing records matching filters with audit trail

  • Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)

  • Delete Records: Delete records matching filters with audit trail

  • Schema Management: Create new data types and evolve existing schemas programmatically

  • Audit Log: Complete change history with old/new values for all modifications

  • Rollback: Undo specific operations using audit IDs

  • Schema Discovery: Get schema definitions for data types

  • Statistics: Get basic statistics about parquet files

  • Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)

  • Optional Full Snapshots: Configurable periodic snapshots for additional safety

Installation

cd truth/mcp-servers/parquet pip install -r requirements.txt

Configuration

Data Directory

The server uses the following priority to locate the data directory:

  1. Environment Variable (highest priority):

    export DATA_DIR="/path/to/your/data/directory"
  2. Auto-detection (for backward compatibility):

    • Automatically detects parent repository structure

    • Looks for $DATA_DIR/ directory in common locations

  3. Default (if no structure found):

    • Uses ~/.config/parquet-mcp/data/ as fallback

Note: The MCP server is portable and can be used with any data directory structure.

Cursor Configuration

Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):

Development (Audit Log Only):

{ "mcpServers": { "parquet": { "command": "python3", "args": [ "/path/to/parquet_mcp_server.py" ], "env": { "DATA_DIR": "/path/to/your/data/directory" } } } }

Production (With Periodic Snapshots):

{ "mcpServers": { "parquet": { "command": "python3", "args": [ "/path/to/parquet_mcp_server.py" ], "env": { "DATA_DIR": "/path/to/your/data/directory", "MCP_FULL_SNAPSHOTS": "true", "MCP_SNAPSHOT_FREQUENCY": "weekly" } } } }

Note: Replace /path/to/parquet_mcp_server.py with the actual path to the server file.

Claude Desktop Configuration

Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):

{ "mcpServers": { "parquet": { "command": "python3", "args": [ "/path/to/parquet_mcp_server.py" ], "env": { "DATA_DIR": "/path/to/your/data/directory" } } } }

Note: Replace /path/to/parquet_mcp_server.py with the actual path to the server file.

Available Tools

Tool Selection Guide

For counting records: Use get_statistics - returns row_count efficiently without loading data.

For retrieving data: Use read_parquet - returns actual records with filtering, sorting, and column selection.

  • Do not call unless you need schema information (field names, types, constraints) to complete the task. Data queries via read_parquet return all necessary field information.

For metadata: Use get_schema only when you need to understand the schema structure (required fields, valid values, data types) to build queries, filters, or add/update records. Do not call it for routine data retrieval tasks.

For modifications: Use add_record, update_records, upsert_record, or delete_records with audit logging.


Proactive Memory Storage

Source of Truth: Instructions for proactive memory storage are embedded in the MCP server:

  • Tool descriptions - add_record, update_records, and upsert_record descriptions include requirements

  • Prompt - data_persistence_guidance prompt (via list_prompts/get_prompt) provides detailed guidance

Agents see these instructions when listing/using tools. This README is for human reference only.


list_data_types

List all available data types (parquet files) in the data directory.

get_schema

Get the schema definition for a data type.

Parameters:

  • data_type (required): The data type name (e.g., 'flows', 'transactions', 'tasks')

create_data_type

Create a new data type with schema definition. Creates the schema JSON file, directory structure, and initializes an empty parquet file. Use when introducing a new data type that doesn't exist yet.

Parameters:

  • data_type (required): The data type name (e.g., 'relationships', 'custom_events')

  • schema (required): Schema definition object with field_name: type_name pairs. Supported types: 'string', 'integer', 'int64', 'float64', 'boolean', 'date', 'datetime', 'timestamp'

  • description (required): Description of the data type and its purpose

  • version (optional): Schema version (e.g., '1.0.0', default: '1.0.0')

Example:

{ "data_type": "relationships", "schema": { "relationship_id": "string", "source_contact_id": "string", "target_contact_id": "string", "relationship_type": "string", "start_date": "date", "end_date": "date", "notes": "string", "created_date": "date", "updated_date": "date" }, "description": "Structured relationships between contacts", "version": "1.0.0" }

update_schema

Update an existing data type schema. Supports adding new fields, updating description, and version bumping. Automatically adds missing columns to existing parquet file. Schema evolution only - does not support removing fields or changing field types (breaking changes require manual migration).

Parameters:

  • data_type (required): The data type name (e.g., 'contacts', 'tasks')

  • add_fields (optional): New fields to add: {field_name: type_name}. Types: 'string', 'integer', 'int64', 'float64', 'boolean', 'date', 'datetime', 'timestamp'

  • update_description (optional): Update the schema description

  • version (optional): New schema version (e.g., '1.1.0'). If not provided, minor version is auto-incremented for field additions

  • update_parquet (optional): Whether to update the parquet file to add missing columns (default: true)

Example:

{ "data_type": "contacts", "add_fields": { "middle_name": "string", "suffix": "string" }, "update_description": "Contact information with enhanced name fields", "version": "1.1.0" }

Limitations:

  • Cannot remove fields (breaking change - requires manual migration)

  • Cannot change field types (breaking change - requires manual migration)

  • Only supports additive schema evolution

read_parquet

Read and query a parquet file with optional filters. Supports enhanced filtering operators.

Use when: You need to retrieve actual data records (rows) from the file.

Do not use for: Counting records - use get_statistics instead, which returns row count efficiently.

Parameters:

  • data_type (required): The data type name

  • filters (optional): Key-value pairs to filter records. Supports enhanced operators:

    • Simple value: exact match

    • List: in list (["value1", "value2"])

    • {"$contains": "text"}: substring match (case-insensitive)

    • {"$starts_with": "text"}: prefix match (case-insensitive)

    • {"$ends_with": "text"}: suffix match (case-insensitive)

    • {"$regex": "pattern"}: regex pattern match

    • {"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity)

    • {"$gt": 100}, {"$gte": 100}, {"$lt": 100}, {"$lte": 100}: numeric comparisons

    • {"$ne": "value"}: not equal

  • limit (optional): Maximum number of rows to return (default: 1000)

  • columns (optional): List of column names to return (default: all columns)

  • sort_by (optional): List of sort specifications, each with:

    • column (required): Column name to sort by

    • ascending (optional): Sort direction, true for ascending (default), false for descending

    • na_position (optional): Where to place null/NaN values: "last" (default) or "first"

    • custom_order (optional): Custom order for enum values (e.g., ["critical", "high", "medium", "low"])

Examples:

{ "data_type": "flows", "filters": { "category": "property_maintenance", "year": 2025 }, "limit": 100 }
{ "data_type": "tasks", "filters": { "title": {"$contains": "therapy"}, "status": {"$ne": "completed"} } }
{ "data_type": "tasks", "filters": { "title": {"$fuzzy": {"text": "therapy session", "threshold": 0.7}} } }

Example with sorting:

{ "data_type": "tasks", "filters": { "sync_log": "exported" }, "sort_by": [ { "column": "sync_datetime", "ascending": false, "na_position": "last" } ], "limit": 10 }

add_record

Add a new record to a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name

  • record (required): The record data as a JSON object matching the schema

Example:

{ "data_type": "flows", "record": { "flow_name": "Monthly Rent", "flow_date": "2025-01-15", "amount_usd": 1500.00, "category": "housing", "flow_type": "recurring_expense" } }

update_records

Update existing records in a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name

  • filters (required): Filters to identify records to update

  • updates (required): Fields to update

Example:

{ "data_type": "tasks", "filters": { "task_id": "abc123" }, "updates": { "status": "completed", "completed_date": "2025-01-15" } }

upsert_record

Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.

Parameters:

  • data_type (required): The data type name

  • filters (required): Enhanced filters to identify existing records (supports all read_parquet filter operators)

  • record (required): The record data to insert or update

Returns:

  • action: "created" or "updated"

  • audit_id or audit_ids: Audit log entry ID(s)

  • record_id: The ID of the created/updated record

Example (exact match):

{ "data_type": "contacts", "filters": { "email": "galina@secod.com" }, "record": { "name": "Galina Semakova", "email": "galina@secod.com", "category": "legal", "last_contact_date": "2025-12-24" } }

Example (fuzzy match):

{ "data_type": "contacts", "filters": { "name": {"$fuzzy": {"text": "Galina Semakova", "threshold": 0.8}} }, "record": { "name": "Galina Semakova", "email": "galina@secod.com", "category": "legal", "last_contact_date": "2025-12-24" } }

Example (contains match):

{ "data_type": "tasks", "filters": { "title": {"$contains": "therapy payment"} }, "record": { "title": "Pay for therapy session", "status": "pending", "due_date": "2025-12-25" } }

delete_records

Delete records from a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name

  • filters (required): Filters to identify records to delete

Example:

{ "data_type": "tasks", "filters": { "status": "canceled" } }

get_statistics

Get comprehensive statistics about a parquet file including row count, column information, date ranges, and categorical distributions.

Use when: You need to know how many records exist, get file metadata, or analyze data distributions. This is the efficient way to answer "how many" questions.

Returns:

  • row_count: Total number of records (use this for count queries)

  • column_count: Number of columns

  • date_statistics: Min/max dates and ranges for date columns

  • categorical_statistics: Value distributions for categorical columns

  • Column types and memory usage

Parameters:

  • data_type (required): The data type name

read_audit_log

Read audit log entries with optional filters. View complete history of all data modifications.

Parameters:

  • data_type (optional): Filter by data type

  • operation (optional): Filter by operation (add, update, delete)

  • record_id (optional): Filter by specific record ID

  • limit (optional): Maximum number of entries to return (default: 100)

Example:

{ "data_type": "transactions", "operation": "update", "limit": 50 }

rollback_operation

Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.

Parameters:

  • audit_id (required): The audit ID of the operation to rollback

Rollback Logic:

  • add operation → Delete the record

  • update operation → Restore old values

  • delete operation → Restore the record

Example:

{ "audit_id": "abc123def456" }

search_parquet

Semantic search using embeddings. Searches text fields for semantically similar records.

Parameters:

  • data_type (required): The data type name

  • query (required): Search query text

  • text_fields (optional): List of text fields to search (default: auto-detect)

  • limit (optional): Maximum number of results (default: 10)

  • min_similarity (optional): Minimum cosine similarity threshold 0-1 (default: 0.7)

  • additional_filters (optional): Additional filters to apply (same format as read_parquet)

Prerequisites:

  • Must run generate_embeddings first to create embeddings for the data type

  • Requires OPENAI_API_KEY environment variable

Example:

{ "data_type": "tasks", "query": "pay for therapy session", "limit": 5, "min_similarity": 0.7 }

generate_embeddings

Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.

Parameters:

  • data_type (required): The data type name

  • text_fields (optional): List of text fields to generate embeddings for (default: auto-detect)

  • force_regenerate (optional): Force regeneration of all embeddings (default: false)

Prerequisites:

  • Requires OPENAI_API_KEY environment variable

Example:

{ "data_type": "tasks", "text_fields": ["title", "description", "notes"] }

Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.

Backup & Recovery

Audit Log (Default)

All write operations create lightweight audit log entries in data/logs/audit_log.parquet:

  • Storage: ~1 KB per operation (99%+ reduction vs full snapshots)

  • Content: Operation type, record ID, affected fields, old/new values, timestamp

  • Recovery: Rollback specific operations using rollback_operation tool

Automatic Snapshots

All write operations automatically create timestamped snapshots before modification (per policy requirement). This ensures rollback capability for every data change.

Snapshot Location:

data/snapshots/[data_type]-[YYYY-MM-DD-HHMMSS].parquet

Note: The MCP_FULL_SNAPSHOTS and MCP_SNAPSHOT_FREQUENCY environment variables are no longer used - snapshots are always created for all write operations.

Storage Comparison

Approach

Storage per Operation

100 Operations

Full snapshots (old)

10 MB

1 GB

Audit log (new)

~1 KB

~100 KB

Savings

99.99%

99.99%

Recovery Options

  1. Recent Changes: Use rollback_operation with audit ID

  2. Multiple Changes: Rollback operations in reverse chronological order

  3. Full Restore: Restore from periodic snapshot (if enabled)

  4. Point-in-Time: Restore snapshot + replay audit log to specific timestamp

See AUDIT_LOG_GUIDE.md for detailed documentation.

Data Types

The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:

  • flows - Cash flow and expense data

  • transactions - Transaction data

  • tasks - Task management data

  • contacts - Contact/merchant information

  • income - Income data

  • fixed_costs - Fixed cost data

  • And many more...

Error Handling

The server returns structured error messages in JSON format when operations fail. Common errors include:

  • File not found errors

  • Schema validation errors

  • Column not found errors

  • Filter matching errors

Security Notes

  • All write operations create audit log entries for traceability

  • Audit logs are stored in data/logs/audit_log.parquet

  • Optional full snapshots can be configured for additional safety

  • Never commit sensitive data files to version control

Troubleshooting

  1. File Not Found Errors

    • Verify the data type exists in data/[type]/[type].parquet

    • Check file permissions

  2. Schema Validation Errors

    • Ensure records match the schema defined in data/schemas/[type]_schema.json

    • Check required fields are present

  3. Filter Matching Errors

    • Verify filter syntax matches supported operators

    • Check column names exist in the schema

Testing

After installation/updates, run the test script:

python3 mcp-servers/parquet/test_audit_log.py

This validates:

  • Audit log creation

  • Schema compliance

  • Operation tracking

See IMPLEMENTATION_SUMMARY.md for manual testing procedures.

Documentation

Notes

  • The server uses audit log for efficient change tracking (99%+ storage reduction)

  • All date fields are automatically converted to ISO format strings in responses

  • Null/NaN values are converted to null in JSON responses

  • The server runs in stdio mode for MCP communication

  • Audit log entries are never automatically deleted (manual archival if needed)

License

MIT

Support

-
security - not tested
F
license - not found
-
quality - not tested

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/markmhendrickson/mcp-server-parquet'

If you have feedback or need assistance with the MCP directory API, please join our Discord server