werewolf2017
lancankun
采纳率0%
2018-11-15 06:45

多个线程取同一个IBM MQ的队列消息时为什么一直只有同一个线程能取到消息?

我先是使用一个线程,每个500毫秒put一条消息到队列里,下面这个是put消息的线程的run方法的代码:

 public void run() {
        try {
            countDown.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        int count = 0;
        MQQueueManager mqQueueManager = null;
        MQQueue queue = null;
        int openOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
        long cur = 0;
        try {
            mqQueueManager = new MQQueueManager(Config.QUEUE_MANAGER, getMQEnviroment(Config.HOST, Config.PORT));
            queue = mqQueueManager.accessQueue(Config.RECIEVE_QUEUE, openOptions, null, null, null);
        } catch (MQException e1) {
            e1.printStackTrace();
        }
        while(running) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            try {
                cur = System.currentTimeMillis();

                MQPutMessageOptions pmo = new MQPutMessageOptions();
                try {
                    Thread.sleep(Config.MILLS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                MQMessage outMsg = new MQMessage(); // Create the message
                outMsg.messageId = "123".getBytes();
                outMsg.correlationId = "456".getBytes();
                outMsg.messageFlags = MQConstants.MQMT_REQUEST;
                outMsg.characterSet = Config.CCSID;
                outMsg.expiry = -1;
                byte[] msgContent = ("hello, world. " + ++count).getBytes();
                outMsg.write(msgContent);
                queue.put(outMsg, pmo);
                mqQueueManager.commit();
                System.out.println(sequence + "-" + count/* + ", sum:" + sum.incrementAndGet()*/ + "-" + (System.currentTimeMillis() - cur) + "ms");

    //              if(sum.get() > Config.SUM) break;
            } catch (MQException e) {
                count--;
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                count--;
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        try {
            queue.close();
            mqQueueManager.close();
            mqQueueManager.disconnect();
        } catch (MQException e) {
            e.printStackTrace();
        }
    }

    public Hashtable<String, Object> getMQEnviroment(String host, int port) {
        Hashtable<String, Object> prop = new Hashtable<String, Object>();
        prop.put(MQC.HOST_NAME_PROPERTY, host);
        prop.put(MQC.PORT_PROPERTY, port);
        prop.put(MQC.CHANNEL_PROPERTY, Config.CHANNEL);
        prop.put(MQC.CCSID_PROPERTY, Config.CCSID);
        prop.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
        return prop;
    }

然后使用三个线程不断读取这个消息队列的消息,下面是读取消息的线程的run方法代码:

 public void run() {
        int count = 0;

        MQQueueManager queueManager = null;
        MQQueue queue = null;

        try {
            int receiveOptions = MQConstants.MQOO_INPUT_AS_Q_DEF
                    | MQConstants.MQOO_INQUIRE;

            long cur = 0;
            while(running) {
                try {
                    queueManager = new MQQueueManager(Config.QUEUE_MANAGER, getMQEnvironment(Config.HOST, Config.PORT));
                    queue = queueManager.accessQueue(Config.SEND_QUEUE, receiveOptions, null, null, null);
                    cur = System.currentTimeMillis();
                    MQGetMessageOptions gmo = new MQGetMessageOptions();
                    gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQOO_FAIL_IF_QUIESCING /*| MQConstants.MQGMO_SYNCPOINT*/;
                    gmo.waitInterval = MQConstants.MQWI_UNLIMITED;

                    MQMessage msg = new MQMessage();
                    queue.get(msg, gmo);
                    queueManager.commit();
                    byte[] body = new byte[msg.getMessageLength()];
                    msg.readFully(body);
                    queue.close();
                    queueManager.close();
                    queueManager.disconnect();
                    System.out.println(name + ": " + new String(body) + " : " + ++count + " : " +(System.currentTimeMillis() - cur) + "ms");
                } catch (MQException e) {
                    if (e.getReason() == 2033) {
                        System.out.println("hello");
                        continue;
                    }
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            queue.close();
            queueManager.close();
            queueManager.disconnect();
        } catch (MQException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Hashtable<String, Object> getMQEnvironment(String host, int port){
        Hashtable<String, Object> prop = new Hashtable<String, Object>();
        prop.put(MQC.HOST_NAME_PROPERTY, host);
        prop.put(MQC.PORT_PROPERTY, port);
        prop.put(MQC.CHANNEL_PROPERTY, Config.CHANNEL);
        prop.put(MQC.CCSID_PROPERTY, Config.CCSID);
        prop.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
        return prop;
    }

我先启动三个取消息的线程,然后再启动一个put消息的线程。但是,每次取到消息的都是同一个线程,其余两个线程一条消息都没取到。不是偶然性的事件,每次测试都是这样。请问一下熟悉IBM MQ的朋友,应该怎么解决这种问题?

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • weixin_43674890 追光者乚 11月前

    发一百条试试,或者加个sleep

    点赞 评论 复制链接分享

相关推荐