kafka同一消费者组内的不同消费者可以订阅不同主题吗

假如有一个消费者组group,消费者组内有两个消费者c1、c2,
c1订阅topic1,c2订阅topic2,那么结果是怎样的?
1、是两个消费者分别消费自己的主题;
2、还是组内这两个消费者都订阅了topic1和topic2;
3、还是报错?

3个回答

2.组内的两个消费者都订阅topic1和topic2。但是只有订阅了某个 topic 的 consumer 才会消费对应的 message

kafka 不同 topic 的 consumer 如果用的 group id 名字一样的情况下,其中任意一个 topic 的 consumer 重新上下线都会造成剩余所有的 consumer 产生 reblance 行为。

https://www.cnblogs.com/jingmingjiayuan/p/11401985.html

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
kafka多个消费者消费同一数据

kafka不同groupid下的消费者,消费同一topic下的某一条数据,为什么offset值不变? 只被消费了一次吗?

springboot kafka同一个消费组内配置多个消费者,监听多个topic?

我使用springboot kafka 使用同一个消费组内配置多个消费者(因此会有多个@KafkaListener监听器),监听多个topic下的指定分区,如图所示,是这样配置吧?但使用@TopicPartition 时报错!提示:TopicPartition cannot be resolved to a type。网上别人都是这么使用的,各种查都找不到有人出现过这种错误!求指导? ![图片说明](https://img-ask.csdn.net/upload/201911/28/1574912436_908691.png)

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

使用go获取kafka中所有组的所有主题的消费者组偏移量

<div class="post-text" itemprop="text"> <p>NOTE: <strong>NOT A DUPLICATE OF</strong> <a href="https://stackoverflow.com/questions/40642689/how-to-get-consumer-group-offsets-for-partition-in-golang-kafka-10">How to get consumer group offsets for partition in Golang Kafka 10</a> does not answer my question, it's not even a working solution</p> <p>I'm trying to write a function in go that queries kafka for all consumer group offsets for all topics.</p> <p>To do that, I was hoping to read all the messages in <code>__consumer_offsets</code> topic and parse them.</p> <p>However, in all the kakfa go libraries I looked through, I could not find a way to just <em>read</em> all the messages from <code>__consumer_offsets</code> without consuming them.</p> <p>(<code>kafka-go</code> either gives me a way to read from a single partition, or <em>consume</em> messages from the entire topic)</p> <p>So my question is, simply put: Is there a way, using any kafka library out there, to get consumer group offsets for all the groups for all the topics?</p> <p>If not, is there a way to get the offset for a given topic and group id?</p> </div>

springboot kafka 多个接收系统订阅多个发送系统,多个消费组多个消费者该如何写代码?

1、kafka结合springboot开发。 2、生产消息的系统有多个,可能会动态的改变消费系统的个数。举例:A、B、C这3个系统生产消息后分别想让不同的系统使用广播模式消费消息。A想要系统1、2、3消费同一个消息,B想要系统2、3、4、5消费同一个消息,C想要系统1、2、5、6消费同一个消息。这样消费组的个数相当于在动态改变着,可能后面新增或减少,实际环境中生产消息的系统有几十个,消费消息的系统也有几十个,这种网上完全找不到代码案例,都是简单的案例或写死的。求指导代码该怎么写!太复杂了完全没思路!

kafka消费者速度与什么有关

@KafkaListener(topics = {"CRBKC0002.000"}) public void sendSmsInfoByBizType(String record) { } 假设单机版的kafka,就一个节点。 1、 @KafkaListener注解接受消费者,是不是等这个方法执行完。 这个消费者进程才算消费结束。是不是一个镜像这个方法同时只能执行一次?就是不能连续起多个线程执行这个方法。 2、如果接受到参数就算消费这进程结束,也就是获取这个record消费者进程就结束了,那假设生产者一秒生产100w数据进入kafka。那这边获取参数就算消费者进程消费结束,那是不是相当于瞬间连续起100w这个方法线程执行。可是tomcat就200线程。

如何在Golang中创建kafka消费者组?

<div class="post-text" itemprop="text"> <p>An available library is <a href="https://github.com/Shopify/sarama/" rel="nofollow noreferrer">sarama</a> (or its expansion <a href="https://github.com/bsm/sarama-cluster" rel="nofollow noreferrer">sarama-cluster</a>) however no consumer group example are provided, not in <a href="https://godoc.org/gopkg.in/Shopify/sarama.v1#example-Consumer" rel="nofollow noreferrer">sarama</a> nor in <a href="https://github.com/bsm/sarama-cluster/issues/105" rel="nofollow noreferrer">sarama-cluster</a>.</p> <p>I do not understand the API. May I have an example of creating a consumer group for a topic?</p> </div>

kafka 消费者消费不到数据

[root@hzctc-kafka-5d61 ~]# kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group sbs-haodian-message1 --topic Message --zookeeper 10.1.5.61:2181 [2018-04-18 16:43:43,467] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/sbs-haodian-message1/offsets/Message/8. 用kafka的时候 用命令查看消费组消费情况 报这个错误 其他的消费组是正常的 哪位大神知道这是什么原因导致的 我在消费操作的时候加了缓存锁 每次poll操作之后的间隔时间不确定 可能是10S或者20S或者30S 不过我的sessiontimeiut设置了90s。这个会有什么影响吗

kafka 消费者 获取消息

activeqmq 都是broker push 到 消费者,消费者 建立 messageListener 监听器 就可以 获取消息,但kafka 是 需要去broker pull消息, 怎么才能知道 broker中 已经 有了对应 topic 呢 ?定时 获取?

Kafka消费者组丢失未提交的消息

<div class="post-text" itemprop="text"> <p>I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited. </p> <pre><code>kafka-console-producer --broker-list localhost:9092 --topic test &gt;this will be commited </code></pre> <p>Now running </p> <pre><code>kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe </code></pre> <p>returns</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 13 0 </code></pre> <p>so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 14 1 </code></pre> <p>Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:</p> <p><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15</code> </p> <p>and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.</p> <p>fetch/commit code:</p> <pre><code>go func() { for { ctx := context.Background() m, err := mr.brokerReader.FetchMessage(ctx) if err != nil { break } if err := msgFunc(m); err != nil { log.Errorf("# messaging # cannot commit a message: %v", err) continue } // commit message if no error if err := mr.brokerReader.CommitMessages(ctx, m); err != nil { // should we do something else to just logging not committed message? log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err) } } }() </code></pre> <p>reader configuration:</p> <pre><code>kafkaReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, CommitInterval: 0, MinBytes: 10e3, MaxBytes: 10e6, }) </code></pre> <p>library used: <a href="https://github.com/segmentio/kafka-go" rel="nofollow noreferrer">https://github.com/segmentio/kafka-go</a></p> </div>

spring boot 1.5集成 kafka 消费者怎么自己确认消费

spring boot 1.5集成 kafka 消费者怎么自己确认消费 怎么使用@KafkaListener注解实现Acknowledgment,即消费者怎么自己提交游标

kafka2.1.0怎么拿到消费者群组的信息。

卡夫卡旧版把消费者情况放在zookeeper的ZkClient里面;新版我在网上查是放在adminclient里面,但是卡夫卡adminclient没有listAllConsumerGroupsf方法。 求问zenm拿到卡夫卡消费者群组信息对其进行监控

kafka 虚拟机内 shell 可以正常生产消费 远程连接创建消费者消费就没反应

有遇到过同样问题的吗?网上找了半天也没找到解决办法 能 ping 通 且防火墙都已关闭 代码如下 ``` package com.yn.kafkatoredis.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class MyKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.100.100:9092"); props.put("group.id", "wtf");//消费者的组 id props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //订阅主题列表 topic consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n"); } } } ```

kafka消费者处理慢的情况下如何提高消息处理速度?不允许增加分区

如题.最近的一个面试题,说是考虑kafka理论特性.具体要求我可能有理解错误.如果各位有研究一眼看出是什么问题,谢谢给个提示. 我搜索了下,提高消费性能的有: 增加分区个数(增加消费者并行数)[不允许]; 消费者使用多线程;如果消息处理是CPU密集的加多线程也没用啊; 或许我理解有问题? 换个问题? 生产者1秒生成1W消息.然而此时全部消费者1s只能消费5000,消息处理是纯CPU计算,问:在不添加分区的情况下如何消息处理速度?

有关kafka消费者的问题,描述如下

现在有个问题请教下:有谁了解kafka,现在的问题就是我要读某个group中的固定个数分区的东西,然后consume这些数据的时候我要启用多线程读取,怎么保证我的数据不会重复(个人认为是不是偏移量就可以看出是否重复),每个线程读到一堆数据后,然后解析成单个对象,再启动不同的线程入库,这种话多线程套多线程效率是不是会很低,多节点部署这个应用会平分这个执行效率,请问怎么解决这类问题

如何确保我的消费者仅按顺序处理kafka主题中的消息?

<div class="post-text" itemprop="text"> <p>I've never used kafka before. I have two test Go programs accessing a local kafka instance: a reader and a writer. I'm trying to tweak my producer, consumer, and kafka server settings to get a particular behavior.</p> <p>My writer:</p> <pre><code>package main import ( "fmt" "math/rand" "strconv" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { rand.Seed(time.Now().UnixNano()) topics := []string{ "policymanager-100", "policymanager-200", "policymanager-300", } progress := make(map[string]int) for _, t := range topics { progress[t] = 0 } producer, err := kafka.NewProducer(&amp;kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "0", }) if err != nil { panic(err) } defer producer.Close() fmt.Println("producing messages...") for i := 0; i &lt; 30; i++ { index := rand.Intn(len(topics)) topic := topics[index] num := progress[topic] num++ fmt.Printf("%s =&gt; %d ", topic, num) msg := &amp;kafka.Message{ Value: []byte(strconv.Itoa(num)), TopicPartition: kafka.TopicPartition{ Topic: &amp;topic, }, } err = producer.Produce(msg, nil) if err != nil { panic(err) } progress[topic] = num time.Sleep(time.Millisecond * 100) } fmt.Println("DONE") } </code></pre> <p>There are three topics that exist on my local kafka: policymanager-100, policymanager-200, policymanager-300. They each only have 1 partition to ensure all messages are sorted by the time kafka receives them. My writer will randomly pick one of those topics and issue a message consisting of a number that increments solely for that topic. When it's done running, I expect the queues to look something like this (topic names shortened for legibility):</p> <pre><code>100: 1 2 3 4 5 6 7 8 9 10 11 200: 1 2 3 4 5 6 7 300: 1 2 3 4 5 6 7 8 9 10 11 12 </code></pre> <p>So far so good. I'm trying to configure things so that any number of consumers can be spun up and consume these messages in order. By "in-order" I mean that no consumer should get message 2 for topic 100 until message 1 is COMPLETED (not just started). If message 1 for topic 100 is being worked on, consumers are free to consume from other topics that currently don't have a message being processed. If a message of a topic has been sent to a consumer, that entire topic should become "locked" until either a timeout assumes that the consumer failed or the consumer commits the message, then the topic is "unlocked" to have it's next message made available to be consumed.</p> <p>My reader:</p> <pre><code>package main import ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { count := 2 for i := 0; i &lt; count; i++ { go consumer(i + 1) } fmt.Println("cosuming...") // hold this thread open indefinitely select {} } func consumer(id int) { c, err := kafka.NewConsumer(&amp;kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "0", // strconv.Itoa(id), "enable.auto.commit": "false", }) if err != nil { panic(err) } c.SubscribeTopics([]string{`^policymanager-.+$`}, nil) for { msg, err := c.ReadMessage(-1) if err != nil { panic(err) } fmt.Printf("%d) Message on %s: %s ", id, msg.TopicPartition, string(msg.Value)) time.Sleep(time.Second) _, err = c.CommitMessage(msg) if err != nil { fmt.Printf("ERROR commiting: %+v ", err) } } } </code></pre> <p>From my current understanding, the way I'm likely to achieve this is by setting up my consumer properly. I've tried many different variations of this program. I've tried having all my goroutines share the same consumer. I've tried using a different <code>group.id</code> for each goroutine. None of these was the right configuration to get the behavior I'm after.</p> <p>What the posted code does is empty out one topic at a time. Despite having multiple goroutines, the process will read all of 100 then move to 200 then 300 and only one goroutine will actually do all the reading. When I let each goroutine have a different <code>group.id</code> then messages get read by multiple goroutines which I would like to prevent.</p> <p>My example consumer is simply breaking things up with goroutines but when I begin working this project into my use case at work, I'll need this to work across multiple kubernetes instances that won't be talking to each other so using anything that interacts between goroutines won't work as soon as there are 2 instances on 2 kubes. That's why I'm hoping to make kafka do the gatekeeping I want.</p> </div>

kafka消费者迭代器卡死

配置kafka+zk在虚拟机上,自己在本机上的生产者可以正常发送消息,但是消费者在 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 之后 iterator 的任何操作都直接卡死了,请问这个是怎么回事?在这个操作之前debug是可以看到变量内部的数值的,这个操作之后就不能看了,value全部清空了。 贴源码 ``` package com.weixinjia.recreation.queue.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerGroupExample extends Thread{ private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // // executor = Executors.newFixedThreadPool(a_numThreads); System.out.println(streams.size()); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); System.out.println(iterator); while(iterator.hasNext()){ MessageAndMetadata<byte[], byte[]> next = iterator.next(); byte[] message = next.message(); String string = new String(message); System.out.println(string); } } System.out.println("消息输出完成"); } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1500"); props.put("zookeeper.sync.time.ms", "4000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "10240000"); props.put("auto.commit.enable", "true"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "masterServce:2181"; String groupId = "group-1"; String topic = "test2"; int threads = 3; ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } ``` 消息在kafka那边直接用kafka-console-consumer.sh是可以查询到的

启动kafka 消费者有时获取不到消息

kafka消费者启动的时候有时候不能获取到消息,但是重启后就可以了,有时候还要重启好多次。。。不知道是为什么,希望大神能指导一下。 [ INFO ] [2016-09-29 14:34:53] org.hibernate.validator.internal.util.Version [30] - HV000001: Hibernate Validator 5.2.4.Final [ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [48] - Starting ApplicationMain on zhuxiang with PID 1740 (D:\IdeaProjects\green-salad\adx-stat\target\classes started by zhuxiang in D:\IdeaProjects\green-salad) [ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [663] - The following profiles are active: dev [ INFO ] [2016-09-29 14:34:54] org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext [581] - Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5754de72: startup date [Thu Sep 29 14:34:54 CST 2016]; root of context hierarchy 2016-09-29 14:34:55 JRebel: Monitoring Spring bean definitions in 'D:\IdeaProjects\green-salad\adx-stat\target\classes\spring-integration-consumer.xml'. [ INFO ] [2016-09-29 14:34:55] org.springframework.beans.factory.xml.XmlBeanDefinitionReader [317] - Loading XML bean definitions from URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml] [ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties] 2016-09-29 14:34:56 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'. [ INFO ] [2016-09-29 14:34:56] org.springframework.integration.config.IntegrationRegistrar [330] - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. [ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.support.DefaultListableBeanFactory [843] - Overriding bean definition for bean 'kafkaConsumerService' with a different definition: replacing [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in file [D:\IdeaProjects\green-salad\adx-stat\target\classes\com\coocaa\salad\stat\service\KafkaConsumerService.class]] with [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml]] [ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [130] - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. [ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [158] - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. [ INFO ] [2016-09-29 14:34:57] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [class org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$3dea2e76] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:58] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties] 2016-09-29 14:34:58 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'. [ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:59] org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer [88] - Tomcat initialized with port(s): 8081 (http) spring-integration-consumer.xml内容如下: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> <int:channel id="inputFromKafka"> <int:dispatcher task-executor="kafkaMessageExecutor"/> </int:channel> <!-- zookeeper配置 可以配置多个 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.20.135.95:2181,172.20.135.95:2182" zk-connection-timeout="10000" zk-session-timeout="10000" zk-sync-time="2000"/> <!-- channel配置 auto-startup="true" 否则接收不发数据 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="1" time-unit="MILLISECONDS"/> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500"/> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> <prop key="auto.commit.enables">true</prop> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.coocaa.salad.stat.service.KafkaConsumerService"/> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage"/> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="group-4" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <!-- 两个TOPIC配置 --> <int-kafka:topic id="clientsRequests2" streams="4"/> <!--<int-kafka:topic id="sunneytopic" streams="4" />--> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans> kafka版本0.10的

kafka的消息只能被consumer group中的一个消费者消费,那这个group的意义何在?

比如说,我现在有A,B,C三个机器,都作为console-consumer,若三个消费者放在一个群里,按我的理解,这个群里的每个人都应该接受到订阅的所有信息的啊,如果只有一个能消费,那何必要放在一个群里呢?

软件测试入门、SQL、性能测试、测试管理工具

软件测试2小时入门,让您快速了解软件测试基本知识,有系统的了解; SQL一小时,让您快速理解和掌握SQL基本语法 jmeter性能测试 ,让您快速了解主流来源性能测试工具jmeter 测试管理工具-禅道,让您快速学会禅道的使用,学会测试项目、用例、缺陷的管理、

计算机组成原理实验教程

西北工业大学计算机组成原理实验课唐都仪器实验帮助,同实验指导书。分为运算器,存储器,控制器,模型计算机,输入输出系统5个章节

Java 最常见的 200+ 面试题:面试必备

这份面试清单是从我 2015 年做了 TeamLeader 之后开始收集的,一方面是给公司招聘用,另一方面是想用它来挖掘在 Java 技术栈中,还有那些知识点是我不知道的,我想找到这些技术盲点,然后修复它,以此来提高自己的技术水平。虽然我是从 2009 年就开始参加编程工作了,但我依旧觉得自己现在要学的东西很多,并且学习这些知识,让我很有成就感和满足感,那所以何乐而不为呢? 说回面试的事,这份面试...

winfrom中嵌套html,跟html的交互

winfrom中嵌套html,跟html的交互,源码就在里面一看就懂,很简单

玩转Python-Python3基础入门

总课时80+,提供源码和相关资料 本课程从Python零基础到纯Python项目实战。内容详细,案例丰富,覆盖了Python知识的方方面面,学完后不仅对Python知识有个系统化的了解,让你从Python小白变编程大牛! 课程包含: 1.python安装 2.变量、数据类型和运算符 3.选择结构 4.循环结构 5.函数和模块 6.文件读写 7.了解面向对象 8.异常处理

程序员的兼职技能课

获取讲师答疑方式: 在付费视频第一节(触摸命令_ALL)片头有二维码及加群流程介绍 限时福利 原价99元,今日仅需39元!购课添加小助手(微信号:itxy41)按提示还可领取价值800元的编程大礼包! 讲师介绍: 苏奕嘉&nbsp;前阿里UC项目工程师 脚本开发平台官方认证满级(六级)开发者。 我将如何教会你通过【定制脚本】赚到你人生的第一桶金? 零基础程序定制脚本开发课程,是完全针对零脚本开发经验的小白而设计,课程内容共分为3大阶段: ①前期将带你掌握Q开发语言和界面交互开发能力; ②中期通过实战来制作有具体需求的定制脚本; ③后期将解锁脚本的更高阶玩法,打通任督二脉; ④应用定制脚本合法赚取额外收入的完整经验分享,带你通过程序定制脚本开发这项副业,赚取到你的第一桶金!

HoloLens2开发入门教程

本课程为HoloLens2开发入门教程,讲解部署开发环境,安装VS2019,Unity版本,Windows SDK,创建Unity项目,讲解如何使用MRTK,编辑器模拟手势交互,打包VS工程并编译部署应用到HoloLens上等。

基于VHDL的16位ALU简易设计

基于VHDL的16位ALU简易设计,可完成基本的加减、带进位加减、或、与等运算。

MFC一站式终极全套课程包

该套餐共包含从C小白到C++到MFC的全部课程,整套学下来绝对成为一名C++大牛!!!

利用Verilog实现数字秒表(基本逻辑设计分频器练习)

设置复位开关。当按下复位开关时,秒表清零并做好计时准备。在任何情况下只要按下复位开关,秒表都要无条件地进行复位操作,即使是在计时过程中也要无条件地进行清零操作。 设置启/停开关。当按下启/停开关后,将

董付国老师Python全栈学习优惠套餐

购买套餐的朋友可以关注微信公众号“Python小屋”,上传付款截图,然后领取董老师任意图书1本。

Python可以这样学(第一季:Python内功修炼)

董付国系列教材《Python程序设计基础》、《Python程序设计(第2版)》、《Python可以这样学》配套视频,讲解Python 3.5.x和3.6.x语法、内置对象用法、选择与循环以及函数设计与使用、lambda表达式用法、字符串与正则表达式应用、面向对象编程、文本文件与二进制文件操作、目录操作与系统运维、异常处理结构。

计算机操作系统 第三版.pdf

计算机操作系统 第三版 本书全面介绍了计算机系统中的一个重要软件——操作系统(OS),本书是第三版,对2001年出版的修订版的各章内容均作了较多的修改,基本上能反映当前操作系统发展的现状,但章节名称基

技术大佬:我去,你写的 switch 语句也太老土了吧

昨天早上通过远程的方式 review 了两名新来同事的代码,大部分代码都写得很漂亮,严谨的同时注释也很到位,这令我非常满意。但当我看到他们当中有一个人写的 switch 语句时,还是忍不住破口大骂:“我擦,小王,你丫写的 switch 语句也太老土了吧!” 来看看小王写的代码吧,看完不要骂我装逼啊。 private static String createPlayer(PlayerTypes p...

Vue.js 2.0之全家桶系列视频课程

基于新的Vue.js 2.3版本, 目前新全的Vue.js教学视频,让你少走弯路,直达技术前沿! 1. 包含Vue.js全家桶(vue.js、vue-router、axios、vuex、vue-cli、webpack、ElementUI等) 2. 采用笔记+代码案例的形式讲解,通俗易懂

微信公众平台开发入门

本套课程的设计完全是为初学者量身打造,课程内容由浅入深,课程讲解通俗易懂,代码实现简洁清晰。通过本课程的学习,学员能够入门微信公众平台开发,能够胜任企业级的订阅号、服务号、企业号的应用开发工作。 通过本课程的学习,学员能够对微信公众平台有一个清晰的、系统性的认识。例如,公众号是什么,它有什么特点,它能做什么,怎么开发公众号。 其次,通过本课程的学习,学员能够掌握微信公众平台开发的方法、技术和应用实现。例如,开发者文档怎么看,开发环境怎么搭建,基本的消息交互如何实现,常用的方法技巧有哪些,真实应用怎么开发。

150讲轻松搞定Python网络爬虫

【为什么学爬虫?】 &nbsp; &nbsp; &nbsp; &nbsp;1、爬虫入手容易,但是深入较难,如何写出高效率的爬虫,如何写出灵活性高可扩展的爬虫都是一项技术活。另外在爬虫过程中,经常容易遇到被反爬虫,比如字体反爬、IP识别、验证码等,如何层层攻克难点拿到想要的数据,这门课程,你都能学到! &nbsp; &nbsp; &nbsp; &nbsp;2、如果是作为一个其他行业的开发者,比如app开发,web开发,学习爬虫能让你加强对技术的认知,能够开发出更加安全的软件和网站 【课程设计】 一个完整的爬虫程序,无论大小,总体来说可以分成三个步骤,分别是: 网络请求:模拟浏览器的行为从网上抓取数据。 数据解析:将请求下来的数据进行过滤,提取我们想要的数据。 数据存储:将提取到的数据存储到硬盘或者内存中。比如用mysql数据库或者redis等。 那么本课程也是按照这几个步骤循序渐进的进行讲解,带领学生完整的掌握每个步骤的技术。另外,因为爬虫的多样性,在爬取的过程中可能会发生被反爬、效率低下等。因此我们又增加了两个章节用来提高爬虫程序的灵活性,分别是: 爬虫进阶:包括IP代理,多线程爬虫,图形验证码识别、JS加密解密、动态网页爬虫、字体反爬识别等。 Scrapy和分布式爬虫:Scrapy框架、Scrapy-redis组件、分布式爬虫等。 通过爬虫进阶的知识点我们能应付大量的反爬网站,而Scrapy框架作为一个专业的爬虫框架,使用他可以快速提高我们编写爬虫程序的效率和速度。另外如果一台机器不能满足你的需求,我们可以用分布式爬虫让多台机器帮助你快速爬取数据。 &nbsp; 从基础爬虫到商业化应用爬虫,本套课程满足您的所有需求! 【课程服务】 专属付费社群+每周三讨论会+1v1答疑

SEIR课程设计源码与相关城市数据.rar

SEIR结合学报与之前博客结合所做的一些改进,选择其中三个城市进行拟合仿真SEIR结合学报与之前博客结合所做的一些改进,选择其中三个城市进行拟合仿真SEIR结合学报与之前博客结合所做的一些改进,选择其

Python数据挖掘简易入门

&nbsp; &nbsp; &nbsp; &nbsp; 本课程为Python数据挖掘方向的入门课程,课程主要以真实数据为基础,详细介绍数据挖掘入门的流程和使用Python实现pandas与numpy在数据挖掘方向的运用,并深入学习如何运用scikit-learn调用常用的数据挖掘算法解决数据挖掘问题,为进一步深入学习数据挖掘打下扎实的基础。

2019 AI开发者大会

2019 AI开发者大会(AI ProCon 2019)是由中国IT社区CSDN主办的AI技术与产业年度盛会。多年经验淬炼,如今蓄势待发:2019年9月6-7日,大会将有近百位中美顶尖AI专家、知名企业代表以及千余名AI开发者齐聚北京,进行技术解读和产业论证。我们不空谈口号,只谈技术,诚挚邀请AI业内人士一起共铸人工智能新篇章!

Java面试题大全(2020版)

发现网上很多Java面试题都没有答案,所以花了很长时间搜集整理出来了这套Java面试题大全,希望对大家有帮助哈~ 本套Java面试题大全,全的不能再全,哈哈~ 一、Java 基础 1. JDK 和 JRE 有什么区别? JDK:Java Development Kit 的简称,java 开发工具包,提供了 java 的开发环境和运行环境。 JRE:Java Runtime Environ...

定量遥感中文版 梁顺林著 范闻捷译

这是梁顺林的定量遥感的中文版,由范闻捷等翻译的,是电子版PDF,解决了大家看英文费时费事的问题,希望大家下载看看,一定会有帮助的

GIS程序设计教程 基于ArcGIS Engine的C#开发实例

张丰,杜震洪,刘仁义编著.GIS程序设计教程 基于ArcGIS Engine的C#开发实例.浙江大学出版社,2012.05

人工智能-计算机视觉实战之路(必备算法+深度学习+项目实战)

系列课程主要分为3大阶段:(1)首先掌握计算机视觉必备算法原理,结合Opencv进行学习与练手,通过实际视项目进行案例应用展示。(2)进军当下最火的深度学习进行视觉任务实战,掌握深度学习中必备算法原理与网络模型架构。(3)结合经典深度学习框架与实战项目进行实战,基于真实数据集展开业务分析与建模实战。整体风格通俗易懂,项目驱动学习与就业面试。 建议同学们按照下列顺序来进行学习:1.Python入门视频课程 2.Opencv计算机视觉实战(Python版) 3.深度学习框架-PyTorch实战/人工智能框架实战精讲:Keras项目 4.Python-深度学习-物体检测实战 5.后续实战课程按照自己喜好选择就可以

三个项目玩转深度学习(附1G源码)

从事大数据与人工智能开发与实践约十年,钱老师亲自见证了大数据行业的发展与人工智能的从冷到热。事实证明,计算机技术的发展,算力突破,海量数据,机器人技术等,开启了第四次工业革命的序章。深度学习图像分类一直是人工智能的经典任务,是智慧零售、安防、无人驾驶等机器视觉应用领域的核心技术之一,掌握图像分类技术是机器视觉学习的重中之重。针对现有线上学习的特点与实际需求,我们开发了人工智能案例实战系列课程。打造:以项目案例实践为驱动的课程学习方式,覆盖了智能零售,智慧交通等常见领域,通过基础学习、项目案例实践、社群答疑,三维立体的方式,打造最好的学习效果。

微信小程序开发实战之番茄时钟开发

微信小程序番茄时钟视频教程,本课程将带着各位学员开发一个小程序初级实战类项目,针对只看过官方文档而又无从下手的开发者来说,可以作为一个较好的练手项目,对于有小程序开发经验的开发者而言,可以更好加深对小程序各类组件和API 的理解,为更深层次高难度的项目做铺垫。

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

去除异常值matlab程序

数据预处理中去除异常值的程序,matlab写成

用verilog HDL语言编写的秒表

在秒表设计中,分模块书写。用在七段数码管上显示。输入频率是1KHZ.可以显示百分秒,秒,分。如要显示小时,只需修改leds里的代码和主模块代码。改程序以通过硬件电路验证。完全正确。

[透视java——反编译、修补和逆向工程技术]源代码

源代码。

相关热词 c# 不能序列化继承类 c# char* 调用 c# 开发dll模板 c#添加控件到工具箱 c#控制台组合数 编程计算猴子吃桃问题c# c# wpf 背景透明 随机抽取号码软件c# c# 开发环境 c# 属性和字段
立即提问