go语言使用RocketMQ
发送事务消息执行的函数中再发送延时消息报错:
producer group has been created
orderListener := OrderListener{}
//生成producer
p, err := rocketmq.NewTransactionProducer(
&orderListener,
producer.WithNameServer([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)}),
)
if err != nil {
zap.S().Errorf("生成producer失败: %s", err.Error())
return nil, err
}
//启动producer
if err = p.Start(); err != nil {
zap.S().Errorf("启动producer失败: %s", err.Error())
return nil, err
}
order := model.OrderInfo{
OrderSn: GenerateOrderSn(req.UserId),
Address: req.Address,
SignerName: req.Name,
SingerMobile: req.Mobile,
Post: req.Post,
User: req.UserId,
}
//应该在消息中具体指明一个订单的具体的商品的扣减情况
jsonString, _ := json.Marshal(&order)
_, err = p.SendMessageInTransaction(context.Background(), primitive.NewMessage("order_reback", jsonString))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
return nil, status.Error(codes.Internal, "发送消息队列失败")
}
if orderListener.Code != codes.OK {
return nil, status.Error(orderListener.Code, orderListener.Detail)
}
if err = p.Shutdown(); err != nil {
panic("关闭producer失败")
}
实现OrderListener的接口:
type OrderListener struct {
Code codes.Code
Detail string
ID int32
OrderAmount float32
}
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
//执行的逻辑 可以忽略
var orderInfo model.OrderInfo
_ = json.Unmarshal(msg.Body, &orderInfo)
tail = "删除购物车记录失败"
return primitive.RollbackMessageState
//return nil, status.Errorf(codes.Internal, "创建订单失败")
}
//发送延时消息 生成延时消息的producer
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)})),
producer.WithRetry(2),
)
if err != nil {
panic("生成producer失败")
}
//不要在一个进程中使用多个producer, 但是不要随便调用shutdown因为会影响其他的producer
//启动producer
if err = p.Start(); err != nil {
fmt.Println(err.Error())
panic("启动producer失败" )
}
msg = primitive.NewMessage("order_timeout", []byte("this is y"))
msg.WithDelayTimeLevel(3)
_, err = p.SendSync(context.Background(), msg)
if err != nil {
zap.S().Errorf("发送延时消息失败: %v\n", err)
tx.Rollback()
o.Code = codes.Internal
o.Detail = "发送延时消息失败"
return primitive.RollbackMessageState
}
err = p.Shutdown()
if err != nil {
panic("Shutdown:" )
}
}
//回调逻辑 可忽略
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
var orderInfo model.OrderInfo
_ = json.Unmarshal(msg.Body, &orderInfo)
if result := global.DB.Where(&model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
return primitive.CommitMessageState
}
return primitive.RollbackMessageState
}