PubSub

Nucleus supports PostgreSQL-compatible LISTEN/NOTIFY for real-time messaging, plus Redis-compatible streams for durable event processing.

LISTEN / NOTIFY

Subscribe to a Channel

LISTEN order_events;

The connection receives all messages published to order_events until UNLISTEN is called.

Publish a Message

NOTIFY order_events, '{"order_id": 42, "status": "shipped"}';

All connections listening on order_events receive the payload.

Unsubscribe

UNLISTEN order_events;

Cluster-Wide Delivery

In clustered deployments, NOTIFY on one node delivers to LISTEN on all nodes. Messages propagate via gossip-based subscription routing — no configuration required.

PubSub Hub

Internally, Nucleus uses a PubSubHub backed by Tokio broadcast channels. Each channel gets an independent sender, so high-throughput topics don't block unrelated channels.

Distributed Routing

When running in PrimaryReplica or MultiRaft mode, the DistributedPubSubRouter automatically:

  1. Propagates subscription state across nodes via gossip
  2. Routes NOTIFY messages to all nodes with active listeners
  3. Delivers locally through in-process broadcast channels

Job Queue

Nucleus includes a built-in priority job queue accessible via PubSub infrastructure:

Priority Levels

| Level | Value | Use Case | |-------|-------|----------| | Low | 0 | Background cleanup | | Normal | 1 | Standard processing | | High | 2 | User-facing tasks | | Critical | 3 | Payment processing, alerts |

Features

  • Auto-retry — Failed jobs re-enqueue up to max_retries
  • Dead-letter queue — Jobs exceeding retry limit are moved to DLQ
  • Priority ordering — Higher priority jobs dequeue first
  • At-least-once delivery — Jobs must be explicitly completed

Client Integration

Rust

use neutron::nucleus::NucleusPool;

let pool = NucleusPool::connect("postgres://localhost:5432/mydb").await?;

// Subscribe
pool.execute("LISTEN order_events").await?;

// Publish
pool.execute("NOTIFY order_events, 'hello'").await?;

Go

client := nucleus.Connect("postgres://localhost:5432/mydb")

// PubSub model
ps := client.PubSub()
ps.Listen("order_events")
ps.Notify("order_events", "hello")

Python

client = await nucleus_connect("postgres://localhost:5432/mydb")

await client.execute("LISTEN order_events")
await client.execute("NOTIFY order_events, 'hello'")

Streams

For durable, ordered event processing, see Streams — Nucleus's Redis-compatible append-only log with consumer groups.