我用一个线程池,对一个队列里的数据操作。如果队列为空,则不开启线程池。队列不为空,拿到其中的数据,用线程池跑任务。这个判断用的while (queue.peek() != null)
现在有一个问题,就是线从队列里拿出数据queue.poll()并执行完操作之后,上面那个判断语句循环回去拿到的queue仍然是非null的,继续循环,但实际上队列里已经没有数据了。我加了volatile也不行,给queue队列加了双重校验锁也不行。一旦循环的次数多了,就报空指针,这该怎么保证多线程下对Queue对象的内存可见?
代码如下,多谢各位大佬
@Component
public class TaskThreadPools {
@Autowired
private ManagerService managerService;
@Autowired
private OperationLogService operationLogService;
private static final org.apache.commons.logging.Log log = LogFactory.getLog(ThreadPools.class);
private static volatile Queue<QueueDealStructure> dataQueue;
public static Queue<QueueDealStructure> getQueueInstance() {
// if (dataQueue == null) {
// Object o = "lock";
// synchronized (o) {
if (dataQueue == null) {
dataQueue = new LinkedList<>();
}
// }
// }
return dataQueue;
}
public void commitLogPool() {
for (int i = 0; i < 5; i++) {
QueueDealStructure queueDealStructure = new QueueDealStructure();
queueDealStructure.setKeyIdentity("8");
queueDealStructure.setProperty("000");
getQueueInstance().offer(queueDealStructure);
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<OperationLog> logs = Collections.synchronizedList(new ArrayList<>());
// ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// Queue<QueueDealStructure> queueDealStructure = getQueueInstance();
if (getQueueInstance().peek() != null) {
log.info("========================get poll not null>>");
ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
while (getQueueInstance().peek() != null) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
log.info("===============================service start>>");
QueueDealStructure poll = getQueueInstance().poll();
String keyIdentity = poll.getKeyIdentity();
String property = poll.getProperty();
log.info("===================Thread id>>" + Thread.currentThread().getId());
Long adminId = Long.valueOf(keyIdentity);
Manager admin = managerService.getObjectById(adminId);
OperationLog operationLog = new OperationLog();
operationLog.setAdmin(admin.getName());
System.out.println("==============operation>>" + property);
operationLog.setOperation(property);
operationLog.setCreateBy(adminId);
operationLog.setUpdateBy(adminId);
logs.add(operationLog);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
if (logs.size() != 0) {
try {
log.info("==================insert database>>");
operationLogService.insertList(logs);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// @Scheduled(cron = "0/30 * * * * ? * ")
// public void timer() {
// commitLogPool();
// }