Members only 2024-03-18 01:21 采纳率: 0%
浏览 37

Flink1.17的webUI显示kafkaSource的Records Sent会翻倍

!Flink1.17的webUI显示kafkaSource的Records Sent会翻倍

img


我确定我的Kafka里只有40条数据,但是webUi中显示Records Sent 80,求解为什么?
这是我flink消费kafka的代码

package com.xiaziyang.source;

import com.alibaba.fastjson.JSONObject;
import com.xiaziyang.deserializer.MyKafkaDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.lang.reflect.Type;
public class MyKafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("hadoop103", 8081,"C:\\JavaStudy\\flink-1.17\\target\\original-flink-1.17-1.0-SNAPSHOT.jar");
        KafkaSource<ConsumerRecord<String, String>> kafkaSource = KafkaSource.<ConsumerRecord<String, String>>builder().setBootstrapServers("hadoop102:9092")
                .setTopics("flink_1")
                .setGroupId("xiaziyang1")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
                .build();
        SingleOutputStreamOperator<ConsumerRecord<String, String>> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafkaSource")
                .name("kafkaSource").setParallelism(1);
        SingleOutputStreamOperator<String> map = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
            @Override
            public String map(ConsumerRecord<String, String> value) throws Exception {
                return value.value().toString();
            }
        }).name("value").setParallelism(2);
        DataStreamSink<String> sink = map.print().name("print").setParallelism(1);
        env.execute();
    }
}
  • 写回答

4条回答 默认 最新

  • 檀越@新空间 2024-03-18 01:21
    关注

    Members only 上午好☀️☀️☀️️
    本答案参考ChatGPT-3.5

    根据你提供的代码,我发现你的问题可能出现在以下几个方面:

    1. Kafka中的数据重复发送:你提到只有40条数据,但是Web UI显示的Records Sent为80条。这可能是因为数据在Kafka中被重复发送了。你可以通过查看Kafka的消息偏移量来确定是否有重复的消息被发送。如果有重复数据,你可以考虑针对消费者的消费逻辑进行优化,例如使用幂等消费或者记录已消费的消息的偏移量等方式来避免重复消费。

    2. 任务并行度设置不合理:在代码中,你设置了两个算子的并行度分别为1和2。这可能导致数据在流水线中的处理过程中出现数据倾斜的情况,从而导致一部分数据被处理得更快,另一部分数据被处理得更慢。你可以尝试调整算子的并行度以达到更均衡的数据处理效果。

    3. Watermark的设置:你使用了WatermarkStrategy.noWatermarks()来设置没有水印策略。这可能导致Flink无法及时推进事件时间,并在数据处理中引入一定的延迟。你可以根据实际情况选择合适的水印生成策略,以确保事件时间的推进和数据的准确处理。

    解决方案:

    1. 在消费Kafka数据时,可以通过记录已消费的消息的偏移量来避免重复消费。
    2. 调整算子的并行度,使数据得到更均衡的处理。
    3. 选择合适的水印生成策略,以确保事件时间的推进和数据的准确处理。

    修改后的代码示例:

    public class MyKafkaSource {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("hadoop103", 8081,"C:\\JavaStudy\\flink-1.17\\target\\original-flink-1.17-1.0-SNAPSHOT.jar");
            env.setParallelism(2); // 设置全局并行度为2
    
            KafkaSource<ConsumerRecord<String, String>> kafkaSource = KafkaSource.<ConsumerRecord<String, String>>builder().setBootstrapServers("hadoop102:9092")
                    .setTopics("flink_1")
                    .setGroupId("xiaziyang1")
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
                    .build();
    
            SingleOutputStreamOperator<ConsumerRecord<String, String>> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafkaSource")
                    .name("kafkaSource");
    
            SingleOutputStreamOperator<String> map = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
                @Override
                public String map(ConsumerRecord<String, String> value) throws Exception {
                    return value.value().toString();
                }
            }).name("value");
    
            DataStreamSink<String> sink = map.print().name("print");
    
            env.execute();
        }
    }
    

    在这个修改后的代码中,我将算子的并行度设置为2,并移除了对算子并行度的手动设置。另外,我也移除了setParallelism的设置,因为并行度已经在上面进行了全局设置。

    评论

报告相同问题?

问题事件

  • 创建了问题 3月18日