问题遇到的现象和发生背景
我用java代码,已经实现了生产消息到kafka,用桌面工具查询是添加上的,kafka也在我本地创建的,但是消费时候poll的数据条数始终为0
问题相关代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @author Salong
* @date 2022/6/10 16:06
* @Email:salong0503@aliyun.com
*/
public class KafkaComsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final String topic;
//这个groupid是我在kafka的conf目录下面的consumer.properties使用的默认的groupid
private static final String GROUPID = "test-consumer-group";
public static void main(String[] args) {
KafkaComsumer comsumer = new KafkaComsumer("test");
Thread thread = new Thread(comsumer);
thread.start();
}
public KafkaComsumer(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.8.13.159:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofSeconds(1));
if (msgList != null && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
System.out.println("key:" + record.key() + ",value:" + record.value());
}
}
} finally {
consumer.close();
}
}
}
运行结果及报错内容
```java
10:31:06.129 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [10.8.13.159:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-consumer-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
10:31:06.801 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=4): org.apache.kafka.common.requests.MetadataResponse@4abcc788
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 3 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.802 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914666885, latencyMs=83, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=5), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:06.885 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:06.910 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.910 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=6) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:06.912 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=6): org.apache.kafka.common.requests.MetadataResponse@42a1e8e3
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 4 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:06.913 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914666935, latencyMs=22, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=7), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:06.935 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.020 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.020 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=8) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.021 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=8): org.apache.kafka.common.requests.MetadataResponse@5a3067bb
10:31:07.021 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 5 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.022 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.075 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667075, latencyMs=53, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=9), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.076 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.131 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.132 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=10) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.136 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=10): org.apache.kafka.common.requests.MetadataResponse@2b4d1438
10:31:07.136 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 6 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.137 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667141, latencyMs=4, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=11), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.141 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.142 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.242 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.242 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=12) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=12): org.apache.kafka.common.requests.MetadataResponse@761165e
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 7 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.244 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667247, latencyMs=3, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=13), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.247 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.353 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.353 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=14) and timeout 30000 to node 0: {topics=[{name=test,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
10:31:07.354 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received METADATA response from node 0 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-test-consumer-group-1, correlationId=14): org.apache.kafka.common.requests.MetadataResponse@4aca10a1
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updating last seen epoch for partition test-0 from 0 to epoch 0 from new metadata
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Updated cluster metadata updateVersion 8 to MetadataCache{clusterId='TdpmzxQIQMqGNthcf1RFFw', nodes={0=10.8.13.159:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=test-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=10.8.13.159:9092 (id: 0 rack: null)}
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FindCoordinator request to broker 10.8.13.159:9092 (id: 0 rack: null)
10:31:07.355 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15) and timeout 30000 to node 0: {key=test-consumer-group,key_type=0,_tagged_fields={}}
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FIND_COORDINATOR response from node 0 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1)
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1654914667358, latencyMs=3, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1, correlationId=15), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=15, errorMessage='The coordinator is not available.', nodeId=-1, host='', port=-1))
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Group coordinator lookup failed: The coordinator is not available.
10:31:07.358 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Coordinator discovery failed, refreshing metadata
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Sending synchronous auto-commit of offsets {}
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Executing onLeavePrepare with generation Generation{generationId=-1, memberId='', protocol='null'} and memberId
10:31:07.373 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Resetting generation due to consumer pro-actively leaving the group
10:31:07.377 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Kafka consumer has been closed
Process finished with exit code 0
我的解答思路和尝试过的方法
我把ip乱改成一个不存在的地址,但是发现这个consumer对象依然可以创建,消费时候也不报错,但是去查询时候发现依然没有被消费,poll的时候依然poll的是0条数据,改server.properties也无效。
listeners=PLAINTEXT://10.8.13.159:9092
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://10.8.13.159:9092
我想要达到的结果
能够正常消费kafka数据