douxun4924 2019-03-17 14:00
浏览 82
已采纳

如何将消息发送到池中的所有连接

When a server side event is added to a stream, how do I make sure all clients currently connected to that stream receive the event, i.e. how do I loop through all clients and send them the message before discarding the message , is this even possible with sse and Go?

summarised pseudo code of what I want to achieve below

package main

import (
    "github.com/gin-contrib/sse"
    "github.com/gin-gonic/gin"
    "net/http"
)

func main() {

    router := gin.New()
    router.Use(gin.Logger())

    var events = make(chan sse.Event, 100)

    router.GET("/api/addUser/event", func(c *gin.Context) {

        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")

        // if events chan has an event
        // Send event to all connected clients

        if( we have events then send them to all clients){

            event := <-events
            _ = sse.Encode(c.Writer, event)
        }

    })

    router.POST("/api/addUser", func(c *gin.Context) {

        //On user add
        //Add event to events chan
        events <- sse.Event{
            Event: "newChiitiko",
            Id:    "1",
            Data:  "New Chiitiko Event",
        }

        c.JSON(http.StatusOK, "okay")
    })

    _ = router.Run(":5000")
}
  • 写回答

1条回答

  • doulvli9462 2019-03-17 14:47
    关注

    It's hard to do it with single channel. The simplest answer is create channel for each connection.

    Like:

    mu := new(sync.Mutex)
    var eventChans []sse.Event
    
    router.GET("/api/addUser/event", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")
    
        // Add own channel to the pool.
        events := make(chan sse.Event)
        mu.Lock()
        eventChans = append(eventChans, events)
        mu.Unlock()
    
        // Listen for the events.
        for(event := range events) {
            sse.Encode(c.Writer, event)
        }
    })
    
    router.POST("/api/addUser", func(c *gin.Context) {
        mu.Lock()
        for(_, events := range eventChans) {
            events <- sse.Event{ ... }
        }
        mu.Unlock()
    
        c.JSON(http.StatusOK, "okay")
    })
    

    Or use sync.Cond.

    cond := sync.NewCond(new(sync.Mutex))
    var event *sse.Event
    
    router.GET("/api/addUser/event", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")
    
        for {
            // Wait for event.
            cond.L.Lock()
            for(event == nil) {
                cond.Wait()
            }
    
            sse.Encode(c.Writer, event)
        }
    })
    
    router.POST("/api/addUser", func(c *gin.Context) {
        cond.L.Lock()
        event = sse.Event{ ... }
        cond.L.Unlock()
    
        cond.Broadcast()
    
        c.JSON(http.StatusOK, "okay")
    })
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 Python爬取指定微博话题下的内容,保存为txt
  • ¥15 vue2登录调用后端接口如何实现
  • ¥65 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?