Real-time
Neutron Python provides two real-time transports: WebSocket hub for bidirectional messaging and SSE for server-push streaming.
WebSocket Hub
Room-based broadcasting system for real-time applications.
Setup
from neutron.realtime import WebSocketHub
hub = WebSocketHub()
@app.websocket("/ws")
async def websocket_endpoint(websocket):
await hub.handle(websocket)
Client Protocol
Clients send JSON messages to join rooms, leave rooms, and broadcast:
// Join a room
{"action": "join", "room": "chat:general"}
// Leave a room
{"action": "leave", "room": "chat:general"}
// Send a message to a room
{"action": "message", "room": "chat:general", "data": {"text": "Hello!"}}
Server Responses
// Confirmation
{"action": "joined", "room": "chat:general"}
{"action": "left", "room": "chat:general"}
Server-Side Broadcasting
Push messages from any handler:
@app.post("/notifications/send")
async def send_notification(request):
data = await request.json()
# Broadcast to all connections in a room
count = await hub.broadcast(
room="notifications",
data={"type": "alert", "message": data["message"]},
exclude=None, # Optional: exclude a specific connection
)
return {"sent_to": count}
Hub Stats
hub.connection_count # Total active WebSocket connections
hub.room_count("chat:general") # Connections in a specific room
hub.rooms # List of all active room names
Server-Sent Events (SSE)
One-way server-push for event streaming.
Basic Usage
from neutron.realtime import sse_response
@app.get("/events")
async def events(request):
async def generate():
for i in range(100):
yield {"data": f"Event {i}", "event": "update", "id": str(i)}
await asyncio.sleep(1)
return sse_response(generate())
SSE Stream Class
For more control, use SSEStream directly:
from neutron.realtime import SSEStream, sse_response
stream = SSEStream()
# Push events programmatically
await stream.send(data="Hello", event="greeting", id="1")
await stream.send(data={"count": 42}, event="metric")
await stream.close()
Wire Format
Events are formatted as standard SSE:
event: update
id: 1
data: Event 1
event: metric
data: {"count": 42}
Multiline data is split across multiple data: lines.
Response Headers
SSE responses automatically set:
| Header | Value |
|--------|-------|
| Content-Type | text/event-stream |
| Cache-Control | no-cache |
| Connection | keep-alive |
Custom Headers
return sse_response(generate(), headers={"X-Stream-ID": "abc123"})
Nucleus Integration
Live Query Results via SSE
Stream database changes to clients using PubSub + SSE:
@app.get("/orders/live")
async def live_orders(request):
async def stream():
async for event in db.pubsub.listen("order_events"):
yield {"event": "order_update", "data": event}
return sse_response(stream())
Chat with WebSocket Hub
hub = WebSocketHub()
@app.websocket("/chat")
async def chat(websocket):
await hub.handle(websocket)
# Server-side message injection
@app.post("/chat/announce")
async def announce(request):
data = await request.json()
await hub.broadcast("chat:general", {
"type": "system",
"text": data["message"],
})
return {"status": "sent"}