Parquet MCP Server

Parquet MCP Server

Enables querying, modifying, and managing Parquet files with CRUD operations, semantic search, audit logging, and rollback capabilities for structured data storage.

Category
访问服务器

README

Parquet MCP Server

MCP server for interacting with parquet files in a repository. Provides comprehensive data management with audit logging, rollback capabilities, and semantic search.

Credits

This is a custom MCP server implementation for parquet file management with audit trail support.

Features

  • Read/Query: Query parquet files with filters, column selection, and limits
  • Add Records: Add new records to parquet files with audit trail
  • Update Records: Update existing records matching filters with audit trail
  • Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)
  • Delete Records: Delete records matching filters with audit trail
  • Audit Log: Complete change history with old/new values for all modifications
  • Rollback: Undo specific operations using audit IDs
  • Schema Discovery: Get schema definitions for data types
  • Statistics: Get basic statistics about parquet files
  • Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)
  • Optional Full Snapshots: Configurable periodic snapshots for additional safety

Installation

cd mcp-servers/parquet
pip install -r requirements.txt

Configuration

Cursor Configuration

Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):

Development (Audit Log Only):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ],
      "env": {}
    }
  }
}

Production (With Periodic Snapshots):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ],
      "env": {
        "MCP_FULL_SNAPSHOTS": "true",
        "MCP_SNAPSHOT_FREQUENCY": "weekly"
      }
    }
  }
}

Claude Desktop Configuration

Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ]
    }
  }
}

Available Tools

list_data_types

List all available data types (parquet files) in the data directory.

get_schema

Get the schema definition for a data type.

Parameters:

  • data_type (required): The data type name (e.g., 'flows', 'transactions', 'tasks')

read_parquet

Read and query a parquet file with optional filters. Supports enhanced filtering operators.

Parameters:

  • data_type (required): The data type name
  • filters (optional): Key-value pairs to filter records. Supports enhanced operators:
    • Simple value: exact match
    • List: in list (["value1", "value2"])
    • {"$contains": "text"}: substring match (case-insensitive)
    • {"$starts_with": "text"}: prefix match (case-insensitive)
    • {"$ends_with": "text"}: suffix match (case-insensitive)
    • {"$regex": "pattern"}: regex pattern match
    • {"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity)
    • {"$gt": 100}, {"$gte": 100}, {"$lt": 100}, {"$lte": 100}: numeric comparisons
    • {"$ne": "value"}: not equal
  • limit (optional): Maximum number of rows to return (default: 1000)
  • columns (optional): List of column names to return (default: all columns)

Examples:

{
  "data_type": "flows",
  "filters": {
    "category": "property_maintenance",
    "year": 2025
  },
  "limit": 100
}
{
  "data_type": "tasks",
  "filters": {
    "title": {"$contains": "therapy"},
    "status": {"$ne": "completed"}
  }
}
{
  "data_type": "tasks",
  "filters": {
    "title": {"$fuzzy": {"text": "therapy session", "threshold": 0.7}}
  }
}

add_record

Add a new record to a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • record (required): The record data as a JSON object matching the schema

Example:

{
  "data_type": "flows",
  "record": {
    "flow_name": "Monthly Rent",
    "flow_date": "2025-01-15",
    "amount_usd": 1500.00,
    "category": "housing",
    "flow_type": "recurring_expense"
  }
}

update_records

Update existing records in a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • filters (required): Filters to identify records to update
  • updates (required): Fields to update

Example:

{
  "data_type": "tasks",
  "filters": {
    "task_id": "abc123"
  },
  "updates": {
    "status": "completed",
    "completed_date": "2025-01-15"
  }
}

upsert_record

Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.

Parameters:

  • data_type (required): The data type name
  • filters (required): Enhanced filters to identify existing records (supports all read_parquet filter operators)
  • record (required): The record data to insert or update

Returns:

  • action: "created" or "updated"
  • audit_id or audit_ids: Audit log entry ID(s)
  • record_id: The ID of the created/updated record

Example (exact match):

{
  "data_type": "contacts",
  "filters": {
    "email": "galina@secod.com"
  },
  "record": {
    "name": "Galina Semakova",
    "email": "galina@secod.com",
    "category": "legal",
    "last_contact_date": "2025-12-24"
  }
}

Example (fuzzy match):

{
  "data_type": "contacts",
  "filters": {
    "name": {"$fuzzy": {"text": "Galina Semakova", "threshold": 0.8}}
  },
  "record": {
    "name": "Galina Semakova",
    "email": "galina@secod.com",
    "category": "legal",
    "last_contact_date": "2025-12-24"
  }
}

Example (contains match):

{
  "data_type": "tasks",
  "filters": {
    "title": {"$contains": "therapy payment"}
  },
  "record": {
    "title": "Pay for therapy session",
    "status": "pending",
    "due_date": "2025-12-25"
  }
}

delete_records

Delete records from a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • filters (required): Filters to identify records to delete

Example:

{
  "data_type": "tasks",
  "filters": {
    "status": "canceled"
  }
}

get_statistics

Get basic statistics about a parquet file.

Parameters:

  • data_type (required): The data type name

read_audit_log

Read audit log entries with optional filters. View complete history of all data modifications.

Parameters:

  • data_type (optional): Filter by data type
  • operation (optional): Filter by operation (add, update, delete)
  • record_id (optional): Filter by specific record ID
  • limit (optional): Maximum number of entries to return (default: 100)

Example:

{
  "data_type": "transactions",
  "operation": "update",
  "limit": 50
}

rollback_operation

Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.

Parameters:

  • audit_id (required): The audit ID of the operation to rollback

Rollback Logic:

  • add operation → Delete the record
  • update operation → Restore old values
  • delete operation → Restore the record

Example:

{
  "audit_id": "abc123def456"
}

search_parquet

Semantic search using embeddings. Searches text fields for semantically similar records.

Parameters:

  • data_type (required): The data type name
  • query (required): Search query text
  • text_fields (optional): List of text fields to search (default: auto-detect)
  • limit (optional): Maximum number of results (default: 10)
  • min_similarity (optional): Minimum cosine similarity threshold 0-1 (default: 0.7)
  • additional_filters (optional): Additional filters to apply (same format as read_parquet)

Prerequisites:

  • Must run generate_embeddings first to create embeddings for the data type
  • Requires OPENAI_API_KEY environment variable

Example:

{
  "data_type": "tasks",
  "query": "pay for therapy session",
  "limit": 5,
  "min_similarity": 0.7
}

generate_embeddings

Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.

Parameters:

  • data_type (required): The data type name
  • text_fields (optional): List of text fields to generate embeddings for (default: auto-detect)
  • force_regenerate (optional): Force regeneration of all embeddings (default: false)

Prerequisites:

  • Requires OPENAI_API_KEY environment variable

Example:

{
  "data_type": "tasks",
  "text_fields": ["title", "description", "notes"]
}

Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.

Backup & Recovery

Audit Log (Default)

All write operations create lightweight audit log entries in data/logs/audit_log.parquet:

  • Storage: ~1 KB per operation (99%+ reduction vs full snapshots)
  • Content: Operation type, record ID, affected fields, old/new values, timestamp
  • Recovery: Rollback specific operations using rollback_operation tool

Optional Full Snapshots

Configure periodic full snapshots for additional safety:

Environment Variables:

  • MCP_FULL_SNAPSHOTS: Set to "true" to enable periodic snapshots (default: false)
  • MCP_SNAPSHOT_FREQUENCY: "daily", "weekly", "monthly", "never" (default: weekly)

Snapshot Location:

data/snapshots/[data_type]-[YYYY-MM-DD-HHMMSS].parquet

Storage Comparison

Approach Storage per Operation 100 Operations
Full snapshots (old) 10 MB 1 GB
Audit log (new) ~1 KB ~100 KB
Savings 99.99% 99.99%

Recovery Options

  1. Recent Changes: Use rollback_operation with audit ID
  2. Multiple Changes: Rollback operations in reverse chronological order
  3. Full Restore: Restore from periodic snapshot (if enabled)
  4. Point-in-Time: Restore snapshot + replay audit log to specific timestamp

See AUDIT_LOG_GUIDE.md for detailed documentation.

Data Types

The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:

  • flows - Cash flow and expense data
  • transactions - Transaction data
  • tasks - Task management data
  • contacts - Contact/merchant information
  • income - Income data
  • fixed_costs - Fixed cost data
  • And many more...

Error Handling

The server returns structured error messages in JSON format when operations fail. Common errors include:

  • File not found errors
  • Schema validation errors
  • Column not found errors
  • Filter matching errors

Security Notes

  • All write operations create audit log entries for traceability
  • Audit logs are stored in data/logs/audit_log.parquet
  • Optional full snapshots can be configured for additional safety
  • Never commit sensitive data files to version control

Troubleshooting

  1. File Not Found Errors

    • Verify the data type exists in data/[type]/[type].parquet
    • Check file permissions
  2. Schema Validation Errors

    • Ensure records match the schema defined in data/schemas/[type]_schema.json
    • Check required fields are present
  3. Filter Matching Errors

    • Verify filter syntax matches supported operators
    • Check column names exist in the schema

Testing

After installation/updates, run the test script:

python3 mcp-servers/parquet/test_audit_log.py

This validates:

  • Audit log creation
  • Schema compliance
  • Operation tracking

See IMPLEMENTATION_SUMMARY.md for manual testing procedures.

Documentation

Notes

  • The server uses audit log for efficient change tracking (99%+ storage reduction)
  • All date fields are automatically converted to ISO format strings in responses
  • Null/NaN values are converted to null in JSON responses
  • The server runs in stdio mode for MCP communication
  • Audit log entries are never automatically deleted (manual archival if needed)

License

MIT

Support

推荐服务器

Baidu Map

Baidu Map

百度地图核心API现已全面兼容MCP协议,是国内首家兼容MCP协议的地图服务商。

官方
精选
JavaScript
Playwright MCP Server

Playwright MCP Server

一个模型上下文协议服务器,它使大型语言模型能够通过结构化的可访问性快照与网页进行交互,而无需视觉模型或屏幕截图。

官方
精选
TypeScript
Magic Component Platform (MCP)

Magic Component Platform (MCP)

一个由人工智能驱动的工具,可以从自然语言描述生成现代化的用户界面组件,并与流行的集成开发环境(IDE)集成,从而简化用户界面开发流程。

官方
精选
本地
TypeScript
Audiense Insights MCP Server

Audiense Insights MCP Server

通过模型上下文协议启用与 Audiense Insights 账户的交互,从而促进营销洞察和受众数据的提取和分析,包括人口统计信息、行为和影响者互动。

官方
精选
本地
TypeScript
VeyraX

VeyraX

一个单一的 MCP 工具,连接你所有喜爱的工具:Gmail、日历以及其他 40 多个工具。

官方
精选
本地
graphlit-mcp-server

graphlit-mcp-server

模型上下文协议 (MCP) 服务器实现了 MCP 客户端与 Graphlit 服务之间的集成。 除了网络爬取之外,还可以将任何内容(从 Slack 到 Gmail 再到播客订阅源)导入到 Graphlit 项目中,然后从 MCP 客户端检索相关内容。

官方
精选
TypeScript
Kagi MCP Server

Kagi MCP Server

一个 MCP 服务器,集成了 Kagi 搜索功能和 Claude AI,使 Claude 能够在回答需要最新信息的问题时执行实时网络搜索。

官方
精选
Python
e2b-mcp-server

e2b-mcp-server

使用 MCP 通过 e2b 运行代码。

官方
精选
Neon MCP Server

Neon MCP Server

用于与 Neon 管理 API 和数据库交互的 MCP 服务器

官方
精选
Exa MCP Server

Exa MCP Server

模型上下文协议(MCP)服务器允许像 Claude 这样的 AI 助手使用 Exa AI 搜索 API 进行网络搜索。这种设置允许 AI 模型以安全和受控的方式获取实时的网络信息。

官方
精选