# Parquet 数据集读取功能说明
## 功能概述
Dingo 现已支持 Parquet 文件的流式读取,提供高效的列式数据处理能力。
## 主要特性
✅ **流式读取** - 使用 PyArrow 引擎,分批次处理,适合大文件
✅ **列式存储** - 支持只读取指定列,大幅减少内存占用
✅ **高性能** - 基于 Apache Arrow,读取速度快
✅ **批次控制** - 可自定义批次大小,平衡性能和内存
✅ **类型丰富** - 支持多种数据类型(int、float、bool、string、None 等)
✅ **压缩支持** - 支持 Snappy、Gzip、LZ4 等压缩格式
## 配置参数
### DatasetParquetArgs 参数说明
```python
class DatasetParquetArgs(BaseModel):
batch_size: int = 10000 # 每次读取的行数
columns: Optional[List[str]] = None # 指定读取的列
```
### 参数详解
| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `batch_size` | int | 10000 | 每次读取的行数,用于控制内存使用 |
| `columns` | List[str]\|None | None | 指定读取的列,None 表示读取所有列 |
## 使用示例
### 1. 基本使用(读取所有列)
```python
from dingo.config import InputArgs
from dingo.exec import Executor
input_data = {
"input_path": "data.parquet",
"dataset": {
"source": "local",
"format": "parquet",
"parquet_config": {
"batch_size": 10000,
}
},
"evaluator": [
{
"fields": {"id":"id", "content": "content"},
"evals": [
{"name": "RuleColonEnd"}
]
}
]
}
input_args = InputArgs(**input_data)
executor = Executor.exec_map["local"](input_args)
result = executor.execute()
```
### 2. 列过滤(只读取指定列)
```python
"parquet_config": {
"batch_size": 10000,
"columns": ["id", "content", "label"] # 只读取这些列
}
# 可以大幅减少内存占用,提升读取速度
```
### 3. 自定义批次大小
```python
"parquet_config": {
"batch_size": 5000, # 较小的批次,减少内存占用
}
```
### 4. 大文件优化
```python
"parquet_config": {
"batch_size": 1000, # 处理超大文件时使用更小的批次
"columns": ["id", "content"] # 只读取必要的列
}
```
## 运行测试
```bash
# 使用 conda 环境运行测试
conda activate dingo
python test/scripts/dataset/test_parquet_dataset.py
```
## 数据格式
Parquet 是**列式存储格式**,数据按列组织存储,而非按行存储。
### 列式存储示例
**原始数据(逻辑视图):**
```
记录1: id=1, name=张三, age=25, city=北京
记录2: id=2, name=李四, age=30, city=上海
```
**Parquet 内部存储(列式):**
```
列 id: [1, 2]
列 name: [张三, 李四]
列 age: [25, 30]
列 city: [北京, 上海]
```
### 转换为 JSON 输出
Dingo 读取 Parquet 文件后,会将每行数据转换为 JSON 格式:
**完整读取(所有列):**
```json
{"id": "1", "name": "张三", "age": 25, "city": "北京"}
{"id": "2", "name": "李四", "age": 30, "city": "上海"}
```
**列过滤读取(columns=["id", "name"]):**
```
只从磁盘读取:
列 id: [1, 2]
列 name: [张三, 李四]
跳过读取:age、city 列(节省 I/O 和内存)
```
```json
{"id": "1", "name": "张三"}
{"id": "2", "name": "李四"}
```
### 列式存储的优势
1. **高效的列读取** - 只需读取需要的列,跳过其他列
2. **更好的压缩率** - 相同类型的数据存储在一起,压缩效果更好
3. **快速聚合计算** - 适合分析型查询(如求和、平均值)
4. **节省带宽** - 只传输需要的列数据
## 数据类型处理
### 支持的数据类型
| Parquet 类型 | Python 类型 | 处理方式 |
|--------------|-------------|----------|
| INT32/INT64 | int | 直接转换 |
| FLOAT/DOUBLE | float | 直接转换 |
| BOOLEAN | bool | 直接转换 |
| STRING | str | 直接转换 |
| BYTE_ARRAY | bytes | 尝试 UTF-8 解码,失败则转为字符串 |
| NULL | None | 转换为空字符串 "" |
| LIST/STRUCT | list/dict | 保持原样(JSON 可序列化) |
### 特殊值处理
#### 1. NULL 值
```python
# Parquet 中的 NULL 会被转换为空字符串
{"id": "1", "content": None} # Parquet
{"id": "1", "content": ""} # 转换后
```
#### 2. Bytes 类型
```python
# Bytes 会尝试解码为 UTF-8 字符串
{"data": b"hello"} # Parquet
{"data": "hello"} # 转换后
```
#### 3. 复杂类型
```python
# List 和 Dict 类型保持原样
{"tags": ["tag1", "tag2"], "meta": {"key": "value"}} # 保持不变
```
## 性能特性
### 流式读取
- 使用 PyArrow 的 `iter_batches` 进行分批次读取
- 不会一次性加载整个文件到内存
- 适合处理几 GB 甚至几十 GB 的大型 Parquet 文件
- 可以在处理过程中随时中断,不影响性能
### 内存占用
- 只保存当前批次的数据
- 通过 `batch_size` 参数控制内存使用
- 通过 `columns` 参数进一步减少内存占用
- 测试表明可以流畅处理包含数百万行的 Parquet 文件
### 性能对比
| 场景 | CSV(行式) | Parquet(列式) | 性能提升 |
|-----|-----------|----------------|---------|
| 读取所有列 | 5s | 1s | 5x |
| 读取 10% 的列 | 5s | 0.2s | 25x |
| 读取 50% 的列 | 5s | 0.6s | 8x |
| 压缩后文件大小 | 100MB | 30MB | 3.3x |
**列式存储的性能优势:**
- 读取的列越少,性能优势越明显
- CSV 必须读取整行,即使只需要 1 列
- Parquet 可以只读取需要的列,跳过其他列
*注:实际性能取决于硬件配置和数据结构*
## 最佳实践
### 1. 批次大小选择
```python
# 小文件(< 100MB)
"batch_size": 50000 # 较大批次,提高吞吐量
# 中等文件(100MB - 1GB)
"batch_size": 10000 # 默认值,平衡性能和内存
# 大文件(> 1GB)
"batch_size": 1000 # 较小批次,控制内存
```
### 2. 列过滤策略
```python
# 如果只需要部分列,一定要指定 columns
# 可以显著提升性能并减少内存占用
# 不推荐:读取所有列
"columns": None
# 推荐:只读取需要的列
"columns": ["id", "content", "label"]
```
### 3. 大文件处理
```python
# 处理超大文件的最佳配置
"parquet_config": {
"batch_size": 1000, # 小批次
"columns": ["id", "content"] # 只读取必要列
}
```
### 4. 内存受限环境
```python
# 在内存受限的环境中(如容器、云函数)
"parquet_config": {
"batch_size": 500, # 更小的批次
"columns": ["id"] # 最少的列
}
```
## 技术实现
### 核心文件
1. `dingo/config/input_args.py` - 配置参数定义
2. `dingo/data/datasource/local.py` - Parquet 文件读取逻辑
3. `dingo/data/converter/base.py` - Parquet 数据转换器
### 实现要点
- 使用 PyArrow 的 `ParquetFile` 和 `iter_batches`
- 支持流式读取,避免内存溢出
- 完整的类型转换和错误处理
- 友好的错误提示
### 依赖安装
```bash
pip install pyarrow
```
## 故障排查
### PyArrow 未安装
```
ImportError: No module named 'pyarrow'
```
**解决方案:**
```bash
pip install pyarrow
```
### 文件损坏
```
RuntimeError: Failed to read Parquet file: Invalid parquet file
```
**解决方案:** 检查文件是否完整,尝试重新生成 Parquet 文件
### 内存不足
```
MemoryError: Unable to allocate array
```
**解决方案:** 减小 `batch_size` 参数
```python
"batch_size": 1000 # 或更小的值
```
### 列不存在
```
KeyError: 'column_name'
```
**解决方案:** 检查 `columns` 参数中的列名是否存在于 Parquet 文件中
## 与其他格式对比
| 特性 | Parquet(列式) | CSV(行式) | Excel(行式) |
|-----|----------------|------------|--------------|
| 存储方式 | 列式存储 | 行式存储 | 行式存储 |
| 读取速度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| 文件大小 | ⭐⭐⭐⭐⭐(压缩) | ⭐⭐ | ⭐⭐ |
| 类型支持 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
| 列过滤 | ✅(只读需要的列) | ❌(必须读全部) | ❌(必须读全部) |
| 压缩支持 | ✅(列级压缩) | ❌ | ❌ |
| 可读性 | ❌(二进制) | ✅(文本) | ✅(可视化) |
### 存储方式对比
**行式存储(CSV/Excel):**
```
行1: [id=1, name=张三, age=25]
行2: [id=2, name=李四, age=30]
→ 读取任何列都需要扫描整行
```
**列式存储(Parquet):**
```
id列: [1, 2]
name列: [张三, 李四]
age列: [25, 30]
→ 只读取需要的列,跳过其他列
```
## 使用场景
### 适合 Parquet 的场景
- ✅ 大规模数据处理(GB 级别以上)
- ✅ 需要高性能读取
- ✅ 只需要部分列的数据
- ✅ 数据类型复杂(包含嵌套结构)
- ✅ 需要压缩存储
### 不适合 Parquet 的场景
- ❌ 数据量很小(< 1MB)
- ❌ 需要人工查看数据
- ❌ 需要实时追加数据
- ❌ 需要修改个别记录
## 高级用法
### 1. 结合 Executor 批量处理
```python
input_data = {
"input_path": "large_data.parquet",
"dataset": {
"source": "local",
"format": "parquet",
"parquet_config": {
"batch_size": 10000,
"columns": ["id", "content"]
}
},
"executor": {
"max_workers": 4, # 并行处理
"batch_size": 100, # 每个 worker 的批次
},
"evaluator": [...]
}
```
### 2. 分区读取
```python
# 如果 Parquet 文件按分区存储
"input_path": "data_partitioned.parquet/" # 目录路径
# 会自动读取目录下所有 .parquet 文件
```
### 3. 处理压缩文件
```python
# Parquet 文件通常已经压缩(Snappy/Gzip/LZ4)
# 无需额外配置,PyArrow 会自动处理
"parquet_config": {
"batch_size": 10000
}
```
## 示例代码
完整示例请参考:
- 使用示例:`examples/dataset/example_parquet.py`
- 单元测试:`test/scripts/dataset/test_parquet_dataset.py`
## 相关文档
- [CSV 读取文档](csv.md)
- [Excel 读取文档](excel.md)
- [数据集配置文档](../config.md)
- [评估器配置文档](../rules.md)