Event Broadcasting (SSE/WebSocket)
10/30/24Less than 1 minute
Event Broadcasting (SSE/WebSocket)
Real-time push for scenarios like notifications, data updates, and monitoring dashboards.
Tech Comparison
| Feature | SSE | WebSocket |
|---|---|---|
| Direction | Server → Client only | Bidirectional |
| Protocol | HTTP (simple) | WS (upgrade) |
| Auto-reconnect | Built-in (EventSource) | Manual |
| Binary | Text only | Text + Binary |
SSE Implementation with Axum
use axum::response::sse::{Event, Sse};
use futures::stream::{Stream, StreamExt};
use std::convert::Infallible;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct SseHub {
tx: broadcast::Sender<String>,
}
impl SseHub {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel(1024);
Self { tx }
}
pub fn publish(&self, msg: impl Into<String>) {
let _ = self.tx.send(msg.into());
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
}
pub async fn sse_stream(hub: SseHub) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = hub.subscribe();
let stream = BroadcastStream::new(rx)
.filter_map(|res| async move { res.ok() })
.map(|msg| Ok(Event::default().event("broadcast").data(msg)));
Sse::new(stream)
}Cluster & Redis Pub/Sub Bridge
For multi-instance deployment, use Redis Pub/Sub to relay messages across instances.
WebSocket Example
Backend:
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
pub async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
while let Some(Ok(msg)) = socket.recv().await {
match msg {
Message::Text(text) => {
let _ = socket.send(Message::Text(format!("echo: {text}"))).await;
}
Message::Ping(p) => { let _ = socket.send(Message::Pong(p)).await; }
_ => {}
}
}
}Frontend:
const ws = new WebSocket("wss://example.com/ws");
ws.onmessage = (e) => console.log("got:", e.data);
ws.onopen = () => ws.send(JSON.stringify({ type: "ping" }));Heartbeat, Reconnect & Rate Limiting
- SSE has built-in reconnection via EventSource
- WebSocket: implement ping/pong and manual reconnection
Nginx/Reverse Proxy Suggestions
location /sse {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
chunked_transfer_encoding on;
}