kafka在一台机器上模拟集群,为什么关闭一个broker,整体集群就不能消费了,求大神解答
kafka在一台机器上模拟集群,为什么关闭一个broker,整体集群就不能消费了。消费就一直卡主等待的状况。

环境:
1.一台机器运行了一个zookeeper,启动文件是在kafka/bin/zookeeper-server-start.sh
2.分别把kafka文件件复制了四份,改变里面server.properties中borker.id分别为1,2,3,4
3.启动机器,正常可以消费topic是cxl的
4.关闭borker.id是1的kafka服务,就没有办法消费cxl
topic的信息
图片说明

现在就不太明白正常集群挂掉一个,整体集群应该是不受影响的,从topic信息上看我的分片也在borker 2上,而我关掉的是broker1应该没有什么影响,但是现在为什么不能消费了?

2个回答

虽然topic的副本数是2,但是 partition的数量为1,所以导致这个topic只会在一个broker(leader partition)中存储数据。
出现上诉情况的原因,可以是在leader partition中存储数据的时候,其他follower partition还未来得及从leader partition中拉取数据,
leader partition所在的broker就挂掉了。从而导致follower partition没有同步到所有数据,此时因为leader partition所在的broker挂掉了,
在ISR中的follower partition会被选举为leader partition,此时的leader partition的数据是缺失的。

你怎么能部署4个broker,kafka集群不是奇数的吗?

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Python+OpenCV计算机视觉

Python+OpenCV计算机视觉

kafka broker重启的副本问题

我的kafka共有4个节点, 每个topic都是3个副本,然后莫名挂了一个,然后我重启的这个broker,但是发现在重启之后 这个broker上的kafka log里面就更新了一个index文件 ,而且log文件大小为0,然后就没去追leader的数据了。 ![图片说明](https://img-ask.csdn.net/upload/201706/09/1496972859_99333.png) 8号重启的 然后这样后就再也没反应了 我想知道 重启broker后 他到底会不会自己去追上leader的数据 如果会的话 那我这个是什么回事

kafka集群内存使用不均问题

kafka集群三台服务器(内存都是33G),按照集群模式配置启动,有两台内存只用到3G,另一台却用到了32G,并没有查看到消息积压,这是啥因呢?

kafka集群是否启动成功?

部署完kafka集群后,启动时kafka前台启动和后台启动有什么不同?

kafka集群 报错 在线等

WARN [Controller-1-to-broker-2-send-thread], Controller 1's connection to broker Node(2, mine-28, 9092) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to Node(2, mine-28, 9092) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$1.apply(NetworkClientBlockingOps.scala:62) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$1.apply(NetworkClientBlockingOps.scala:58) at kafka.utils.NetworkClientBlockingOps$$anonfun$kafka$utils$NetworkClientBlockingOps$$pollUntil$extension$2.apply(NetworkClientBlockingOps.scala:106) at kafka.utils.NetworkClientBlockingOps$$anonfun$kafka$utils$NetworkClientBlockingOps$$pollUntil$extension$2.apply(NetworkClientBlockingOps.scala:105) at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.scala:139) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:105) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:58) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:225) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:172) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

kafka 集群高可用测试

先搭建了3个kafka集群 3台都启动 并且正常发送消息后 测试 某一台挂了情况 ``` #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 broker.id=0 #当前kafka对外提供服务的端口默认是9092 listeners=PLAINTEXT://192.168.1.252:9092 #这个是borker进行网络处理的线程数 num.network.threads=3 #这个是borker进行I/O处理的线程数 num.io.threads=8 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.send.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.receive.buffer.bytes=102400 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 socket.request.max.bytes=104857600 ############################# Log Basics ############################# #消息存放的目录,这个目录可以配置为","逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录, #如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 log.dirs=/usr/local/kafka_2.11-1.1.0/data/kafka/logs #默认的分区数,一个topic默认1个分区数 num.partitions=3 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. offsets.topic.replication.factor=1 # transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #默认消息的最大持久化时间,168小时,7天 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.segment.bytes=1073741824 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# #设置zookeeper的连接端口 zookeeper.connect=192.168.1.252:2181,192.168.1.253:2181,192.168.1.254:2181 # 超时时间 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 ``` 创建了 主题 ![图片说明](https://img-ask.csdn.net/upload/201805/15/1526363032_696232.png) 然后 发送消息 ![图片说明](https://img-ask.csdn.net/upload/201805/15/1526363075_21045.png) msg1 msg2 然后 关闭其中一台kafka ![图片说明](https://img-ask.csdn.net/upload/201805/15/1526363143_679462.png) 控制台打印 ``` 2018/05/15-13:48:03 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator- Marking the coordinator 192.168.1.254:9092 (id: 2147483645 rack: null) dead for group defaultGroup 2018/05/15-13:48:03 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator- Discovered coordinator 192.168.1.254:9092 (id: 2147483645 rack: null) for group defaultGroup. 2018/05/15-13:48:04 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator- Marking the coordinator 192.168.1.254:9092 (id: 2147483645 rack: null) dead for group defaultGroup ``` 然后继续发消息 msg3 控制台无响应 继续发送msg4 无响应 重启那台kafka 控制台打印 ``` "msg4" "msg3" 2018/05/15-13:52:11 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN org.apache.kafka.clients.consumer.internals.Fetcher- Received unknown topic or partition error in fetch for partition trading-1. The topic/partition may not exist or the user may not have Describe access to it 2018/05/15-13:52:11 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator- Marking the coordinator 192.168.1.254:9092 (id: 2147483645 rack: null) dead for group defaultGroup 2018/05/15-13:52:11 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator- Auto offset commit failed for group defaultGroup: Offset commit failed with a retriable exception. You should retry committing offsets. 2018/05/15-13:52:11 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator- Discovered coordinator 192.168.1.254:9092 (id: 2147483645 rack: null) for group defaultGroup. ``` kafka 不是主从复制么 不是谁挂了 用另一个么

kafka集群中的相关问题

请问一下,一个已经运行中且启用了SASL_PLAINTEXT的kafka集群,现在想在broker的jaas文件中增加user要怎么办?难道只能加了以后一个个重启broker吗?

kafka 集群映射端口到外网,外网无法生产消息

有3台kafka ip分别是内网ip 192.168.2.21 9092 192.168.2.22 9092 192.168.2.23 9092 另有一台同时接通了内网和互联网的机器 (内网ip 192.168.2.13,外网ip 172.50.63.1), 现在用这台机器做了端口映射,用的rinetd 0.0.0.0 9092 192.168.2.21 9092 0.0.0.0 9093 192.168.2.22 9092 0.0.0.0 9094 192.168.2.23 9092 然后在自己开发电脑上用java代码生产消息 topic=ffff ``` java props.put("bootstrap.servers", "172.50.63.1:9092"); ``` 会产生如下报错 ``` java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for ffff-0 at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at Producer.run(Producer.java:140) Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for ffff-0 ``` 这是我的kafka配置 ``` listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://192.168.2.13:9092 ``` 现在的疑问是 这个配置应该写什么?才能让外网能够正常生产 advertised.listeners应该写互联网ip 172.50.63.1 还是做映射的那台内网ip 192.168.2.13

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

Kafka 运行一段时间就停了

报错: ERROR Error while deleting segments for MenuChangedEvent.domain.FZ-water-0 in dir /tmp/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.NoSuchFileException: /tmp/kafka-logs/MenuChangedEvent.domain.FZ-water-0/00000000000000000000.log 。。。。 [2019-05-07 11:03:48,234] ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed (kafka.log.LogManager) 注意: /etc/cron.daily下没有temwatch 怎么解决?一段时间就shutdown,重启了就可以运行一段时间

求大神解答 kafka 的副本存放机制?

今天了解 kafka副本存放机制时去网上搜了一下,发现有两个版本的答案 : <br/> 第一个版本是: 随机存储在集群上(类似于hdfs存储机制)。 <br/> 第二个版本是: 第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的,其他分区的第一个副本放置位置相对于第0个分区依次往后移。 <br/> <br/> 例如:有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,第五个分区将会放在第三个 Broker 上。 <br/> <br/> 自己想了一下,感觉kafka的副本存放机制应该不会这么严格,因为如果按照第二个版本来说的话假如我有3个副本,第一个副本存储在第1台服务器,那么第2个副本和第3个副本依次存储在 第2台服务器和第3台服务器,那么就存在一个问题,如果第2台服务器没有存储空间了咋办?求解!

在新建和删除kafka的topic时出现问题

kafka本身存在一个12个分区的topic,我重新增加一个12个分区的topic后,在Partition Information中出现Partition 0的leader为-2,点击进入后显示 broker id -2 :Yikes! Broker not found -2 for cluster kafka_manager. 然而我的broker ID 并没有使-2的,并且在我删除该topic后(删除失败),partition information中每一个的leader都变成了负数(-1),报错跟上述相同。 求解为什么会出现这个问题,并且该如何彻底删除kafka topic残留的全部数据(kafka本身存在删除缺陷,如何较好的解决这个缺陷)。 菜鸟入坑,求大神帮忙解决啊!

kafka里面关于生产者SDK如何推送消息给kafka集群

kafka里面关于生产者SDK如何推送消息给kafka集群,求各位给解答一下,谢谢

kafka宕机面试问题求解?

kafka集群中有3台机器,如果包括副本在内的机器全挂了怎么办?

无法读取集群中kafka中的数据

使用flume将文件数据解析发送到kafka上,然后使用storm(storm运行自己写的java程序,程序中使用kafka 的consumer)读取kafka中的数据,使用zookeeper管理集群,有3个节点,报错如下: ![图片说明](https://img-ask.csdn.net/upload/201608/22/1471858935_68063.jpg) 从报错上看是其中的一个主机的kafka与zookeeper的通信有问题?不过这只是我的猜测,大牛们遇到过类似的问题吗?或者说 有什么解决问题的思路吗? 补充,就这一个主机有问题,却导致了storm无法正常运行,无法读取任何数据。

kafka 集成 kerberos ,启动kafka报错

kafka 使用kerberos协议的时候,启动kakfa的时候报zookeeper校验不通过。 错误信息如下:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098295_69981.png) kerberos的用户密钥:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098396_900993.png) kerberos的etc/krb5.conf配置信息:[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = EXAMPLE.COM default_tkt_enctypes = arcfour-hmac-md5 dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true [realms] EXAMPLE.COM = { kdc = 192.168.1.41 admin_server = 192.168.1.41 } [domain_realm] kafka = EXAMPLE.COM zookeeper = EXAMPLE.COM weiwei = EXAMPLE.COM 192.168.1.41 = EXAMPLE.COM 127.0.0.1 = EXAMPLE.COM kerberos 的var/kerberos/krb5kdc/kdc.conf的配置信息: [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] EXAMPLE.COM = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab max_renewable_life = 7d supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal } kafka的kafka_server_jaas.conf的配置信息: KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="kafka/weiwei@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper_jaas.conf的配置信息: Server{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper.properties的新增配置信息: authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 server.properties 新增的配置信息: advertised.host.name=192.168.1.41 advertised.listeners=SASL_PLAINTEXT://192.168.1.41:9092 listeners=SASL_PLAINTEXT://192.168.1.41:9092 #listeners=PLAINTEXT://127.0.0.1:9093 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka zookeeper-server-start.sh 新增的配置信息 export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/zookeeper_jaas.conf' kafka-server-start.sh 新增的配置信息: export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/kafka_server_jaas.conf' 配置信息基本是这样,快过年了,小弟在线求救,再预祝大侠们新年快乐。 ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ```

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

如何保护zookeeper中kafka的元数据节点

我现在分别有kafka和zookeeper的集群, zookeeper我单独使用的时候自己建立的节点可以设置权限,但是kafka在zookeeper中创建的元数据节点怎么办?这些节点都是anyone权限,肯定不可能手动去一个一个加,因为zookeeper子节点不继承父节点的权限,所以除非产生的每个节点都有权限否者没有意义。 我查资料按照步骤给zookeeper启用了jaas文件,然后zookeeper集群也可以正常启动与使用,但是kafka启动的时候报错: java.lang.SecurityException: zookeeper.set.acl is true, but the verification of the JAAS login file failed. 主要就是下面这句话,到底是什么情况下才会报这个错? zookeeper.set.acl is true, but the verification of the JAAS login file failed. 我现在zookeeper的jaas配置方式是: 1.给zoo.cfg文件配置了 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 2.zookeeper的jaas文件如下: Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpwd"; }; 这个文件conf文件夹中 3. 并且org.apache.kafka.common.security.plain.PlainLoginModule 需要用到的jar也都添加到了 classpath中, 4. zkEnv.sh 也设置了SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf" kafka的broker的jaas文件内容如下: KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpwd" user_admin="adminpwd"; }; 配置文件server.properties中设置了zookeeper.set.acl=true 请问我是不是哪里弄错了,为什么就是不行呢 zookeeper.set.acl 这个怎么用,根据我查资料理解,这样设置了以后kafka就可以创建的节点是带ACL的

连接到Kafka后Golang消费者延迟接收Kafka消息

<div class="post-text" itemprop="text"> <p><em>I'm new to Golang and Kafa so this might seem like a silly question.</em></p> <p>After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?</p> <p>It prints a message right before <code>consumer.Messages()</code> and print another message for each message received. The ~20 sec delay is between the first <code>fmt.Println</code> and second <code>fmt.Println</code>.</p> <pre><code>package main import ( "fmt" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) func main() { // Create the consumer and listen for new messages consumer := createConsumer() // Create a signal channel to know when we are done done := make(chan bool) // Start processing messages go func() { fmt.Println("Start consuming Kafka messages") for msg := range consumer.Messages() { s := string(msg.Value[:]) fmt.Println("Msg: ", s) } }() &lt;-done } func createConsumer() *cluster.Consumer { // Define our configuration to the cluster config := cluster.NewConfig() config.Consumer.Return.Errors = false config.Group.Return.Notifications = false config.Consumer.Offsets.Initial = sarama.OffsetOldest // Create the consumer brokers := []string{"127.0.0.1:9092"} topics := []string{"orders"} consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) if err != nil { log.Fatal("Unable to connect consumer to Kafka") } go handleErrors(consumer) go handleNotifications(consumer) return consumer } </code></pre> <p><strong>docker-compose.yml</strong></p> <pre><code>version: '2' services: zookeeper: image: "confluentinc/cp-zookeeper:5.0.1" hostname: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker-1: image: "confluentinc/cp-enterprise-kafka:5.0.1" hostname: broker-1 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_BROKER_RACK: rack-a KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092' KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: 'broker-1' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' KAFKA_CREATE_TOPICS: "orders:1:1" </code></pre> </div>

kafka的消息只能被consumer group中的一个消费者消费,那这个group的意义何在?

比如说,我现在有A,B,C三个机器,都作为console-consumer,若三个消费者放在一个群里,按我的理解,这个群里的每个人都应该接受到订阅的所有信息的啊,如果只有一个能消费,那何必要放在一个群里呢?

2019 Python开发者日-培训

2019 Python开发者日-培训

150讲轻松搞定Python网络爬虫

150讲轻松搞定Python网络爬虫

设计模式(JAVA语言实现)--20种设计模式附带源码

设计模式(JAVA语言实现)--20种设计模式附带源码

YOLOv3目标检测实战:训练自己的数据集

YOLOv3目标检测实战:训练自己的数据集

java后台+微信小程序 实现完整的点餐系统

java后台+微信小程序 实现完整的点餐系统

三个项目玩转深度学习(附1G源码)

三个项目玩转深度学习(附1G源码)

初级玩转Linux+Ubuntu(嵌入式开发基础课程)

初级玩转Linux+Ubuntu(嵌入式开发基础课程)

2019 AI开发者大会

2019 AI开发者大会

玩转Linux:常用命令实例指南

玩转Linux:常用命令实例指南

一学即懂的计算机视觉(第一季)

一学即懂的计算机视觉(第一季)

4小时玩转微信小程序——基础入门与微信支付实战

4小时玩转微信小程序——基础入门与微信支付实战

Git 实用技巧

Git 实用技巧

Python数据清洗实战入门

Python数据清洗实战入门

使用TensorFlow+keras快速构建图像分类模型

使用TensorFlow+keras快速构建图像分类模型

实用主义学Python(小白也容易上手的Python实用案例)

实用主义学Python(小白也容易上手的Python实用案例)

程序员的算法通关课:知己知彼(第一季)

程序员的算法通关课:知己知彼(第一季)

MySQL数据库从入门到实战应用

MySQL数据库从入门到实战应用

机器学习初学者必会的案例精讲

机器学习初学者必会的案例精讲

手把手实现Java图书管理系统(附源码)

手把手实现Java图书管理系统(附源码)

极简JAVA学习营第四期(报名以后加助教微信:eduxy-1)

极简JAVA学习营第四期(报名以后加助教微信:eduxy-1)

.net core快速开发框架

.net core快速开发框架

玩转Python-Python3基础入门

玩转Python-Python3基础入门

Python数据挖掘简易入门

Python数据挖掘简易入门

微信公众平台开发入门

微信公众平台开发入门

程序员的兼职技能课

程序员的兼职技能课

Windows版YOLOv4目标检测实战:训练自己的数据集

Windows版YOLOv4目标检测实战:训练自己的数据集

HoloLens2开发入门教程

HoloLens2开发入门教程

微信小程序开发实战

微信小程序开发实战

Java8零基础入门视频教程

Java8零基础入门视频教程

相关热词 c# cad插入影像 c#设计思想 c#正则表达式 转换 c#form复制 c#写web c# 柱形图 c# wcf 服务库 c#应用程序管理器 c#数组如何赋值给数组 c#序列化应用目的博客园
立即提问