Streams

Nucleus includes Redis-compatible streams for event sourcing, message queues, and real-time data pipelines. Append-only with consumer groups for reliable processing.

Adding Entries

-- Add an entry with field-value pairs
SELECT STREAM_XADD('events', 'user', 'alice', 'action', 'login', 'ip', '192.168.1.1');
-- → "1709856000000-0"

SELECT STREAM_XADD('events', 'user', 'bob', 'action', 'purchase', 'amount', '49.99');
-- → "1709856000001-0"

Each entry gets an auto-generated ID in the format <timestamp_ms>-<sequence>.

Reading Entries

-- Get entries in a time range (start_ms, end_ms, count)
SELECT STREAM_XRANGE('events', 0, 9999999999999, 100);

-- Read entries after a specific timestamp
SELECT STREAM_XREAD('events', 1709856000000, 10);

-- Stream length
SELECT STREAM_XLEN('events');
-- → 2

Consumer Groups

Consumer groups let multiple consumers process a stream cooperatively, with exactly-once delivery guarantees:

-- Create a consumer group starting from the beginning
SELECT STREAM_XGROUP_CREATE('events', 'processors', 0);

-- Read as a consumer (entries are assigned to this consumer)
SELECT STREAM_XREADGROUP('events', 'processors', 'worker-1', 10);

-- Acknowledge processing is complete
SELECT STREAM_XACK('events', 'processors', 1709856000000, 0);

How Consumer Groups Work

  1. Create a group on a stream with a starting position
  2. Read entries as a named consumer — entries are tracked per consumer
  3. Acknowledge entries after processing — removes from pending list
  4. Unacknowledged entries can be reclaimed if a consumer fails

Entry ID Format

IDs follow the Redis convention:

<millisecond_timestamp>-<sequence_number>

Examples:

  • 1709856000000-0 — First entry at that millisecond
  • 1709856000000-1 — Second entry at the same millisecond
  • * — Auto-generate with current timestamp

Use Cases

  • Event sourcing — Immutable event log for state reconstruction
  • Message queues — Producer/consumer with acknowledgment
  • Activity feeds — User actions, notifications, audit trails
  • Change data capture — Stream database changes to consumers
  • Real-time pipelines — Multi-stage data processing
  • Task distribution — Fan out work to multiple workers via consumer groups