PubSub
Nucleus supports PostgreSQL-compatible LISTEN/NOTIFY for real-time messaging, plus Redis-compatible streams for durable event processing.
LISTEN / NOTIFY
Subscribe to a Channel
LISTEN order_events;
The connection receives all messages published to order_events until UNLISTEN is called.
Publish a Message
NOTIFY order_events, '{"order_id": 42, "status": "shipped"}';
All connections listening on order_events receive the payload.
Unsubscribe
UNLISTEN order_events;
Cluster-Wide Delivery
In clustered deployments, NOTIFY on one node delivers to LISTEN on all nodes. Messages propagate via gossip-based subscription routing — no configuration required.
PubSub Hub
Internally, Nucleus uses a PubSubHub backed by Tokio broadcast channels. Each channel gets an independent sender, so high-throughput topics don't block unrelated channels.
Distributed Routing
When running in PrimaryReplica or MultiRaft mode, the DistributedPubSubRouter automatically:
- Propagates subscription state across nodes via gossip
- Routes
NOTIFYmessages to all nodes with active listeners - Delivers locally through in-process broadcast channels
Job Queue
Nucleus includes a built-in priority job queue accessible via PubSub infrastructure:
Priority Levels
| Level | Value | Use Case | |-------|-------|----------| | Low | 0 | Background cleanup | | Normal | 1 | Standard processing | | High | 2 | User-facing tasks | | Critical | 3 | Payment processing, alerts |
Features
- Auto-retry — Failed jobs re-enqueue up to
max_retries - Dead-letter queue — Jobs exceeding retry limit are moved to DLQ
- Priority ordering — Higher priority jobs dequeue first
- At-least-once delivery — Jobs must be explicitly completed
Client Integration
Rust
use neutron::nucleus::NucleusPool;
let pool = NucleusPool::connect("postgres://localhost:5432/mydb").await?;
// Subscribe
pool.execute("LISTEN order_events").await?;
// Publish
pool.execute("NOTIFY order_events, 'hello'").await?;
Go
client := nucleus.Connect("postgres://localhost:5432/mydb")
// PubSub model
ps := client.PubSub()
ps.Listen("order_events")
ps.Notify("order_events", "hello")
Python
client = await nucleus_connect("postgres://localhost:5432/mydb")
await client.execute("LISTEN order_events")
await client.execute("NOTIFY order_events, 'hello'")
Streams
For durable, ordered event processing, see Streams — Nucleus's Redis-compatible append-only log with consumer groups.