LinkedBlockingQueue的take方法消费消息没能同步消费(消费完一个再消费下一个)
以下是我的队列的代码(队列里的消息是通过put方法存入队列的)
@Component
public class IPOEQueue {
private static Logger log = Logger.getLogger( "Logger_IPOEQueue" );
public static LinkedBlockingQueue<IPOEQueueParams> ipoeQueue = new LinkedBlockingQueue<>();
@Resource
private WsjyDao wsjyDao;
@Resource
private WsjyService wsjyService;
@PostConstruct
public void init() {
Thread t =new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Map<String, Object> entityObject = new LinkedHashMap<>();
IPOELogParams insertLogParams = null;
Map<String, Object> params = null;
String entityRecordId = null;
try {
IPOEQueueParams data = ipoeQueue.take();
String url = data.getUrl();
List<NameValuePair> pairList = data.getPairList();
String orderId = data.getOrderId();
IPOELogParams ipoeLogParams = data.getIpoeLogParams();
insertLogParams = data.getInsertLogParams();
params = data.getParams();
entityRecordId = insertLogParams.getEntityRecordId();
log.info("IPOE日志:" + entityRecordId + ",开始执行队列逻辑");
entityObject = insertLogParams.getEntityObject();
String ipoeAccount = (String) params.get("user_name");
String ipoeId = wsjyDao.getCountFromIPOEEntity(ipoeAccount); // IPOE表中的uuid
if (ipoeId != null && !"".equals(ipoeId)) {
// 已存在该账号
ipoeLogParams.setEntityRecordId(ipoeId);
BaseJson baseJson = IPOELogUtils.updatePushResult(ipoeLogParams);
log.info(ipoeId + "记录日志更新到IPOE表" + baseJson.isSuccess());
entityObject.put("iprzIpoeIpoeidFk", ipoeId);
}else {
// 不存在的IPOE账号,insert
String newUUID = CZUtils.getUUID();
ipoeLogParams.setEntityRecordId(newUUID);
BaseJson baseJson = IPOELogUtils.insertLog(ipoeLogParams);
log.info(ipoeId + "记录日志到IPOE表" + baseJson.isSuccess());
entityObject.put("iprzIpoeIpoeidFk", newUUID);
}
String result = HttpClientService.sendPost( url, pairList );
log.info("sendRequestToIPOE返回的结果:" + result);
try {
wsjyService.updateOrderServiceEndTime( result, params, orderId );
}catch (DeadlockLoserDataAccessException e) {
e.printStackTrace();
log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试一次:");
try {
wsjyService.updateOrderServiceEndTime( result, params, orderId );
}catch (DeadlockLoserDataAccessException ex) {
e.printStackTrace();
log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试第二次:");
wsjyService.updateOrderServiceEndTime( result, params, orderId );
}
}
// 推送IPOE结果-记录日志
if (StringUtils.isNotBlank(result)) {
Map map = JSON.parseObject(result, Map.class);
Integer code = (Integer) map.get("code");
if (code.equals(0)) {
// 推送成功
entityObject.put("iprzTuisongjieguo", "01"); // 01-推送成功
}else {
entityObject.put("iprzTuisongjieguo", "02"); // 02-推送失败
}
}else {
// 推送失败
entityObject.put("iprzTuisongjieguo", "02"); // 02-推送失败
}
entityObject.put("iprzFanhuinarong", result);
insertLogParams.setEntityObject(entityObject);
}catch (Throwable e) {
e.printStackTrace();
log.error(entityRecordId + "IPOE队列消费出现异常:" + JSON.toJSONString(e));
log.error( "向IPOE发送请求异常,请求参数:" + JSON.toJSONString( params ) );
log.error( "向IPOE发送请求异常,日志数据:" + JSON.toJSONString( insertLogParams ) );
entityObject.put("iprzTuisongjieguo", "02");
entityObject.put("iprzFanhuinarong", "推送过程中出现异常:" + JSON.toJSONString( e ));
if (insertLogParams != null)
insertLogParams.setEntityObject(entityObject);
}finally {
// 更新IPOE日志表
if (insertLogParams != null) {
BaseJson baseJson = IPOELogUtils.updatePushResult(insertLogParams);
log.info(insertLogParams.getEntityRecordId() + "记录日志到日志表" + baseJson.isSuccess());
}else {
log.error("insertLogParams为null");
}
}
}
}
});
t.setName("IPOEQueueConsumer");
t.start();
}
}
我在日志里看到的现象
有哪位道友遇到过这种问题吗 ?