duanqiang7631 2018-06-29 21:23
浏览 72
已采纳

too long

When a user hits a route, I make a NATS request, and wait for the reply:

ctx := r.Context()
reply, err := natsConnection.RequestWithContext(ctx, "Subject", payload)

The subscriber will do a task that will be resources intensive:

natsConnection.Subscribe("Subject", func(m *nats.Msg) {
    //do consuming task and cancel if the request was cancelled. 
    natsConnection.Publish(m.Reply, []byte("Reply"))
})

How can I tell the subscriber to stop working if the request was cancelled.

I thought creating a context for each message recived by the worker then give the sender a new inbox to wait for a reply in it, or send a cancel notice to it:

contextMap := map[string]func(){}
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    //create new inbox
    inbox := nats.NewInbox()

    //subscribe to the new inbox and cancel and clean up the current task is something is recived
    newSub, err := natsConnection.Subscribe(inbox, func(m *nats.Msg) {
        contextMap[inbox]()
    })

    contextMap[inbox] = func() {
        fmt.Println("recived")
        cancel()
        delete(contextMap, inbox)
        newSub.Unsubscribe()
    }

    //tell the requester to wait for reply in this inbox
    natsConnection.Publish(m.Reply, []byte(inbox))

    //do the work and send the reply when finished
    tick(ctx)
    natsConnection.Publish(inbox, []byte("done"))
})

On the requester end, I send the first request and wait in the specifed inbox for the reply or the context to be canceled in which case I send a message to the inbox letting the reciver know:

ctx := r.Context()
inbox, _ := natsConnection.RequestWithContext(ctx, "Subject", []byte("Payload"))

reply := make(chan *nats.Msg, 1)
natsConnection.ChanSubscribe(string(inbox.Data), reply)

select {
case <-ctx.Done():
    fmt.Println("Canceled")
    natsConnection.Publish(string(inbox.Data), []byte(""))
    return
case r := <-reply:
    fmt.Println(string(r.Data))
}

tick is just a function that tick:

func tick(ctx context.Context) {
    ticker := time.NewTicker(time.Millisecond * 500)
    for {
        select {
        case <-ctx.Done():
            ticker.Stop()
            fmt.Println("context was canceled", ctx.Err())
            return
        case <-ticker.C:
            fmt.Println("Tick")
        }
    }
}

Right now this works, but i was wondering if there is simpler way to do it, or if not how can i make this code better?

  • 写回答

2条回答 默认 最新

  • dou1908 2018-07-01 20:21
    关注

    If the response is intensive and takes multiple seconds to produce, then yes you can send a separate message that kills the request. Most architectures however let the responder continue and throw away the result.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 下图接收小电路,谁知道原理
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测
  • ¥15 ETLCloud 处理json多层级问题
  • ¥15 matlab中使用gurobi时报错
  • ¥15 这个主板怎么能扩出一两个sata口
  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探