I'm trying to improve the performance of an app. One part of its code uploads a file to a server in chunks.
The original version simply does this in a sequential loop. However, it's slow and during the sequence it also needs to talk to another server before uploading each chunk.
The upload of chunks could simply be placed in a goroutine. It works, but is not a good solution because if the source file is extremely large it ends up using a large amount of memory.
So, I try to limit the number of active goroutines by using a buffered channel. Here is some code that shows my attempt. I've stripped it down to show the concept and you can run it to test for yourself.
package main
import (
"fmt"
"io"
"os"
"time"
)
const defaultChunkSize = 1 * 1024 * 1024
// Lets have 4 workers
var c = make(chan int, 4)
func UploadFile(f *os.File) error {
fi, err := f.Stat()
if err != nil {
return fmt.Errorf("err: %s", err)
}
size := fi.Size()
total := (int)(size/defaultChunkSize + 1)
// Upload parts
buf := make([]byte, defaultChunkSize)
for partno := 1; partno <= total; partno++ {
readChunk := func(offset int, buf []byte) (int, error) {
fmt.Println("readChunk", partno, offset)
n, err := f.ReadAt(buf, int64(offset))
if err != nil {
return n, err
}
return n, nil
}
// This will block if there are not enough worker slots available
c <- partno
// The actual worker.
go func() {
offset := (partno - 1) * defaultChunkSize
n, err := readChunk(offset, buf)
if err != nil && err != io.EOF {
return
}
err = uploadPart(partno, buf[:n])
if err != nil {
fmt.Println("Uploadpart failed:", err)
}
<-c
}()
}
return nil
}
func uploadPart(partno int, buf []byte) error {
fmt.Printf("Uploading partno: %d, buflen=%d
", partno, len(buf))
// Actually upload the part. Lets test it by instead writing each
// buffer to another file. We can then use diff to compare the
// source and dest files.
// Open file. Seek to (partno - 1) * defaultChunkSize, write buffer
f, err := os.OpenFile("/home/matthewh/Downloads/out.tar.gz", os.O_CREATE|os.O_WRONLY, 0755)
if err != nil {
fmt.Printf("err: %s
", err)
}
n, err := f.WriteAt(buf, int64((partno-1)*defaultChunkSize))
if err != nil {
fmt.Printf("err=%s
", err)
}
fmt.Printf("%d bytes written
", n)
defer f.Close()
return nil
}
func main() {
filename := "/home/matthewh/Downloads/largefile.tar.gz"
fmt.Printf("Opening file: %s
", filename)
f, err := os.Open(filename)
if err != nil {
panic(err)
}
UploadFile(f)
}
It almost works. But there are several problems. 1) The final partno 22 is occuring 3 times. The correct length is actually 612545 as the file length isn't a multiple of 1MB.
// Sample output
...
readChunk 21 20971520
readChunk 22 22020096
Uploading partno: 22, buflen=1048576
Uploading partno: 22, buflen=612545
Uploading partno: 22, buflen=1048576
Another problem, the upload could fail and I am not familiar enough with go and how best to solve failure of the goroutine.
Finally, I want to ordinarily return some data from the uploadPart when it succeeds. Specifically, it'll be a string (an HTTP ETag header value). These etag values need to be collected by the main function.
What is a better way to structure this code in this instance? I've not yet found a good golang design pattern that correctly fulfills my needs here.