douzhantanju1849 2017-06-03 04:15
浏览 102
已采纳

net / http:协程之间的并发和消息传递

I am working on a REST API server and one of the features of the server is being able to notify arbitrary number of client via a websocket when a new resource is created or an existing one is modified.

I have a custom action router to bind an URL to a function and gorillas's implementation of websocket library. For IPC I have decided to rely on channels as it appears to be the idiomatic way to communicate between coroutines. Also it behaves like a pipe which is a concept I am familiar with.

A prototype for a function Create looks like this:

func Create (res http.ResponseWriter, req *http.Request, userdata interface {}) (int, string, interface {})

As a userdata an instance of a structure PipeSet is passed. It is a map that is shared between all coroutines where a key is an address (a pointer to) of a Pipe and value the same thing. The rationale here is to speed up a lookup process when deleting.

type Pipe chan string                                                           

type PipeSet struct {                                                           
    sync.Mutex                                                                  
    Pipes map [*Pipe] *Pipe                                                     
}                                                                               

func NewPipe () Pipe {                                                          
    return make (Pipe)                                                          
}                                                                               

func NewPipeSet () PipeSet {                                                    
    var newSet PipeSet                                                      
    newSet.Pipes = make (map[*Pipe] *Pipe)                                  
    return newSet                                                           
}                                                                               

func (o *PipeSet) AddPipe (pipe *Pipe) {                                        
    o.Lock ()                                                                   
    o.Pipes[pipe] = pipe                                                        
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) ForeachPipe (f func (pipe Pipe)) {                            
    o.Lock ()                                                                   
    for k := range (o.Pipes) {                                                  
        f (*o.Pipes[k])                                                         
    }                                                                           
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) DeletePipe (pipe *Pipe) {                                     
    o.Lock ()                                                                   
    delete (o.Pipes, pipe)                                                      
    o.Unlock ()                                                                 
}

When a client connects via websocket a new channel (a Pipe) is created and added to a shared PipeSet. Then if a new resource is created a coroutine goes through an entire PipeSet sending a message to each Pipe. The message is then forwarded to a connected clients on the other side.

A problem area

I am unable to detect whether client's websocket connection is still there. I need to know that to determine whether I should remove a Pipe from the PipeSet. I am relying on CloseNotifier in this case. It never fires.

The code looks like this (excerpt):

var upgrader = websocket.Upgrader {
    CheckOrigin: func (r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade (res, req, nil)

if err != nil {
    marker.MarkError (err)
    return http.StatusBadRequest, "", nil
}

defer conn.Close ()

exitStatus = http.StatusOK
pipe := genstore.NewPipe ()
quit := res.(http.CloseNotifier).CloseNotify ()

genStore.WSChannels.AddPipe (&pipe)

for {
    log.Printf ("waiting for a message")

    select {
        case wsMsg = <-pipe:
            log.Printf ("got a message: %s (num pipes %d)", wsMsg, len (genStore.WSChannels.Pipes))

            if err = conn.WriteMessage (websocket.TextMessage, []byte (wsMsg)); err != nil {
                marker.MarkError (err)
                goto egress
            }

        case <-quit:
            log.Printf ("quit...")
            goto egress
    }
}

egress:
genStore.WSChannels.DeletePipe (&pipe)
  • 写回答

1条回答 默认 最新

  • douling8772 2017-06-03 16:09
    关注

    When you upgrade HTTP connection to a WebSocket connection using Gorilla, it hijacks that connection and net/http server stops serving it. This means, that you can't rely on a net/http events from that moment.

    Check this: https://github.com/gorilla/websocket/issues/123

    So, what you can do here is to start new goroutine for every new WebSocket connection, which will read data from this connection and write a message to a quit channel on a failure.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 如何在3D高斯飞溅的渲染的场景中获得一个可控的旋转物体
  • ¥88 实在没有想法,需要个思路
  • ¥15 MATLAB报错输入参数太多
  • ¥15 python中合并修改日期相同的CSV文件并按照修改日期的名字命名文件
  • ¥15 有赏,i卡绘世画不出
  • ¥15 如何用stata画出文献中常见的安慰剂检验图
  • ¥15 c语言链表结构体数据插入
  • ¥40 使用MATLAB解答线性代数问题
  • ¥15 COCOS的问题COCOS的问题
  • ¥15 FPGA-SRIO初始化失败