HTAP 嵌入式数据库 | Hybrid Transactional & Analytical Processing Database

事务(DeltaStore 行级增量) + 分析(Vortex 列式存储) + 引擎(DuckDB SQL 执行)

版本 0.1.0 | Rust 1.91+ | Apache 2.0

目录


核心理念

RockDuck 是一个用 Rust 构建的 HTAP 嵌入式数据库,同时支持事务型(OLTP)和分析型(OLAP)工作负载。它的名字取自三项核心能力的首字母组合:事务(行级增量存储) + 分析(列式存储) + 引擎(DuckDB SQL 执行)。

RockDuck 的架构深受三个成熟系统的启发:

灵感来源 借鉴的设计
Apache Iceberg Shadow Column MVCC(通过 created_by_txn / deleted_by_txn 实现多版本快照隔离)
ClickHouse MergeTree Segment / Granule 分层组织,后台 Compaction 合并,将写入的活跃数据与冻结的分析数据分层管理
Snowflake Zone Map 谓词下推(per-granule min/max 统计跳过无关数据块) + HTAP 双存储路由(DeltaStore 与列存分别处理 OLTP/OLAP 查询)

快速开始

编译

cargo build --release

Library API

use rockduck::RockDuck;
use std::sync::Arc;

let db = RockDuck::open("./data")?;

let mut cols = std::collections::HashMap::new();
cols.insert("id".to_string(), Arc::new(arrow_array::Int64Array::from(vec![1i64])) as _);
cols.insert("name".to_string(), Arc::new(arrow_array::StringArray::from(vec!["Alice"])) as _);

let txn_id = db.insert("users", b"pk001", &cols)?;

// Time-Travel 查询(TxnId = 10 时的快照)
let results = db.scan_as_of("users", 10, None, None)?;

// Iceberg 导出
let path = db.export_iceberg("users", "/tmp/iceberg_table", None).await?;

CLI

# 插入
cargo run -- insert pk001 --columns id=1 --columns age=30 -t users

# 查询
cargo run -- get pk001 -t users

# 扫描
cargo run -- scan --start pk001 --end pk100 -t users

# 删除
cargo run -- delete pk001 -t users

# 统计
cargo run -- stats -t users

HTAP 端到端工作流

RockDuck 的读写路径通过 HTAP 双存储路由实现事务与分析的统一访问:

flowchart LR
    subgraph Write["写入路径(OLTP)"]
        W1["INSERT / UPDATE / DELETE"]
        W2["DeltaStore 记录 cell 级 before/after"]
        W3["Vortex 追加写入"]
        W4["WAL Begin → Commit"]
        W5["RocksDB 双索引更新"]
    end

    subgraph Router["query/router.rs 路由决策"]
        R1{"has_updates?"}
        R2{"query_type"}
    end

    subgraph Read["读取路径(OLAP + OLTP 融合)"]
        R_VX["VortexOnly\n列存全表扫描"]
        R_DS["DeltaStoreOnly\n点查最新修改"]
        R_MG["Merge\nDeltaStore overlay + Vortex"]
    end

    W1 --> W2 --> W3 --> W4 --> W5
    R1 -->|No| R_VX
    R1 -->|Yes| R2
    R2 -->|"PointGet"| R_DS
    R2 -->|"RangeScan/Aggregate"| R_MG
    R2 -->|"FullScan w/ filter"| R_MG

DeltaStore — 事务写入与增量更新

Update 路径(write/insert.rs)不覆盖原 Vortex 数据,而是将变更记录到 DeltaStore:

sequenceDiagram
    participant App as 应用层
    participant DS as DeltaStore
    participant VX as Vortex
    participant Scan as scan.rs

    App->>DS: update(table, pk, columns)
    DS->>VX: 读取旧值 (before image)
    DS-->>DS: record_update(txn, col, offset, old, new)
    DS->>DS: persist() → _upd_{col}.vortex

    Note over DS: 下次读取时,DeltaStore 覆盖 Vortex 原始值

    Scan->>VX: 读取 Vortex RecordBatch
    Scan->>DS: get_visible_deltas()
    DS-->>Scan: CellDelta overlay
    Scan-->>App: 合并后的最新数据

架构总览

flowchart TB
    subgraph Core["RockDuck Core (db.rs)"]
        txn["txn_counter\n(txn ID 生成器)"]
        wal_cache["segment_bloom_filters\n(内存布隆过滤器缓存)"]
        mvcc_mgr["visibility_manager\n(MVCC 可见性管理)"]
        delta_mgr["delta_store\n(DeltaStoreManager)"]
    end

    subgraph Write["写入层"]
        insert["write/insert.rs"]
        wal["write/wal.rs\n(32KB Block + CRC32 WAL)"]
    end

    subgraph Read["读取层"]
        scan["read/scan.rs\n(Zone Map 裁剪 + Filter Pushdown)"]
        point_get["read/point_get.rs"]
        router["query/router.rs\n(HTAP 读路径路由)"]
    end

    subgraph Storage["storage/vortex.rs"]
        writer["VortexWriter"]
        reader["VortexReader"]
        mmap_cache["Arc<Mmap> Cache\n(零拷贝 Frozen 数据)"]
    end

    subgraph Metadata["metadata/rocksdb.rs — 12 个 Column Family"]
        pk_idx["pk_idx\n(Hash 索引 → IndexEntry)"]
        pk_skip["pk_skiplist\n(Sorted 索引,范围扫描)"]
        seg_meta["seg_meta\nSegment 元数据序列化"]
        stat["stat\n(TableStats 表级统计)"]
        zone["zone\n(Zone Map per-granule 统计)"]
        mvcc_cf["mvcc\n(活跃事务追踪: active:{txn_id} → begin_ts)"]
        sys["sys\n(committed_txn 持久化)"]
        proj["proj_meta\n(Secondary Projection 元数据)"]
        layer["layer\n(Immutable Layer 快照)"]
        lbf["lbf\n(Learned Bloom Filter 预留)"]
        bf["bf\n(Per-granule Bloom Filter)"]
        iceberg_cf["iceberg_manifest\n(Native Iceberg 清单存储)"]
    end

    subgraph Query["查询引擎"]
        vtab["query/vtab.rs\n(RockDuckVTab 流式推送)"]
        duckdb_fn["query/duckdb_ext.rs\n(docdb_scan / docdb_iceberg_info)"]
    end

    subgraph Compaction["compaction/"]
        scheduler["scheduler.rs"]
        nonblocking["nonblocking.rs"]
        pdt_merge["pdt_merge.rs"]
        reencode["reencode.rs"]
    end

    Core --> Write
    Core --> Read
    Core --> Storage
    Core --> Metadata
    Core --> Query
    Core --> Compaction

    Write --> Storage
    Write --> Metadata
    Read --> Metadata
    Read --> Storage
    Read --> router
    scan --> reader
    reader --> mmap_cache
    vtab --> scan
    Compaction --> Storage
    Compaction --> Metadata

数据目录结构

graph TD
    root["rockduck_data/"]
    root --> meta["meta/\n(RocksDB 元数据)"]
    root --> segments["segments/"]
    root --> wal["wal/"]
    root --> temp["temp/"]

    segments --> active["active/"]
    segments --> immutable["immutable/"]

    active --> seg_active["{seg_id}/"]
    immutable --> seg_imm["{seg_id}/"]

    seg_imm --> col_vortex["{col}.vortex\n列数据文件"]
    seg_imm --> del_vortex["_del.vortex\n删除掩码"]
    seg_imm --> meta_vortex["_meta.vortex\nSegment 元数据"]
    seg_imm --> upd_vortex["_upd.vortex\n更新掩码"]
    seg_imm --> zm_vortex["_zm.json\nZone Map"]

    wal --> wal_file["wal_000000.bin\n(32KB Block + CRC32)"]

    meta --> rocksdb_sst["*.sst, MANIFEST\n(RocksDB 数据文件)"]

存储层次

RockDuck 的数据组织为 Segment → Granule → Block 三层结构,灵感来自 ClickHouse MergeTree。

graph BT
    DB["RockDuck 数据库"]
    DB --> Seg1["Segment #1 (seg_abc...)"]
    DB --> Seg2["Segment #2 (seg_def...)"]
    DB --> SegN["..."]

    Seg1 --> G1_1["Granule 0\n~1MB, ~1024 rows"]
    Seg1 --> G1_2["Granule 1\n~1MB, ~1024 rows"]
    Seg1 --> G1_M["..."]

    G1_1 --> B1_1["Block 0\n1024 rows\ncol stats: min/max"]
    G1_1 --> B1_2["Block 1\n1024 rows\ncol stats: min/max"]
    G1_1 --> B1_K["..."]

    G1_2 --> B2_1["Block 0\n1024 rows\ncol stats: min/max"]

Segment 生命周期

stateDiagram-v2
    [*] --> Active
    Active --> Compactable : del_ratio 上升
    Active --> Frozen : freeze_segment()
    Compactable --> Active : 删除回收
    Compactable --> Frozen : freeze_segment()
    Frozen --> [*] : Compaction Merge

    note right of Active
        可写
        BufReader 读取
        Bloom Filter 更新中
    end note

    note right of Frozen
        只读
        mmap 零拷贝读取
        可导出 Iceberg
    end note

数据类型与默认编码

flowchart LR
    subgraph Types["数据类型"]
        direction TB
        Ints["整数类型\nInt8 ~ Int64\nUInt8 ~ UInt64"]
        Floats["浮点类型\nFloat32, Float64"]
        Bools["布尔类型\nBool"]
        Others["其他类型\nUtf8, Binary, Timestamp..."]
    end

    Types --> Delta["Delta 编码\n(单调递增/递减)"]
    Types --> Gorilla["Gorilla 编码\n(浮点数压缩)"]
    Types --> RLE["RLE 编码\n(重复值多)"]
    Types --> Raw["Raw (无编码)"]

MVCC 与 Time-Travel

MVCC 设计(Shadow Column 方式)

每个数据行记录两个事务 ID:

classDiagram
    class IndexEntry {
        +String seg_id
        +u32 granule_id
        +u32 offset
        +TxnId created_by_txn
        +Option~TxnId~ deleted_by_txn
    }

可见性判断

flowchart TD
    START["is_visible(snapshot, created_txn, deleted_txn)"]
    START --> ISO{"snapshot.isolation"}

    ISO -->|"ReadCommitted"| RC1{"created_txn >\ncommitted_txn?"}
    RC1 -->|Yes| RC_FALSE["return false"]
    RC1 -->|No| RC2{"deleted_txn ≤\ncommitted_txn?"}
    RC2 -->|Yes| RC_FALSE2["return false"]
    RC2 -->|No| RC_TRUE["return true"]

    ISO -->|"RepeatableRead\nor Snapshot"| RR1{"created_txn >\nsnapshot_id?"}
    RR1 -->|Yes| RR_FALSE1["return false"]
    RR1 -->|No| RR2{"deleted_txn ≤\nsnapshot_id?"}
    RR2 -->|Yes| RR_FALSE2["return false"]
    RR2 -->|No| RR3{"created_txn ∈\nactive_txns?"}
    RR3 -->|Yes| RR_FALSE3["return false"]
    RR3 -->|No| RR_TRUE["return true"]

Time-Travel 查询

// 在 TxnId = 10 的时间点查询数据
let snapshot = db.snapshot_at(10, IsolationLevel::Snapshot)?;
let results = db.scan_as_of("users", 10, pk_range, filter)?;

MVCC RocksDB 存储 — 12 个 Column Family

graph LR
    subgraph CF_Index["索引层"]
        direction TB
        pk_idx["pk_idx\nHash(table:pk) → IndexEntry\n{seg_id, granule_id, offset,\ncreated_by_txn, deleted_by_txn}"]
        pk_skip["pk_skiplist\nSorted(table:pk) → IndexEntry\n支持范围扫描"]
        lbf["lbf\nLearned Bloom Filter\n(预留)"]
        bf["bf\nPer-granule Bloom Filter\n快速判断 key 是否存在"]
    end

    subgraph CF_Data["数据层"]
        direction TB
        seg_meta["seg_meta\nSegmentMeta 序列化\n{status, row_count, del_ratio, upd_ratio}"]
        proj_meta["proj_meta\nProjection 元数据\n列子集映射"]
    end

    subgraph CF_Stats["统计层"]
        direction TB
        stat["stat\nTableStats 表级统计\n{row_count, del_count, last_*_txn}"]
        zone["zone\nZoneMapStats per-granule\n{min/max per column}"]
    end

    subgraph CF_MVCC["MVCC 层"]
        direction TB
        mvcc_cf["mvcc\nKey: active:{txn_id}\nValue: begin_ts\n追踪活跃事务"]
        sys["sys\nKey: __system__:committed_txn\nValue: max_committed_txn_id"]
    end

    subgraph CF_Layer["分层存储"]
        direction TB
        layer["layer\nImmutable Layer 快照\n支持历史数据查询"]
    end

    subgraph CF_Iceberg["Iceberg"]
        direction TB
        iceberg["iceberg_manifest\nKey: iceberg:latest\nValue: IcebergExport (bincode)\n原生 Iceberg 清单"]
    end

索引双写策略:每条 Insert 同时写入 pk_idx(O(1) 点查)和 pk_skiplist(O(log n) 范围扫描),牺牲写入性能换取读取灵活性。

graph TD
    INSERT["INSERT (table, pk, cols)"] --> IDX["双写索引"]
    IDX --> H["pk_idx CF\nkey = table:pk → IndexEntry"]
    IDX --> S["pk_skiplist CF\nkey = table:pk → IndexEntry"]

    H --> BF_U["bf CF 更新\nPer-granule Bloom Filter"]
    S --> ZM_U["zone CF 更新\nZone Map min/max"]

    BF_U --> DONE["写入完成"]
    ZM_U --> DONE

自适应删除掩码

DelMask 根据删除率自动选择最优存储格式,触发模式切换:

stateDiagram-v2
    [*] --> Empty : new()
    Empty --> SkipList : 第 1 次删除
    SkipList --> SkipList : del_ratio < 1%
    SkipList --> Roaring : del_ratio 突破 1%

    Roaring --> Roaring : 1% < del_ratio < 50%
    Roaring --> FullBitmap : del_ratio 突破 50%
    Roaring --> SkipList : del_ratio 回落到 < 1%

    FullBitmap --> FullBitmap : del_ratio > 50%
    FullBitmap --> Compaction : 触发 Compaction

    Compaction --> [*] : 删除行物理清除
graph LR
    subgraph Threshold1["del_ratio < 1%"]
        DS1["SkipList~Vec<u32>\n只存已删除行号"]
    end

    subgraph Threshold2["1% ≤ del_ratio < 50%"]
        DS2["RoaringBitmap\n位图压缩,范围查询快"]
    end

    subgraph Threshold3["del_ratio ≥ 50%"]
        DS3["FullBitmap~Vec<u8>\n每行 1 bit + Compaction 触发"]
    end

    DS1 -.->|"add_delete()\n自动切换"| DS2
    DS2 -.->|"add_delete()\n自动切换"| DS3

自适应列编码

AdaptiveEncoder 分析真实数据特征,推荐最优编码方案:

flowchart TD
    START["analyze_column_array(array)\n采样 10K 行"] --> CARD{"cardinality"}

    CARD -->|"< 1000"| LOW_CARD["Dict 编码\nconfidence=0.9"]
    CARD -->|"≥ 1000"| SORTED{"is_sorted?"}

    SORTED -->|"true"| DELTA_SORT["Delta 编码\nconfidence=0.85\n单调递增/递减"]
    SORTED -->|"false"| DTYPE{"dtype"}

    DTYPE -->|"Float32/Float64"| FLOAT{"compression_hint"}
    FLOAT -->|"> 0.5\n(低方差)"| ALP["ALP 编码\nconfidence=0.7"]
    FLOAT -->|"≤ 0.5\n(高方差)"| GORILLA["Gorilla 编码\nconfidence=0.75"]

    DTYPE -->|"Int/UInt"| RANGE["min/max 范围\n< cardinality × 2?"]
    RANGE -->|Yes| DELTA_RANGE["Delta 编码\nconfidence=0.8"]
    RANGE -->|No| RAW["Raw (无编码)\nconfidence=0.5"]

Block 级统计信息

每 1024 行为一个 Block,记录热点列的 min/max,用于 granule 内谓词下推:

graph TB
    G["Granule (1MB, ~1024 rows)"]
    G --> B1["Block 0\nrows 0-1023\nstats: col.age [10, 90]"]
    G --> B2["Block 1\nrows 1024-2047\nstats: col.age [20, 80]"]

    B1 --> Q1{"查询: age > 85?"}
    Q1 -->|"min=10, max=90\n10 < 85, 90 ≥ 85\n→ 可能有结果, 不裁剪"| KEEP1["保留 Block 0"]
    Q1 -->|"max=80 < 85\n→ 整个 Block 裁剪"| SKIP1["跳过 Block 0"]

    B2 --> Q2{"查询: age > 85?"}
    Q2 -->|80 < 85| SKIP2["跳过 Block 1"]

HTAP 双存储路由

ReadPath 决策树

flowchart TD
    START["choose_read_path(RouterParams)"] --> UPDATES{"has_updates\ndelta_count > 0?"}
    UPDATES -->|No| VX["ReadPath::VortexOnly\n全部走 Vortex"]

    UPDATES -->|Yes| QTYPE{"query_type"}

    QTYPE -->|"PointGet"| DSO["ReadPath::DeltaStoreOnly\n只读 DeltaStore\n最新修改优先"]

    QTYPE -->|"FullScan\n+ has_updates"| MERGE["ReadPath::Merge\nDeltaStore overlay + Vortex"]

    QTYPE -->|"Aggregate"| SEL{"filter_selectivity"}
    SEL -->|"> 0.1"| VX_A["VortexOnly\n大量数据扫描"]
    SEL -->|"≤ 0.1"| DSO_A["DeltaStoreOnly\n少量数据聚合"]

    QTYPE -->|"RangeScan"| SEL_R{"filter_selectivity\n+ delta_count"}

    SEL_R -->|"sel < 0.01\ndelta_count < 100"| DSO_R["DeltaStoreOnly\n高精度点查"]

    SEL_R -->|"sel > 0.5\n或 delta_count > 1000"| MERGE_R["ReadPath::Merge\n大范围扫描"]

    SEL_R -->|其他| MERGE_D["ReadPath::Merge\n默认路径"]

DeltaStore 数据结构

classDiagram
    class DeltaStore {
        +String seg_id
        +BTreeMap~TxnId, HashMap~String, HashMap~u64, CellDelta~~ deltas
        +Option~BTreeMap~TxnId, ()~ committed_txns
    }

    class CellDelta {
        +u64 row
        +String col
        +DeltaOpType op
        +Option~Vec~before
        +Option~Vec~after
        +TxnId txn_id
    }

    class DeltaOpType {
        <<enumeration>>
        Update
        Delete
        Insert
    }

    DeltaStore "1" --> "*" CellDelta
    CellDelta --> DeltaOpType

DeltaStore overlay 合并过程

sequenceDiagram
    participant Vx as Vortex 列存
    participant DS as DeltaStore
    participant Scan as read/scan.rs

    Scan->>Vx: 读取 Vortex 原始数据
    Vx-->>Scan: RecordBatch {id:[1,2,3], age:[20,30,40]}

    Scan->>DS: get_all_visible_deltas()
    DS-->>Scan: { "age" → { 1: CellDelta{before:20, after:25} } }

    Note over Scan: apply_deltas_overlay()
    Scan->>Scan: row 1 的 age 20 → 25

    Scan-->>Result: RecordBatch {id:[1,2,3], age:[25,30,40]}

写入路径

flowchart TD
    A["insert() / insert_batch()"] --> B["txn_id = next_txn_id()"]
    B --> C["WAL — Begin 记录"]
    C --> D["columns → RecordBatch"]
    D --> E["allocate_position()\n查找/创建活跃 Segment"]

    E --> F["write_segment_batch()\n追加到 {col}.vortex"]
    F --> G["_del.vortex 新增位置=false"]
    G --> H["双写 RocksDB 索引"]

    H --> H1["pk_idx CF\npk:table:pk → IndexEntry"]
    H --> H2["pk_skiplist CF\n有序键,支持范围扫描"]

    H2 --> I["Bloom Filter 更新\nsegment_bloom_filters 缓存"]
    I --> J["WAL — Commit + flush"]
    J --> K["committed_txn 持久化到 sys CF"]
    K --> L["返回 TxnId"]

WAL Block 格式

graph RL
    subgraph WAL_File["wal_000000.bin (顺序追加)"]
        direction RL
        B1["Block 1 (32KB)"]
        B2["Block 2 (32KB)"]
        B3["Block 3 (32KB)"]
    end

    subgraph Block_N["Block Header (16 bytes)"]
        HDR["block_seq (8B) | used_bytes (4B) | header_crc (4B)"]
    end

    subgraph Records["Records (≤ 32752 bytes)"]
        R1["op_type (1B) | txn_id (8B) | payload_len (4B) | payload | crc32 (4B)"]
        R2["op_type (1B) | txn_id (8B) | payload_len (4B) | payload | crc32 (4B)"]
        R3["..."]
    end

    Block_N --> Records
    Records --> R1
    Records --> R2
    Records --> R3

WAL 崩溃恢复流程

flowchart LR
    START["RockDuck 启动"] --> RECOVER["recover_from_wal()"]
    RECOVER --> LIST["list_wal_files()\n扫描 wal_*.bin"]
    LIST --> SCAN["scan_committed_records()"]
    SCAN --> FSM["重建事务状态机"]

    FSM --> T1["Begin / Insert / Delete / Update"]
    T1 --> COMMIT{"OpType == Commit?"}
    T1 --> ROLLBACK{"OpType == Rollback?"}

    COMMIT -->|"收集该 txn 的所有记录"| COMMIT_COLLECT
    ROLLBACK -->|"丢弃该 txn 的所有记录"| ROLLBACK_DROP

    COMMIT_COLLECT --> REPLAY["重放到 RocksDB\npk_idx + pk_skiplist"]
    ROLLBACK_DROP --> DONE["忽略"]
    REPLAY --> DONE

    DONE --> MAX["max_committed_txn 更新"]

读取路径

flowchart TD
    A["get(pk) / scan(pk_range, filter)"] --> BF["Bloom Filter 检查\n快速跳过不存在的 PK"]
    A --> ZM["Zone Map 裁剪\n跳过不包含查询值的数据块"]
    A --> BK["Block Stats 裁剪\nGranule 内谓词下推"]

    BF --> IDX["RocksDB pk_idx 查找\n→ IndexEntry\n{seg_id, granule_id, offset}"]
    ZM --> IDX
    BK --> IDX

    IDX --> STATUS{"Segment 状态"}

    STATUS -->|"Active / Compactable"| BUF["BufReader 读取\n_arrow-ipc FileReader_"]
    STATUS -->|"Frozen"| MMAP["mmap 零拷贝\nArc<Mmap> 缓存共享"]

    BUF --> MVCC["MVCC 可见性过滤\nis_visible(snapshot, ...)"]
    MMAP --> MVCC

    MVCC --> DM["Del Mask 应用\n已删除行过滤"]
    DM --> FILT["Filter 表达式求值\nArrow compute filter"]
    FILT --> RB["RecordBatch 返回"]

Vortex 文件布局

graph TD
    subgraph Segment["segments/{seg_id}/"]
        direction TB
        META["_meta.vortex\nSegment 元数据\nSegmentMeta (bincode)"]
        DEL["_del.vortex\n删除掩码\nBooleanArray"]

        COL1["{col1}.vortex\nArrow IPC 列文件"]
        COL2["{col2}.vortex\nArrow IPC 列文件"]
        COLN["..."]

        UPD["_upd_age.vortex\n更新掩码"]
        ZM["_zm.json\nZone Map"]
    end

    subgraph VortexReader["VortexReader"]
        READER["read_column(seg_id, col)\n自动选择读取路径"]
        READER --> META_CHECK{"meta.status"}
        META_CHECK -->|Frozen| MMAP_READ["read_column_mmap_internal()"]
        META_CHECK -->|Active| BUF_READ["read_arrow_file()"]
        MMAP_READ --> ARC_CACHE["Arc<Mmap> 缓存"]
    end

    VortexReader --> COL1
    VortexReader --> COL2
    VortexReader --> DEL

Filter 表达式解析与求值

filter_expr.rs 实现了一个手写的表达式解析器,不依赖外部 SQL 解析库:

flowchart TD
    RAW["WHERE age > 30 AND name = 'Alice' OR NOT deleted"]
    RAW --> TOKEN["Tokenizer"]
    TOKEN --> PARSE["Recursive Descent Parser"]
    PARSE --> AST["Expr AST"]
    AST --> EVAL["evaluate(batch, &Expr) → BooleanArray"]

    subgraph AST["Expr AST"]
        OR["Or"]
        AND1["And"]
        GT["Compare(age > 30)"]
        EQ["Compare(name = 'Alice')"]
        NOT_DEL["Not"]
        DEL["Compare(deleted = true)"]
        OR --> AND1
        OR --> NOT_DEL
        AND1 --> GT
        AND1 --> EQ
        NOT_DEL --> DEL
    end

    EVAL --> MASK["BooleanMask → Arrow Compute Filter"]

支持的操作符

类别 操作符
比较 >, >=, <, <=, =, !=
逻辑 AND, OR, NOT
括号 (, )
字面量 整数、浮点、字符串、布尔

求值策略:先序遍历 AST,短路求值(遇到 false AND ... 直接返回,跳过后续列读取)。


Compaction 调度

优先级队列与评分公式

flowchart LR
    subgraph Evaluate["evaluate() — 遍历所有 Segment"]
        direction TB
        E1["读取 SegmentMeta"]
        E1 --> SCORE["calculate_priority()"]
        SCORE --> REASON["determine_reason()"]
    end

    SCORE -->|"del_score = del_ratio² × 10\n+ size_score = log₂(MB) × 0.5\n+ age_score = log₂(hours) × 0.3"| FORMULA["priority = del_score\n  + size_score\n  + age_score"]
    FORMULA --> PUSH["BinaryHeap.push()\n优先级队列"]

Compaction 原因判定

flowchart TD
    M["SegmentMeta"] --> DEL{"del_ratio > 0.5?"}
    DEL -->|Yes| HDR["CompactionReason::HighDeleteRatio"]
    DEL -->|No| SIZE{"size < 1MB?"}
    SIZE -->|Yes| SF["CompactionReason::SmallFile"]
    SIZE -->|No| UPD{"upd_ratio > 0.3\n且 del_ratio < 0.5?"}
    UPD -->|Yes| INC["CompactionReason::IncrementalMaterialize"]
    UPD -->|No| PER["CompactionReason::Periodic"]

Feature 5: 查询反馈增强的 Compaction 优先级

flowchart TD
    BASE["base_score\n= del²×10 + size×0.5 + age×0.3"] --> FEEDBACK

    subgraph FEEDBACK["Query Feedback 惩罚"]
        STALE["Zone Map 失准\nstaleness_penalty × 5.0"]
        MISS["裁剪失效\n(1 - prune_hit_ratio) × 3.0"]
    end

    BASE --> PEN1["+ staleness_penalty"]
    PEN1 --> PEN2["+ miss_penalty"]
    PEN2 --> FINAL["final_priority\n用于 BinaryHeap 排序"]

PDT Merge Compaction

位置删除(PDT)原理

传统 Compaction 比较 key 值再决定去留。PDT(Positional Delete Tracking)只处理位置变化,不比较数据:

graph LR
    subgraph Old["旧 Segment"]
        O1["位置 0: alive"]
        O2["位置 1: deleted ✗"]
        O3["位置 2: alive"]
        O4["位置 3: deleted ✗"]
        O5["位置 4: alive"]
    end

    subgraph PDT["PDT Merge"]
        DM["DelMask\nSkipList / RoaringBitmap"]
    end

    subgraph New["新 Segment"]
        N1["位置 0: row0 的值"]
        N2["位置 1: row2 的值 ← 跳过 deleted"]
        N3["位置 2: row4 的值 ← 跳过 deleted"]
    end

    Old -->|读取 DelMask| PDT
    PDT -->|存活位置列表| New

核心收益:I/O 量 = 有效数据量,而非总数据量。

多路合并

flowchart LR
    S1["Segment A\n(del_ratio=60%)"] --> M["PDT multiway_merge()"]
    S2["Segment B\n(del_ratio=55%)"] --> M
    S3["Segment C\n(del_ratio=70%)"] --> M
    M --> OUT["新 Segment\n(del_ratio=0%)"]

Query Feedback — 自适应 Compaction

工作原理

QueryFeedbackCollector 追踪 Zone Map 裁剪命中率,影响 Compaction 优先级:

flowchart TD
    Q["查询 SELECT * WHERE age > 30"]

    Q --> ZM["Zone Map 估算\nGranule 0-9 全部可能包含"]
    ZM --> COMPARE{"实际匹配 Granule 数"}

    COMPARE -->|估算 10,实际 8+| HIT["prune_hit\nstaleness_penalty -= 0.05"]
    COMPARE -->|估�� 10,实际 1| MISS["prune_miss\nstaleness_penalty += 0.1\nmiss_penalty += 3.0"]

    HIT --> PRIORITY["优先级分数"]
    MISS --> PRIORITY
    PRIORITY --> HEAP["CompactionHeap"]

staleness_penalty 与 prune_hit_ratio

状态 staleness_penalty prune_hit_ratio
从未查询 0.0 0.5(中立)
多次 miss → 1.0(封顶) → 0.0
多次 hit → 0.0(下限) → 1.0

惩罚分数加入 Compaction 优先级:priority += staleness_penalty × 5.0 + (1 - prune_hit_ratio) × 3.0


Iceberg v2 导出

双层设计

flowchart LR
    subgraph Hot["热路径 (Native in-RocksDB)"]
        direction TB
        H1["IcebergExport\n(bincode 序列化)"]
        H2["CF: iceberg_manifest\nKey: iceberg:latest"]
        H3["freeze_segment()\n自动更新清单"]
    end

    subgraph Cold["冷路径 (On-Demand Spec-Compliant)"]
        direction TB
        C1["export_to_iceberg()"]
        C2["v{N}.metadata.json\n(TableMetadata)"]
        C3["snap-*.avro\n(Manifest List)"]
        C4["*-m0.avro\n(Manifest)"]
        C5["data/segments/"]
    end

    Hot -->|"freeze_for_iceberg()"| Cold

Iceberg 导出目录结构

graph TD
    root["target/ (Iceberg Table Root)"]
    root --> vh["version-hint.txt\n\"2\""]
    root --> meta["metadata/"]
    root --> data["data/"]

    meta --> vm["v{snapshot_id}.metadata.json"]
    meta --> ml["snap-{id}-{seq}-{uuid}.avro\n(Manifest List)"]
    meta --> vh2["version-hint.txt\n\"2\""]

    data --> segs["segments/"]
    segs --> s1["{seg_id}/"]
    segs --> s2["{seg_id}/"]

    s1 --> c1["{col1}.vortex\n(Vortex 数据文件)"]
    s1 --> c2["{col2}.vortex"]
    s1 --> dm["_del.vortex"]
    s1 --> m_["_meta.vortex"]

Iceberg 导出流程

flowchart TD
    START["export_to_iceberg()"] --> COLLECT["收集所有 Frozen segments"]
    COLLECT --> FIELDS["提取 field_id_map\ncol → Iceberg field_id"]
    FIELDS --> ENTRIES["translate::\nbuild_data_file_entries()"]
    ENTRIES --> MANIFEST["加载/创建 IcebergExport"]
    MANIFEST --> DIRS["创建目录结构\n(metadata/, data/)"]
    DIRS --> COPY["复制 Vortex 文件到 data/"]
    COPY --> SCHEMA["translate::to_iceberg_schema()"]
    SCHEMA --> AVRO_M["write_manifest_avro_sync()"]
    AVRO_M --> AVRO_ML["write_manifest_list_avro_sync()"]
    AVRO_ML --> JSON["write TableMetadata JSON"]
    JSON --> FSYNC["sync_file() / sync_dir()\n(Windows: FlushFileBuffers)"]
    FSYNC --> SAVE["save_manifest() → RocksDB"]
    SAVE --> DONE["返回 metadata_path"]

RocksDB Iceberg Manifest 存储

graph LR
    subgraph iceberg_manifest_CF["iceberg_manifest Column Family"]
        K1["iceberg:latest\n→ IcebergExport (bincode)"]
        K2["iceberg:history\n→ Vec<SnapshotRef> (bincode)"]
    end

    subgraph SnapshotRef["SnapshotRef"]
        SR1["name: \"main\""]
        SR2["snapshot_id: 12345"]
        SR3["type_: \"branch\""]
    end

    K1 --> IcebergExport["IcebergExport\nsnapshot_id, sequence_number\nentries: Vec<DataFileEntry>"]
    IcebergExport --> DFE["DataFileEntry\nfile_path, record_count\nlower_bounds, upper_bounds\nnull_counts, split_offsets"]

DataFileEntry 结构

classDiagram
    class DataFileEntry {
        +String file_path
        +String file_format = "VORTEX"
        +u64 record_count
        +u64 file_size
        +HashMap~i32, Vec~u8~~ lower_bounds
        +HashMap~i32, Vec~u8~~ upper_bounds
        +HashMap~i32, u64~ null_counts
        +Vec~u64~ split_offsets
        +i32 sort_order_id = 1
    }

DuckDB 集成

自定义 VTab 流式推送

DuckDB 自带的 ArrowVTab 会将所有 RecordBatch concat 成一个巨大批次,RockDuck 实现了自定义 RockDuckVTab

sequenceDiagram
    participant DuckDB as DuckDB 查询引擎
    participant VTab as RockDuckVTab

    Note over DuckDB,VTab: bind() 阶段(一次性)
    DuckDB->>VTab: bind(path)
    VTab->>RockDuck: RockDuck::open(path)
    VTab->>VTab: scan("default")
    VTab-->>DuckDB: 注册 schema 列 + cardinality

    Note over DuckDB,VTab: init() 阶段(一次性)
    DuckDB->>VTab: init()
    VTab-->>DuckDB: max_threads = 1

    Note over DuckDB,VTab: func() 阶段(按需多次调用)
    DuckDB->>VTab: func(output)
    VTab->>VTab: batch_index.fetch_add(1, Relaxed)
    alt 还有批次
        VTab-->>DuckDB: record_batch_to_duckdb_data_chunk(batch[idx])
    else 所有批次已推送
        VTab-->>DuckDB: set_len(0)
    end

    DuckDB->>VTab: func(output)
    Note right of VTab: batch_index = 1
    VTab-->>DuckDB: batches[1]

DuckDB SQL 表函数

graph LR
    subgraph TableFunctions["docdb_* 表函数"]
        F1["docdb_scan(path)\n扫描 RockDuck 表"]
        F2["docdb_iceberg_info(path)\n读取 metadata.json"]
        F3["docdb_iceberg_entries(data_dir)\n列出 Vortex 数据文件"]
    end

    F1 --> DuckDB_SQL["DuckDB SQL 查询"]
    F2 --> DuckDB_SQL
    F3 --> DuckDB_SQL
-- DuckDB 中使用示例
SELECT * FROM docdb_scan('/path/to/rockduck/data');

INSTALL vortex;
LOAD vortex;
SELECT * FROM read_vortex('/path/to/exported/segments/*/*.vortex');

端到端测试覆盖

tests/integration_tests.rs 包含 30+ 个端到端测试,覆盖完整生命周期:

flowchart TD
    subgraph Lifecycle["数据库生命周期"]
        L1["test_open_database_default"]
        L2["test_open_database_custom_config"]
        L3["test_data_persists_after_reopen"]
    end

    subgraph Write["写入测试"]
        W1["test_insert_single_record_then_get"]
        W2["test_insert_batch_then_scan_all"]
        W3["test_batch_insert_individual_point_gets"]
        W4["test_large_batch_insert_and_scan"]
        W5["test_flush_succeeds"]
        W6["test_next_txn_id_incrementing"]
    end

    subgraph Delete["删除测试"]
        D1["test_delete_then_point_get_returns_none"]
        D2["test_deleted_records_excluded_from_scan"]
        D3["test_double_delete_is_idempotent"]
        D4["test_delete_then_insert_same_key"]
        D5["test_delete_nonexistent_returns_error"]
    end

    subgraph Scan["扫描测试"]
        S1["test_scan_all_records"]
        S2["test_scan_with_pk_range_half_open"]
        S3["test_scan_nonexistent_table"]
        S4["test_scan_empty_range_returns_nothing"]
        S5["test_scan_with_filter_returns_correct_rows"]
    end

    subgraph Stats["统计测试"]
        T1["test_table_stats_row_count_matches_scan"]
        T2["test_table_stats_alive_rows_after_delete"]
        T3["test_table_stats_basic"]
        T4["test_table_stats_del_ratio_zero"]
    end

    subgraph Segment["Segment 测试"]
        G1["test_list_segments"]
        G2["test_get_segment_meta"]
        G3["test_list_segments_returns_after_insert"]
        G4["test_mmap_read_returns_same_as_bufreader"]
    end

    subgraph MultiTable["多表隔离"]
        M1["test_multiple_tables_data_isolation"]
    end

测试原则:每个写入测试必须验证数据能被正确读回,确保端到端数据一致性。


配置参考

配置项 默认值 说明
data_dir ./rockduck_data 根数据目录
granule_size 1 MB 每 Granule 的行数
segment_target_size 1 GB Segment 目标大小
num_threads CPU 核心数 并行度
enable_bloom_filter true 写入路径布隆过滤器
bloom_filter_fpp 0.01 布隆过滤器假阳性率
enable_zone_map true Granule 级 min/max 统计
enable_compression true 列压缩
compression_algorithm "lz4" lz4 / zstd / snappy
enable_wal true 写前日志(崩溃恢复)
wal_max_file_size 128 MB WAL 文件轮转阈值

核心模块一览

模块 文件 职责
入口 db.rs RockDuck 主结构体,所有公开 API,WAL 恢复编排
存储 storage/vortex.rs VortexWriter/VortexReader,支持 BufReader 和 mmap 零拷贝
元数据 metadata/rocksdb.rs RocksDB 初始化,12 个 Column Family 管理
MVCC mvcc/visibility.rs 可见性管理,三种隔离级别,Time-Travel 快照
WAL write/wal.rs 32KB Block + CRC32,崩溃恢复,WAL rotation
写入 write/insert.rs Insert/Delete/Update,双索引(Hash + Skiplist)写入
读取 read/scan.rs 范围扫描,pk_skiplist 有序遍历,DeltaStore overlay 合并
点查 read/point_get.rs 主键查找,LBF(Learned Bloom Filter)预测,Bloom Filter 检查
删除掩码 segment/del_mask.rs SkipList / RoaringBitmap / FullBitmap 自适应切换
DeltaStore segment/delta_store.rs Cell 级更新追踪,before/after 镜像,MVCC 可见性过滤
列编码 segment/encoding.rs AdaptiveEncoder,真实数据采样分析,自适应编码推荐
Segment Layout segment/layout.rs PAX 目录结构,文件命名规范
Segment 元数据 segment/meta.rs SegmentMeta/GranuleMeta/BlockStats,Zone Map,CompareOp
路由 query/router.rs HTAP 三路径选择(VortexOnly / DeltaStoreOnly / Merge)
VTab query/vtab.rs RockDuckVTab 流式推送,AtomicUsize 批次索引追踪
DuckDB 函数 query/duckdb_ext.rs docdb_scandocdb_iceberg_infodocdb_iceberg_entries
过滤器表达式 query/filter_expr.rs 解析器(tokenizer/parser),支持 AND/OR/NOT/比较符
查询反馈 query/feedback.rs QueryFeedbackCollector,Zone Map 命中率追踪,staleness penalty
Compaction 调度 compaction/scheduler.rs BinaryHeap 优先级队列,评分公式,CompactionReason 判定
PDT Merge compaction/pdt_merge.rs 位置删除合并,多路合并,MergeStats 统计
Iceberg 导出 iceberg/export.rs Iceberg v2 导出编排器
Iceberg Avro iceberg/avro_writer.rs Avro Manifest 写入,3 个硬编码 Avro Schema
Iceberg 清单 iceberg/catalog.rs RocksDB 内 Iceberg 原生清单存储
Iceberg 翻译 iceberg/translate.rs Arrow -> Iceberg Schema 翻译,DataFileEntry 构建
配置 config.rs RockDuckConfig Builder,支持 bloom/zone_map/compression/WAL 配置

许可

Apache License 2.0