duanquannan0593 2016-04-06 16:53
浏览 34
已采纳

如何使用通道和goroutines构造Go Web服务器?

I am implementing a server to stream many arrays of floats. I need some help to design my system to achieve the following:

  • The audio process must be independent and does its work even if there isn't any request coming in. My current approach make the DataProcess function to wait until there is a request.
  • Because the channel can only give data to 1 request, how can 2 or more requests get the data I've prepared in the DataProcess?
  • To actually stream data, the request handler cant wait for the whole DataProcess to finish, is there anyway to give the handler data as soon as we complete each iteration in the DataProcess?

Any reply is appreciated. This is my current thoughts:

package main

import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)

func main() {
    c := AudioProcess()
    handleHello := makeHello(c)

    http.HandleFunc("/", handleHello)
    http.ListenAndServe(":8000", nil)
}

func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        for item := range c { // this loop runs when channel c is closed
            io.WriteString(w, item)
        }
    }
}

func AudioProcess() chan string {
    c := make(chan string)
    go func() {
        for i := 0; i <= 10; i++ { // Iterate the audio file
            c <- strconv.Itoa(i) // have my frame of samples, send to channel c
            time.Sleep(time.Second)
            fmt.Println("send ", i) // logging
        }
        close(c) // done processing, close channel c
        }()
        return c
    }
  • 写回答

1条回答 默认 最新

  • drf97973 2016-04-06 23:19
    关注

    I'm not entirely sure if this addresses your problem as I'm not fully aware of your use case, but nevertheless, I've come up with a solution below.

    I've used Gin for the HTTP router because it was more comfortable to me, but I'm pretty sure you can adapt the code to fit yours. I did this in a hurry (sorry), so there may be problems I'm not aware of, but do let me know if there are any.

    In short:

    1. I created a Manager that takes care of several Client. It also contains a sync.Mutex to ensure only one thread is modifying the clients at any given time;
    2. There is an InitBackgroundTask() that will generate a random float64 number, and pass it to ALL clients in a Manager (if there are any). If there aren't any clients, we just sleep, and carry on...
    3. The index handler deals with adding, and removing clients. Clients are identified through a UUID;
    4. 3 things can happen now. Clients are automatically removed when they disconnect via the <-c.Writer.CloseNotify() channel (because the method returns thereby calling the defer). We can also receive the random float64 number in the next background task tick. Finally, we can also terminate if we have not received anything in 20s.

    I made several assumptions about your needs here (e.g. that the background task will return X every Y minutes). If you are looking for more fine grain streaming, I'd recommend using websockets instead (and the pattern below can still be used).

    Let me know if you have any questions.

    Code:

    package main
    
    import (
        "github.com/gin-gonic/gin"
        "github.com/satori/go.uuid"
        "log"
        "math/rand"
        "net/http"
        "sync"
        "time"
    )
    
    type Client struct {
        uuid string
        out  chan float64
    }
    
    type Manager struct {
        clients map[string]*Client
        mutex   sync.Mutex
    }
    
    func NewManager() *Manager {
        m := new(Manager)
        m.clients = make(map[string]*Client)
        return m
    }
    
    func (m *Manager) AddClient(c *Client) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        log.Printf("add client: %s
    ", c.uuid)
        m.clients[c.uuid] = c
    }
    
    func (m *Manager) DeleteClient(id string) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        // log.Println("delete client: %s", c.uuid)
        delete(m.clients, id)
    }
    
    func (m *Manager) InitBackgroundTask() {
        for {
            f64 := rand.Float64()
            log.Printf("active clients: %d
    ", len(m.clients))
            for _, c := range m.clients {
                c.out <- f64
            }
            log.Printf("sent output (%+v), sleeping for 10s...
    ", f64)
            time.Sleep(time.Second * 10)
        }
    }
    
    func main() {
        r := gin.Default()
        m := NewManager()
    
        go m.InitBackgroundTask()
    
        r.GET("/", func(c *gin.Context) {
            cl := new(Client)
            cl.uuid = uuid.NewV4().String()
            cl.out = make(chan float64)
    
            defer m.DeleteClient(cl.uuid)
            m.AddClient(cl)
    
            select {
            case <-c.Writer.CloseNotify():
                log.Printf("%s : disconnected
    ", cl.uuid)
            case out := <-cl.out:
                log.Printf("%s : received %+v
    ", out)
                c.JSON(http.StatusOK, gin.H{
                    "output": out,
                })
            case <-time.After(time.Second * 20):
                log.Println("timed out")
            }
        })
    
        r.Run()
    }
    

    Note: if you're testing this on Chrome, you might have to append a random parameter at the end of the URL so that the request will actually be made, e.g. ?rand=001, ?rand=002 and so on.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)