Change Data Capture

Nucleus provides built-in Change Data Capture (CDC) — no external connectors required. Changes are captured at the executor level and delivered through broadcast channels.

Change Events

Every INSERT, UPDATE, and DELETE emits a ChangeEvent:

ChangeEvent {
    table:       "orders"
    change_type: Insert | Update | Delete
    new_row:     [42, "shipped", 99.99]    // present on Insert/Update
    old_row:     [42, "pending", 99.99]    // present on Update/Delete
    timestamp:   1709827200000
}

How It Works

  1. DML operations (INSERT, UPDATE, DELETE) execute normally
  2. After committing, the executor calls notify_change with the affected rows
  3. A per-table ChangeNotifier broadcasts events to all subscribers
  4. Subscribers receive events through async channels

Row-Level Subscriptions

Subscribe to changes on a specific table:

Rust

let mut rx = client.subscribe_changes("orders").await?;

while let Some(event) = rx.recv().await {
    match event.change_type {
        ChangeType::Insert => println!("New order: {:?}", event.new_row),
        ChangeType::Update => println!("Updated: {:?} -> {:?}", event.old_row, event.new_row),
        ChangeType::Delete => println!("Deleted: {:?}", event.old_row),
    }
}

Go

cdc := client.CDC()
ch := cdc.Subscribe("orders")

for event := range ch {
    fmt.Printf("Change: %s on %s\n", event.Type, event.Table)
}

Python

async for event in client.cdc.subscribe("orders"):
    if event.change_type == "insert":
        print(f"New row: {event.new_row}")

Query-Level Subscriptions

Subscribe to the result set of a SQL query. Nucleus tracks which tables the query depends on and pushes diffs when results change.

-- Subscribe to a query (returns subscription_id)
SUBSCRIBE SELECT * FROM orders WHERE status = 'pending';

When any row in orders changes, Nucleus re-evaluates the query and returns a QueryDiff:

QueryDiff {
    subscription_id: 1
    added_rows:   [[43, "pending", 29.99]]    // rows that now match
    removed_rows: [[42, "pending", 99.99]]    // rows that no longer match
}

Polling for Diffs

FETCH SUBSCRIPTION 1;

Returns up to 1000 buffered diffs per call.

Scheduled Tasks

Nucleus includes a cron-like task scheduler for periodic CDC processing:

-- Run every 60 seconds
SCHEDULE 'archive_old_orders'
    EVERY 60 SECONDS
    AS 'INSERT INTO orders_archive SELECT * FROM orders WHERE created_at < now() - interval 30 day';

Schedule Types

| Type | Example | |------|---------| | EVERY N SECONDS | Poll for changes frequently | | EVERY N MINUTES | Periodic aggregation | | EVERY N HOURS | Daily summaries | | ONCE AFTER N | One-time delayed task |

Tasks track last_run, run_count, and last_error for observability.

Use Cases

  • Event sourcing — Capture all state changes as an immutable log
  • Cache invalidation — Bust caches when underlying data changes
  • Search indexing — Keep FTS/vector indexes in sync with source tables
  • Audit trails — Record who changed what and when
  • Real-time dashboards — Push diffs to WebSocket clients via PubSub