dongmu5106 2019-01-31 21:19
浏览 535
已采纳

Golang-扩展websocket客户端以实现与不同服务器的多个连接

I have a websocket client. In reality, it is far more complex than the basic code shown below. I now need to scale this client code to open connections to multiple servers. Ultimately, the tasks that need to be performed when a message is received from the servers is identical. What would be the best approach to handle this? As I said above the actual code performed when receiving the message is far more complex than shown in the example.

package main

import (
        "flag"
        "log"
        "net/url"
        "os"
        "os/signal"
        "time"

        "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:1234", "http service address")

func main() {
        flag.Parse()
        log.SetFlags(0)

        interrupt := make(chan os.Signal, 1)
        signal.Notify(interrupt, os.Interrupt)

        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
        log.Printf("connecting to %s", u.String())

        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil {
                log.Fatal("dial:", err)
        }
        defer c.Close()

        done := make(chan struct{})

        go func() {
                defer close(done)
                for {
                        _, message, err := c.ReadMessage()
                        if err != nil {
                                log.Println("read:", err)
                                return
                        }
                        log.Printf("recv: %s", message)
                }
        }()

        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
                select {
                case <-done:
                        return
                case t := <-ticker.C:
                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                        if err != nil {
                                log.Println("write:", err)
                                return
                        }
                case <-interrupt:
                        log.Println("interrupt")

                        // Cleanly close the connection by sending a close message and then
                        // waiting (with timeout) for the server to close the connection.
                        err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                        if err != nil {
                                log.Println("write close:", err)
                                return
                        }
                        select {
                        case <-done:
                        case <-time.After(time.Second):
                        }
                        return
                }
        }
}
  • 写回答

2条回答 默认 最新

  • duanli12176 2019-02-01 00:51
    关注

    Modify the interrupt handling to close a channel on interrupt. This allows multiple goroutines to wait on the event by waiting for the channel to close.

    shutdown := make(chan struct{})
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    go func() {
        <-interrupt
        log.Println("interrupt")
        close(shutdown)
    }()
    

    Move the per-connection code to a function. This code is a copy and paste from the question with two changes: the interrupt channel is replaced with the shutdown channel; the function notifies a sync.WaitGroup when the function is done.

    func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
    
        log.Printf("connecting to %s", u)
        c, _, err := websocket.DefaultDialer.Dial(u, nil)
        if err != nil {
            log.Fatal("dial:", err)
        }
        defer c.Close()
    
        done := make(chan struct{})
    
        go func() {
            defer close(done)
            for {
                _, message, err := c.ReadMessage()
                if err != nil {
                    log.Println("read:", err)
                    return
                }
                log.Printf("recv: %s", message)
            }
        }()
    
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
    
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                if err != nil {
                    log.Println("write:", err)
                    return
                }
            case <-shutdown:
                // Cleanly close the connection by sending a close message and then
                // waiting (with timeout) for the server to close the connection.
                err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                if err != nil {
                    log.Println("write close:", err)
                    return
                }
                select {
                case <-done:
                case <-time.After(time.Second):
                }
                return
            }
        }
    }
    

    Declare a sync.WaitGroup in main(). For each websocket endpoint that you want to connect to, increment the WaitGroup and start a goroutine to connect that endpoint. After starting the goroutines, wait on the WaitGroup for the goroutines to complete.

    var wg sync.WaitGroup
    for _, u := range endpoints { // endpoints is []string 
                                  // where elements are URLs 
                                  // of endpoints to connect to.
        wg.Add(1)
        go connect(u, shutdown, &wg)
    }
    wg.Wait()
    

    The code above with an edit to make it run against Gorilla's echo example server is posted on the playground.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 怀疑手机被监控,请问怎么解决和防止
  • ¥15 Qt下使用tcp获取数据的详细操作
  • ¥15 idea右下角设置编码是灰色的
  • ¥15 全志H618ROM新增分区
  • ¥15 在grasshopper里DrawViewportWires更改预览后,禁用电池仍然显示
  • ¥15 NAO机器人的录音程序保存问题
  • ¥15 C#读写EXCEL文件,不同编译
  • ¥15 MapReduce结果输出到HBase,一直连接不上MySQL
  • ¥15 扩散模型sd.webui使用时报错“Nonetype”
  • ¥15 stm32流水灯+呼吸灯+外部中断按键