在使用 WebFlux 的 `Flux>` 实现服务端事件推送时,客户端网络波动或主动断连会导致连接异常关闭,但默认情况下 Flux 并不会自动重试或恢复事件流——`onErrorResume` 或 `retry()` 无法直接生效,因为 `ServerSentEvent` 流由 `ResponseBodyEmitter`(底层为 `ChannelSendOperator`)驱动,其错误传播机制与普通 Flux 不同;同时,浏览器对 SSE 连接中断后虽会自动重连(遵循 `retry:` 字段),但服务端若未维护连接上下文或未正确响应重连请求(如忽略 `Last-Event-ID` 头、未做断点续推),将导致消息丢失或重复;此外,`Flux.generate()` 或 `Flux.interval()` 等热源若未结合 `Context` 或外部状态管理,在连接重建时难以保证事件连续性与幂等性。如何在不引入 WebSocket 的前提下,实现带状态感知、ID 有序、断线续传且资源可控的 SSE 长连接?
1条回答 默认 最新
kylin小鸡内裤 2026-04-09 09:20关注```html一、基础认知:SSE 协议本质与 WebFlux 中的“伪 Flux”陷阱
Server-Sent Events(SSE)是基于 HTTP 的单向流协议,客户端通过
EventSource发起长连接,服务端以text/event-stream响应持续写入data:、id:、event:和retry:字段。但关键在于:WebFlux 的@ResponseBodyEmitter并非真正意义上的 Reactor 流管道——其底层由ChannelSendOperator封装,错误发生时直接关闭响应通道,onErrorResume或retry()无法捕获或介入,因为事件已脱离 Reactor 生命周期。二、核心矛盾拆解:三大断连失序根源
- 状态断裂:每次重连触发新 Controller 方法调用,
Flux.interval()等热源无上下文绑定,计数器/游标重置; - ID 失控:未解析
Last-Event-ID请求头,服务端盲目从头推送,导致重复或跳号; - 资源失控:无连接生命周期管理,长连接堆积引发内存泄漏与线程耗尽(尤其在
Flux.generate()中持有外部引用)。
三、架构设计:分层状态感知 SSE 模型
采用「连接注册中心 + 事件序列化存储 + 上下文驱动生成器」三层模型:
层级 职责 关键技术点 Connection Registry 维护活跃连接 ID、最后发送 ID、创建时间、心跳状态 ConcurrentHashMap<String, ConnectionState>+ 定期清理任务Event Journal 持久化事件序列(支持断点续传),按业务维度分片 JDBC(带 id,timestamp,payload,version)、Redis Streams 或轻量级 WALContext-Aware Generator 根据请求头还原会话状态,驱动 Flux.generate()从指定 ID 续推结合 ContextView注入Last-Event-ID与连接元数据四、关键代码实现:可续传、幂等、可控的 Flux SSE 推送
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> streamEvents( @RequestHeader(value = "Last-Event-ID", required = false) String lastId, @RequestHeader(value = "X-Client-ID", required = true) String clientId) { // 1. 注册连接并获取续传起点 long startId = connectionRegistry.register(clientId, lastId); // 2. 构建带状态的 Flux:从 journal 查询后续事件,自动分页+背压适配 return eventJournal.fetchFrom(startId, clientId) .concatMap(event -> Mono.just( ServerSentEvent.<String>builder() .id(String.valueOf(event.id())) .event("message") .data(event.payload()) .build() )) .doOnNext(e -> connectionRegistry.updateLastSent(clientId, Long.parseLong(e.id()))) .doOnComplete(() -> connectionRegistry.deregister(clientId)) .doOnError(err -> connectionRegistry.deregister(clientId)) .timeout(Duration.ofMinutes(30), Flux.just(ServerSentEvent.<String>builder().event("heartbeat").build())) .onErrorResume(err -> Flux.empty()); // 仅终止当前流,不阻塞 registry }五、健壮性增强:浏览器重连协同机制
服务端需严格遵循 SSE 规范以配合浏览器自动重试行为:
- 响应头必须包含
Cache-Control: no-cache与Connection: keep-alive; - 每个事件块显式设置
id:(整数递增或 UUID),禁止空 id; - 首次响应写入
retry: 3000(毫秒),控制重连间隔; - 当检测到
Last-Event-ID存在且有效时,必须跳过已发送事件,否则违反幂等性。
六、资源治理:连接生命周期与背压控制
引入连接熔断与优雅降级策略:
graph LR A[HTTP Request] --> B{连接数 < 5000?} B -->|Yes| C[注册连接 + 启动 Flux] B -->|No| D[返回 429 Too Many Connections] C --> E[每 30s 心跳事件] E --> F{客户端 ACK?} F -->|Yes| E F -->|No| G[主动 close + 清理 registry]七、生产就绪检查清单
- ✅ 所有事件 ID 全局唯一且单调递增(推荐 Snowflake 或数据库自增);
- ✅
Last-Event-ID解析逻辑支持空值、非法格式、越界场景; - ✅ 连接注册表启用弱引用缓存 + LRU 驱逐策略,防 OOM;
- ✅ 日志埋点覆盖连接建立/中断/续传/超时,关联
X-Request-ID; - ✅ 压测验证 10k 并发连接下 CPU < 70%,GC 次数稳定;
- ✅ 提供 /actuator/sse-status 端点实时查看连接数、平均延迟、积压事件数。
八、进阶演进:从 SSE 到混合推送网关
在超大规模场景中,可将本方案抽象为
```SsePublisher接口,与 Kafka、RabbitMQ 订阅集成,实现「事件源 → 持久化日志 → 多终端分发」统一链路。此时Flux不再直接生成事件,而是消费 MQ 分区,天然具备水平扩展与故障隔离能力,同时保留 SSE 的低客户端侵入性优势。本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报- 状态断裂:每次重连触发新 Controller 方法调用,