!Flink1.17的webUI显示kafkaSource的Records Sent会翻倍

我确定我的Kafka里只有40条数据,但是webUi中显示Records Sent 80,求解为什么?
这是我flink消费kafka的代码
package com.xiaziyang.source;
import com.alibaba.fastjson.JSONObject;
import com.xiaziyang.deserializer.MyKafkaDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.lang.reflect.Type;
public class MyKafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("hadoop103", 8081,"C:\\JavaStudy\\flink-1.17\\target\\original-flink-1.17-1.0-SNAPSHOT.jar");
KafkaSource<ConsumerRecord<String, String>> kafkaSource = KafkaSource.<ConsumerRecord<String, String>>builder().setBootstrapServers("hadoop102:9092")
.setTopics("flink_1")
.setGroupId("xiaziyang1")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
.build();
SingleOutputStreamOperator<ConsumerRecord<String, String>> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafkaSource")
.name("kafkaSource").setParallelism(1);
SingleOutputStreamOperator<String> map = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
@Override
public String map(ConsumerRecord<String, String> value) throws Exception {
return value.value().toString();
}
}).name("value").setParallelism(2);
DataStreamSink<String> sink = map.print().name("print").setParallelism(1);
env.execute();
}
}