doudun5009 2018-07-02 01:50
浏览 52
已采纳

带有Goroutines的Golang AWS S3manager多功能阅读器

I'm creating an endpoint that allows a user to upload several files at the same time and store them in S3. Currently I'm able to achieve this using MultipartReader and s3manager but only in a non-synchronous fashion.

I'm trying to implement Go routines to speed this functionality up and have multiple files uploaded to S3 concurrently, but a data race error is causing trouble. I think *s3manager might not be goroutine safe as the docs say it is. (Code works synchronously if go-statement is replaced with function code).

Could implementing mutex locks possibly fix my error?

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        c := make(chan string)
        // grab the request.MultipartReader
        reader, err := r.MultipartReader()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // copy each part to destination.
        for {
            part, err := reader.NextPart()
            if err == io.EOF {
                break
            }
            // if part.FileName() is empty, skip this iteration.
            if part.FileName() == "" {
                continue
            }
            counter++
            go S3Upload(c, part)
        }
        for i := 0; i < counter; i++ {
          fmt.Println(<-c)
         }
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}  

func S3Upload(c chan string, part *multipart.Part) {
    bucket := os.Getenv("BUCKET")
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String(os.Getenv("REGION"))},
    )
    if err != nil {
        c <- "error occured creating session"
        return
    }
    uploader := s3manager.NewUploader(sess)
    _, err = uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(part.FileName()),
        Body:   part,
    })
    if err != nil {
        c <- "Error occurred attempting to upload to S3"
        return
    }
    // successful upload
    c <- "successful upload"
}
  • 写回答

1条回答 默认 最新

  • dongzhao3040 2018-07-02 03:36
    关注

    ^ see all the comments above,

    here is some modified code example, channels not useful here.

    package main
    
    import (
        "bytes"
        "io"
        "log"
        "net/http"
        "os"
        "strings"
        "sync"
    
        "github.com/aws/aws-sdk-go/aws"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/s3/s3manager"
    )
    
    var (
        setupUploaderOnce sync.Once
        uploader          *s3manager.Uploader
        bucket            string
        region            string
    )
    // ensure sessions and uploader are setup only once using a Singleton pattern
    func setupUploader() {
        setupUploaderOnce.Do(func() {
            bucket = os.Getenv("BUCKET")
            region = os.Getenv("REGION")
            sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
            if err != nil {
                log.Fatal(err)
            }
            uploader := s3manager.NewUploader(sess)
        })
    }
    
    // normally singleton stuff is packaged out and called before starting the server, but to keep the example a single file, load it up here
    func init() {
        setupUploader()
    }
    
    func uploadHandler(w http.ResponseWriter, r *http.Request) {
        counter := 0
        switch r.Method {
        // GET to display the upload form.
        case "GET":
            err := templates.Execute(w, nil)
            if err != nil {
                log.Print(err)
            }
            // POST uploads each file and sends them to S3
        case "POST":
            var buf bytes.Buffer
            // "file" is defined by the form field, change it to whatever your form sets it too
            file, header, err := r.FormFile("file")
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
            // close the file
            defer file.Close()
            fileName := strings.Split(header.Filename, ".")
            // load the entire file data to the buffer
            _, err = io.Copy(&buf, file)
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
    
            // copy each part to destination.
            go S3Upload(buf, fileName[0])
            // displaying a success message.
            err = templates.Execute(w, "Upload successful.")
            if err != nil {
                log.Print(err)
            }
        default:
            w.WriteHeader(http.StatusMethodNotAllowed)
        }
    }
    
    // keeping this simple, do something with the err, like log
    // if the uploader fails in the goroutine, there is potential
    // for false positive uploads... channels are not really good here
    // either, for that, bubble the error up,
    // and don't spin up a goroutine.. same thing as waiting for the channel to return.
    func S3Upload(body bytes.Buffer, fileName string) {
        _, err := uploader.Upload(&s3manager.UploadInput{
            Bucket: aws.String(bucket),
            Key:    aws.String(fileName),
            Body:   bytes.NewReader(body.Bytes()),
        })
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)