spring与kafka的key值传入问题?

我想问一下,我发送消息的时候,发送消息的方法:;我传入一个key,xml里配置了序列化,报错: org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer,该怎么解决,谢谢

0

1个回答

我今天也遇到这个问题,
首先'[B'在class字节码中表示byte数组,所以问题就是无法将byte[]转换为StringSerializer中指定的类型.
然后确认原因,源码看一下StringSerializer,会发现方法描述是public byte[] serialize(String topic, String data),所以必须传递的是String类型的数据,证实原因.
下面说解决方案:
application.yml修改kafka的value配置参数

spring:
    kafka:
        producer:
                  value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

问题解决,如何避免这个,初步猜测,主要是在于使用spring cloud stream 的binder @Output会将参数转换为byte[].下一步可以继续debugging看一下,下面如何设置.

0
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
使用spring集成的kafka收发消息
1. 引入maven依赖 org.springframework.integration spring-integration-kafka ${spring-integration-kafka.version} 2. 生产者的xml配置 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:
开发 - kafka的一次小坑
在Kafka中进行生产消费,默认kafka有几种生产消费的消息 1.kafka如果不指定key进行生产,会按照kafka自带的一直性算法 根据 message的一致性hash算法去进行判断,然后会按照一批一批的信息去传送到每个partition上面,这样会造成热点问题。 单个监听partition的线程有性能问题,可能单个线程需要处理很多的数据 2、kafka去指定k
kafka2.0-序列化与反序列化_07
概要:先讲讲kafka中简单的序列化方式,以及实现,然后再使用protobuf实现一个自定义序列化的小程序 正如之前文章中写过的例子程序,我们配置的序列化方式都是下面这样的。 //生产者的序列化配置 props.put(&amp;quot;key.serializer&amp;quot;, &amp;quot;org.apache.kafka.common.serialization.StringSerializer&amp;quot;); props.p...
Kafka消息序列化和反序列化(上)
Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。 首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写: public class ProducerJav...
第二集 Spring for Apache Kafka 配置主题和发送消息
使用Spring for Apache Kafka配置主题和发送消息
关于使用kafka的个人总结
本篇文章算是对几个月工作的一个总结吧,接触了几个月的kafka和flume,感觉自己也只能算是了解kafka的阶段,接下来也打算深入研究下kafka的原理,下面写的算是对工作中使用到的kafka方面的东西做个总结吧!       kafka概述 (有空可以浏览一下http://orchome.com)       当前kafka版本为0.10.0.0。       关于Kafka需要了解
关于kafka producer 分区策略的思考
今天跑了一个简单的kafka produce程序,如下所示public class kafkaProducer extends Thread{ private String topic; public kafkaProducer(String topic){ super(); this.topic = topic; }
学习笔记:微服务15 spring cloud kafka 消息总线
在比如限时抢购,车票抢购,选课排队等许多场景,都需要消息总线,在spring cloud生态圈中,多个微服务协同工作,多对多处理交付任务和处理任务排队,需要实现消息总线, 本例采用kafka 作为消息服务器,以下代码从网上复制,经调试通过。 1. 依赖     &amp;lt;dependency&amp;gt;         &amp;lt;groupId&amp;gt;org.springframework.kafk...
解决synchronized(key)传入key值相等,却未成功上锁
此为学习笔记。 实现:当传入方法的key值相同时,需要相隔一秒打印,不相同的,第一时间同时打印。 问题:用synchronized(key)锁住打印的代码,key中有两条数据都是&quot;1&quot;,预期效果应该是可以相隔一秒打印,但结果却是同时的。先上代码。 public class ThreadTest3 extends Thread{ private String key; private...
kafka自定义Integer序列化类
实现了个示意性的整型序列化类作为Kafka的key.serializer.class
高并发架构实战(九) Spring Boot集Kafka
Spring Boot 2.0.4 集成 Kafka 2.0.0。 项目源码地址 一、简介 kafka是一种高吞吐量的分布式发布订阅消息系统。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都...
Kafka传递自定义对象
1、搭好相应的环境(ZK+kafka),保证kafka能正常的发送接收消息 2、新建一个工具类,负责对象字节数组的相互转换,传输数据用package com.kafka.util;import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import
Kafka分区策略及自定义
默认分区策略默认分区策略是:取正(bytearray生成32位hash值)%numpartitions 这个公式的结果是得到0-(numpartitions-1)间正整数的个数大致相等,也就是说kafka的默认分区策略是无论我们给定多少个分区,我们存放的数据基本上会平均的分到各个分区上。private int defaultPartition(String topic, Object key, b
Spring-Kafka源码解析
文章目录Spring-KafkakafkaConsumerkafkaConsumer消费者模型spring-kafka consumer实现Consumer ConfigskafkaProducerkafkaProducer生产者模型Producer Configs使用过程中踩的坑坑1坑2 Spring-Kafka kafkaConsumer kafkaConsumer消费者模型 spring-...
kafka与Spring的集成
kafka与Spring的集成配置生产者:前提kafka安装完成,及创建好主题 pom文件配置: &amp;lt;!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.apache.kafka&amp;lt;/groupId&amp;gt; ...
Spring Boot 01 加载配置文件和获取key/value值
1、Spring Boot 默认配置文件的位置和名字 (1)、默认放置配置文件的地方是:classpath: 和classpath:config (2)、默认名字是:application.properties
消息队列kafka(二)--与spring整合(kafkaTemplate方式)
一、简介 在这里介绍kafka与spring的整合,这里采用kafkaTemplate方式。 二、生产者开发步骤 1、添加maven依赖(略去spring依赖,请自行添加) org.apache.kafka kafka_2.9.2 0.8.2.1 org.apache.kafka kafka-clients 1.0.0 2、添加
kafka客户端Producer和Consumer关于自定义消息序列和反序列
一、背景    最近在学习kafka相关的知识,正好遇到一个疑问,在写demo的过程中发现,投递的数据都是字符串类型,那么就想想在实际应用中应该会有大量的需求投递自定义数据类型,那么如何才能投递自定义数据类型呢?这里面就涉及到了kafka提供的接口序列化和反序列化的功能。二、kafka消息序列化和反序列化先看个demo,写个Producer客户端,根据官方文档,需要先做一些配置,放到Propert...
Kafka与Spring对接踩过的坑
  1.前言     系统环境:Spring+SpringMVC+Mybatis+Maven     这里系统环境不一定要跟我一样,但是建议使用maven,方便管理jar包。     现阶段只是实现了简单的java对接kafka,所以会比较简单,要是有大神路过请指正,或者给些更深入的指导。   2.简单的实现java对接kafka 2.1.Kafka学习网站 Kafka的安装和深入...
Spring Cloud Stream Kafka 特定分区的消息始终由特定的消费者实例消费
实验目的:Kafka特定分区的消息始终由消费者应用的特定实例消费,例如,分区1由实例索引0的实例消费,分区2由实例索引1的实例消费,分区3由实例索引2的实例消费。 项目介绍:项目分为1个生产者实例,3个消费者实例,生产者应用和消费者应用均为Spring Cloud Eureka客户端项目。生产者实例将消息发送到Kafka Topic的3个分区中,消费者的3个实例分别按实例索引消费Kafka To...
Kafka与Spring整合踩坑心得
搭建环境:MAVEN+IDEA+Kafka 最近搭建Spring和Kafka集成的环境,参考:https://www.cnblogs.com/wangb0402/p/6187796.html博文,搭建好之后,producer和consumer一直不好使。而后有查阅了https://blog.csdn.net/u011294519/article/details/80503556这篇博文,修改对应...
Spring Boot整合Kafka的简单用例
一、kafka基本术语 Producer 生产者,是发送消息的对象 Consumer 消费者,是订阅消息和处理消息的对象 Topic 主题,用于消息的分类,也就是一个标签,可以看作是一个频道,可以被多个消费者订阅 Broker 代理,kafka集群中的每一个服务器就是一个代理(Broker),消费者可以订阅一个或者多个主题(Topic),消费已经...
springboot 集成kafka系列 二、springboot集成kafka生产者
    1、新建springboot脚手架工程,pom文件如下,其中引入了kafka需要的依赖,注意这里的kafka版本号需要和之前安装的kafka版本一致,要不然会有问题 &amp;lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&amp;gt; &amp;lt;project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot; xmlns:xsi=...
Spring boot实现生产者 发送kafka
677807540    java开发交流群       @Autowired     private KafkaTemplate kafkaTemplate;         ListenableFuture send = kafkaTemplate.send(&quot;kuaifa_import&quot;, Type,                     macs);            ...
Spring Kafka 教程 – spring读取和发送kakfa消息
Apache Kafka, 分布式消息系统, 非常流行。Spring是非常流行的Java快速开发框架。将两者无缝平滑结合起来可以快速实现很多功能。本文件简要介绍Spring Kafka,如何使用 KafkaTemplate发送消息到kafka的broker上, 如何使用“listener container“接收Kafka消息。 1,Spring Kafka的组成 这一节我们首先介绍Sprin...
Springboot集成使用阿里云kafka详细步骤
转载请注明出处:Springboot集成使用阿里云kafka详细步骤 明确连接认证类型 首先要明确使用哪种连接认证类型 Ons模式参考 https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/beta Ons模式的conf内容 KafkaClient { com.aliyun.open...
Kafka Producer端封装自定义消息
这篇文章主要讲kafka producer端的编程,通过一个应用案例来描述kafka在实际应用中的作用。如果你还没有搭建起kafka的开发环境,可以先参考:&amp;lt;kafka开发环境搭建&amp;gt; 首先描述一下应用的情况:一个站内的搜索引擎,运营人员想知道某一时段,各类用户对商品的不同需求。通过对这些数据的分析,从而获得更多有价值的市场分析报表。这样的情况,就需要我们对每次的搜索进行记录,当然,...
springboot中使用redis,key序列化问题解决方案
因为boot默认的key序列化方案是JdkSerializationRedisSerializer——使用Java自带的序列化机制将对象序列化为一个字符串,你会发现是一串难懂的字符,不利于维护和修改,所有有必要改变默认的序列化方式,下面是我的代码,有效。在启动类中写入:@Bean    public RedisTemplate&amp;lt;Object, Object&amp;gt; redisTemplate...
被版本更新坑到哭系列:SpringBoot整合Kafka
环境说明 Windows 10 1709 IDEA 2017.3.2 SpringBoot 2.0.M7 Spring-Kafka 2.1.0.RELEASE JDK 1.8.0_144 Maven 3.5.0 前言 编写程序时,一定要注意版本之间的兼容问题.如果不想关注这些,那么就全都用最新的就好了. 由于我用IDEA创建springboot项目的时候,用的springboot版本是稳定版
【问题处理】Spring Boot中kafka消息能力不足现象及问题解决
【本文首发于个人网址:liumoran.cn】 kfaka中主要包含生产者与消费都对象,生产者往队列中推送消息,而消费者则从队列中读取并处理消息。生产者推送的过程较为简单,而对于消费者来说,情况复杂且容易受各种因素影响。 1. 消费过程 消费过程如下所示: Created with Raphaël 2.2.0开始是否有消息?读取消息处理消息提交offset是否停止?结束等待等待超时yesnoyes...
kafka 指定partition生产,消费
kafka指定partition生产消费 在实际的业务中,特别是涉及到指定任务是否结束,任务对应消息是否消费完毕时,单纯指定topic消费,由kafka自动分配partition已经无法满足我们的实际需求了,这时我们就需要指定partition进行生产与消费。闲话少说,下面我们通过代码来详细描述生产者与消费者的配置。 producer代码 注意:producer代码中我们需要两个类,一个时指
spring-kafka源码解析
前言:         关于Kafka,是一个比较流行的MQ工具,也是多数公司比较常用的。有关于Kafka的一些基本内容读者可以参考官方文档,了解一下生产者消费者的使用。kafka的搭建笔者也不再详述,网络上有很多文章介绍。     这篇文章主要是从源码的角度来分析一下Spring对kafka的使用封装     笔者搭建的kafka版本为 kafka_2.11-0.11.0.1   1....
kafka自定义消息序列化和反序列化方式
kafka自定义消息序列化和反序列化方式 版本说明: kafka版本:kafka_2.12-2.0.0.tgz pom依赖: &amp;amp;lt;dependency&amp;amp;gt; &amp;amp;lt;groupId&amp;amp;gt;org.apache.kafka&amp;amp;lt;/groupId&amp;amp;gt; &amp;amp;lt;artifactId&amp;amp;gt;kafka-clients&amp;amp;lt;/
spring-kafka消费数据 重复消费问题(针对提交offset偏移产生重复消费的问题)
spring-kafka 重复消费数据spring集成kafuka框架版本介绍解决方案 spring集成kafuka框架 最近公司需要对接kafka拉取数据,在使用spring-kafka框架时候,总是无法持续消费,总是出现持续消费,相当纠结。因为也是刚接手任务,故整理了一下遇到的难题,特此整理一下,望对各位同学有些帮助。 版本介绍 项目架构主要是spring mvc 架构版本是5.0.2.REL...
Storm-kafka集成——1.1.0版本storm中tuple取KafkaSpout数据详解
问题描述:KafkaSpout拉取kafka topic数据,下一级bolt从kafkaspout获取数据,tuple到底采用什么方法取出spout中的消息呢?KafkaSpout创建:/* *根据数据源topic和zk_id创建并返回kafkaSpout * */ public static KafkaSpout init(String spout_topic,String zk_id){ ...
Spring集成Kafka,配置生产者消费者
一、前提条件搭建了3个结点的Kafka集群,并且创建了一个名为test1的topic二、添加maven依赖<!--kafka-spring 集成--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId>
kafka 整合springboot 2.1.3.RELEASE (不用看了,我还没搞清楚)
kafka相关概念: 1. AMQP协议 2. Kafka支持的客户端语言 3. Kafka架构 【producer】-----发送-----&gt;【broker】&lt;-----接收-----【consumer】 一些基本概念: 消费者(Consumer):从消息队列中请求消息的客户端应用程序; 生产者(Producer):向broker发布消息的客户端应用程序; AMQP服务器端(...
Spring data kafka操作kafka消息的发送和订阅
本项目是在Spring Boot的基础上构建的,笔者使用的是Spring Boot 1.5.8版本. 1.在项目的pom.xml文件中引入如下依赖项:     org.apache.kafka     kafka-clients     0.10.2.0     org.springframework.kafka     spring-kafka     1.2.0.RE
spring-boot 集成kafka单节点消息发送与接收
springboot还处于学习阶段,又同时在学习kafka,两者结合,继续学习。1、官网下载kafka2、解压3、对于单节点来说,按照官网上操作即可实现消息的发送和接收。但是对于客户端,是通过 @KafkaListener 注解监听生产者发送的消息,故需要修改config/server.properties 文件 如上图,打开listeners ,将默认的件监听IP+端口改为具体的服务器地址4、创建
Spring纯Java配置集成kafka
KafkaConfig.java package com.niugang.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.Offse...
相关热词 c#异步发送kafka c#窗体编号 c# 操作二进制文件 c# 反射 机制 c#线程 窗体失去响应 c#角度转弧度 c# 解析gps数据 c# vs设置 语法版本 c# json含回车 c#多线程demo