lh33061521 2018-11-07 09:36 采纳率: 0%
浏览 689

关于生产消费线程,消费者重复执行的问题

一个关于线程的问题,生产者放入队列中后,不打断点的话,消费者线程会执行两次,打断点的话是执行一次,贴一下代码,求解答。

缓冲队列:

 package psplat.unicorn.model;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author L
 *  缓冲队列
 */
public class CacheQueue<T> {

    private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>();

    public CacheQueue() {
    }

    protected void add(T t){
        queue.add(t);
    }

    protected T remove(){
        return queue.poll();
    }

    protected T get(){
        return queue.peek();
    }

    protected boolean isEmpty(){
        return queue.isEmpty();
    }

    protected void addAll(List<T> list){
        queue.addAll(list);
    }

    protected int getLength() {
        return queue.size();
    }

}

缓冲队列管理类:

 package psplat.unicorn.model;

import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author L
 *  联通队列管理类
 */
public class UnicornQueueManager {

    private CacheQueue<MessageSendInfo> queueCache;
    private Lock lock;
    private Condition condition;

    private static volatile UnicornQueueManager instance;

    private UnicornQueueManager(){
        queueCache = new CacheQueue<MessageSendInfo>();
        this.lock = new ReentrantLock();
        this.condition = lock.newCondition();
    }

    public static UnicornQueueManager getInstance(){
        if(instance == null){
            synchronized(UnicornQueueManager.class){
                if(instance == null){
                    instance = new UnicornQueueManager();
                }
            }
        }
        return instance;
    }

    public void add(MessageSendInfo messageInfo){
        queueCache.add(messageInfo);
        lock.lock();
        condition.signalAll();
        lock.unlock();
    }

    public MessageSendInfo removeOne(){
        return queueCache.remove();
    }

    public MessageSendInfo get(){
        return queueCache.get();
    }

    public boolean isEmpty(){
        return queueCache.isEmpty();
    }

    public Lock getLock() {
        return lock;
    }

    public Condition getCondition() {
        return condition;
    }

    public void addAll(List<MessageSendInfo> list){
        queueCache.addAll(list);
    }

    public int getLength() {
        return queueCache.getLength();
    }

}

消费者线程:

 package psplat.unicorn.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jeecgframework.web.system.service.SystemService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import psplat.https.HttpClientUtil;
import psplat.unicorn.entity.UnicornLogEntity;
import psplat.unicorn.model.MessageSendInfo;
import psplat.unicorn.model.MessageStatus;
import psplat.unicorn.model.ReportMapManager;
import psplat.unicorn.model.UnicornConstants;
import psplat.unicorn.model.UnicornQueueManager;
import psplat.util.ConvertMapUtil;

@Component
public class UnicordSendConsumer implements InitializingBean {

    private ExecutorService consumerRunnable;

    @Autowired
    private SystemService systemService;

    //用于发送计数,发送三次,失败,则丢弃数据,否则队列后的消息全部阻塞
    private static int count = 0;

    @Override
    public void afterPropertiesSet() throws Exception {
        consumerRunnable = Executors.newSingleThreadExecutor();
        consumerRunnable.execute(new Runnable() {

            @Override
            public void run() {
                while(true) {
                    if(UnicornQueueManager.getInstance().isEmpty()) {
                        UnicornQueueManager.getInstance().getLock().lock();
                        try {
                            UnicornQueueManager.getInstance().getCondition().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        UnicornQueueManager.getInstance().getLock().unlock();
                    } else {
                        MessageSendInfo sendInfo = UnicornQueueManager.getInstance().get();
                        //组装参数并发送
                        String url = UnicornConstants.COMMON_URL.getSuffix() + UnicornConstants.SEND.getSuffix();
                        Map<String, String> argsMap = new HashMap<String, String>();
                        argsMap.put("SpCode", UnicornConstants.SpCode.getSuffix());
                        argsMap.put("LoginName", UnicornConstants.LoginName.getSuffix());
                        argsMap.put("Password", UnicornConstants.Password.getSuffix());
                        argsMap.put("MessageContent", sendInfo.getMessageContent());
                        argsMap.put("UserNumber", sendInfo.getNumbers());
                        argsMap.put("SerialNumber", sendInfo.getSerialNumber());
                        String result = (new HttpClientUtil()).doPost(url, argsMap, "GBK");
                        String resultCode = ConvertMapUtil.convertMapFromArgs(result).get("result");
                        if(!resultCode.equals("0")) {
                            //发送失败,发三次,移除
                            count++;
                            if(count != 3) {
                                //失败了,1秒后重新发送
                                this.threadSleep(1000);
                                continue;
                            }
                        }
                        count = 0;
                        UnicornQueueManager.getInstance().removeOne();
                        System.out.println("执行,此时队列长度" + UnicornQueueManager.getInstance().getLength());
                        List<UnicornLogEntity> logs = sendInfo.inrichUnicornLogs(MessageStatus.COMMIT.getCode(), resultCode);
                        systemService.batchSave(logs);
                        ReportMapManager.getInstance().addAll(ReportConsumer.convertUnicornLogEntityMap(logs));
                    }
                }
            }

            private void threadSleep(int i) {
                try {
                    Thread.sleep(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        });
    }

}

有大神看出来为何这个线程会执行两次么?加了打印之后,队列长度已经为0为什么不进入等待?
如下是打印:
调用结束,返回值:result=0&description=发送短信成功&taskid=208754669702&faillist=&task_id=208754669702
执行,此时队列长度0
调用结束,返回值:result=0&description=发送短信成功&taskid=208753973775&faillist=&task_id=208753973775
执行,此时队列长度0
求指导!

  • 写回答

2条回答

  • dabocaiqq 2018-11-07 11:49
    关注
    评论

报告相同问题?

悬赏问题

  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?