场景如下:我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送一个变动消息。A收到消息更新本地缓存。那么问题来了,同一个服务的多个实例如何多次消费同一个topic消息
1条回答 默认 最新
- 真-酸辣土豆丝 2021-07-23 17:25关注
已经解决!通过redis setif 加代码动态配置groupID、不同实例获取不动groupID。启动的时候会配置kafka消费工厂ConsumerFactory 这个时候生成groupID
setIfAbsent()redis。如果失败就继续生成知道成功package com.jieshun.open.config; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @Description * @Date 2021-7-23 16:04 * @Created by yyk */ @Slf4j @Component public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String BROKERS; @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean ENABLE_AUTO_COMMIT; @Value("${spring.kafka.consumer.auto-commit-interval-ms}") private String AUTO_COMMIT_INTERVAL_MS; @Value("${spring.kafka.consumer.session-timeout-ms}") private Integer SESSION_TIMEOUT_MS; @Value("${spring.kafka.consumer.auto-offset-reset}") private String AUTO_OFFSET_RESET; @Value("${spring.kafka.consumer.group-id}") private String GROUP_ID; @Value("${spring.kafka.consumer.max-poll-records}") private String MAX_POLL_RECORDS; /**缓存名称前缀*/ private final String CACHE_GROUP_NAME_PREFIX = "jop:gateway:group:"; private String CURRENT_INSTANCE_GROUP_ID; @Autowired private StringRedisTemplate redisTemplate; /** 线程池,为了实现分组名称续租服务*/ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); /**构建kafka监听工厂*/ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumerFactory()); return factory; } /**通过redis限制获取的分组名称*/ public String getSerializeGroupId(Integer currValue){ String key = CACHE_GROUP_NAME_PREFIX.concat(currValue.toString()); boolean b = redisTemplate.opsForValue().setIfAbsent(key, currValue.toString()); if(b){ return GROUP_ID.concat(currValue.toString()); }else{ currValue++; return getSerializeGroupId(currValue); } } /**初始化消费工厂配置 其中会动态指定消费分组*/ private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); /**多实例部署每个实例设置不同groupId 实现发布订阅*/ CURRENT_INSTANCE_GROUP_ID = getSerializeGroupId(0); log.info("当前实例 group_id:{}",CURRENT_INSTANCE_GROUP_ID); properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); return new DefaultKafkaConsumerFactory<String, String>(properties); } }
解决 无用评论 打赏 举报
悬赏问题
- ¥15 对于相关问题的求解与代码
- ¥15 ubuntu子系统密码忘记
- ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
- ¥15 保护模式-系统加载-段寄存器
- ¥15 电脑桌面设定一个区域禁止鼠标操作
- ¥15 求NPF226060磁芯的详细资料
- ¥15 使用R语言marginaleffects包进行边际效应图绘制
- ¥20 usb设备兼容性问题
- ¥15 错误(10048): “调用exui内部功能”库命令的参数“参数4”不能接受空数据。怎么解决啊
- ¥15 安装svn网络有问题怎么办