事件广播(SSE/WebSocket)
大约 6 分钟
事件广播(SSE/WebSocket)
用于把后端业务事件实时推送到前端(仪表盘更新、通知气泡、任务进度、在线用户状态等)。本节给出:
- 通道模型:public/private/presence
- 两种推送协议:SSE 与 WebSocket(何时选用)
- 单机 Hub 与集群下的 Redis Pub/Sub 桥接
- 鉴权与订阅、心跳与断线重连、限速与安全
选型对比
- SSE(Server-Sent Events)
- 优点:浏览器原生 EventSource、单向推送、HTTP/2 友好、实现简单
- 局限:单向(无法客户端上行),某些代理/老浏览器兼容性
- WebSocket
- 优点:双向通信、低延迟、通道管理灵活
- 局限:实现复杂、反向代理需配置 Upgrade、心跳与连接管理成本更高
建议:默认用 SSE;需要客户端上行或复杂互动场景时再用 WebSocket。
基于 Axum 的 SSE 实现
下面是一个稳定的 SSE 端点示例,周期性推送系统信息。你的版本已经能正常工作,这里补充完整导入与说明。
示例(与现有实现一致,含 keep-alive 与节流):
use std::{convert::Infallible, time::Duration};
use axum::response::sse::{Event, Sse};
use tokio_stream::StreamExt; // 提供 throttle
use futures::stream; // repeat_with
// use chrono::Utc; // 如果要设置 id 可用时间戳
// 假设你已有该函数
fn get_oper_sys_info() -> serde_json::Value { /* ... */ }
pub async fn server_event() -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
let stream = stream::repeat_with(move || {
let r = get_oper_sys_info();
let data = serde_json::to_string(&r).unwrap_or_else(|_| "0".to_string());
Event::default()
.event("sysinfo") // 可选:命名事件类型
.data(data) // 事件内容
// .id(Utc::now().timestamp_millis().to_string()) // 可选:便于断线续传
})
.map(Ok)
.throttle(Duration::from_secs(2)); // 限速推送,降低压力
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15)) // 心跳间隔(可适度增大)
.text("keep-alive"),
)
}
路由注册:
fn sys_server_info() -> WebPath {
WebPath::new().route(
"/server_update",
WebPathType::Get,
Some("更新服务器信息"),
get(s_sys_server_info::server_event),
)
}
前端使用(原生 EventSource):
<script>
import { EventSourcePolyfill } from 'event-source-polyfill'
import { useUserStoreWithOut } from '@/store/modules/user'
import { type Ref, onDeactivated, onUnmounted, ref } from 'vue'
const PATH_URL = import.meta.env.VITE_API_BASE_PATH
export enum ServerInfoApi {
server_update = '/sys/serverinfo/server_update'
}
export const useSSE = (_url: string) => {
const data: Ref<string | null> = ref(null)
const userStore = useUserStoreWithOut()
const es = new EventSourcePolyfill(PATH_URL + _url, {
headers: {
'Content-Type': 'application/json',
['Authorization']: userStore.getTokenType + ' ' + userStore.getToken
}
})
es.onmessage = (e: MessageEvent) => {
data.value = e.data
}
onDeactivated(() => {
es.close()
})
onUnmounted(() => {
es.close()
})
return { data }
}
</script>
服务端广播设计
单机场景可用 tokio::sync::broadcast 或 watch 作为 Hub:
use axum::response::sse::{Event, Sse};
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use std::convert::Infallible;
#[derive(Clone)]
pub struct SseHub {
tx: broadcast::Sender<String>, // 统一发 JSON 字符串
}
impl SseHub {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel(1024);
Self { tx }
}
pub fn publish(&self, msg: impl Into<String>) {
let _ = self.tx.send(msg.into());
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
}
pub async fn sse_stream(hub: SseHub) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
let rx = hub.subscribe();
let stream = BroadcastStream::new(rx)
.filter_map(|res| async move { res.ok() })
.map(|msg| Ok(Event::default().event("broadcast").data(msg)));
Sse::new(stream)
}
- 业务侧通过 hub.publish(json) 广播消息;
- SSE 连接复用 hub.subscribe() 的 Receiver,实时推送给所有客户端。
集群与 Redis Pub/Sub 桥接
当你的服务有多实例时,需让不同实例之间共享事件。做法:
- 每个实例本地一个 Hub(broadcast),对外服务 SSE 连接;
- 同时订阅 Redis Pub/Sub(或使用 Redis Stream/Kafka 等);
- 任一实例 publish 的事件先写入 Redis,再由所有实例消费并转发到本地 Hub;
- 避免重复:可在消息中加实例 ID,消费时忽略自身重复消息。
伪代码:
// 推送
async fn publish_global(redis: &RedisClient, msg: &str) {
let _ = redis.publish("sse:topic", msg).await;
}
// 订阅转发
async fn relay_from_redis_to_local(redis: RedisClient, hub: SseHub) {
let mut sub = redis.subscribe("sse:topic").await.unwrap();
while let Some(msg) = sub.next_message().await {
hub.publish(msg.get_payload::<String>().unwrap());
}
}
通道模型(public/private/presence)
- public:任何已通过基础鉴权的用户可订阅;
- private:需要校验用户是否有权限订阅该通道(如用户 ID、角色、资源范围);
- presence:在 private 的基础上,维护在线成员列表(加入/离开事件),可实现在线状态、房间成员等。
参考实现要点:
- 通道 -> 本地 Hub Map:HashMap<ChannelName, Hub>
- JOIN 流程:校验 token → 验权 → 返回对应通道的 SSE 流
- presence:在 Hub 上维护成员表,订阅/断开时广播 presence:join/presence:leave 事件
鉴权与订阅
- 入口鉴权:Cookie/JWT/签名 Query 参数
- 订阅授权:校验用户是否可订阅此频道(private/presence 用)
- 有效期与续期:SSE 长连接可依赖 Cookie 会话;JWT 建议使用短期令牌 + 自动续期机制
- 数据脱敏:仅发送必要字段,避免隐私泄露
SSE 不支持自定义 Header,常见做法:
- 使用 Cookie(适于同域)
- 使用带签名的短期 token 作为 query 参数(注意防止泄露与复用)
- 或使用支持 Header 的 polyfill(如 event-source-polyfill)
心跳、重连与速率控制
- 心跳:SSE 的 keep_alive 会定期发送注释帧,保持连接活跃
- 重连:浏览器内置重连逻辑(默认 3 秒);可通过 retry 字段控制
- 限速:服务端对高频事件使用 throttle/debounce;必要时分桶采样
- 超时/限连:通过反向代理或服务端限制每 IP/用户的并发连接数
Nginx/反向代理建议
SSE 需要禁用缓冲并保持长连接:
location /sse/ {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Cache-Control "no-cache";
proxy_buffering off; # 关键:关闭缓冲
chunked_transfer_encoding on;
proxy_read_timeout 1h; # 放宽读超时
proxy_send_timeout 1h;
# 如有跨域,配合 CORS 头部
}
WebSocket 简述与示例(可选)
如需双向通信或客户端上行(例如聊天室、协同编辑),可使用 WebSocket。
后台(Axum):
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
};
pub async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
while let Some(Ok(msg)) = socket.recv().await {
match msg {
Message::Text(text) => {
// Echo 或路由到业务
let _ = socket.send(Message::Text(format!("echo: {text}"))).await;
}
Message::Ping(p) => { let _ = socket.send(Message::Pong(p)).await; }
_ => {}
}
}
}
前端:
const ws = new WebSocket("wss://example.com/ws");
ws.onmessage = (e) => console.log("got:", e.data);
ws.onopen = () => ws.send(JSON.stringify({ type: "ping" }));
安全与稳定性
- 验权与授权:所有私有/敏感通道需严格校验
- 限流与配额:按 IP/用户/通道限连与限频
- 防止广播风暴:聚合、采样、背压;对大消息做截断或分页
- 序列化安全:JSON 尽量固定 schema,避免未验证的自由文本
- 观测性:记录连接数、重连次数、推送时延、下游消费错误
调试与测试
- curl 观察 SSE
curl -N -H "Accept: text/event-stream" https://example.com/sse/sys
- 前端断线重连:临时断网/关闭代理,确认能自动恢复
- 压力测试:模拟 N 路连接 + M 条/秒事件,看 CPU/内存/GC/代理队列
小结
- 默认选 SSE:实现简单、浏览器友好、适合大多数“只下行”通知场景
- 需要上行或复杂交互再选 WebSocket
- 单机用本地 Hub(broadcast),集群用 Redis Pub/Sub 桥接
- 做好鉴权与限速、心跳与重连、反向代理配置,确保稳定可观测
如果你需要把现有系统指标(get_oper_sys_info)广播到“多租户/多通道”的结构,我可以基于你的 Cache/Job 系统顺手设计一个可水平扩展的 Hub 模块(带 Redis 桥接与 presence)。