服务端webflux传输SSE流式数据,客户端接收到数据格式不符合SSE协议规范,导致无法解析data和event,id等字段
这是我服务端用webflux每1s返回一个单词,返回类型是流式。使用ServerSentEvent对象符合sse协议规范字段,其中有data,id,event字段
/**
* @Desc: 短暂的周期性事件流
* @Author:zhh
* @Date:2024/3/26 15:16
*/
@RestController
public class WebFluxController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@CrossOrigin
public Flux<ServerSentEvent<String>> subscribe() {
return Flux.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(sequence -> ServerSentEvent.<String>builder()
.data(sequence.getT1())
.id(UUID.randomUUID().toString())
.event("MyCustomEvent")
.build());
}
}
这是我客户端接收到sse流式数据,将字节转化成字符串,因为sse传输的是纯文本格式。代码如下
public Flux<String> initSse() {
//使用webClient发送消息
return this.webClient.get()
//请求uri
.uri(API_URI)
//设置流式响应
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
.acceptCharset(Charset.defaultCharset())
.retrieve()
.bodyToFlux(DataBuffer.class)
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return Mono.just(new String(bytes, StandardCharsets.UTF_8));
})
.doOnNext(str-> System.out.println(str));
}
运行结果及详细报错内容
id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:
quick
id:84627318-bf50-491b-b5ce-34c180cc8e11
event:MyCustomEvent
data:
brown
id:90769bdd-798a-43af-b6e4-4f628f868e01
event:MyCustomEvent
data:
fox
id:892273ec-ba22-4015-be46-11902e77472b
event:MyCustomEvent
data:
jumps
我的解答思路和尝试过的方法
按照sse协议规范,正确输出应该是
id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:quick
id:84627318-bf50-491b-b5ce-34c180cc8e11
event:MyCustomEvent
data:brown
id:90769bdd-798a-43af-b6e4-4f628f868e01
event:MyCustomEvent
data:fox
id:892273ec-ba22-4015-be46-11902e77472b
event:MyCustomEvent
data:jumps
错误的输出导致无法有效解析每个事件的内容, 如果直接接收data数据可以
public Flux<Object> initSse2() {
//使用webClient发送消息
return this.webClient.get()
//请求uri
.uri(API_URI)
//设置流式响应
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
.acceptCharset(Charset.defaultCharset())
.retrieve()
//直接将sse事件协议格式下的data提取转换成string
.bodyToFlux(String.class)
.flatMap(string -> {
//如果是json字符串,需要处理json字符串转换成对象 ......
Object result = JSON.parseObject(string, Object.class);
return Flux.just(result);
});
}
问题就在于bodyToFlux只能提取data数据转成字符串,如果需要event,id等字段数据此时就不能生效了
此时想到bodyToFlux(ServerSentEvent.class)直接转成ServerSentEvent对象,结果报错了 根据报错日志,无法解析字段the
id:37505324-7732-40ed-b6c5-b8e30cabfa75
event:MyCustomEvent
data:
the
通过打印知道传过来的数据是上面这样的,显然json解析失败了
public Flux<String> initSse() {
//使用webClient发送消息
return this.webClient.get()
//请求uri
.uri(API_URI)
//设置流式响应
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
.acceptCharset(Charset.defaultCharset())
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.doOnNext(string -> System.out.println(string))
.flatMap(this::parseSseEventData);
}
public Flux<String> parseSseEventData(ServerSentEvent serverSentEvent) {
return Flux.just(serverSentEvent.data().toString());
}
报错
om.fasterxml.jackson.core.JsonParseException: Unrecognized token 'The': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (org.springframework.core.io.buffer.DefaultDataBuffer$DefaultDataBufferInputStream); line: 1, column: 5]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:356) ~[jackson-databind-2.13.5.jar:2.13.5]
at com.fasterxml.jackson.databind.ObjectReader._bin