Golang-扩展websocket客户端以实现与不同服务器的多个连接

I have a websocket client. In reality, it is far more complex than the basic code shown below. I now need to scale this client code to open connections to multiple servers. Ultimately, the tasks that need to be performed when a message is received from the servers is identical. What would be the best approach to handle this? As I said above the actual code performed when receiving the message is far more complex than shown in the example.

package main

import (
        "flag"
        "log"
        "net/url"
        "os"
        "os/signal"
        "time"

        "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:1234", "http service address")

func main() {
        flag.Parse()
        log.SetFlags(0)

        interrupt := make(chan os.Signal, 1)
        signal.Notify(interrupt, os.Interrupt)

        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
        log.Printf("connecting to %s", u.String())

        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil {
                log.Fatal("dial:", err)
        }
        defer c.Close()

        done := make(chan struct{})

        go func() {
                defer close(done)
                for {
                        _, message, err := c.ReadMessage()
                        if err != nil {
                                log.Println("read:", err)
                                return
                        }
                        log.Printf("recv: %s", message)
                }
        }()

        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
                select {
                case <-done:
                        return
                case t := <-ticker.C:
                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                        if err != nil {
                                log.Println("write:", err)
                                return
                        }
                case <-interrupt:
                        log.Println("interrupt")

                        // Cleanly close the connection by sending a close message and then
                        // waiting (with timeout) for the server to close the connection.
                        err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                        if err != nil {
                                log.Println("write close:", err)
                                return
                        }
                        select {
                        case <-done:
                        case <-time.After(time.Second):
                        }
                        return
                }
        }
}

2个回答

Modify the interrupt handling to close a channel on interrupt. This allows multiple goroutines to wait on the event by waiting for the channel to close.

shutdown := make(chan struct{})
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
    <-interrupt
    log.Println("interrupt")
    close(shutdown)
}()

Move the per-connection code to a function. This code is a copy and paste from the question with two changes: the interrupt channel is replaced with the shutdown channel; the function notifies a sync.WaitGroup when the function is done.

func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    log.Printf("connecting to %s", u)
    c, _, err := websocket.DefaultDialer.Dial(u, nil)
    if err != nil {
        log.Fatal("dial:", err)
    }
    defer c.Close()

    done := make(chan struct{})

    go func() {
        defer close(done)
        for {
            _, message, err := c.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                return
            }
            log.Printf("recv: %s", message)
        }
    }()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-done:
            return
        case t := <-ticker.C:
            err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
            if err != nil {
                log.Println("write:", err)
                return
            }
        case <-shutdown:
            // Cleanly close the connection by sending a close message and then
            // waiting (with timeout) for the server to close the connection.
            err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
            if err != nil {
                log.Println("write close:", err)
                return
            }
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}

Declare a sync.WaitGroup in main(). For each websocket endpoint that you want to connect to, increment the WaitGroup and start a goroutine to connect that endpoint. After starting the goroutines, wait on the WaitGroup for the goroutines to complete.

var wg sync.WaitGroup
for _, u := range endpoints { // endpoints is []string 
                              // where elements are URLs 
                              // of endpoints to connect to.
    wg.Add(1)
    go connect(u, shutdown, &wg)
}
wg.Wait()

The code above with an edit to make it run against Gorilla's echo example server is posted on the playground.

duangao8359
duangao8359 谢谢。 对我有很大帮助。 我将两个答案都标记为correc.t
一年多之前 回复



与每个不同服务器的通信是否完全独立于其他服务器? 如果是的话,我会以类似的方式进行操作:</ p>


  • main </ em>中创建一个上下文与取消功能</ li>
  • 创建 waitgroup 跟踪每个服务器的启动goroutine </ li>
  • ,将其添加到waitgroup,从传递的主要函数中启动一个新的goroutine 上下文和等待组引用</ li>
  • main </ em>进入for / select循环,侦听信号,如果到达则调用cancelfunc并等待等待组。 </ li>
  • main </ em>也可以侦听goroutine中的结果chan,并可能在goroutine不应该直接执行结果的情况下打印结果本身。</ li>
    < 正如我们所说,li>每个 goroutine </ em>都有对wg,上下文以及可能返回结果的chan的引用。 现在,如果goroutine仅必须执行一项和一项操作,或者需要执行一系列操作,则该方法就产生了分歧。 对于第一种方法</ li>
  • 如果只要做一件事情,我们将采用类似描述的方法此处(请注意,与他异步的是,他会启动一个新的goroutine来执行DoSomething()步骤,该步骤将在通道上返回结果)
    这使得它能够 随时接受取消信号。 由您确定要成为非阻塞对象的程度以及要对提示信号做出响应的提示的时间。将上下文关联传递给goroutine的好处是您可以调用启用的上下文 大多数库函数的版本。 例如,如果您希望您的拨号盘具有1分钟的超时时间,则可以通过传递的超时值创建一个新的上下文,然后使用该值创建DialContext。 这样一来,拨号就可以从超时停止,也可以停止调用父级(您在主环境中创建的拨号)上下文的cancelfunc。</ li>
  • 如果需要执行更多操作,我通常更喜欢执行一项操作 使用goroutine,让它调用一个新的goroutine,然后执行下一步(将所有引用传递到管道中)并退出。</ li>
    </ ul>

    这种方法的扩展性很好 取消并可以在任何步骤停止管道,并为需要花费很长时间的步骤轻松提供带有脱碳元素的上下文。</ p>
    </ div>

展开原文

原文

is the communication with every different server completely independendant of the other servers? if yes i would go around in a fashion like:

  • in main create a context with a cancellation function
  • create a waitgroup in main to track fired up goroutines
  • for every server, add to the waitgroup, fire up a new goroutine from the main function passing the context and the waitgroup references
  • main goes in a for/select loop listening to for signals and if one arrives calls the cancelfunc and waits on the waitgroup.
  • main can also listen on a result chan from the goroutines and maybe print the results itself it the goroutines shouldn't do it directly.
  • every goroutine has as we said has references for the wg, the context and possibly a chan to return results. now the approach splits on if the goroutine must do one and one thing only, or if it needs to do a sequence of things. for the first approach
  • if only one thing is to be done we follow an approach like the one descripbed here (observe that to be asyncronous he would in turn fire up a new goroutine to perform the DoSomething() step that would return the result on the channel) That allows it to be able to accept the cancellation signal at any time. it is up to you to determine how non-blocking you want to be and how prompt you want to be to respond to cancellation signals.Also the benefit of having the a context associated being passed to the goroutines is that you can call the Context enabled versions of most library functions. For example if you want your dials to have a timeout of let's say 1 minute, you would create a new context with timeout from the one passed and then DialContext with that. This allows the dial to stop both from a timeout or the parent (the one you created in main) context's cancelfunc being called.
  • if more things need to be done ,i usually prefer to do one thing with the goroutine, have it invoke a new one with the next step to be performed (passing all the references down the pipeline) and exit.

this approach scales well with cancellations and being able to stop the pipeline at any step as well as support contexts with dealines easily for steps that can take too long.

douhang5493
douhang5493 非常感谢你。 正是我想要的。
一年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐