人生何處不春風 2023-04-13 10:48 采纳率: 28.6%
浏览 15

LinkedBlockingQueue没能同步消费

LinkedBlockingQueue的take方法消费消息没能同步消费(消费完一个再消费下一个)

以下是我的队列的代码(队列里的消息是通过put方法存入队列的)

@Component
public class IPOEQueue {

    private static Logger log = Logger.getLogger( "Logger_IPOEQueue" );

    public static LinkedBlockingQueue<IPOEQueueParams> ipoeQueue = new LinkedBlockingQueue<>();

    @Resource
    private WsjyDao wsjyDao;

    @Resource
    private WsjyService wsjyService;

    @PostConstruct
    public void init() {
        Thread t =new Thread(new Runnable() {
            @Override
            public void run() {

                while (true) {
                    Map<String, Object> entityObject = new LinkedHashMap<>();
                    IPOELogParams insertLogParams = null;
                    Map<String, Object> params = null;
                    String entityRecordId = null;
                    try {
                        IPOEQueueParams data = ipoeQueue.take();

                        String url = data.getUrl();
                        List<NameValuePair> pairList = data.getPairList();
                        String orderId = data.getOrderId();

                        IPOELogParams ipoeLogParams = data.getIpoeLogParams();
                        insertLogParams = data.getInsertLogParams();
                        params = data.getParams();

                        entityRecordId = insertLogParams.getEntityRecordId();
                        log.info("IPOE日志:" + entityRecordId + ",开始执行队列逻辑");
                        entityObject = insertLogParams.getEntityObject();
                        String ipoeAccount = (String) params.get("user_name");
                        String ipoeId = wsjyDao.getCountFromIPOEEntity(ipoeAccount);    // IPOE表中的uuid
                        if (ipoeId != null && !"".equals(ipoeId)) {
                            // 已存在该账号
                            ipoeLogParams.setEntityRecordId(ipoeId);
                            BaseJson baseJson = IPOELogUtils.updatePushResult(ipoeLogParams);
                            log.info(ipoeId + "记录日志更新到IPOE表" + baseJson.isSuccess());

                            entityObject.put("iprzIpoeIpoeidFk", ipoeId);
                        }else {
                            // 不存在的IPOE账号,insert
                            String newUUID = CZUtils.getUUID();
                            ipoeLogParams.setEntityRecordId(newUUID);
                            BaseJson baseJson = IPOELogUtils.insertLog(ipoeLogParams);
                            log.info(ipoeId + "记录日志到IPOE表" + baseJson.isSuccess());

                            entityObject.put("iprzIpoeIpoeidFk", newUUID);
                        }

                        String result = HttpClientService.sendPost( url, pairList );
                        log.info("sendRequestToIPOE返回的结果:" + result);

                        try {
                            wsjyService.updateOrderServiceEndTime( result, params, orderId );
                        }catch (DeadlockLoserDataAccessException e) {
                            e.printStackTrace();
                            log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试一次:");
                            try {
                                wsjyService.updateOrderServiceEndTime( result, params, orderId );
                            }catch (DeadlockLoserDataAccessException ex) {
                                e.printStackTrace();
                                log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试第二次:");
                                wsjyService.updateOrderServiceEndTime( result, params, orderId );
                            }
                        }

                        // 推送IPOE结果-记录日志
                        if (StringUtils.isNotBlank(result)) {
                            Map map = JSON.parseObject(result, Map.class);
                            Integer code = (Integer) map.get("code");
                            if (code.equals(0)) {
                                // 推送成功
                                entityObject.put("iprzTuisongjieguo", "01");   // 01-推送成功
                            }else {
                                entityObject.put("iprzTuisongjieguo", "02");   // 02-推送失败
                            }

                        }else {
                            // 推送失败
                            entityObject.put("iprzTuisongjieguo", "02");   // 02-推送失败
                        }

                        entityObject.put("iprzFanhuinarong", result);
                        insertLogParams.setEntityObject(entityObject);

                    }catch (Throwable e) {
                        e.printStackTrace();
                        log.error(entityRecordId + "IPOE队列消费出现异常:" + JSON.toJSONString(e));

                        log.error( "向IPOE发送请求异常,请求参数:" + JSON.toJSONString( params ) );
                        log.error( "向IPOE发送请求异常,日志数据:" + JSON.toJSONString( insertLogParams ) );

                        entityObject.put("iprzTuisongjieguo", "02");
                        entityObject.put("iprzFanhuinarong", "推送过程中出现异常:" + JSON.toJSONString( e ));
                        if (insertLogParams != null)
                            insertLogParams.setEntityObject(entityObject);
                    }finally {
                        // 更新IPOE日志表
                        if (insertLogParams != null) {
                            BaseJson baseJson = IPOELogUtils.updatePushResult(insertLogParams);
                            log.info(insertLogParams.getEntityRecordId() + "记录日志到日志表" + baseJson.isSuccess());
                        }else {
                            log.error("insertLogParams为null");
                        }
                    }
                }
            }
        });
        t.setName("IPOEQueueConsumer");
        t.start();
    }

}


我在日志里看到的现象

img

有哪位道友遇到过这种问题吗 ?

  • 写回答

1条回答 默认 最新

  • CQ.abc 2023-04-13 11:05
    关注

    可以在IPOEQueue.take方法中加个同步块试试

    public static LinkedBlockingQueue<IPOEQueueParams> ipoeQueue = new LinkedBlockingQueue<>();
    private static Object lock = new Object();
    
    public static IPOEQueueParams take() throws InterruptedException {
        synchronized (lock) {
            while (ipoeQueue.isEmpty()) {
                lock.wait();
            }
            IPOEQueueParams data = ipoeQueue.poll();
            if (data != null) {
                // 处理IPOEQueueParams
            }
            return data;
        }
    }
    
    public static void put(IPOEQueueParams data) {
        ipoeQueue.offer(data);
        synchronized (lock) {
            lock.notifyAll();
        }
    }
    
    评论

报告相同问题?

问题事件

  • 创建了问题 4月13日

悬赏问题

  • ¥30 STM32 INMP441无法读取数据
  • ¥100 求汇川机器人IRCB300控制器和示教器同版本升级固件文件升级包
  • ¥15 用visualstudio2022创建vue项目后无法启动
  • ¥15 x趋于0时tanx-sinx极限可以拆开算吗
  • ¥500 把面具戴到人脸上,请大家贡献智慧
  • ¥15 任意一个散点图自己下载其js脚本文件并做成独立的案例页面,不要作在线的,要离线状态。
  • ¥15 各位 帮我看看如何写代码,打出来的图形要和如下图呈现的一样,急
  • ¥30 c#打开word开启修订并实时显示批注
  • ¥15 如何解决ldsc的这条报错/index error
  • ¥15 VS2022+WDK驱动开发环境