Real-Time

Neutron provides three real-time communication patterns: WebSocket for bidirectional messaging, SSE for server-to-client streaming, and PubSub for broadcast.

WebSocket

Bidirectional communication using fastwebsockets:

use neutron::ws::{WebSocketUpgrade, WebSocket, Message};

async fn ws_handler(ws: WebSocketUpgrade) -> Response {
    ws.on_upgrade(|mut socket| async move {
        while let Some(msg) = socket.recv().await {
            match msg {
                Message::Text(text) => {
                    socket.send(Message::text(format!("Echo: {text}"))).await.ok();
                }
                Message::Binary(data) => {
                    socket.send(Message::binary(data)).await.ok();
                }
                Message::Close(_) => break,
                _ => {}
            }
        }
    })
}

let router = Router::new()
    .get("/ws", ws_handler);

Message Types

| Type | Description | |------|-------------| | Message::Text(String) | UTF-8 text frame | | Message::Binary(Vec<u8>) | Binary frame | | Message::Ping(Vec<u8>) | Ping control frame | | Message::Pong(Vec<u8>) | Pong response frame | | Message::Close(Option<CloseFrame>) | Close with code + reason |

Server-Sent Events

One-way server-to-client streaming:

use neutron::sse::{Sse, SseEvent};

async fn events() -> Sse {
    Sse::new(async_stream::stream! {
        yield SseEvent::new().data("connected");

        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            interval.tick().await;
            yield SseEvent::new()
                .event("tick")
                .data(&chrono::Utc::now().to_rfc3339());
        }
    })
}

let router = Router::new()
    .get("/events", events);

Client-side:

const es = new EventSource("/events");
es.addEventListener("tick", (e) => {
    console.log("Server time:", e.data);
});

PubSub

In-memory publish/subscribe for broadcasting messages:

use neutron::pubsub::PubSub;

let router = Router::new()
    .state(PubSub::new())
    .post("/broadcast", broadcast)
    .get("/subscribe", subscribe);

async fn broadcast(
    State(ps): State<PubSub>,
    Json(body): Json<serde_json::Value>,
) -> StatusCode {
    ps.publish("notifications", &body.to_string());
    StatusCode::OK
}

async fn subscribe(State(ps): State<PubSub>) -> Sse {
    let mut rx = ps.subscribe::<String>("notifications");

    Sse::new(async_stream::stream! {
        while let Ok(msg) = rx.recv().await {
            yield SseEvent::new()
                .event("notification")
                .data(&msg);
        }
    })
}

Patterns

Chat Room

async fn chat(ws: WebSocketUpgrade, State(ps): State<PubSub>) -> Response {
    ws.on_upgrade(|mut socket| async move {
        let mut rx = ps.subscribe::<String>("chat");

        // Forward PubSub messages to WebSocket
        let send_task = tokio::spawn(async move {
            while let Ok(msg) = rx.recv().await {
                if socket.send(Message::text(msg)).await.is_err() {
                    break;
                }
            }
        });

        // Forward WebSocket messages to PubSub
        while let Some(Message::Text(text)) = socket.recv().await {
            ps.publish("chat", &text);
        }

        send_task.abort();
    })
}

Live Dashboard

Combine SSE with database polling:

async fn metrics_stream(State(db): State<Client>) -> Sse {
    Sse::new(async_stream::stream! {
        loop {
            let row = db.query_one("SELECT TS_LAST('cpu_usage')", &[]).await.unwrap();
            let cpu: f64 = row.get(0);

            yield SseEvent::new()
                .event("metrics")
                .data(&serde_json::json!({ "cpu": cpu }).to_string());

            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    })
}