Jzin 2023-02-12 13:36 采纳率: 63.2%
浏览 17
已结题

go语言使用RocketMQ发送事务消息执行的函数中再发送延时消息报错

go语言使用RocketMQ
发送事务消息执行的函数中再发送延时消息报错:
producer group has been created

orderListener := OrderListener{}
    //生成producer
    p, err := rocketmq.NewTransactionProducer(
        &orderListener,
        producer.WithNameServer([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)}),
    )
    if err != nil {
        zap.S().Errorf("生成producer失败: %s", err.Error())
        return nil, err
    }
    //启动producer
    if err = p.Start(); err != nil {
        zap.S().Errorf("启动producer失败: %s", err.Error())
        return nil, err
    }
    order := model.OrderInfo{
        OrderSn:      GenerateOrderSn(req.UserId),
        Address:      req.Address,
        SignerName:   req.Name,
        SingerMobile: req.Mobile,
        Post:         req.Post,
        User:         req.UserId,
    }
    //应该在消息中具体指明一个订单的具体的商品的扣减情况
    jsonString, _ := json.Marshal(&order)
    _, err = p.SendMessageInTransaction(context.Background(), primitive.NewMessage("order_reback", jsonString))
    if err != nil {
        fmt.Printf("发送失败: %s\n", err)
        return nil, status.Error(codes.Internal, "发送消息队列失败")
    }
    if orderListener.Code != codes.OK {
        return nil, status.Error(orderListener.Code, orderListener.Detail)
    }
    if err = p.Shutdown(); err != nil {
        panic("关闭producer失败")
    }

实现OrderListener的接口:

type OrderListener struct {
    Code        codes.Code
    Detail      string
    ID          int32
    OrderAmount float32
}

func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
        //执行的逻辑   可以忽略
    var orderInfo model.OrderInfo
    _ = json.Unmarshal(msg.Body, &orderInfo)

       tail = "删除购物车记录失败"
        return primitive.RollbackMessageState
        //return nil, status.Errorf(codes.Internal, "创建订单失败")
    }
        //发送延时消息   生成延时消息的producer
    p, err := rocketmq.NewProducer(
        producer.WithNsResolver(primitive.NewPassthroughResolver([]string{fmt.Sprintf("%s:%d", global.ServerConfig.RocketMQConfig.Host, global.ServerConfig.RocketMQConfig.Port)})),
        producer.WithRetry(2),
    )
    if err != nil {
        panic("生成producer失败")
    }

    //不要在一个进程中使用多个producer, 但是不要随便调用shutdown因为会影响其他的producer
        //启动producer
    if err = p.Start(); err != nil {
        fmt.Println(err.Error())
        panic("启动producer失败" )
    }

    msg = primitive.NewMessage("order_timeout", []byte("this is y"))
    msg.WithDelayTimeLevel(3)
    _, err = p.SendSync(context.Background(), msg)
    if err != nil {
        zap.S().Errorf("发送延时消息失败: %v\n", err)
        tx.Rollback()
        o.Code = codes.Internal
        o.Detail = "发送延时消息失败"
        return primitive.RollbackMessageState
    }
    err = p.Shutdown()
    if err != nil {
        panic("Shutdown:" )
    }
}

//回调逻辑  可忽略
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
    var orderInfo model.OrderInfo
    _ = json.Unmarshal(msg.Body, &orderInfo)
    if result := global.DB.Where(&model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
        return primitive.CommitMessageState
    }
    return primitive.RollbackMessageState
}

  • 写回答

1条回答 默认 最新

  • Jzin 2023-02-12 14:43
    关注

    设置生产组名称即可:
    在生产producer时设置:producer.WithGroupName("shiwu"),

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 2月20日
  • 已采纳回答 2月12日
  • 创建了问题 2月12日

悬赏问题

  • ¥15 微信公众平台自制会员卡可以通过收款码收款码收款进行自动积分吗
  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目
  • ¥20 mysql架构,按照姓名分表
  • ¥15 MATLAB实现区间[a,b]上的Gauss-Legendre积分
  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题