沈山南丶 2024-03-27 10:07 采纳率: 50%
浏览 22

WebFlux+SSE流式传输

服务端webflux传输SSE流式数据,客户端接收到数据格式不符合SSE协议规范,导致无法解析data和event,id等字段

这是我服务端用webflux每1s返回一个单词,返回类型是流式。使用ServerSentEvent对象符合sse协议规范字段,其中有data,id,event字段

/**
 * @Desc: 短暂的周期性事件流
 * @Author:zhh
 * @Date:2024/3/26 15:16
 */
@RestController
public class WebFluxController {
    private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");

    @GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @CrossOrigin
    public Flux<ServerSentEvent<String>> subscribe()  {
        return Flux.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .data(sequence.getT1())
                        .id(UUID.randomUUID().toString())
                        .event("MyCustomEvent")
                        .build());
    }
}

这是我客户端接收到sse流式数据,将字节转化成字符串,因为sse传输的是纯文本格式。代码如下

public Flux<String> initSse() {
        //使用webClient发送消息
        return this.webClient.get()
                //请求uri
                .uri(API_URI)
                //设置流式响应
                .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
                .acceptCharset(Charset.defaultCharset())
                .retrieve()
                .bodyToFlux(DataBuffer.class)
                .flatMap(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    return Mono.just(new String(bytes, StandardCharsets.UTF_8));
                }) 
                .doOnNext(str-> System.out.println(str));
                
    }

运行结果及详细报错内容
id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:
quick



id:84627318-bf50-491b-b5ce-34c180cc8e11
event:MyCustomEvent
data:
brown



id:90769bdd-798a-43af-b6e4-4f628f868e01
event:MyCustomEvent
data:
fox



id:892273ec-ba22-4015-be46-11902e77472b
event:MyCustomEvent
data:
jumps




我的解答思路和尝试过的方法

按照sse协议规范,正确输出应该是

id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:quick


id:84627318-bf50-491b-b5ce-34c180cc8e11
event:MyCustomEvent
data:brown


id:90769bdd-798a-43af-b6e4-4f628f868e01
event:MyCustomEvent
data:fox


id:892273ec-ba22-4015-be46-11902e77472b
event:MyCustomEvent
data:jumps

错误的输出导致无法有效解析每个事件的内容, 如果直接接收data数据可以

public Flux<Object> initSse2() {
        //使用webClient发送消息
        return this.webClient.get()
                //请求uri
                .uri(API_URI)
                //设置流式响应
                .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
                .acceptCharset(Charset.defaultCharset())
                .retrieve()
                //直接将sse事件协议格式下的data提取转换成string
                .bodyToFlux(String.class)
                .flatMap(string -> {
                    //如果是json字符串,需要处理json字符串转换成对象 ......
                    Object result = JSON.parseObject(string, Object.class);
                    return Flux.just(result);
                });
    }

问题就在于bodyToFlux只能提取data数据转成字符串,如果需要event,id等字段数据此时就不能生效了

此时想到bodyToFlux(ServerSentEvent.class)直接转成ServerSentEvent对象,结果报错了 根据报错日志,无法解析字段the

id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:
the

通过打印知道传过来的数据是上面这样的,显然json解析失败了

public Flux<String> initSse() {
        //使用webClient发送消息
        return this.webClient.get()
                //请求uri
                .uri(API_URI)
                //设置流式响应
                .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
                .acceptCharset(Charset.defaultCharset())
                .retrieve()
                .bodyToFlux(ServerSentEvent.class)
                .doOnNext(string -> System.out.println(string))
                .flatMap(this::parseSseEventData);
    }
    
 public Flux<String> parseSseEventData(ServerSentEvent serverSentEvent) {
        return Flux.just(serverSentEvent.data().toString());
 }
    
报错 
om.fasterxml.jackson.core.JsonParseException: Unrecognized token 'The': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (org.springframework.core.io.buffer.DefaultDataBuffer$DefaultDataBufferInputStream); line: 1, column: 5]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:356) ~[jackson-databind-2.13.5.jar:2.13.5]
    at com.fasterxml.jackson.databind.ObjectReader._bin


到底是那一步出现了问题,导致接收到的sse流数据不符合sse定义规范,是服务端传输问题?还是客户端dataBuffer解析问题?
  • 写回答

2条回答 默认 最新

  • 码农阿豪 Java领域优质创作者 2024-03-27 10:07
    关注
    让阿豪来帮你解答,本回答参考chatgpt3.5编写提供,如果还有疑问可以评论或留言
    问题描述:服务端使用WebFlux传输SSE流式数据,但客户端接收到数据格式不符合SSE协议规范,导致无法解析data和event,id等字段。 解决方法:根据SSE协议规范,每个事件都需要按照特定格式输出,包括id、event和data字段。因此,客户端需要按照协议规范解析接收到的数据。 以下是客户端接收到sse流式数据的代码: public Flux initSse() { return this.webClient.get() .uri(API_URI) .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(DataBuffer.class) .flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); return Mono.just(new String(bytes, StandardCharsets.UTF_8)); }) .doOnNext(str -> System.out.println(str)); } 在上面的代码中,客户端接收到的是DataBuffer类型的数据。可以通过将其转换成字节数组再转换成字符串的方式来处理。 但是,这种方法会将一个事件的多条信息输出在同一行,导致无法区分每个事件中的不同字段。因此,我们需要修改代码,按照SSE协议规范解析每个事件。 修改后的代码如下: public Flux> initSse() { return this.webClient.get() .uri(API_URI) .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(String.class) .map(text -> { String[] lines = text.split("\n"); String eventData = ""; String eventId = ""; String eventType = ""; for (String line : lines) { if (line.startsWith("data:")) { eventData = line.substring(5).trim(); } else if (line.startsWith("id:")) { eventId = line.substring(3).trim(); } else if (line.startsWith("event:")) { eventType = line.substring(6).trim(); } } return ServerSentEvent.builder(eventData) .id(eventId) .event(eventType) .build(); }); } 在上面的代码中,我们先将DataBuffer类型的数据转换成字符串,然后按照“\n”进行分割,逐个处理每个事件中的不同字段。最后将这些字段构造成一个ServerSentEvent对象,返回给客户端。 运行结果应该是: id:37505324-7732-40ed-b6c5-b8e30cabfa75 event:MyCustomEvent data:quick id:84627318-bf50-491b-b5ce-34c180cc8e11 event:MyCustomEvent data:brown id:90769bdd-798a-43af-b6e4-4f628f868e01 event:MyCustomEvent data:fox id:892273ec-ba22-4015-be46-11902e77472b event:MyCustomEvent data:jumps 这样就按照SSE协议规范解析了每个事件中的不同字段,方便客户端后续的处理。
    评论

报告相同问题?

问题事件

  • 创建了问题 3月27日

悬赏问题

  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行
  • ¥20 测距传感器数据手册i2c