Jzin 2023-02-12 13:36 采纳率: 55%
浏览 17
已结题

go语言使用RocketMQ发送事务消息执行的函数中再发送延时消息报错

go语言使用RocketMQ
发送事务消息执行的函数中再发送延时消息报错:
producer group has been created

orderListener := OrderListener{}
    //生成producer
    p, err := rocketmq.NewTransactionProducer(
        &orderListener,
        producer.WithNameServer([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)}),
    )
    if err != nil {
        zap.S().Errorf("生成producer失败: %s", err.Error())
        return nil, err
    }
    //启动producer
    if err = p.Start(); err != nil {
        zap.S().Errorf("启动producer失败: %s", err.Error())
        return nil, err
    }
    order := model.OrderInfo{
        OrderSn:      GenerateOrderSn(req.UserId),
        Address:      req.Address,
        SignerName:   req.Name,
        SingerMobile: req.Mobile,
        Post:         req.Post,
        User:         req.UserId,
    }
    //应该在消息中具体指明一个订单的具体的商品的扣减情况
    jsonString, _ := json.Marshal(&order)
    _, err = p.SendMessageInTransaction(context.Background(), primitive.NewMessage("order_reback", jsonString))
    if err != nil {
        fmt.Printf("发送失败: %s\n", err)
        return nil, status.Error(codes.Internal, "发送消息队列失败")
    }
    if orderListener.Code != codes.OK {
        return nil, status.Error(orderListener.Code, orderListener.Detail)
    }
    if err = p.Shutdown(); err != nil {
        panic("关闭producer失败")
    }

实现OrderListener的接口:

type OrderListener struct {
    Code        codes.Code
    Detail      string
    ID          int32
    OrderAmount float32
}

func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
        //执行的逻辑   可以忽略
    var orderInfo model.OrderInfo
    _ = json.Unmarshal(msg.Body, &orderInfo)

       tail = "删除购物车记录失败"
        return primitive.RollbackMessageState
        //return nil, status.Errorf(codes.Internal, "创建订单失败")
    }
        //发送延时消息   生成延时消息的producer
    p, err := rocketmq.NewProducer(
        producer.WithNsResolver(primitive.NewPassthroughResolver([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)})),
        producer.WithRetry(2),
    )
    if err != nil {
        panic("生成producer失败")
    }

    //不要在一个进程中使用多个producer, 但是不要随便调用shutdown因为会影响其他的producer
        //启动producer
    if err = p.Start(); err != nil {
        fmt.Println(err.Error())
        panic("启动producer失败" )
    }

    msg = primitive.NewMessage("order_timeout", []byte("this is y"))
    msg.WithDelayTimeLevel(3)
    _, err = p.SendSync(context.Background(), msg)
    if err != nil {
        zap.S().Errorf("发送延时消息失败: %v\n", err)
        tx.Rollback()
        o.Code = codes.Internal
        o.Detail = "发送延时消息失败"
        return primitive.RollbackMessageState
    }
    err = p.Shutdown()
    if err != nil {
        panic("Shutdown:" )
    }
}

//回调逻辑  可忽略
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
    var orderInfo model.OrderInfo
    _ = json.Unmarshal(msg.Body, &orderInfo)
    if result := global.DB.Where(&model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
        return primitive.CommitMessageState
    }
    return primitive.RollbackMessageState
}

  • 写回答

1条回答 默认 最新

  • Jzin 2023-02-12 14:43
    关注

    设置生产组名称即可:
    在生产producer时设置:producer.WithGroupName("shiwu"),

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 2月20日
  • 已采纳回答 2月12日
  • 创建了问题 2月12日

悬赏问题

  • ¥15 Acrn IVSHMEM doorbell问题
  • ¥15 yolov5中的val测试集训练时数量变小问题
  • ¥15 MPLS/VPN实验中MPLS的配置问题
  • ¥15 materialstudio氢键计算问题
  • ¥15 已知隐函数其中一个变量的,求另外一个变量
  • ¥15 echarts图表制作
  • ¥15 halcon根据玻璃面板纹路取区域
  • ¥15 HFSS设计小型化180度耦合器
  • ¥15 使用CInternetSession,CHttpFile读取网页文件时有些电脑上会卡住怎么办?
  • ¥15 水下机器人的半物理仿真研究