doushansu9012 2017-08-29 17:02
浏览 147
已采纳

NATS对请求的异步回复不是异步的

I am trying to implement request/response functinonality in gnatsd using GO language and I realized that gnatsd does not reply to request in async manner.

I started my investigation using NATS github examples https://github.com/nats-io/go-nats/tree/master/examples - examples nats-req.go and nats-rply.go. The examples works well.

Then I modified them simply to test parallel requests on gnatsd and also to provide some debug info in which goroutine ID the async reply is processed. There is source code of modified examples.

nats-rply.go has been modified to simply return back text of incoming request with information on current goroutine ID. I have also add to the async processing function 1 second sleep to simulate some processing time.

package main
import (
    "fmt"
    "github.com/nats-io/go-nats"
    "flag"
    "log"
    "runtime"
    "time"
    "bytes"
    "strconv"
)

// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
    log.Fatalf("Usage: nats-rply [-s server][-t] <subject> 
")
}

func printMsg(m *nats.Msg, i int) {
    log.Printf("[#%d] Received on [%s]: '%s'
", i, m.Subject, string(m.Data))
}

func main() {
    log.Printf("Main goroutine ID:%d
", getGID())
    var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
    var showTime = flag.Bool("t", false, "Display timestamps")

    //log.SetFlags(0)
    flag.Usage = usage
    flag.Parse()

    args := flag.Args()
    if len(args) < 1 {
        usage()
    }

    nc, err := nats.Connect(*urls)
    if err != nil {
        log.Fatalf("Can't connect: %v
", err)
    }

    subj, i := args[0], 0

    nc.Subscribe(subj, func(msg *nats.Msg) {
        i++
        printMsg(msg, i)
        //simulation of some processing time
        time.Sleep(1 * time.Second)
        newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
        nc.Publish(msg.Reply, []byte(newreply))
    })
    nc.Flush()

    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }

    log.Printf("Listening on [%s]
", subj)
    if *showTime {
        log.SetFlags(log.LstdFlags)
    }

    runtime.Goexit()
}

func getGID() uint64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

nats-req.go has been modified to send 10 requests in separate 10 goroutines started in parallel, the request timeout has been set to 3,5 seconds. I tried goroutines with shared NATS connection (function oneReq()) and also goroutines with its own NATS connections (function onReqSeparateConnect()) - with the same unsuccessful results.

package main

import (
    "flag"
    "fmt"
    "github.com/nats-io/go-nats"
    "sync"
    "time"
    "log"
)

// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
    log.Fatalf("Usage: nats-req [-s server (%s)] <subject> 
", nats.DefaultURL)
}

func main() {
    //var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")

    //log.SetFlags(0)
    flag.Usage = usage
    flag.Parse()

    args := flag.Args()
    if len(args) < 1 {
        usage()
    }

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Can't connect: %v
", err)
    }
    defer nc.Close()
    subj := args[0]

    var wg sync.WaitGroup
    wg.Add(10)
    for i := 1; i <= 10; i++ {
        //go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg)
        go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg)
    }

    wg.Wait()

}

func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
    defer wg.Done()
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
    if err != nil {
        if nc.LastError() != nil {
            log.Printf("Error in Request: %v
", nc.LastError())
        }
        log.Printf("Error in Request: %v
", err)
    } else {
        log.Printf("Published [%s] : '%s'
", subj, payload)
        log.Printf("Received [%v] : '%s'
", msg.Subject, string(msg.Data))
    }
}

func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
    defer wg.Done()
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Printf("Can't connect: %v
", err)
        return
    }
    defer nc.Close()
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
    if err != nil {
        if nc.LastError() != nil {
            log.Printf("Error in Request: %v
", nc.LastError())
        }
        log.Printf("Error in Request: %v
", err)
    } else {
        log.Printf("Published [%s] : '%s'
", subj, payload)
        log.Printf("Received [%v] : '%s'
", msg.Subject, string(msg.Data))
    }
}

And there is result - unwanted behaviour, it looks that nats-rply.go creates only one goroutine for processing incoming reqests and the requests are processed in serial way. The nats-req.go sends all 10 requests in one time with timeout set to 3,5 seconds. The nats-rply.go starts responding to the request with one second intervals in serial way, so only 3 requests are satisfied until 3,5sec timeout is breached - rest of requests timeouts. The response message also contains GoroutineID which is the same for all incoming requests! Even when nats-req is started again the goroutine id is the same, the ID changes only when nats-rply.go server is restarted.

nats-req.go log

D:\PRAC\TSP\AMON>nats-req foo
2017/08/29 18:46:48 Sending: 'Request9'
2017/08/29 18:46:48 Sending: 'Request7'
2017/08/29 18:46:48 Sending: 'Request10'
2017/08/29 18:46:48 Sending: 'Request4'
2017/08/29 18:46:48 Sending: 'Request8'
2017/08/29 18:46:48 Sending: 'Request6'
2017/08/29 18:46:48 Sending: 'Request1'
2017/08/29 18:46:48 Sending: 'Request5'
2017/08/29 18:46:48 Sending: 'Request2'
2017/08/29 18:46:48 Sending: 'Request3'
2017/08/29 18:46:49 Published [foo] : 'Request9'
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36'
2017/08/29 18:46:50 Published [foo] : 'Request7'
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36'
2017/08/29 18:46:51 Published [foo] : 'Request10'
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36'
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout

nats-rply.go log

C:\Users\belunek>nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9'
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7'
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10'
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4'
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8'
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6'
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1'
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5'
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2'
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3'

Please any ideas, how to correctly implement request/response communication in NATS with asyns (parallel) response processing? Thanks for any info.

  • 写回答

2条回答 默认 最新

  • dongwen5351 2017-08-29 22:40
    关注

    Gnatsd reply to Request in async manner, but it doesn't start goroutine for each request, just pure async. And because you simulate processing load using time.Sleep, which pauses calling goroutine, it looks like sync processing. If you modify your example to use goroutines, everything works well.

    ...
    nc.Subscribe(subj, func(msg *nats.Msg) {
        go handler(msg, i, nc)
    })
    ...
    
    func handler(msg *nats.Msg, i int, nc *nats.Conn) {
        i++
        printMsg(msg, i)
        //simulation of some processing time
        time.Sleep(1 * time.Second)
        newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
        nc.Publish(msg.Reply, []byte(newreply))
    }
    

    Output:

    ./nats-rply test
    2017/08/30 00:17:05 Main goroutine ID:1
    2017/08/30 00:17:05 Listening on [test]
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request6'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request5'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request1'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request8'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request3'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request7'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request9'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request4'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request2'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request10'
    
    ./nats-req test
    2017/08/30 00:17:12 Published [test] : 'Request3'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : 'REPLY TO request "Request3", GoroutineId:37'
    2017/08/30 00:17:12 Published [test] : 'Request7'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : 'REPLY TO request "Request7", GoroutineId:42'
    2017/08/30 00:17:12 Published [test] : 'Request10'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : 'REPLY TO request "Request10", GoroutineId:43'
    2017/08/30 00:17:12 Published [test] : 'Request5'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : 'REPLY TO request "Request5", GoroutineId:34'
    2017/08/30 00:17:12 Published [test] : 'Request8'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : 'REPLY TO request "Request8", GoroutineId:36'
    2017/08/30 00:17:12 Published [test] : 'Request1'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : 'REPLY TO request "Request1", GoroutineId:35'
    2017/08/30 00:17:12 Published [test] : 'Request2'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : 'REPLY TO request "Request2", GoroutineId:41'
    2017/08/30 00:17:12 Published [test] : 'Request4'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : 'REPLY TO request "Request4", GoroutineId:40'
    2017/08/30 00:17:12 Published [test] : 'Request9'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : 'REPLY TO request "Request9", GoroutineId:39'
    2017/08/30 00:17:12 Published [test] : 'Request6'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : 'REPLY TO request "Request6", GoroutineId:38'
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 HFSS 中的 H 场图与 MATLAB 中绘制的 B1 场 部分对应不上
  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?