duanfu1945 2017-11-21 19:32
浏览 21

将工作分摊到一部分,但限制工人数量

I'm trying to improve the performance of an app. One part of its code uploads a file to a server in chunks.

The original version simply does this in a sequential loop. However, it's slow and during the sequence it also needs to talk to another server before uploading each chunk.

The upload of chunks could simply be placed in a goroutine. It works, but is not a good solution because if the source file is extremely large it ends up using a large amount of memory.

So, I try to limit the number of active goroutines by using a buffered channel. Here is some code that shows my attempt. I've stripped it down to show the concept and you can run it to test for yourself.

package main

import (
    "fmt"
    "io"
    "os"
    "time"
)

const defaultChunkSize = 1 * 1024 * 1024

// Lets have 4 workers
var c = make(chan int, 4)

func UploadFile(f *os.File) error {
    fi, err := f.Stat()
    if err != nil {
        return fmt.Errorf("err: %s", err)
    }
    size := fi.Size()

    total := (int)(size/defaultChunkSize + 1)
    // Upload parts
    buf := make([]byte, defaultChunkSize)
    for partno := 1; partno <= total; partno++ {
        readChunk := func(offset int, buf []byte) (int, error) {
            fmt.Println("readChunk", partno, offset)
            n, err := f.ReadAt(buf, int64(offset))
            if err != nil {
                return n, err
            }

            return n, nil
        }

        // This will block if there are not enough worker slots available
        c <- partno

        // The actual worker.
        go func() {
            offset := (partno - 1) * defaultChunkSize
            n, err := readChunk(offset, buf)
            if err != nil && err != io.EOF {
                return
            }

            err = uploadPart(partno, buf[:n])
            if err != nil {
                fmt.Println("Uploadpart failed:", err)
            }
            <-c
        }()
    }

    return nil
}

func uploadPart(partno int, buf []byte) error {
    fmt.Printf("Uploading partno: %d, buflen=%d
", partno, len(buf))
    // Actually upload the part.  Lets test it by instead writing each
    // buffer to another file.  We can then use diff to compare the 
    // source and dest files.

    // Open file.  Seek to (partno - 1) * defaultChunkSize, write buffer
    f, err := os.OpenFile("/home/matthewh/Downloads/out.tar.gz", os.O_CREATE|os.O_WRONLY, 0755)
    if err != nil {
        fmt.Printf("err: %s
", err)
    }

    n, err := f.WriteAt(buf, int64((partno-1)*defaultChunkSize))
    if err != nil {
        fmt.Printf("err=%s
", err)
    }
    fmt.Printf("%d bytes written
", n)
    defer f.Close()
    return nil
}

func main() {
    filename := "/home/matthewh/Downloads/largefile.tar.gz"
    fmt.Printf("Opening file: %s
", filename)

    f, err := os.Open(filename)
    if err != nil {
        panic(err)
    }

    UploadFile(f)
}

It almost works. But there are several problems. 1) The final partno 22 is occuring 3 times. The correct length is actually 612545 as the file length isn't a multiple of 1MB.

// Sample output
...
readChunk 21 20971520
readChunk 22 22020096
Uploading partno: 22, buflen=1048576
Uploading partno: 22, buflen=612545
Uploading partno: 22, buflen=1048576

Another problem, the upload could fail and I am not familiar enough with go and how best to solve failure of the goroutine.

Finally, I want to ordinarily return some data from the uploadPart when it succeeds. Specifically, it'll be a string (an HTTP ETag header value). These etag values need to be collected by the main function.

What is a better way to structure this code in this instance? I've not yet found a good golang design pattern that correctly fulfills my needs here.

  • 写回答

3条回答 默认 最新

  • dou91855 2017-11-21 20:16
    关注

    Skipping for the moment the question of how better to structure this code, I see a bug in your code which may be causing the problem you're seeing. Since the function you're running in the goroutine uses the variable partno, which changes with each iteration of the loop, your goroutine isn't necessarily seeing the value of partno at the time you invoked the goroutine. A common way of fixing this is to create a local copy of that variable inside the loop:

    for partno := 1; partno <= total; partno++ {
        partno := partno
        // ...
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 有了解d3和topogram.js库的吗?有偿请教
  • ¥100 任意维数的K均值聚类
  • ¥15 stamps做sbas-insar,时序沉降图怎么画
  • ¥15 unity第一人称射击小游戏,有demo,在原脚本的基础上进行修改以达到要求
  • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
  • ¥15 关于#Java#的问题,如何解决?
  • ¥15 加热介质是液体,换热器壳侧导热系数和总的导热系数怎么算
  • ¥100 嵌入式系统基于PIC16F882和热敏电阻的数字温度计
  • ¥15 cmd cl 0x000007b
  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line