C# 编写activemq 消费者和生产者,无法正常收发数据。 mq在pc上已经安装完成,并且可以网页登录。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.Runtime.Serialization;
namespace actmq
{
public partial class Producer : Form
{
// 生产者
public Producer()
{
InitializeComponent();
InitConsumer();
InitProducer();
}
private IConnectionFactory factoryProducer;
private string strServer = "tcp://127.0.0.1:61616";
public void InitProducer()
{
try
{
//初始化工厂,这里默认的URL是不需要修改的
factoryProducer = new ConnectionFactory(strServer);
}
catch
{
lbMessage.Text = "初始化失败!!";
}
}
private void button1_Click(object sender, EventArgs e)
{
//通过工厂建立连接
using (IConnection connection = factoryProducer.CreateConnection())
{
//通过连接创建Session会话
using (ISession session = connection.CreateSession())
{
//通过会话创建生产者,方法里面new出来的是MQ中的Queue
IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("test"));
//创建一个发送的消息对象
ITextMessage message = prod.CreateTextMessage();
//给这个对象赋实际的消息
message.Text = "hello";
//message.Properties.SetString("filter", "SwipeCard");
//生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否长链
//MsgPriority消息优先级别,发送最小单位,当然还有其他重载
prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
lbMessage.Text = "发送成功!!";
txtMessage.Text = "";
txtMessage.Focus();
}
}
}
IConnectionFactory factoryConsumer;
IConnection connectionConsumer;
public void InitConsumer()
{
//创建连接工厂
factoryConsumer = new ConnectionFactory(strServer);
//通过工厂构建连接
connectionConsumer = factoryConsumer.CreateConnection();
//这个是连接的客户端名称标识
connectionConsumer.ClientId = "zxc132423";
//启动连接,监听的话要主动启动连接
connectionConsumer.Start();
//通过连接创建一个会话
ISession session = connectionConsumer.CreateSession();
//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("test"), "filter='SwipeCard'");
//注册监听事件
consumer.Listener += new MessageListener(consumer_Listener);
connectionConsumer.Start();
// connection.Stop();
// connection.Close();
}
void consumer_Listener(IMessage message)
{
ITextMessage msg = (ITextMessage)message;
//tbReceiveMessage.Invoke(new DelegateRevMessage(RevMessage), msg);
Console.WriteLine(msg.Text);
}
public delegate void DelegateRevMessage(ITextMessage message);
public void RevMessage(ITextMessage message)
{
// tbReceiveMessage.Text += string.Format(@"接收到:{0}{1}", message.Text, Environment.NewLine);
Console.WriteLine(message.Text);
}
}//end class
}