Change Data Capture
Nucleus provides built-in Change Data Capture (CDC) — no external connectors required. Changes are captured at the executor level and delivered through broadcast channels.
Change Events
Every INSERT, UPDATE, and DELETE emits a ChangeEvent:
ChangeEvent {
table: "orders"
change_type: Insert | Update | Delete
new_row: [42, "shipped", 99.99] // present on Insert/Update
old_row: [42, "pending", 99.99] // present on Update/Delete
timestamp: 1709827200000
}
How It Works
- DML operations (INSERT, UPDATE, DELETE) execute normally
- After committing, the executor calls
notify_changewith the affected rows - A per-table
ChangeNotifierbroadcasts events to all subscribers - Subscribers receive events through async channels
Row-Level Subscriptions
Subscribe to changes on a specific table:
Rust
let mut rx = client.subscribe_changes("orders").await?;
while let Some(event) = rx.recv().await {
match event.change_type {
ChangeType::Insert => println!("New order: {:?}", event.new_row),
ChangeType::Update => println!("Updated: {:?} -> {:?}", event.old_row, event.new_row),
ChangeType::Delete => println!("Deleted: {:?}", event.old_row),
}
}
Go
cdc := client.CDC()
ch := cdc.Subscribe("orders")
for event := range ch {
fmt.Printf("Change: %s on %s\n", event.Type, event.Table)
}
Python
async for event in client.cdc.subscribe("orders"):
if event.change_type == "insert":
print(f"New row: {event.new_row}")
Query-Level Subscriptions
Subscribe to the result set of a SQL query. Nucleus tracks which tables the query depends on and pushes diffs when results change.
-- Subscribe to a query (returns subscription_id)
SUBSCRIBE SELECT * FROM orders WHERE status = 'pending';
When any row in orders changes, Nucleus re-evaluates the query and returns a QueryDiff:
QueryDiff {
subscription_id: 1
added_rows: [[43, "pending", 29.99]] // rows that now match
removed_rows: [[42, "pending", 99.99]] // rows that no longer match
}
Polling for Diffs
FETCH SUBSCRIPTION 1;
Returns up to 1000 buffered diffs per call.
Scheduled Tasks
Nucleus includes a cron-like task scheduler for periodic CDC processing:
-- Run every 60 seconds
SCHEDULE 'archive_old_orders'
EVERY 60 SECONDS
AS 'INSERT INTO orders_archive SELECT * FROM orders WHERE created_at < now() - interval 30 day';
Schedule Types
| Type | Example |
|------|---------|
| EVERY N SECONDS | Poll for changes frequently |
| EVERY N MINUTES | Periodic aggregation |
| EVERY N HOURS | Daily summaries |
| ONCE AFTER N | One-time delayed task |
Tasks track last_run, run_count, and last_error for observability.
Use Cases
- Event sourcing — Capture all state changes as an immutable log
- Cache invalidation — Bust caches when underlying data changes
- Search indexing — Keep FTS/vector indexes in sync with source tables
- Audit trails — Record who changed what and when
- Real-time dashboards — Push diffs to WebSocket clients via PubSub