weixin_38986584
weixin_38986584
采纳率0%
2019-10-23 15:01 浏览 355

如何向多个队列同时发送消息用的是ibm mq

package test;

import java.io.IOException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;

public class MessageByMQ {

/**
 * 队列管理器的名称
 */
private String qManagerName="QM_CTL";

/**
 * 队列管理器
 */
private   MQQueueManager qMgr;
/**
 * 队列名称
 */
private  String queueName= "Q.CTL.REQ";
\\private String q = "Q.SVC.TYPE.REQ";
\\private String s = "Q.SVC.TYPE.REQ2";
/**
 * 队列
 */
private MQQueue qQueue;

/**
 * mq服务器所在的主机名称
 */
private String hostname="127.0.0.1";
/**
 * 服务器连接通道名称
 */
private String channelName="SVRCONN_CTL";

/**
 * 监听器监听的端口
 */
private  int  port=1431;
/**
 * 传输的编码类型
 */
private  int CCSID = 1381; 
@Before
public  void  init(){
    try {
        MQEnvironment.hostname = this.hostname; // 安裝MQ所在的ip address
        MQEnvironment.port = this.port; // TCP/IP port
        MQEnvironment.channel = this.channelName;
        MQEnvironment.CCSID = CCSID;
        qMgr = new MQQueueManager(this.qManagerName);
        int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE
                | MQC.MQOO_OUTPUT;

        qQueue = qMgr.accessQueue(queueName, qOptioin);
      \\  qQueue = qMgr.accessQueue(s, qOptioin);
        \\qQueue = qMgr.accessQueue(q, qOptioin);    


    } catch (MQException e) {
        e.printStackTrace();
    }
}





/**
 * 发送信息
 */

public void SendMsg(byte[] qByte) {
    try {
        MQMessage qMsg = new MQMessage();
        qMsg.write(qByte);
        MQPutMessageOptions pmo = new MQPutMessageOptions();
        qQueue.put(qMsg, pmo);
        System.out.println("The message is sent!");
        System.out.println("\tThe message is " + new String(qByte, "GBK"));
    } catch (MQException e) {
        e.printStackTrace();
        System.out
                .println("A WebSphere MQ error occurred : Completion code "
                        + e.completionCode + " Reason Code is "
                        + e.reasonCode);
    } catch (java.io.IOException e) {
        e.printStackTrace();
        System.out
                .println("An error occurred whilst to the message buffer "
                        + e);
    }

}




/**
 * 从消息队列取数据
 */
public void GetMsg() {
    try {
        MQMessage retrievedMessage = new MQMessage();

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        gmo.options += MQC.MQPMO_SYNCPOINT;
        qQueue.get(retrievedMessage, gmo);
        int length = retrievedMessage.getDataLength();

        byte[] msg = new byte[length];

        retrievedMessage.readFully(msg);

        String sMsg = new String(msg,"GBK");
        System.out.println(sMsg);

    } catch (RuntimeException e) {
        e.printStackTrace();
    } catch (MQException e) {
        e.printStackTrace();
        if (e.reasonCode != 2033) // 没有消息
        {
            e.printStackTrace();
            System.out
                    .println("A WebSphere MQ error occurred : Completion code "
                            + e.completionCode
                            + " Reason Code is "
                            + e.reasonCode);
        }
    } catch (java.io.IOException e) {
        System.out
                .println("An error occurred whilst to the message buffer "
                        + e);
    }
}





/**
 * 单元测试方法
 */

@Test
public  void  testMQ(){
    MessageByMQ mqst = new MessageByMQ();
    mqst.init();
    try {
        mqst.SendMsg("<Message><Head><ControlerName>wmb</ControlerName><FunctionName>ALL</FunctionName><ReturnCode></ReturnCode><ReturnInfo></ReturnInfo></Head><Body><TokenPoolID>ALL</TokenPoolID><ServiceID>ALL</ServiceID><SystemID>ALL</SystemID> </Body></Message>".getBytes("GBK"));
        mqst.GetMsg();
    } catch (Exception e) {
        e.printStackTrace();
    }
}


/**
 * 释放资源
 */
@After
public  void  release(){
    try {
        qQueue.close();
        qMgr.disconnect();
    } catch (MQException e) {
        System.out
                .println("A WebSphere MQ error occurred : Completion code "
                        + e.completionCode + " Reason Code is "
                        + e.reasonCode);
    }   
}

}

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

相关推荐