doudun5009 2018-07-02 01:50
浏览 52
已采纳

带有Goroutines的Golang AWS S3manager多功能阅读器

I'm creating an endpoint that allows a user to upload several files at the same time and store them in S3. Currently I'm able to achieve this using MultipartReader and s3manager but only in a non-synchronous fashion.

I'm trying to implement Go routines to speed this functionality up and have multiple files uploaded to S3 concurrently, but a data race error is causing trouble. I think *s3manager might not be goroutine safe as the docs say it is. (Code works synchronously if go-statement is replaced with function code).

Could implementing mutex locks possibly fix my error?

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        c := make(chan string)
        // grab the request.MultipartReader
        reader, err := r.MultipartReader()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // copy each part to destination.
        for {
            part, err := reader.NextPart()
            if err == io.EOF {
                break
            }
            // if part.FileName() is empty, skip this iteration.
            if part.FileName() == "" {
                continue
            }
            counter++
            go S3Upload(c, part)
        }
        for i := 0; i < counter; i++ {
          fmt.Println(<-c)
         }
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}  

func S3Upload(c chan string, part *multipart.Part) {
    bucket := os.Getenv("BUCKET")
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String(os.Getenv("REGION"))},
    )
    if err != nil {
        c <- "error occured creating session"
        return
    }
    uploader := s3manager.NewUploader(sess)
    _, err = uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(part.FileName()),
        Body:   part,
    })
    if err != nil {
        c <- "Error occurred attempting to upload to S3"
        return
    }
    // successful upload
    c <- "successful upload"
}
  • 写回答

1条回答 默认 最新

  • dongzhao3040 2018-07-02 03:36
    关注

    ^ see all the comments above,

    here is some modified code example, channels not useful here.

    package main
    
    import (
        "bytes"
        "io"
        "log"
        "net/http"
        "os"
        "strings"
        "sync"
    
        "github.com/aws/aws-sdk-go/aws"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/s3/s3manager"
    )
    
    var (
        setupUploaderOnce sync.Once
        uploader          *s3manager.Uploader
        bucket            string
        region            string
    )
    // ensure sessions and uploader are setup only once using a Singleton pattern
    func setupUploader() {
        setupUploaderOnce.Do(func() {
            bucket = os.Getenv("BUCKET")
            region = os.Getenv("REGION")
            sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
            if err != nil {
                log.Fatal(err)
            }
            uploader := s3manager.NewUploader(sess)
        })
    }
    
    // normally singleton stuff is packaged out and called before starting the server, but to keep the example a single file, load it up here
    func init() {
        setupUploader()
    }
    
    func uploadHandler(w http.ResponseWriter, r *http.Request) {
        counter := 0
        switch r.Method {
        // GET to display the upload form.
        case "GET":
            err := templates.Execute(w, nil)
            if err != nil {
                log.Print(err)
            }
            // POST uploads each file and sends them to S3
        case "POST":
            var buf bytes.Buffer
            // "file" is defined by the form field, change it to whatever your form sets it too
            file, header, err := r.FormFile("file")
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
            // close the file
            defer file.Close()
            fileName := strings.Split(header.Filename, ".")
            // load the entire file data to the buffer
            _, err = io.Copy(&buf, file)
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
    
            // copy each part to destination.
            go S3Upload(buf, fileName[0])
            // displaying a success message.
            err = templates.Execute(w, "Upload successful.")
            if err != nil {
                log.Print(err)
            }
        default:
            w.WriteHeader(http.StatusMethodNotAllowed)
        }
    }
    
    // keeping this simple, do something with the err, like log
    // if the uploader fails in the goroutine, there is potential
    // for false positive uploads... channels are not really good here
    // either, for that, bubble the error up,
    // and don't spin up a goroutine.. same thing as waiting for the channel to return.
    func S3Upload(body bytes.Buffer, fileName string) {
        _, err := uploader.Upload(&s3manager.UploadInput{
            Bucket: aws.String(bucket),
            Key:    aws.String(fileName),
            Body:   bytes.NewReader(body.Bytes()),
        })
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 乌班图ip地址配置及远程SSH
  • ¥15 怎么让点阵屏显示静态爱心,用keiluVision5写出让点阵屏显示静态爱心的代码,越快越好
  • ¥15 PSPICE制作一个加法器
  • ¥15 javaweb项目无法正常跳转
  • ¥15 VMBox虚拟机无法访问
  • ¥15 skd显示找不到头文件
  • ¥15 机器视觉中图片中长度与真实长度的关系
  • ¥15 fastreport table 怎么只让每页的最下面和最顶部有横线
  • ¥15 java 的protected权限 ,问题在注释里
  • ¥15 这个是哪里有问题啊?