Parquet MCP Server
Enables querying, modifying, and managing Parquet files with CRUD operations, semantic search, audit logging, and rollback capabilities for structured data storage.
README
Parquet MCP Server
MCP server for interacting with parquet files in a repository. Provides comprehensive data management with audit logging, rollback capabilities, and semantic search.
Credits
This is a custom MCP server implementation for parquet file management with audit trail support.
Features
- Read/Query: Query parquet files with filters, column selection, and limits
- Add Records: Add new records to parquet files with audit trail
- Update Records: Update existing records matching filters with audit trail
- Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)
- Delete Records: Delete records matching filters with audit trail
- Audit Log: Complete change history with old/new values for all modifications
- Rollback: Undo specific operations using audit IDs
- Schema Discovery: Get schema definitions for data types
- Statistics: Get basic statistics about parquet files
- Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)
- Optional Full Snapshots: Configurable periodic snapshots for additional safety
Installation
cd mcp-servers/parquet
pip install -r requirements.txt
Configuration
Cursor Configuration
Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):
Development (Audit Log Only):
{
"mcpServers": {
"parquet": {
"command": "python",
"args": [
"$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
],
"env": {}
}
}
}
Production (With Periodic Snapshots):
{
"mcpServers": {
"parquet": {
"command": "python",
"args": [
"$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
],
"env": {
"MCP_FULL_SNAPSHOTS": "true",
"MCP_SNAPSHOT_FREQUENCY": "weekly"
}
}
}
}
Claude Desktop Configuration
Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):
{
"mcpServers": {
"parquet": {
"command": "python",
"args": [
"$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
]
}
}
}
Available Tools
list_data_types
List all available data types (parquet files) in the data directory.
get_schema
Get the schema definition for a data type.
Parameters:
data_type(required): The data type name (e.g., 'flows', 'transactions', 'tasks')
read_parquet
Read and query a parquet file with optional filters. Supports enhanced filtering operators.
Parameters:
data_type(required): The data type namefilters(optional): Key-value pairs to filter records. Supports enhanced operators:- Simple value: exact match
- List: in list (
["value1", "value2"]) {"$contains": "text"}: substring match (case-insensitive){"$starts_with": "text"}: prefix match (case-insensitive){"$ends_with": "text"}: suffix match (case-insensitive){"$regex": "pattern"}: regex pattern match{"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity){"$gt": 100},{"$gte": 100},{"$lt": 100},{"$lte": 100}: numeric comparisons{"$ne": "value"}: not equal
limit(optional): Maximum number of rows to return (default: 1000)columns(optional): List of column names to return (default: all columns)
Examples:
{
"data_type": "flows",
"filters": {
"category": "property_maintenance",
"year": 2025
},
"limit": 100
}
{
"data_type": "tasks",
"filters": {
"title": {"$contains": "therapy"},
"status": {"$ne": "completed"}
}
}
{
"data_type": "tasks",
"filters": {
"title": {"$fuzzy": {"text": "therapy session", "threshold": 0.7}}
}
}
add_record
Add a new record to a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namerecord(required): The record data as a JSON object matching the schema
Example:
{
"data_type": "flows",
"record": {
"flow_name": "Monthly Rent",
"flow_date": "2025-01-15",
"amount_usd": 1500.00,
"category": "housing",
"flow_type": "recurring_expense"
}
}
update_records
Update existing records in a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to updateupdates(required): Fields to update
Example:
{
"data_type": "tasks",
"filters": {
"task_id": "abc123"
},
"updates": {
"status": "completed",
"completed_date": "2025-01-15"
}
}
upsert_record
Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.
Parameters:
data_type(required): The data type namefilters(required): Enhanced filters to identify existing records (supports allread_parquetfilter operators)record(required): The record data to insert or update
Returns:
action: "created" or "updated"audit_idoraudit_ids: Audit log entry ID(s)record_id: The ID of the created/updated record
Example (exact match):
{
"data_type": "contacts",
"filters": {
"email": "galina@secod.com"
},
"record": {
"name": "Galina Semakova",
"email": "galina@secod.com",
"category": "legal",
"last_contact_date": "2025-12-24"
}
}
Example (fuzzy match):
{
"data_type": "contacts",
"filters": {
"name": {"$fuzzy": {"text": "Galina Semakova", "threshold": 0.8}}
},
"record": {
"name": "Galina Semakova",
"email": "galina@secod.com",
"category": "legal",
"last_contact_date": "2025-12-24"
}
}
Example (contains match):
{
"data_type": "tasks",
"filters": {
"title": {"$contains": "therapy payment"}
},
"record": {
"title": "Pay for therapy session",
"status": "pending",
"due_date": "2025-12-25"
}
}
delete_records
Delete records from a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to delete
Example:
{
"data_type": "tasks",
"filters": {
"status": "canceled"
}
}
get_statistics
Get basic statistics about a parquet file.
Parameters:
data_type(required): The data type name
read_audit_log
Read audit log entries with optional filters. View complete history of all data modifications.
Parameters:
data_type(optional): Filter by data typeoperation(optional): Filter by operation (add, update, delete)record_id(optional): Filter by specific record IDlimit(optional): Maximum number of entries to return (default: 100)
Example:
{
"data_type": "transactions",
"operation": "update",
"limit": 50
}
rollback_operation
Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.
Parameters:
audit_id(required): The audit ID of the operation to rollback
Rollback Logic:
addoperation → Delete the recordupdateoperation → Restore old valuesdeleteoperation → Restore the record
Example:
{
"audit_id": "abc123def456"
}
search_parquet
Semantic search using embeddings. Searches text fields for semantically similar records.
Parameters:
data_type(required): The data type namequery(required): Search query texttext_fields(optional): List of text fields to search (default: auto-detect)limit(optional): Maximum number of results (default: 10)min_similarity(optional): Minimum cosine similarity threshold 0-1 (default: 0.7)additional_filters(optional): Additional filters to apply (same format as read_parquet)
Prerequisites:
- Must run
generate_embeddingsfirst to create embeddings for the data type - Requires
OPENAI_API_KEYenvironment variable
Example:
{
"data_type": "tasks",
"query": "pay for therapy session",
"limit": 5,
"min_similarity": 0.7
}
generate_embeddings
Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.
Parameters:
data_type(required): The data type nametext_fields(optional): List of text fields to generate embeddings for (default: auto-detect)force_regenerate(optional): Force regeneration of all embeddings (default: false)
Prerequisites:
- Requires
OPENAI_API_KEYenvironment variable
Example:
{
"data_type": "tasks",
"text_fields": ["title", "description", "notes"]
}
Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.
Backup & Recovery
Audit Log (Default)
All write operations create lightweight audit log entries in data/logs/audit_log.parquet:
- Storage: ~1 KB per operation (99%+ reduction vs full snapshots)
- Content: Operation type, record ID, affected fields, old/new values, timestamp
- Recovery: Rollback specific operations using
rollback_operationtool
Optional Full Snapshots
Configure periodic full snapshots for additional safety:
Environment Variables:
MCP_FULL_SNAPSHOTS: Set to "true" to enable periodic snapshots (default: false)MCP_SNAPSHOT_FREQUENCY: "daily", "weekly", "monthly", "never" (default: weekly)
Snapshot Location:
data/snapshots/[data_type]-[YYYY-MM-DD-HHMMSS].parquet
Storage Comparison
| Approach | Storage per Operation | 100 Operations |
|---|---|---|
| Full snapshots (old) | 10 MB | 1 GB |
| Audit log (new) | ~1 KB | ~100 KB |
| Savings | 99.99% | 99.99% |
Recovery Options
- Recent Changes: Use
rollback_operationwith audit ID - Multiple Changes: Rollback operations in reverse chronological order
- Full Restore: Restore from periodic snapshot (if enabled)
- Point-in-Time: Restore snapshot + replay audit log to specific timestamp
See AUDIT_LOG_GUIDE.md for detailed documentation.
Data Types
The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:
flows- Cash flow and expense datatransactions- Transaction datatasks- Task management datacontacts- Contact/merchant informationincome- Income datafixed_costs- Fixed cost data- And many more...
Error Handling
The server returns structured error messages in JSON format when operations fail. Common errors include:
- File not found errors
- Schema validation errors
- Column not found errors
- Filter matching errors
Security Notes
- All write operations create audit log entries for traceability
- Audit logs are stored in
data/logs/audit_log.parquet - Optional full snapshots can be configured for additional safety
- Never commit sensitive data files to version control
Troubleshooting
-
File Not Found Errors
- Verify the data type exists in
data/[type]/[type].parquet - Check file permissions
- Verify the data type exists in
-
Schema Validation Errors
- Ensure records match the schema defined in
data/schemas/[type]_schema.json - Check required fields are present
- Ensure records match the schema defined in
-
Filter Matching Errors
- Verify filter syntax matches supported operators
- Check column names exist in the schema
Testing
After installation/updates, run the test script:
python3 mcp-servers/parquet/test_audit_log.py
This validates:
- Audit log creation
- Schema compliance
- Operation tracking
See IMPLEMENTATION_SUMMARY.md for manual testing procedures.
Documentation
- README.md - This file, overview and quick reference
- AUDIT_LOG_GUIDE.md - Complete audit log documentation
- IMPLEMENTATION_SUMMARY.md - Implementation details and testing
- SETUP.md - Setup and configuration instructions
Notes
- The server uses audit log for efficient change tracking (99%+ storage reduction)
- All date fields are automatically converted to ISO format strings in responses
- Null/NaN values are converted to
nullin JSON responses - The server runs in stdio mode for MCP communication
- Audit log entries are never automatically deleted (manual archival if needed)
License
MIT
Support
推荐服务器
Baidu Map
百度地图核心API现已全面兼容MCP协议,是国内首家兼容MCP协议的地图服务商。
Playwright MCP Server
一个模型上下文协议服务器,它使大型语言模型能够通过结构化的可访问性快照与网页进行交互,而无需视觉模型或屏幕截图。
Magic Component Platform (MCP)
一个由人工智能驱动的工具,可以从自然语言描述生成现代化的用户界面组件,并与流行的集成开发环境(IDE)集成,从而简化用户界面开发流程。
Audiense Insights MCP Server
通过模型上下文协议启用与 Audiense Insights 账户的交互,从而促进营销洞察和受众数据的提取和分析,包括人口统计信息、行为和影响者互动。
VeyraX
一个单一的 MCP 工具,连接你所有喜爱的工具:Gmail、日历以及其他 40 多个工具。
graphlit-mcp-server
模型上下文协议 (MCP) 服务器实现了 MCP 客户端与 Graphlit 服务之间的集成。 除了网络爬取之外,还可以将任何内容(从 Slack 到 Gmail 再到播客订阅源)导入到 Graphlit 项目中,然后从 MCP 客户端检索相关内容。
Kagi MCP Server
一个 MCP 服务器,集成了 Kagi 搜索功能和 Claude AI,使 Claude 能够在回答需要最新信息的问题时执行实时网络搜索。
e2b-mcp-server
使用 MCP 通过 e2b 运行代码。
Neon MCP Server
用于与 Neon 管理 API 和数据库交互的 MCP 服务器
Exa MCP Server
模型上下文协议(MCP)服务器允许像 Claude 这样的 AI 助手使用 Exa AI 搜索 API 进行网络搜索。这种设置允许 AI 模型以安全和受控的方式获取实时的网络信息。