我的kafka配置了同一个topic和同一个group,然后开启10个进程去消费,但是我看执行情况,当启动10个进程时,实际最多同时执行的就3、4个,后面随着整体累计消息的变少,同时执行的进程就下降到1 、2个了,这时什么原因?我想开启10个进程就10进程同时进行消费。
var cuResult = customer.Consume(cts.Token);
Console.WriteLine("========================此进程抢到消息===================");
if (cuResult != null)
{
Console.WriteLine("消费结果:" + JsonConvert.SerializeObject(cuResult));
LogHelper.SetInfoLog("消费结果:" + JsonConvert.SerializeObject(cuResult));
CreateCadFileInput createCadFileInput = JsonConvert.DeserializeObject<CreateCadFileInput>(cuResult.Value);
if (createCadFileInput == null)
{
LogHelper.SetErrorLog($"执行消息返回空结果:{cuResult.Message}");
// 没有返回结构,需要将这个消息消费掉,防止死循环.
customer.Commit(cuResult);
continue;
}
// 注入对象为空 异常 日志
if (_commonService == null)
{
LogHelper.SetErrorLog("ICommonService 注入失败");
continue;
}
// 注入对象为空 异常 日志
if (_cadAndOssService == null)
{
LogHelper.SetErrorLog("ICadAndOssService 注入失败");
continue;
}
// 注入对象为空 异常 日志
if (createCadFileInput.RequestId == null)
{
LogHelper.SetErrorLog("createCadFileInput.RequestId 值为Null");
continue;
}
// 更新Redis
//string redisKey = _commonService.GetLocalServerRedisKey(createCadFileInput.RequestId);
string redisKey = Constants.RedisRequestKey; // redis key值-不区分 服务器
var editRedisRequestInput = new EditRedisRequestInput();
editRedisRequestInput.IsIntoKafka = true;
editRedisRequestInput.RequestId = createCadFileInput.RequestId;
editRedisRequestInput.UserId = createCadFileInput.UserId;
editRedisRequestInput.UserName = createCadFileInput.UserName;
editRedisRequestInput.RedisKey = redisKey;
editRedisRequestInput.RequestTime = createCadFileInput.RequestTime;
createCadFileInput.IsIntoKafka = true; // 打上进入kafka的标记
var createResult = await _cadAndOssService.CreateCadAndUpLoadOSSAsync(createCadFileInput);
// 执行成功
if (createResult.Code == ResultCodeEnum.Success)
{
// 然后再更新正在处理的Redis 请求数据
_commonService.DeleteRedisAddRequest(editRedisRequestInput);
// 然后提交Kafka消息的状态
customer.Commit(cuResult);
Console.WriteLine($"Kafka消费任务创建CAD文件成功;请求id:{editRedisRequestInput.RequestId}");
LogHelper.SetInfoLog($"Kafka消费任务创建CAD文件成功;请求id:{editRedisRequestInput.RequestId}");
}
else
{
Console.WriteLine($"Kafka消费任务创建CAD文件失败:{createResult.Message}");
// 执行失败,也提交执行的消息,并记录
LogHelper.SetErrorLog($"Kafka消费任务创建CAD文件失败:{createResult.Message}");
// 然后提交Kafka消息的状态
customer.Commit(cuResult);
}