Real-Time
Neutron provides three real-time communication patterns: WebSocket for bidirectional messaging, SSE for server-to-client streaming, and PubSub for broadcast.
WebSocket
Bidirectional communication using fastwebsockets:
use neutron::ws::{WebSocketUpgrade, WebSocket, Message};
async fn ws_handler(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(|mut socket| async move {
while let Some(msg) = socket.recv().await {
match msg {
Message::Text(text) => {
socket.send(Message::text(format!("Echo: {text}"))).await.ok();
}
Message::Binary(data) => {
socket.send(Message::binary(data)).await.ok();
}
Message::Close(_) => break,
_ => {}
}
}
})
}
let router = Router::new()
.get("/ws", ws_handler);
Message Types
| Type | Description |
|------|-------------|
| Message::Text(String) | UTF-8 text frame |
| Message::Binary(Vec<u8>) | Binary frame |
| Message::Ping(Vec<u8>) | Ping control frame |
| Message::Pong(Vec<u8>) | Pong response frame |
| Message::Close(Option<CloseFrame>) | Close with code + reason |
Server-Sent Events
One-way server-to-client streaming:
use neutron::sse::{Sse, SseEvent};
async fn events() -> Sse {
Sse::new(async_stream::stream! {
yield SseEvent::new().data("connected");
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
yield SseEvent::new()
.event("tick")
.data(&chrono::Utc::now().to_rfc3339());
}
})
}
let router = Router::new()
.get("/events", events);
Client-side:
const es = new EventSource("/events");
es.addEventListener("tick", (e) => {
console.log("Server time:", e.data);
});
PubSub
In-memory publish/subscribe for broadcasting messages:
use neutron::pubsub::PubSub;
let router = Router::new()
.state(PubSub::new())
.post("/broadcast", broadcast)
.get("/subscribe", subscribe);
async fn broadcast(
State(ps): State<PubSub>,
Json(body): Json<serde_json::Value>,
) -> StatusCode {
ps.publish("notifications", &body.to_string());
StatusCode::OK
}
async fn subscribe(State(ps): State<PubSub>) -> Sse {
let mut rx = ps.subscribe::<String>("notifications");
Sse::new(async_stream::stream! {
while let Ok(msg) = rx.recv().await {
yield SseEvent::new()
.event("notification")
.data(&msg);
}
})
}
Patterns
Chat Room
async fn chat(ws: WebSocketUpgrade, State(ps): State<PubSub>) -> Response {
ws.on_upgrade(|mut socket| async move {
let mut rx = ps.subscribe::<String>("chat");
// Forward PubSub messages to WebSocket
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if socket.send(Message::text(msg)).await.is_err() {
break;
}
}
});
// Forward WebSocket messages to PubSub
while let Some(Message::Text(text)) = socket.recv().await {
ps.publish("chat", &text);
}
send_task.abort();
})
}
Live Dashboard
Combine SSE with database polling:
async fn metrics_stream(State(db): State<Client>) -> Sse {
Sse::new(async_stream::stream! {
loop {
let row = db.query_one("SELECT TS_LAST('cpu_usage')", &[]).await.unwrap();
let cpu: f64 = row.get(0);
yield SseEvent::new()
.event("metrics")
.data(&serde_json::json!({ "cpu": cpu }).to_string());
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
}