duanhuiqing9528 2017-06-26 16:39
浏览 408
已采纳

从S3同时下载多个文件并合并它们

Im trying to download multiple files from S3 concurrently ,and consolidate their contents into a bytes buffer.The files are csv formatted. My code seems to work most of time(8 from 10 tries).But there are instances that after i inspected the consolidated buffer, I've got less that what i should be getting(usually no more than 100 rows missing). Total number of records expected is 4802. If run my code sequentially this problem does not appear.But i need to use goroutines for the speed.This is a major requirement on what im trying to do.I have run the go data race inspector with no data races appear , and the error statements that i print never print out.

This is the code i use:

    var pingsBuffer = aws.NewWriteAtBuffer([]byte{}) 
        //range over the contents of the index file
    for _, file := range indexList {
        wg.Add(1)
        go download(key + string(file), pingsBuffer, &wg)
    }
    wg.Wait()

and the download functions (that also consolidates the downloaded files)

func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup)  {

defer wg.Done()

awsBuffer := aws.NewWriteAtBuffer([]byte{})

input := &s3.GetObjectInput {
    Bucket: aws.String(defaultLocationRootBucket),
    Key:    aws.String(key),
}

n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
    return
}


lenghts3:= int64(len(buffer.Bytes()))

n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3  with error : %v.", key, bufferError))
}
  • 写回答

1条回答 默认 最新

  • dpdfh60088 2017-06-26 17:18
    关注

    This code:

    lenghts3:= int64(len(buffer.Bytes()))
    

    Is a concurrency problem: two routines may get the length at the same time, getting the same start position, and both proceed to write to the buffer with the same start position, stepping on each other's toes.

    Since you're already retrieving whole objects in memory and not streaming to the combined buffer, you may as well just send the full contents of each file on a channel, and have a receiver on that channel append each result to a shared byte buffer as they come in, synchronously.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 给自己本科IT专业毕业的妹m找个实习工作
  • ¥15 用友U8:向一个无法连接的网络尝试了一个套接字操作,如何解决?
  • ¥30 我的代码按理说完成了模型的搭建、训练、验证测试等工作(标签-网络|关键词-变化检测)
  • ¥50 mac mini外接显示器 画质字体模糊
  • ¥15 TLS1.2协议通信解密
  • ¥40 图书信息管理系统程序编写
  • ¥20 Qcustomplot缩小曲线形状问题
  • ¥15 企业资源规划ERP沙盘模拟
  • ¥15 树莓派控制机械臂传输命令报错,显示摄像头不存在
  • ¥15 前端echarts坐标轴问题