package test;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MessageByMQ {
/**
* 队列管理器的名称
*/
private String qManagerName="QM_CTL";
/**
* 队列管理器
*/
private MQQueueManager qMgr;
/**
* 队列名称
*/
private String queueName= "Q.CTL.REQ";
\\private String q = "Q.SVC.TYPE.REQ";
\\private String s = "Q.SVC.TYPE.REQ2";
/**
* 队列
*/
private MQQueue qQueue;
/**
* mq服务器所在的主机名称
*/
private String hostname="127.0.0.1";
/**
* 服务器连接通道名称
*/
private String channelName="SVRCONN_CTL";
/**
* 监听器监听的端口
*/
private int port=1431;
/**
* 传输的编码类型
*/
private int CCSID = 1381;
@Before
public void init(){
try {
MQEnvironment.hostname = this.hostname; // 安裝MQ所在的ip address
MQEnvironment.port = this.port; // TCP/IP port
MQEnvironment.channel = this.channelName;
MQEnvironment.CCSID = CCSID;
qMgr = new MQQueueManager(this.qManagerName);
int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE
| MQC.MQOO_OUTPUT;
qQueue = qMgr.accessQueue(queueName, qOptioin);
\\ qQueue = qMgr.accessQueue(s, qOptioin);
\\qQueue = qMgr.accessQueue(q, qOptioin);
} catch (MQException e) {
e.printStackTrace();
}
}
/**
* 发送信息
*/
public void SendMsg(byte[] qByte) {
try {
MQMessage qMsg = new MQMessage();
qMsg.write(qByte);
MQPutMessageOptions pmo = new MQPutMessageOptions();
qQueue.put(qMsg, pmo);
System.out.println("The message is sent!");
System.out.println("\tThe message is " + new String(qByte, "GBK"));
} catch (MQException e) {
e.printStackTrace();
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
} catch (java.io.IOException e) {
e.printStackTrace();
System.out
.println("An error occurred whilst to the message buffer "
+ e);
}
}
/**
* 从消息队列取数据
*/
public void GetMsg() {
try {
MQMessage retrievedMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options += MQC.MQPMO_SYNCPOINT;
qQueue.get(retrievedMessage, gmo);
int length = retrievedMessage.getDataLength();
byte[] msg = new byte[length];
retrievedMessage.readFully(msg);
String sMsg = new String(msg,"GBK");
System.out.println(sMsg);
} catch (RuntimeException e) {
e.printStackTrace();
} catch (MQException e) {
e.printStackTrace();
if (e.reasonCode != 2033) // 没有消息
{
e.printStackTrace();
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode
+ " Reason Code is "
+ e.reasonCode);
}
} catch (java.io.IOException e) {
System.out
.println("An error occurred whilst to the message buffer "
+ e);
}
}
/**
* 单元测试方法
*/
@Test
public void testMQ(){
MessageByMQ mqst = new MessageByMQ();
mqst.init();
try {
mqst.SendMsg("<Message><Head><ControlerName>wmb</ControlerName><FunctionName>ALL</FunctionName><ReturnCode></ReturnCode><ReturnInfo></ReturnInfo></Head><Body><TokenPoolID>ALL</TokenPoolID><ServiceID>ALL</ServiceID><SystemID>ALL</SystemID> </Body></Message>".getBytes("GBK"));
mqst.GetMsg();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 释放资源
*/
@After
public void release(){
try {
qQueue.close();
qMgr.disconnect();
} catch (MQException e) {
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
}
}
}