一个关于线程的问题,生产者放入队列中后,不打断点的话,消费者线程会执行两次,打断点的话是执行一次,贴一下代码,求解答。
缓冲队列:
package psplat.unicorn.model;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author L
* 缓冲队列
*/
public class CacheQueue<T> {
private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>();
public CacheQueue() {
}
protected void add(T t){
queue.add(t);
}
protected T remove(){
return queue.poll();
}
protected T get(){
return queue.peek();
}
protected boolean isEmpty(){
return queue.isEmpty();
}
protected void addAll(List<T> list){
queue.addAll(list);
}
protected int getLength() {
return queue.size();
}
}
缓冲队列管理类:
package psplat.unicorn.model;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author L
* 联通队列管理类
*/
public class UnicornQueueManager {
private CacheQueue<MessageSendInfo> queueCache;
private Lock lock;
private Condition condition;
private static volatile UnicornQueueManager instance;
private UnicornQueueManager(){
queueCache = new CacheQueue<MessageSendInfo>();
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
public static UnicornQueueManager getInstance(){
if(instance == null){
synchronized(UnicornQueueManager.class){
if(instance == null){
instance = new UnicornQueueManager();
}
}
}
return instance;
}
public void add(MessageSendInfo messageInfo){
queueCache.add(messageInfo);
lock.lock();
condition.signalAll();
lock.unlock();
}
public MessageSendInfo removeOne(){
return queueCache.remove();
}
public MessageSendInfo get(){
return queueCache.get();
}
public boolean isEmpty(){
return queueCache.isEmpty();
}
public Lock getLock() {
return lock;
}
public Condition getCondition() {
return condition;
}
public void addAll(List<MessageSendInfo> list){
queueCache.addAll(list);
}
public int getLength() {
return queueCache.getLength();
}
}
消费者线程:
package psplat.unicorn.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jeecgframework.web.system.service.SystemService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import psplat.https.HttpClientUtil;
import psplat.unicorn.entity.UnicornLogEntity;
import psplat.unicorn.model.MessageSendInfo;
import psplat.unicorn.model.MessageStatus;
import psplat.unicorn.model.ReportMapManager;
import psplat.unicorn.model.UnicornConstants;
import psplat.unicorn.model.UnicornQueueManager;
import psplat.util.ConvertMapUtil;
@Component
public class UnicordSendConsumer implements InitializingBean {
private ExecutorService consumerRunnable;
@Autowired
private SystemService systemService;
//用于发送计数,发送三次,失败,则丢弃数据,否则队列后的消息全部阻塞
private static int count = 0;
@Override
public void afterPropertiesSet() throws Exception {
consumerRunnable = Executors.newSingleThreadExecutor();
consumerRunnable.execute(new Runnable() {
@Override
public void run() {
while(true) {
if(UnicornQueueManager.getInstance().isEmpty()) {
UnicornQueueManager.getInstance().getLock().lock();
try {
UnicornQueueManager.getInstance().getCondition().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
UnicornQueueManager.getInstance().getLock().unlock();
} else {
MessageSendInfo sendInfo = UnicornQueueManager.getInstance().get();
//组装参数并发送
String url = UnicornConstants.COMMON_URL.getSuffix() + UnicornConstants.SEND.getSuffix();
Map<String, String> argsMap = new HashMap<String, String>();
argsMap.put("SpCode", UnicornConstants.SpCode.getSuffix());
argsMap.put("LoginName", UnicornConstants.LoginName.getSuffix());
argsMap.put("Password", UnicornConstants.Password.getSuffix());
argsMap.put("MessageContent", sendInfo.getMessageContent());
argsMap.put("UserNumber", sendInfo.getNumbers());
argsMap.put("SerialNumber", sendInfo.getSerialNumber());
String result = (new HttpClientUtil()).doPost(url, argsMap, "GBK");
String resultCode = ConvertMapUtil.convertMapFromArgs(result).get("result");
if(!resultCode.equals("0")) {
//发送失败,发三次,移除
count++;
if(count != 3) {
//失败了,1秒后重新发送
this.threadSleep(1000);
continue;
}
}
count = 0;
UnicornQueueManager.getInstance().removeOne();
System.out.println("执行,此时队列长度" + UnicornQueueManager.getInstance().getLength());
List<UnicornLogEntity> logs = sendInfo.inrichUnicornLogs(MessageStatus.COMMIT.getCode(), resultCode);
systemService.batchSave(logs);
ReportMapManager.getInstance().addAll(ReportConsumer.convertUnicornLogEntityMap(logs));
}
}
}
private void threadSleep(int i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
有大神看出来为何这个线程会执行两次么?加了打印之后,队列长度已经为0为什么不进入等待?
如下是打印:
调用结束,返回值:result=0&description=发送短信成功&taskid=208754669702&faillist=&task_id=208754669702
执行,此时队列长度0
调用结束,返回值:result=0&description=发送短信成功&taskid=208753973775&faillist=&task_id=208753973775
执行,此时队列长度0
求指导!