问题遇到的现象和发生背景
打算是使用flink 消费数据,并将json 数组,通过streamload的方式,sink到doris的表中
问题相关代码,请勿粘贴截图
public HttpResult doPut(String url, String jsonParams, Map<String, String> heads) throws IOException {
RequestBody requestBody = FormBody.create(MediaType.parse("application/json; charset=utf-8")
, jsonParams);
// requestBody = gzip(requestBody);
Request.Builder builder = new Request.Builder();
builder.headers(this.getHeaders());
if (heads != null) {
for (String key : heads.keySet()) {
builder.addHeader(key, heads.get(key));
}
}
Request.Builder requestBuilder = builder.url(url);
Request postRequest = requestBuilder.put(requestBody).build();
return get(postRequest); // 这一步报错
}
这里数据库和url是对应的,没有错误
数据格式也没有问题
headers头文件也进行了设置,没有问题
运行结果及报错内容
java.net.ProtocolException: Unexpected status line: I
at okhttp3.internal.http.StatusLine.parse(StatusLine.java:69)
at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:55)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at com.che300.common.util.encrypt.RedirectInterceptor.intercept(RedirectInterceptor.java:15)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall.execute(RealCall.java:69)
at com.che300.common.util.encrypt.DorisOkHttpUtil.get(DorisOkHttpUtil.java:122)
at com.che300.common.util.encrypt.DorisOkHttpUtil.doPut(DorisOkHttpUtil.java:73)
at com.che300.common.util.encrypt.DorisApiUtil.httpInsert(DorisApiUtil.java:57)
at com.che300.common.util.encrypt.DorisApiUtil.streamLoad(DorisApiUtil.java:43)
at com.che300.common.util.encrypt.DorisApiUtil.streamLoad(DorisApiUtil.java:25)
at com.che300.syncconsumer.service.sink.DorisSinkTest.invoke(DorisSinkTest.java:53)
at com.che300.syncconsumer.service.sink.DorisSinkTest.invoke(DorisSinkTest.java:22)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
我的解答思路和尝试过的方法
尝试了好多方法,,不知道错误到底在哪儿
我想要达到的结果
想要成功将jsonArray,导入到doris的对应的表