doushansu9012
doushansu9012
2017-08-29 17:02

NATS对请求的异步回复不是异步的

已采纳

I am trying to implement request/response functinonality in gnatsd using GO language and I realized that gnatsd does not reply to request in async manner.

I started my investigation using NATS github examples https://github.com/nats-io/go-nats/tree/master/examples - examples nats-req.go and nats-rply.go. The examples works well.

Then I modified them simply to test parallel requests on gnatsd and also to provide some debug info in which goroutine ID the async reply is processed. There is source code of modified examples.

nats-rply.go has been modified to simply return back text of incoming request with information on current goroutine ID. I have also add to the async processing function 1 second sleep to simulate some processing time.

package main
import (
    "fmt"
    "github.com/nats-io/go-nats"
    "flag"
    "log"
    "runtime"
    "time"
    "bytes"
    "strconv"
)

// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
    log.Fatalf("Usage: nats-rply [-s server][-t] <subject> 
")
}

func printMsg(m *nats.Msg, i int) {
    log.Printf("[#%d] Received on [%s]: '%s'
", i, m.Subject, string(m.Data))
}

func main() {
    log.Printf("Main goroutine ID:%d
", getGID())
    var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
    var showTime = flag.Bool("t", false, "Display timestamps")

    //log.SetFlags(0)
    flag.Usage = usage
    flag.Parse()

    args := flag.Args()
    if len(args) < 1 {
        usage()
    }

    nc, err := nats.Connect(*urls)
    if err != nil {
        log.Fatalf("Can't connect: %v
", err)
    }

    subj, i := args[0], 0

    nc.Subscribe(subj, func(msg *nats.Msg) {
        i++
        printMsg(msg, i)
        //simulation of some processing time
        time.Sleep(1 * time.Second)
        newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
        nc.Publish(msg.Reply, []byte(newreply))
    })
    nc.Flush()

    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }

    log.Printf("Listening on [%s]
", subj)
    if *showTime {
        log.SetFlags(log.LstdFlags)
    }

    runtime.Goexit()
}

func getGID() uint64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

nats-req.go has been modified to send 10 requests in separate 10 goroutines started in parallel, the request timeout has been set to 3,5 seconds. I tried goroutines with shared NATS connection (function oneReq()) and also goroutines with its own NATS connections (function onReqSeparateConnect()) - with the same unsuccessful results.

package main

import (
    "flag"
    "fmt"
    "github.com/nats-io/go-nats"
    "sync"
    "time"
    "log"
)

// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
    log.Fatalf("Usage: nats-req [-s server (%s)] <subject> 
", nats.DefaultURL)
}

func main() {
    //var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")

    //log.SetFlags(0)
    flag.Usage = usage
    flag.Parse()

    args := flag.Args()
    if len(args) < 1 {
        usage()
    }

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Can't connect: %v
", err)
    }
    defer nc.Close()
    subj := args[0]

    var wg sync.WaitGroup
    wg.Add(10)
    for i := 1; i <= 10; i++ {
        //go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg)
        go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg)
    }

    wg.Wait()

}

func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
    defer wg.Done()
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
    if err != nil {
        if nc.LastError() != nil {
            log.Printf("Error in Request: %v
", nc.LastError())
        }
        log.Printf("Error in Request: %v
", err)
    } else {
        log.Printf("Published [%s] : '%s'
", subj, payload)
        log.Printf("Received [%v] : '%s'
", msg.Subject, string(msg.Data))
    }
}

func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
    defer wg.Done()
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Printf("Can't connect: %v
", err)
        return
    }
    defer nc.Close()
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
    if err != nil {
        if nc.LastError() != nil {
            log.Printf("Error in Request: %v
", nc.LastError())
        }
        log.Printf("Error in Request: %v
", err)
    } else {
        log.Printf("Published [%s] : '%s'
", subj, payload)
        log.Printf("Received [%v] : '%s'
", msg.Subject, string(msg.Data))
    }
}

And there is result - unwanted behaviour, it looks that nats-rply.go creates only one goroutine for processing incoming reqests and the requests are processed in serial way. The nats-req.go sends all 10 requests in one time with timeout set to 3,5 seconds. The nats-rply.go starts responding to the request with one second intervals in serial way, so only 3 requests are satisfied until 3,5sec timeout is breached - rest of requests timeouts. The response message also contains GoroutineID which is the same for all incoming requests! Even when nats-req is started again the goroutine id is the same, the ID changes only when nats-rply.go server is restarted.

nats-req.go log

D:\PRAC\TSP\AMON>nats-req foo
2017/08/29 18:46:48 Sending: 'Request9'
2017/08/29 18:46:48 Sending: 'Request7'
2017/08/29 18:46:48 Sending: 'Request10'
2017/08/29 18:46:48 Sending: 'Request4'
2017/08/29 18:46:48 Sending: 'Request8'
2017/08/29 18:46:48 Sending: 'Request6'
2017/08/29 18:46:48 Sending: 'Request1'
2017/08/29 18:46:48 Sending: 'Request5'
2017/08/29 18:46:48 Sending: 'Request2'
2017/08/29 18:46:48 Sending: 'Request3'
2017/08/29 18:46:49 Published [foo] : 'Request9'
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36'
2017/08/29 18:46:50 Published [foo] : 'Request7'
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36'
2017/08/29 18:46:51 Published [foo] : 'Request10'
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36'
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout

nats-rply.go log

C:\Users\belunek>nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9'
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7'
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10'
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4'
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8'
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6'
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1'
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5'
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2'
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3'

Please any ideas, how to correctly implement request/response communication in NATS with asyns (parallel) response processing? Thanks for any info.

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

2条回答

  • dongwen5351 dongwen5351 4年前

    Gnatsd reply to Request in async manner, but it doesn't start goroutine for each request, just pure async. And because you simulate processing load using time.Sleep, which pauses calling goroutine, it looks like sync processing. If you modify your example to use goroutines, everything works well.

    ...
    nc.Subscribe(subj, func(msg *nats.Msg) {
        go handler(msg, i, nc)
    })
    ...
    
    func handler(msg *nats.Msg, i int, nc *nats.Conn) {
        i++
        printMsg(msg, i)
        //simulation of some processing time
        time.Sleep(1 * time.Second)
        newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
        nc.Publish(msg.Reply, []byte(newreply))
    }
    

    Output:

    ./nats-rply test
    2017/08/30 00:17:05 Main goroutine ID:1
    2017/08/30 00:17:05 Listening on [test]
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request6'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request5'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request1'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request8'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request3'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request7'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request9'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request4'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request2'
    2017/08/30 00:17:11 [#1] Received on [test]: 'Request10'
    
    ./nats-req test
    2017/08/30 00:17:12 Published [test] : 'Request3'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : 'REPLY TO request "Request3", GoroutineId:37'
    2017/08/30 00:17:12 Published [test] : 'Request7'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : 'REPLY TO request "Request7", GoroutineId:42'
    2017/08/30 00:17:12 Published [test] : 'Request10'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : 'REPLY TO request "Request10", GoroutineId:43'
    2017/08/30 00:17:12 Published [test] : 'Request5'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : 'REPLY TO request "Request5", GoroutineId:34'
    2017/08/30 00:17:12 Published [test] : 'Request8'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : 'REPLY TO request "Request8", GoroutineId:36'
    2017/08/30 00:17:12 Published [test] : 'Request1'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : 'REPLY TO request "Request1", GoroutineId:35'
    2017/08/30 00:17:12 Published [test] : 'Request2'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : 'REPLY TO request "Request2", GoroutineId:41'
    2017/08/30 00:17:12 Published [test] : 'Request4'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : 'REPLY TO request "Request4", GoroutineId:40'
    2017/08/30 00:17:12 Published [test] : 'Request9'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : 'REPLY TO request "Request9", GoroutineId:39'
    2017/08/30 00:17:12 Published [test] : 'Request6'
    2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : 'REPLY TO request "Request6", GoroutineId:38'
    
    点赞 评论 复制链接分享
  • duanpai9945 duanpai9945 4年前

    Keep in mind that by starting a go-routine from the message handler, your processing order goes out of the window. This is the reason NATS is calling the message handler serially, to give user a guaranteed order. If order is not important to you, then indeed, it is easy to start processing of the message in a separate go-routine (or pool of go-routines).

    点赞 评论 复制链接分享