donglang7236 2018-02-06 21:01
浏览 43
已采纳

写给阅读器时分段上传到s3

I've found a few questions that are similar to mine, but nothing that answers my specific question.

I want to upload CSV data to s3. My basic code is along the lines of (I've simplified getting the data for brevity, normally it's reading from a database):

reader, writer := io.Pipe()

go func() {
    cWriter := csv.NewWriter(pWriter)

    for _, line := range lines {
        cWriter.Write(line)
    }

    cWriter.Flush()
    writer.Close()
}()

sess := session.New(//...)
uploader := s3manager.NewUploader(sess)
    result, err := uploader.Upload(&s3manager.UploadInput{
        Body:   reader,
        //...
})

The way I understand it, the code will wait for writing to finish and then will upload the contents to s3, so I end up with the full contents of the file in memory. Is it possible to chunk the upload (possibly using the s3 multipart upload?) so that for larger uploads, I'm only storing part of the data in memory at any one time?

  • 写回答

1条回答 默认 最新

  • doukuang6795 2018-02-07 00:02
    关注

    The uploader is supported multipart upload if I had read source code of the uploader in right way: https://github.com/aws/aws-sdk-go/blob/master/service/s3/s3manager/upload.go

    The minimum size of an uploaded part is 5 Mb.

    // MaxUploadParts is the maximum allowed number of parts in a multi-part upload
    // on Amazon S3.
    const MaxUploadParts = 10000
    // MinUploadPartSize is the minimum allowed part size when uploading a part to
    // Amazon S3.
    const MinUploadPartSize int64 = 1024 * 1024 * 5
    // DefaultUploadPartSize is the default part size to buffer chunks of a
    // payload into.
    const DefaultUploadPartSize = MinUploadPartSize
    
    u := &Uploader{
        PartSize:          DefaultUploadPartSize,
        MaxUploadParts:    MaxUploadParts,
        .......
    }
    
    func (u Uploader) UploadWithContext(ctx aws.Context, input *UploadInput, opts ...func(*Uploader)) (*UploadOutput, error) {
       i := uploader{in: input, cfg: u, ctx: ctx}
       .......
    
    func (u *uploader) nextReader() (io.ReadSeeker, int, error) {
        .............
        switch r := u.in.Body.(type) {
        .........
        default:
            part := make([]byte, u.cfg.PartSize)
            n, err := readFillBuf(r, part)
            u.readerPos += int64(n)
    
            return bytes.NewReader(part[0:n]), n, err
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥60 许可证msc licensing软件报错显示已有相同版本软件,但是下一步显示无法读取日志目录。
  • ¥15 Attention is all you need 的代码运行
  • ¥15 一个服务器已经有一个系统了如果用usb再装一个系统,原来的系统会被覆盖掉吗
  • ¥15 使用esm_msa1_t12_100M_UR50S蛋白质语言模型进行零样本预测时,终端显示出了sequence handled的进度条,但是并不出结果就自动终止回到命令提示行了是怎么回事:
  • ¥15 前置放大电路与功率放大电路相连放大倍数出现问题
  • ¥30 关于<main>标签页面跳转的问题
  • ¥80 部署运行web自动化项目
  • ¥15 腾讯云如何建立同一个项目中物模型之间的联系
  • ¥30 VMware 云桌面水印如何添加
  • ¥15 用ns3仿真出5G核心网网元