却诚Salong 2022-06-11 10:41 采纳率: 83.3%
浏览 36
已结题

kafka可以生产无法消费的问题

问题遇到的现象和发生背景

我用java代码,已经实现了生产消息到kafka,用桌面工具查询是添加上的,kafka也在我本地创建的,但是消费时候poll的数据条数始终为0

问题相关代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author Salong
 * @date 2022/6/10 16:06
 * @Email:salong0503@aliyun.com
 */
public class KafkaComsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    //这个groupid是我在kafka的conf目录下面的consumer.properties使用的默认的groupid
    private static final String GROUPID = "test-consumer-group";

    public static void main(String[] args) {
        KafkaComsumer comsumer = new KafkaComsumer("test");
        Thread thread = new Thread(comsumer);
        thread.start();
    }

    public KafkaComsumer(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.8.13.159:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));

    }

    @Override
    public void run() {
        try {
            ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofSeconds(1));
            if (msgList != null && msgList.count() > 0) {
                for (ConsumerRecord<String, String> record : msgList) {
                    System.out.println("key:" + record.key() + ",value:" + record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}


运行结果及报错内容

```java
10:31:06.129 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 1000
    auto.offset.reset = earliest
    bootstrap.servers = [10.8.13.159:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-consumer-group-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test-consumer-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer


10:31:06.801 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=4): org.apache.kafka.common.requests.MetadataResponse@4abcc788
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 3 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914666885, latencyMs=83, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:06.910 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.910 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=6) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:06.912 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=6): org.apache.kafka.common.requests.MetadataResponse@42a1e8e3
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 4 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914666935, latencyMs=22, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.020 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.020 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=8) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.021 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=8): org.apache.kafka.common.requests.MetadataResponse@5a3067bb
10:31:07.021 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 5 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.075 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667075, latencyMs=53, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.131 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.132 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=10) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.136 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=10): org.apache.kafka.common.requests.MetadataResponse@2b4d1438
10:31:07.136 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 6 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667141, latencyMs=4, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.142 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.242 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.242 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=12) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=12): org.apache.kafka.common.requests.MetadataResponse@761165e
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 7 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667247, latencyMs=3, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.353 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.353 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=14) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.354 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=14): org.apache.kafka.common.requests.MetadataResponse@4aca10a1
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 8 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667358, latencyMs=3, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending synchronous auto-commit of offsets {}
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Executing onLeavePrepare with generation Generation{generationId=-1, memberId='', protocol='null'} and memberId 
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Resetting generation due to consumer pro-actively leaving the group
10:31:07.377 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Kafka consumer has been closed

Process finished with exit code 0


我的解答思路和尝试过的方法

我把ip乱改成一个不存在的地址,但是发现这个consumer对象依然可以创建,消费时候也不报错,但是去查询时候发现依然没有被消费,poll的时候依然poll的是0条数据,改server.properties也无效。

listeners=PLAINTEXT://10.8.13.159:9092
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://10.8.13.159:9092
我想要达到的结果

能够正常消费kafka数据

  • 写回答

1条回答 默认 最新

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 7月26日
  • 已采纳回答 7月26日
  • 修改了问题 6月11日
  • 修改了问题 6月11日
  • 展开全部

悬赏问题

  • ¥20 西门子S7-Graph,S7-300,梯形图
  • ¥50 用易语言http 访问不了网页
  • ¥50 safari浏览器fetch提交数据后数据丢失问题
  • ¥15 matlab不知道怎么改,求解答!!
  • ¥15 永磁直线电机的电流环pi调不出来
  • ¥15 用stata实现聚类的代码
  • ¥15 请问paddlehub能支持移动端开发吗?在Android studio上该如何部署?
  • ¥20 docker里部署springboot项目,访问不到扬声器
  • ¥15 netty整合springboot之后自动重连失效
  • ¥15 悬赏!微信开发者工具报错,求帮改