duanquannan0593 2016-04-06 16:53
浏览 34
已采纳

如何使用通道和goroutines构造Go Web服务器?

I am implementing a server to stream many arrays of floats. I need some help to design my system to achieve the following:

  • The audio process must be independent and does its work even if there isn't any request coming in. My current approach make the DataProcess function to wait until there is a request.
  • Because the channel can only give data to 1 request, how can 2 or more requests get the data I've prepared in the DataProcess?
  • To actually stream data, the request handler cant wait for the whole DataProcess to finish, is there anyway to give the handler data as soon as we complete each iteration in the DataProcess?

Any reply is appreciated. This is my current thoughts:

package main

import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)

func main() {
    c := AudioProcess()
    handleHello := makeHello(c)

    http.HandleFunc("/", handleHello)
    http.ListenAndServe(":8000", nil)
}

func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        for item := range c { // this loop runs when channel c is closed
            io.WriteString(w, item)
        }
    }
}

func AudioProcess() chan string {
    c := make(chan string)
    go func() {
        for i := 0; i <= 10; i++ { // Iterate the audio file
            c <- strconv.Itoa(i) // have my frame of samples, send to channel c
            time.Sleep(time.Second)
            fmt.Println("send ", i) // logging
        }
        close(c) // done processing, close channel c
        }()
        return c
    }
  • 写回答

1条回答 默认 最新

  • drf97973 2016-04-06 23:19
    关注

    I'm not entirely sure if this addresses your problem as I'm not fully aware of your use case, but nevertheless, I've come up with a solution below.

    I've used Gin for the HTTP router because it was more comfortable to me, but I'm pretty sure you can adapt the code to fit yours. I did this in a hurry (sorry), so there may be problems I'm not aware of, but do let me know if there are any.

    In short:

    1. I created a Manager that takes care of several Client. It also contains a sync.Mutex to ensure only one thread is modifying the clients at any given time;
    2. There is an InitBackgroundTask() that will generate a random float64 number, and pass it to ALL clients in a Manager (if there are any). If there aren't any clients, we just sleep, and carry on...
    3. The index handler deals with adding, and removing clients. Clients are identified through a UUID;
    4. 3 things can happen now. Clients are automatically removed when they disconnect via the <-c.Writer.CloseNotify() channel (because the method returns thereby calling the defer). We can also receive the random float64 number in the next background task tick. Finally, we can also terminate if we have not received anything in 20s.

    I made several assumptions about your needs here (e.g. that the background task will return X every Y minutes). If you are looking for more fine grain streaming, I'd recommend using websockets instead (and the pattern below can still be used).

    Let me know if you have any questions.

    Code:

    package main
    
    import (
        "github.com/gin-gonic/gin"
        "github.com/satori/go.uuid"
        "log"
        "math/rand"
        "net/http"
        "sync"
        "time"
    )
    
    type Client struct {
        uuid string
        out  chan float64
    }
    
    type Manager struct {
        clients map[string]*Client
        mutex   sync.Mutex
    }
    
    func NewManager() *Manager {
        m := new(Manager)
        m.clients = make(map[string]*Client)
        return m
    }
    
    func (m *Manager) AddClient(c *Client) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        log.Printf("add client: %s
    ", c.uuid)
        m.clients[c.uuid] = c
    }
    
    func (m *Manager) DeleteClient(id string) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        // log.Println("delete client: %s", c.uuid)
        delete(m.clients, id)
    }
    
    func (m *Manager) InitBackgroundTask() {
        for {
            f64 := rand.Float64()
            log.Printf("active clients: %d
    ", len(m.clients))
            for _, c := range m.clients {
                c.out <- f64
            }
            log.Printf("sent output (%+v), sleeping for 10s...
    ", f64)
            time.Sleep(time.Second * 10)
        }
    }
    
    func main() {
        r := gin.Default()
        m := NewManager()
    
        go m.InitBackgroundTask()
    
        r.GET("/", func(c *gin.Context) {
            cl := new(Client)
            cl.uuid = uuid.NewV4().String()
            cl.out = make(chan float64)
    
            defer m.DeleteClient(cl.uuid)
            m.AddClient(cl)
    
            select {
            case <-c.Writer.CloseNotify():
                log.Printf("%s : disconnected
    ", cl.uuid)
            case out := <-cl.out:
                log.Printf("%s : received %+v
    ", out)
                c.JSON(http.StatusOK, gin.H{
                    "output": out,
                })
            case <-time.After(time.Second * 20):
                log.Println("timed out")
            }
        })
    
        r.Run()
    }
    

    Note: if you're testing this on Chrome, you might have to append a random parameter at the end of the URL so that the request will actually be made, e.g. ?rand=001, ?rand=002 and so on.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 ansys fluent计算闪退
  • ¥15 有关wireshark抓包的问题
  • ¥15 需要写计算过程,不要写代码,求解答,数据都在图上
  • ¥15 向数据表用newid方式插入GUID问题
  • ¥15 multisim电路设计
  • ¥20 用keil,写代码解决两个问题,用库函数
  • ¥50 ID中开关量采样信号通道、以及程序流程的设计
  • ¥15 U-Mamba/nnunetv2固定随机数种子
  • ¥15 vba使用jmail发送邮件正文里面怎么加图片
  • ¥15 vb6.0如何向数据库中添加自动生成的字段数据。