douweng3383 2018-08-21 20:00
浏览 18
已采纳

与多阅读器并发的POST不返回响应

I have a proof of concept http server using echo which takes a POST request with a JSON body. I am trying to stream the request body over to multiple POST requests using pipes and the multiwriter but it is not working correctly.

In the example below I can see the data is sent to the 2 POST endpoints and I can see a log from those requests but I never get a response back it seems the code hangs waiting for the http.Post(...) functions to complete.

If I call these 2 endpoints directly they work fine and give a valid json response, so i believe the problem is with this piece of code which is my handler for the route.

func ImportAggregate(c echo.Context) error {
    oneR, oneW := io.Pipe()
    twoR, twoW := io.Pipe()

    done := make(chan bool, 2)

    go func() {
        fmt.Println("Product Starting")
        response, err := http.Post("http://localhost:1323/products/import", "application/json", oneR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    go func() {
        fmt.Println("Import Starting")
        response, err := http.Post("http://localhost:1323/discounts/import", "application/json", twoR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    mw := io.MultiWriter(oneW, twoW)
    io.Copy(mw, c.Request().Body)

    <-done
    <-done

    return c.String(200, "Imported")
}

The output in console is:

Product Starting
Import Starting
  • 写回答

1条回答 默认 最新

  • dte29947 2018-08-22 06:39
    关注

    The issue in OP code is that the http.Post calls never detects the EOF of the provided io.Reader.

    That happens because the provided half write pipe is never closed, thus, the half read pipe never emits the regular EOF error.

    As a note about OP comment that closing the half read pipe would generate irregular errors, one has to understand that reading from a closed pipe is not a correct behavior.

    Thus in this situation, care should be taken to close the half write side right after the content has been copied.

    The resulting source code should be changed to

    func ImportAggregate(c echo.Context) error {
        oneR, oneW := io.Pipe()
        twoR, twoW := io.Pipe()
    
        done := make(chan bool, 2)
    
        go func() {
            fmt.Println("Product Starting")
            response, err := http.Post("http://localhost:1323/products/import", "application/json", oneR)
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(response.Body)
            }
            done <- true
        }()
    
        go func() {
            fmt.Println("Import Starting")
            response, err := http.Post("http://localhost:1323/discounts/import", "application/json", twoR)
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(response.Body)
            }
            done <- true
        }()
    
        mw := io.MultiWriter(oneW, twoW)
        io.Copy(mw, c.Request().Body)
        oneW.Close()
        twoW.Close()
    
        <-done
        <-done
    
        return c.String(200, "Imported")
    }
    

    Side notes beyond OP question:

    • an error check must implemented around the io.Copy in order to detect a transmission error.

    • it is not needed to close the half read side of the pipe, http.Post will do it after it received the EOF signal.

    • the goroutines responsible to consume the pipes must be declared and started before the input request is copied. The Pipes being synchronous, the code would block during the io.Copy waiting to be consumed on its other end.

    • the done chan does not require to be unbuffered (of length 2)

    • a way to forward error from outgoing requests to the outgoing response would be to use a channel of type (chan error), loop over it two times, and check for the first error encountered.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥30 关于#算法#的问题:运用EViews第九版本进行一系列计量经济学的时间数列数据回归分析预测问题 求各位帮我解答一下
  • ¥15 setInterval 页面闪烁,怎么解决
  • ¥15 如何让企业微信机器人实现消息汇总整合
  • ¥50 关于#ui#的问题:做yolov8的ui界面出现的问题
  • ¥15 如何用Python爬取各高校教师公开的教育和工作经历
  • ¥15 TLE9879QXA40 电机驱动
  • ¥20 对于工程问题的非线性数学模型进行线性化
  • ¥15 Mirare PLUS 进行密钥认证?(详解)
  • ¥15 物体双站RCS和其组成阵列后的双站RCS关系验证
  • ¥20 想用ollama做一个自己的AI数据库