hfismyangel
Cloud_N
2017-10-11 06:48

如何保证多线程操作中对象的内存可见性?

  • exception
  • 对象
  • 多线程
  • 线程池

我用一个线程池,对一个队列里的数据操作。如果队列为空,则不开启线程池。队列不为空,拿到其中的数据,用线程池跑任务。这个判断用的while (queue.peek() != null)

现在有一个问题,就是线从队列里拿出数据queue.poll()并执行完操作之后,上面那个判断语句循环回去拿到的queue仍然是非null的,继续循环,但实际上队列里已经没有数据了。我加了volatile也不行,给queue队列加了双重校验锁也不行。一旦循环的次数多了,就报空指针,这该怎么保证多线程下对Queue对象的内存可见?
代码如下,多谢各位大佬

 @Component
public class TaskThreadPools {
    @Autowired
    private ManagerService managerService;
    @Autowired
    private OperationLogService operationLogService;

    private static final org.apache.commons.logging.Log log = LogFactory.getLog(ThreadPools.class);
    private static volatile Queue<QueueDealStructure> dataQueue;

    public static Queue<QueueDealStructure> getQueueInstance() {
//        if (dataQueue == null) {
//            Object o = "lock";
//            synchronized (o) {
                if (dataQueue == null) {
                    dataQueue = new LinkedList<>();
                }
//            }
//        }
        return dataQueue;
    }

    public void commitLogPool() {
        for (int i = 0; i < 5; i++) {
            QueueDealStructure queueDealStructure = new QueueDealStructure();
            queueDealStructure.setKeyIdentity("8");
            queueDealStructure.setProperty("000");
            getQueueInstance().offer(queueDealStructure);
        }
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<OperationLog> logs = Collections.synchronizedList(new ArrayList<>());
//        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//        Queue<QueueDealStructure> queueDealStructure = getQueueInstance();
        if (getQueueInstance().peek() != null) {
            log.info("========================get poll not null>>");
            ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
            while (getQueueInstance().peek() != null) {
                threadPoolExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            log.info("===============================service start>>");

                            QueueDealStructure poll = getQueueInstance().poll();

                            String keyIdentity = poll.getKeyIdentity();
                            String property = poll.getProperty();
                            log.info("===================Thread id>>" + Thread.currentThread().getId());
                            Long adminId = Long.valueOf(keyIdentity);

                            Manager admin = managerService.getObjectById(adminId);
                            OperationLog operationLog = new OperationLog();
                            operationLog.setAdmin(admin.getName());

                            System.out.println("==============operation>>" + property);
                            operationLog.setOperation(property);
                            operationLog.setCreateBy(adminId);
                            operationLog.setUpdateBy(adminId);

                            logs.add(operationLog);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            if (logs.size() != 0) {
                try {
                    log.info("==================insert database>>");
                    operationLogService.insertList(logs);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

//    @Scheduled(cron = "0/30 * * * * ? * ")
//    public void timer() {
//        commitLogPool();
//    }

异常如下
图片说明

  • 点赞
  • 回答
  • 收藏
  • 复制链接分享

2条回答

为你推荐