Real-time

Neutron Python provides two real-time transports: WebSocket hub for bidirectional messaging and SSE for server-push streaming.

WebSocket Hub

Room-based broadcasting system for real-time applications.

Setup

from neutron.realtime import WebSocketHub

hub = WebSocketHub()

@app.websocket("/ws")
async def websocket_endpoint(websocket):
    await hub.handle(websocket)

Client Protocol

Clients send JSON messages to join rooms, leave rooms, and broadcast:

// Join a room
{"action": "join", "room": "chat:general"}

// Leave a room
{"action": "leave", "room": "chat:general"}

// Send a message to a room
{"action": "message", "room": "chat:general", "data": {"text": "Hello!"}}

Server Responses

// Confirmation
{"action": "joined", "room": "chat:general"}
{"action": "left", "room": "chat:general"}

Server-Side Broadcasting

Push messages from any handler:

@app.post("/notifications/send")
async def send_notification(request):
    data = await request.json()

    # Broadcast to all connections in a room
    count = await hub.broadcast(
        room="notifications",
        data={"type": "alert", "message": data["message"]},
        exclude=None,  # Optional: exclude a specific connection
    )

    return {"sent_to": count}

Hub Stats

hub.connection_count   # Total active WebSocket connections
hub.room_count("chat:general")  # Connections in a specific room
hub.rooms              # List of all active room names

Server-Sent Events (SSE)

One-way server-push for event streaming.

Basic Usage

from neutron.realtime import sse_response

@app.get("/events")
async def events(request):
    async def generate():
        for i in range(100):
            yield {"data": f"Event {i}", "event": "update", "id": str(i)}
            await asyncio.sleep(1)

    return sse_response(generate())

SSE Stream Class

For more control, use SSEStream directly:

from neutron.realtime import SSEStream, sse_response

stream = SSEStream()

# Push events programmatically
await stream.send(data="Hello", event="greeting", id="1")
await stream.send(data={"count": 42}, event="metric")
await stream.close()

Wire Format

Events are formatted as standard SSE:

event: update
id: 1
data: Event 1

event: metric
data: {"count": 42}

Multiline data is split across multiple data: lines.

Response Headers

SSE responses automatically set:

| Header | Value | |--------|-------| | Content-Type | text/event-stream | | Cache-Control | no-cache | | Connection | keep-alive |

Custom Headers

return sse_response(generate(), headers={"X-Stream-ID": "abc123"})

Nucleus Integration

Live Query Results via SSE

Stream database changes to clients using PubSub + SSE:

@app.get("/orders/live")
async def live_orders(request):
    async def stream():
        async for event in db.pubsub.listen("order_events"):
            yield {"event": "order_update", "data": event}

    return sse_response(stream())

Chat with WebSocket Hub

hub = WebSocketHub()

@app.websocket("/chat")
async def chat(websocket):
    await hub.handle(websocket)

# Server-side message injection
@app.post("/chat/announce")
async def announce(request):
    data = await request.json()
    await hub.broadcast("chat:general", {
        "type": "system",
        "text": data["message"],
    })
    return {"status": "sent"}