使用redis stream 导致cpu高的问题
```java
@PostConstruct
public void startListener() {
String serviceName = environment.getProperty("spring.application.name");
String ip = IdWorker.getIdStr();
// if (!StringUtils.hasText(ip)) {
// ip = ip + "_" + IdWorker.getIdStr();
// }
String port = environment.getProperty("server.port");
groupId = String.format("%s_%s_%s", serviceName, ip, port);
log.info("开启监听,消费组:{},groupId:{}", PARAM_GROUP_NAME, groupId);
String consumer = "consumer-1";
// Executor executor = Executors.newSingleThreadExecutor();
ThreadPoolExecutor executor = ThreadPoolConfig.getInstance();
container = MultiStreamContainer.builder()
.stringRedisTemplate(stringRedisTemplate)
.custom(consumer)
.group(groupId)
.stream(PARAM_GROUP_NAME)
.executor(executor)
.listener(message -> {
RecordId id = message.getId();
Map<Object, Object> value = message.getValue();
log.info("监听到消息stream:{} message:{}, id:{}", message.getStream(), value, id);
// 通过RedisTemplate手动确认消息
//消费消息,异步刷新本地缓存
ParamValueNewCaChe valueNewCaChe = SpringContextUtils.getBean(ParamValueNewCaChe.class);
String orgIdFlag = null;
if (ObjectUtils.isNotEmpty(value.get("OrgIdFlag"))) {
orgIdFlag = value.get("OrgIdFlag").toString().replace("\"", "");
}
String paramCache = value.get("ParamCache").toString().replace("\"", "");
//key 转换
if (StringUtils.hasText(orgIdFlag)) {
valueNewCaChe.refresh(paramCache, orgIdFlag);
} else {
valueNewCaChe.refresh(paramCache, null);
}
stringRedisTemplate.opsForStream().acknowledge(groupId, consumer, id);
})
.build();
container.start();
}
@Slf4j
@Builder
@Data
public class MultiStreamContainer {
private StringRedisTemplate stringRedisTemplate;
private String group;
private String custom;
private Executor executor;
@Singular
private List<String> streams;
private Listener listener;
private ListenerRunnable runnable;
private static void prepareStreamAndGroup(StreamOperations<String, ?, ?> ops, String stream, String group) {
String status = "OK";
try {
StreamInfo.XInfoGroups groups = ops.groups(stream);
if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) {
status = ops.createGroup(stream, group);
}
} catch (Exception exception) {
RecordId initialRecord = ops.add(ObjectRecord.create(stream, "Initial Record"));
Assert.notNull(initialRecord, "未初始化消费组 '" + stream + "'");
status = ops.createGroup(stream, ReadOffset.from(initialRecord), group);
} finally {
Assert.isTrue("OK".equals(status), "无法创建组名为: '" + group + "'");
}
}
public void stop() {
if (runnable != null) {
runnable.running.set(false);
runnable = null;
}
}
public void start() {
stop();
List<String> streamList = streams != null
? Lists.newArrayList(streams) : Lists.newArrayList();
runnable = ListenerRunnable.builder()
.streamList(streamList)
.group(group)
.custom(custom)
.stringRedisTemplate(stringRedisTemplate)
.listener(listener)
.build();
executor.execute(runnable);
}
public interface Listener {
void onMessage(MapRecord<String, Object, Object> message);
}
@Builder
@Data
public static class ListenerRunnable implements Runnable {
private final AtomicBoolean running = new AtomicBoolean(true);
private final List<String> streamList;
private final String group;
private final String custom;
private final StringRedisTemplate stringRedisTemplate;
private final Listener listener;
@Override
public void run() {
//1、初始化消费组
List<StreamOffset<String>> offsetList = Lists.newArrayList();
for (String readRecord : streamList) {
offsetList.add(StreamOffset.create(readRecord, ReadOffset.lastConsumed()));
}
Consumer consumer = Consumer.from(group, custom);
StreamReadOptions options = StreamReadOptions.empty().count(1);
StreamOffset<String>[] offsets = new StreamOffset[offsetList.size()];
for (int i = 0; i < offsetList.size(); i++) {
offsets[i] = offsetList.get(i);
prepareStreamAndGroup(stringRedisTemplate.opsForStream(), offsets[i].getKey(), group);
}
//2、监听中
while (running.get()) {
if (streamList.isEmpty()) {
log.info("不存在消息组!");
return;
}
List<MapRecord<String, Object, Object>> values = null;
try {
values = stringRedisTemplate.opsForStream().read(consumer, options, offsets);
Thread.sleep(100);
} catch (Exception exception) {
log.info("监听消息异常", exception);
try {
Thread.sleep(200);
} catch (Exception ee) {
ee.printStackTrace();
}
}
if (values != null && !values.isEmpty()) {
for (MapRecord val : values) {
listener.onMessage(val);
}
}
}
}
}
}
ublic class ThreadPoolConfig {
/**
* 核心线程数
*/
private static final int corePoolSize = 1;
/**
* 最大线程数
*/
private static final int maxPoolSize = 1;
/**
* 多余线程最大空闲时间
*/
private static final int keepAlive = 20;
/**
* 线程池缓冲队列
* 如果不手动指定容量,默认为Integer.MAX_VALUE,也就是无界队列。
*/
private static final BlockingQueue poolQueue = new LinkedBlockingQueue(32);
private static volatile ThreadPoolExecutor poolExecutor;
private ThreadPoolConfig(){}
public static ThreadPoolExecutor getInstance(){
if (poolExecutor == null){
synchronized (ThreadPoolConfig.class){
if (poolExecutor == null){
poolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAlive,
TimeUnit.SECONDS,
poolQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
}
}
}
return poolExecutor;
}
}
```