spring与kafka的key值传入问题?

我想问一下,我发送消息的时候,发送消息的方法:;我传入一个key,xml里配置了序列化,报错: org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer,该怎么解决,谢谢

1个回答

我今天也遇到这个问题,
首先'[B'在class字节码中表示byte数组,所以问题就是无法将byte[]转换为StringSerializer中指定的类型.
然后确认原因,源码看一下StringSerializer,会发现方法描述是public byte[] serialize(String topic, String data),所以必须传递的是String类型的数据,证实原因.
下面说解决方案:
application.yml修改kafka的value配置参数

spring:
    kafka:
        producer:
                  value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

问题解决,如何避免这个,初步猜测,主要是在于使用spring cloud stream 的binder @Output会将参数转换为byte[].下一步可以继续debugging看一下,下面如何设置.

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Spring集成kafka,消费者运行时内存占用会一直增长?

本人用Spring集成kafka消费者,发布运行时内存占用会一直升高,最后程序挂掉。请各位大神看看,提供解决方法 以下是我的配置文件 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978342_260014.png) 程序运行两天后占用内存达到了1.4G,我用jmap导出程序占用文件,使用eclipsemat分析 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978543_231966.png) ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978554_565464.png) 发现是这个org.springframework.kafka.listener.KafkaMessageListenerContainer这个类里面 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978671_113331.png) 这个里面的LinkedBlockingQueue这个队列像是没释放一样。不知道是不是还需要配置什么东西,,一直找不到什么方法来解决。

spring-kafka做了分区,部分分区数据可以正常消费,部分分区始终无法消费?

spring-kafka(版本:1.0.6.RELEASE,kafka-client:0.9.0.1),创建了8个分区,有一个分区的数据始终无法消费,其他分区数据正常消费。有大神知道是为什么?

springboot kafka 怎么配置 max.request.size

项目使用 springboot + kafka,有一个消息的内容比较大,出现了下面的异常 The message is 1330537 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 但是找了半天,不知道这个到底怎么配置,追踪了下源码也没看明白。哪位大神指导下? 其他的配置在 application.properties 中配置如下, spring.kafka.bootstrap-servers=**** spring.kafka.consumer.group-id=vprGroup spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer 想按照上面的格式这么配也不行 spring.kafka.producer.max-request-size=2097152

spring boot 1.5集成 kafka 消费者怎么自己确认消费

spring boot 1.5集成 kafka 消费者怎么自己确认消费 怎么使用@KafkaListener注解实现Acknowledgment,即消费者怎么自己提交游标

求救!kafka maven依赖冲突问题

小弟目前在写毕业设计项目,其中涉及到kafka javaapi的一段代码: ``` import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public void sendMessage(AppLogEntity e) { //创建配置对象 Properties props = new Properties(); props.put("metadata.broker.list", "192.168.72.182:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); //创建生产者 Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); sendSingleLog(producer,Constants.TOPIC_APP_STARTUP,e.getAppStartupLogs()); sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs()); sendSingleLog(producer,Constants.TOPIC_APP_EVENT,e.getAppEventLogs()); sendSingleLog(producer,Constants.TOPIC_APP_PAGE,e.getAppPageLogs()); sendSingleLog(producer,Constants.TOPIC_APP_USAGE,e.getAppUsageLogs()); //发送消息 producer.close(); } ``` 框架是SSM,启动后报错 org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'collectLogController': Failed to introspect bean class [com.automan.applogs.collect.web.controller.CollectLogController] for lookup method metadata: could not find class that it depends on; nested exception is java.lang.NoClassDefFoundError: kafka/javaapi/producer/Producer 再往下看,即是 Caused by: java.lang.ClassNotFoundException: kafka.javaapi.producer.Producer 我第一反应是去看pom文件,发现依赖都没有问题,但点开详细dependency后,发现kafka的依赖中scala版本冲突了, ![图片说明](https://img-ask.csdn.net/upload/201911/26/1574744977_566670.png) 查询了各种办法都解决不掉,也不知道是不是这个问题导致上述报错,特来求救,感激不尽~~

kafka消费者速度与什么有关

@KafkaListener(topics = {"CRBKC0002.000"}) public void sendSmsInfoByBizType(String record) { } 假设单机版的kafka,就一个节点。 1、 @KafkaListener注解接受消费者,是不是等这个方法执行完。 这个消费者进程才算消费结束。是不是一个镜像这个方法同时只能执行一次?就是不能连续起多个线程执行这个方法。 2、如果接受到参数就算消费这进程结束,也就是获取这个record消费者进程就结束了,那假设生产者一秒生产100w数据进入kafka。那这边获取参数就算消费者进程消费结束,那是不是相当于瞬间连续起100w这个方法线程执行。可是tomcat就200线程。

kafka发送消息报错Exception thrown when sending a message with key='null'

![图片说明](https://img-ask.csdn.net/upload/201907/10/1562773073_882822.png) 使用jps 命令 [1] 9795 [root@localhost kafka]# jps 10099 Jps 8294 Kafka 9574 NamesrvStartup [1]+ Done bin/kafka-server-start.sh -daemon config/server.properties 发送接收消息的配置如下 1.appacliation.properties spring.kafka.bootstrap-servers=192.168.209.128:9092 spring.kafka.consumer.group-id=0 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.batch-size=65536 spring.kafka.producer.buffer-memory=524288 2.java 发送接收消息 @GetMapping("/sendOrder") public String sendOrder(@RequestParam("orderId")String orderId) { kafkaTemplate.send("order",new Order(orderId)); return "success"; } @KafkaListener(topics = {"order"}) public void receiveOrder(ConsumerRecord<?,?>consumer) { logger.info("{}-{}:{}",consumer.topic(),consumer.key(),consumer.value()); } 刚开始我执行了一个发送string 消息的,结果就是死循环一样不停的打印Exception 之前的消息, 哪位大佬给解决下,多谢。

spring boot 整合kafka报错怎么解决?

整合了一下spring boot跟kafka老是一直报错 ``` 2018-05-20 18:56:26.920 ERROR 4428 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='myTest--------1' to topic myTest: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. ``` 请问一下各位大佬怎么解决的? 相应代码如下,也是参考了网上的: ``` <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> ``` kafka版本及配置 kafka_2.11-1.1.0 配置: ``` port = 9092 host.name = 阿里内网ip advertised.host.name = xxx.xxx.xx 阿里外网ip ``` 其他配置默认了 然后spring boot配置 ``` kafka: bootstrap-servers: xxx.xxx.xxx.xxx:9092 listener.concurrency: 3 producer.batch-size: 1000 consumer.group-id: test-consumer-group ``` ``` ``` @Configuration @EnableKafka public class KafkaConfiguration { } ``` @Component public class MsgProducer { private Logger log = LoggerFactory.getLogger(MsgProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topicName, String jsonData) { log.info("向kafka推送数据:[{}]", jsonData); try { kafkaTemplate.send(topicName, jsonData); } catch (Exception e) { System.out.println("发送数据出错!!!"+topicName+","+jsonData); System.out.println("发送数据出错=====>"+e); log.error("发送数据出错!!!{}{}", topicName, jsonData); log.error("发送数据出错=====>", e); } //消息发送的监听器,用于回调返回信息 kafkaTemplate.setProducerListener(new ProducerListener<String, String>() { @Override public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) { System.out.println("topic-----"+topic); } @Override public void onError(String topic, Integer partition, String key, String value, Exception exception) { } @Override public boolean isInterestedInSuccess() { log.info("数据发送完毕"); System.out.println("数据发送完毕"); return false; } }); } } @Component public class MsgConsumer { @KafkaListener(topics = {"myTest"}) public void processMessage(String content) { System.out.println("消息被消费"+content); } } @RunWith(SpringRunner.class) @SpringBootTest public class TestKafka { @Autowired private MsgProducer msgProducer; @Test public void test() throws Exception { msgProducer.sendMessage("myTest", "myTest--------1"); } } ``` 就是这样;运行test老是一直这个错误

springboot整合kafka,kafka已正常加载,但是consumer的listner不执行。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.unitdream.kafka</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> ``` package com.unitdream.kafka.consumer.kafkaconfig; import com.unitdream.kafka.consumer.kafkaconfig.listener.KafkaConsumerListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { public Map<String, Object> consumerProperties() { Map<String, Object> props = new HashMap<String, Object>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "n1:9092,n2:9092,n3:9092,n4:9092,es1:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "alarm"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<String, String>(consumerProperties()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public KafkaConsumerListener kafkaConsumerListener(){ return new KafkaConsumerListener(); } } ``` package com.unitdream.kafka.consumer.kafkaconfig.listener; import com.unitdream.kafka.consumer.services.MqKafkaAnalysisService; import com.unitdream.kafka.consumer.services.MqKafkaMataService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; public class KafkaConsumerListener { private static final Logger loger = LoggerFactory.getLogger(KafkaConsumerListener.class); @Autowired private MqKafkaMataService mqKafkaMataService; @Autowired private MqKafkaAnalysisService mqKafkaAnalysisService; @KafkaListener(topics = {"alarm"}) public void listener(ConsumerRecord<String, String> record) { loger.info("kafka消费[{}-{}]-{}的数据--------------------start",record.topic(),record.partition(),record.offset()); if (mqKafkaMataService.insertOne(record) > 0) { mqKafkaAnalysisService.insertOne(record); } loger.info("kafka消费[{}-{}]-{}的数据--------------------end",record.topic(),record.partition(),record.offset()); } }

怎么解决的 kafka消费内存溢出问题 ?

![图片说明](https://img-ask.csdn.net/upload/201906/06/1559809105_761647.png)![图片说明](https://img-ask.csdn.net/upload/201906/06/1559809115_821758.png) 并发消费导致内存溢出问题

springcloudbus自定义event消息kafka中收不到消息,springcloud Greenwich版不行,daston版本可以

自定义event,在发送后在Kafka中并没有出现新增的消息 发布事件的代码 protected void fireEvent(String eventAction, Product productDto) { ProductEvent productEvent = new ProductEvent(productDto, context.getId(), "service-hello:**", eventAction, productDto.getItemCode()); // 发布事件 context.publishEvent(productEvent); } yml配置 server: port: 2100 spring: application: name: SERVICE-HELLO cloud: stream: bindings: output: destination: twostepsfromjava-cloud-producttopic content-type: application/json kafka: binder: brokers: localhost:9092 代码没有报错,但是执行完kafka对应的topic没有消息 同样的代码旧版本可以Daston可以,有没有知道解决办法,或者给个可以的greenwitch版本的demo

spring cloud stream 集成Kafka 接收到的消息内容前带contentType、application/json字符串

1、发送消息的代码: ``` this.source.output().send(MessageBuilder.withPayload("{'name':'zyy'}").setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()); ``` 2、接受消息的代码: ``` @StreamListener(target = Sink.INPUT) public void onProductMsg(@Payload Object object) { System.out.println(object); } ``` 3、结果: ![图片说明](https://img-ask.csdn.net/upload/201904/25/1556154220_224090.jpg) 4、请教大家怎么去掉contentType、application/json字符串

关于kafka写producer的问题

kafka在写producer时,采用轮询partition的方式指定KeyedMessage的key值,这样对性能是否有很大影响?

kafka 集成 kerberos ,启动kafka报错

kafka 使用kerberos协议的时候,启动kakfa的时候报zookeeper校验不通过。 错误信息如下:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098295_69981.png) kerberos的用户密钥:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098396_900993.png) kerberos的etc/krb5.conf配置信息:[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = EXAMPLE.COM default_tkt_enctypes = arcfour-hmac-md5 dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true [realms] EXAMPLE.COM = { kdc = 192.168.1.41 admin_server = 192.168.1.41 } [domain_realm] kafka = EXAMPLE.COM zookeeper = EXAMPLE.COM weiwei = EXAMPLE.COM 192.168.1.41 = EXAMPLE.COM 127.0.0.1 = EXAMPLE.COM kerberos 的var/kerberos/krb5kdc/kdc.conf的配置信息: [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] EXAMPLE.COM = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab max_renewable_life = 7d supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal } kafka的kafka_server_jaas.conf的配置信息: KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="kafka/weiwei@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper_jaas.conf的配置信息: Server{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper.properties的新增配置信息: authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 server.properties 新增的配置信息: advertised.host.name=192.168.1.41 advertised.listeners=SASL_PLAINTEXT://192.168.1.41:9092 listeners=SASL_PLAINTEXT://192.168.1.41:9092 #listeners=PLAINTEXT://127.0.0.1:9093 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka zookeeper-server-start.sh 新增的配置信息 export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/zookeeper_jaas.conf' kafka-server-start.sh 新增的配置信息: export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/kafka_server_jaas.conf' 配置信息基本是这样,快过年了,小弟在线求救,再预祝大侠们新年快乐。 ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ```

springboot结合kafka时0毫秒关闭消息生产者

自己在做springboot结合kafka的时候,运行消息生产者,结果console显示了生产者的相关配置以及提示生产者在0毫秒内关闭了消息生产者,然后生产者发送消息失败。想请问下各位大佬这到底是是配置文件的问题还是消息生产者发送消息的时候出现了问题啊。 消息发送者代码: @Component @EnableKafka public class MessageSender { @Autowired private KafkaTemplate<String,String> kafkaTemplate; //private static final MessageSender sender=new MessageSender(); /* * * kafka客户端发送消息 * @param topic 主题 * @param message 消息内容 * @return*/ public boolean sendMessage(String topic,String message) { try { System.out.println("topic"+topic+"message"+message); kafkaTemplate.send(topic, message); } catch (Exception e) { return false; } return true; } } 控制台具体信息如下: 2019-07-09 22:43:34.008 INFO 7916 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 65536 bootstrap.servers = [192.168.2.2:9092] buffer.memory = 524288 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringDeserializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-07-09 22:43:34.019 INFO 7916 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms. 2019-07-09 22:43:34.040 DEBUG 7916 --- [nio-8080-exec-1] o.s.b.w.s.f.OrderedRequestContextFilter : Cleared thread-bound request context: org.apache.catalina.connector.RequestFacade@2e251d9f 2019-07-09 22:43:34.077 DEBUG 7916 --- [nio-8080-exec-2] o.s.b.w.s.f.OrderedRequestContextFilter : Bound request context to thread: org.apache.catalina.connector.RequestFacade@2e251d9f 2019-07-09 22:43:34.112 DEBUG 7916 --- [nio-8080-exec-2] o.s.b.w.s.f.OrderedRequestContextFilter : Cleared thread-bound request context: org.apache.catalina.connector.RequestFacade@2e251d9f ``` ```

kafka和spring集成问题:Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.config.ContainerProperties

springboot 和kafka集成提示Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.config.ContainerProperties错误 详细如下: Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2] at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:507) ~[spring-core-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.util.ReflectionUtils.doWithLocalMethods(ReflectionUtils.java:367) ~[spring-core-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.buildLifecycleMetadata(InitDestroyAnnotationBeanPostProcessor.java:208) ~[spring-beans-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.findLifecycleMetadata(InitDestroyAnnotationBeanPostProcessor.java:189) ~[spring-beans-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessMergedBeanDefinition(InitDestroyAnnotationBeanPostProcessor.java:128) ~[spring-beans-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.postProcessMergedBeanDefinition(CommonAnnotationBeanPostProcessor.java:297) ~[spring-context-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyMergedBeanDefinitionPostProcessors(AbstractAutowireCapableBeanFactory.java:1013) ~[spring-beans-5.0.13.RELEASE.jar:5.0.13.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:547) ~[spring-beans-5.0.13.RELEASE.jar:5.0.13.RELEASE] ... 15 common frames omitted Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/config/ContainerProperties at java.lang.Class.getDeclaredMethods0(Native Method) ~[na:1.8.0_131] at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[na:1.8.0_131] at java.lang.Class.getDeclaredMethods(Class.java:1975) ~[na:1.8.0_131] at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:489) ~[spring-core-5.0.13.RELEASE.jar:5.0.13.RELEASE] ... 22 common frames omitted Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.config.ContainerProperties at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131] ... 26 common frames omitted ``` ```

关于kafka topic分区的问题

各位为好: 就是kafka将topic分为多个区,然后将区分布在多个sever上;如果将10个topic,每个topic分1个区。那么这个topic的10个区会分布在不同的server上吗?还是会集中在1,2台server上?如果分为多个区的话,我知道是会均匀分布的; 另外每个区是在那个server上怎么知道?owner 只是告诉了是消费者在消费当前分区,不知道当前分区在那个server上。 ![图片说明](https://img-ask.csdn.net/upload/201504/12/1428828615_949831.png)

kafka ArrayIndexOutOfBoundsException: 18

在kafka上出了下面这个问题,上网查了下都说是新版的kafka clien向旧版的kafka发送请求,旧版的kafka(<0.10)不支持ApiVersion(key:18) Request,造成的,但是我所有的produce,consumer,kafka服务器上装的kafka clien都是0.9.0.1,应该不会出现这个问题才对,为什么?求各位大神指点 ``` [2018-10-25 10:03:17,919] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2018-10-25 10:03:18,080] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [topic-test,0] (kafka.server.ReplicaFetcherManager) [2018-10-25 10:03:18,099] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [topic-test,0] (kafka.server.ReplicaFetcherManager) [2018-10-25 10:03:48,864] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:748) ```

springboot 集成 kafka 广播式消费

项目是springboot 集成 kafka ,生产环境有四台实例部署.我想实现Kafka广播式消费, 需要每个实例拥有不同的groupId,那么我在springboot中需要如何配置呢? 每个实例跑的代码和配置中心的配置都是相同的.

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

程序员请照顾好自己,周末病魔差点一套带走我。

程序员在一个周末的时间,得了重病,差点当场去世,还好及时挽救回来了。

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

Intellij IDEA 实用插件安利

1. 前言从2020 年 JVM 生态报告解读 可以看出Intellij IDEA 目前已经稳坐 Java IDE 头把交椅。而且统计得出付费用户已经超过了八成(国外统计)。IDEA 的...

搜狗输入法也在挑战国人的智商!

故事总是一个接着一个到来...上周写完《鲁大师已经彻底沦为一款垃圾流氓软件!》这篇文章之后,鲁大师的市场工作人员就找到了我,希望把这篇文章删除掉。经过一番沟通我先把这篇文章从公号中删除了...

总结了 150 余个神奇网站,你不来瞅瞅吗?

原博客再更新,可能就没了,之后将持续更新本篇博客。

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

魂迁光刻,梦绕芯片,中芯国际终获ASML大型光刻机

据羊城晚报报道,近日中芯国际从荷兰进口的一台大型光刻机,顺利通过深圳出口加工区场站两道闸口进入厂区,中芯国际发表公告称该光刻机并非此前盛传的EUV光刻机,主要用于企业复工复产后的生产线扩容。 我们知道EUV主要用于7nm及以下制程的芯片制造,光刻机作为集成电路制造中最关键的设备,对芯片制作工艺有着决定性的影响,被誉为“超精密制造技术皇冠上的明珠”,根据之前中芯国际的公报,目...

优雅的替换if-else语句

场景 日常开发,if-else语句写的不少吧??当逻辑分支非常多的时候,if-else套了一层又一层,虽然业务功能倒是实现了,但是看起来是真的很不优雅,尤其是对于我这种有强迫症的程序"猿",看到这么多if-else,脑袋瓜子就嗡嗡的,总想着解锁新姿势:干掉过多的if-else!!!本文将介绍三板斧手段: 优先判断条件,条件不满足的,逻辑及时中断返回; 采用策略模式+工厂模式; 结合注解,锦...

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

2020阿里全球数学大赛:3万名高手、4道题、2天2夜未交卷

阿里巴巴全球数学竞赛( Alibaba Global Mathematics Competition)由马云发起,由中国科学技术协会、阿里巴巴基金会、阿里巴巴达摩院共同举办。大赛不设报名门槛,全世界爱好数学的人都可参与,不论是否出身数学专业、是否投身数学研究。 2020年阿里巴巴达摩院邀请北京大学、剑桥大学、浙江大学等高校的顶尖数学教师组建了出题组。中科院院士、美国艺术与科学院院士、北京国际数学...

为什么你不想学习?只想玩?人是如何一步一步废掉的

不知道是不是只有我这样子,还是你们也有过类似的经历。 上学的时候总有很多光辉历史,学年名列前茅,或者单科目大佬,但是虽然慢慢地长大了,你开始懈怠了,开始废掉了。。。 什么?你说不知道具体的情况是怎么样的? 我来告诉你: 你常常潜意识里或者心理觉得,自己真正的生活或者奋斗还没有开始。总是幻想着自己还拥有大把时间,还有无限的可能,自己还能逆风翻盘,只不是自己还没开始罢了,自己以后肯定会变得特别厉害...

百度工程师,获利10万,判刑3年!

所有一夜暴富的方法都写在刑法中,但总有人心存侥幸。这些年互联网犯罪高发,一些工程师高技术犯罪更是引发关注。这两天,一个百度运维工程师的案例传遍朋友圈。1...

程序员为什么千万不要瞎努力?

本文作者用对比非常鲜明的两个开发团队的故事,讲解了敏捷开发之道 —— 如果你的团队缺乏统一标准的环境,那么即使勤劳努力,不仅会极其耗时而且成果甚微,使用...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试:第十六章:Java中级开发

HashMap底层实现原理,红黑树,B+树,B树的结构原理 Spring的AOP和IOC是什么?它们常见的使用场景有哪些?Spring事务,事务的属性,传播行为,数据库隔离级别 Spring和SpringMVC,MyBatis以及SpringBoot的注解分别有哪些?SpringMVC的工作原理,SpringBoot框架的优点,MyBatis框架的优点 SpringCould组件有哪些,他们...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

无代码时代来临,程序员如何保住饭碗?

编程语言层出不穷,从最初的机器语言到如今2500种以上的高级语言,程序员们大呼“学到头秃”。程序员一边面临编程语言不断推陈出新,一边面临由于许多代码已存在,程序员编写新应用程序时存在重复“搬砖”的现象。 无代码/低代码编程应运而生。无代码/低代码是一种创建应用的方法,它可以让开发者使用最少的编码知识来快速开发应用程序。开发者通过图形界面中,可视化建模来组装和配置应用程序。这样一来,开发者直...

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

大三实习生,字节跳动面经分享,已拿Offer

说实话,自己的算法,我一个不会,太难了吧

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

《Oracle Java SE编程自学与面试指南》最佳学习路线图2020年最新版(进大厂必备)

正确选择比瞎努力更重要!

字节跳动面试官竟然问了我JDBC?

轻松等回家通知

面试官:你连SSO都不懂,就别来面试了

大厂竟然要考我SSO,卧槽。

实时更新:计算机编程语言排行榜—TIOBE世界编程语言排行榜(2020年6月份最新版)

内容导航: 1、TIOBE排行榜 2、总榜(2020年6月份) 3、本月前三名 3.1、C 3.2、Java 3.3、Python 4、学习路线图 5、参考地址 1、TIOBE排行榜 TIOBE排行榜是根据全世界互联网上有经验的程序员、课程和第三方厂商的数量,并使用搜索引擎(如Google、Bing、Yahoo!)以及Wikipedia、Amazon、YouTube统计出排名数据。

立即提问
相关内容推荐