doq91130 2016-04-05 04:35
浏览 41
已采纳

如何使用频道广播消息

I am new to go and I am trying to create a simple chat server where clients can broadcast messages to all connected clients.

In my server, I have a goroutine (infinite for loop) that accepts connection and all the connections are received by a channel.

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

Then, I start a handler (goroutine) for every connected client. Inside the handler, I try to broadcast to all connections by iterating through the channel.

for c := range ch {
    conn.Write(msg)
}

However, I cannot broadcast because (I think from reading the docs) the channel needs to be closed before iterating. I am not sure when I should close the channel because I want to continuously accept new connections and closing the channel won't let me do that. If anyone can help me, or provide a better way to broadcast messages to all connected clients, it would be appreciated.

  • 写回答

5条回答 默认 最新

  • duanpiyao2734 2016-04-05 05:48
    关注

    What you are doing is a fan out pattern, that is to say, multiple endpoints are listening to a single input source. The result of this pattern is, only one of these listeners will be able to get the message whenever there's a message in the input source. The only exception is a close of channel. This close will be recognized by all of the listeners, and thus a "broadcast".

    But what you want to do is broadcasting a message read from connection, so we could do something like this:

    When the number of listeners is known

    Let each worker listen to dedicated broadcast channel, and dispatch the message from the main channel to each dedicated broadcast channel.

    type worker struct {
        source chan interface{}
        quit chan struct{}
    }
    
    func (w *worker) Start() {
        w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
        go func() {
            for {
                select {
                case msg := <-w.source
                    // do something with msg
                case <-quit: // will explain this in the last section
                    return
                }
            }
        }()
    }
    

    And then we could have a bunch of workers:

    workers := []*worker{&worker{}, &worker{}}
    for _, worker := range workers { worker.Start() }
    

    Then start our listener:

    go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
    }()
    

    And a dispatcher:

    go func() {
        for {
            msg := <- ch
            for _, worker := workers {
                worker.source <- msg
            }
        }
    }()
    

    When the number of listeners is not known

    In this case, the solution given above still works. The only difference is, whenever you need a new worker, you need to create a new worker, start it up, and then push it into workers slice. But this method requires a thread-safe slice, which need a lock around it. One of the implementation may look like as follows:

    type threadSafeSlice struct {
        sync.Mutex
        workers []*worker
    }
    
    func (slice *threadSafeSlice) Push(w *worker) {
        slice.Lock()
        defer slice.Unlock()
    
        workers = append(workers, w)
    }
    
    func (slice *threadSafeSlice) Iter(routine func(*worker)) {
        slice.Lock()
        defer slice.Unlock()
    
        for _, worker := range workers {
            routine(worker)
        }
    }
    

    Whenever you want to start a worker:

    w := &worker{}
    w.Start()
    threadSafeSlice.Push(w)
    

    And your dispatcher will be changed to:

    go func() {
        for {
            msg := <- ch
            threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
        }
    }()
    

    Last words: never leave a dangling goroutine

    One of the good practices is: never leave a dangling goroutine. So when you finished listening, you need to close all of the goroutines you fired. This will be done via quit channel in worker:

    First we need to create a global quit signalling channel:

    globalQuit := make(chan struct{})
    

    And whenever we create a worker, we assign the globalQuit channel to it as its quit signal:

    worker.quit = globalQuit
    

    Then when we want to shutdown all workers, we simply do:

    close(globalQuit)
    

    Since close will be recognized by all listening goroutines (this is the point you understood), all goroutines will be returned. Remember to close your dispatcher routine as well, but I will leave it to you :)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(4条)

报告相同问题?

悬赏问题

  • ¥15 三因素重复测量数据R语句编写,不存在交互作用
  • ¥15 微信会员卡等级和折扣规则
  • ¥15 微信公众平台自制会员卡可以通过收款码收款码收款进行自动积分吗
  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目
  • ¥20 mysql架构,按照姓名分表