duaner5714 2015-09-07 11:43
浏览 30
已采纳

如何在Go中执行并发下载

We have a process whereby users request files that we need to get from our source. This source isn't the most reliable so we implemented a queue using Amazon SQS. We put the download URL into the queue and then we poll it with a small app that we wrote in Go. This app simply retrieves the messages, downloads the file and then pushes it to S3 where we store it. Once all of this is complete it calls back a service which will email the user to let them know that the file is ready.

Originally I wrote this to create n channels and then attached 1 go-routine to each and had the go-routine in an infinite loop. This way I could ensure that I was only ever processing a fixed number of downloads at a time.

I realised that this isn't the way that channels are supposed to be used and, if I'm understanding correctly now, there should actually be one channel with n go-routines receiving on that channel. Each go-routine is in an infinite loop, waiting on a message and when it receives it will process the data, do everything that it's supposed to and when it's done it will wait on the next message. This allows me to ensure that I'm only ever processing n files at a time. I think this is the right way to do it. I believe this is fan-out, right?

What I don't need to do, is to merge these processes back together. Once the download is done it is calling back a remote service so that handles the remainder of the process. There is nothing else that the app needs to do.

OK, so some code:

func main() {
    queue, err := ConnectToQueue() // This works fine...
    if err != nil {
        log.Fatalf("Could not connect to queue: %s
", err)
    }

    msgChannel := make(chan sqs.Message, 10)

    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
        go processMessage(msgChannel, queue)
    }

    for {
        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)

        for _, m := range response.Messages {
            msgChannel <- m
        }
    }
}

func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
    for {
        m := <-ch
        // Do something with message m

        // Delete message from queue when we're done
        queue.DeleteMessage(&m)
    }
}

Am I anywhere close here? I have n running go-routines (where MAX_CONCURRENT_ROUTINES = n) and in the loop we will keep passing messages in to the single channel. Is this the right way to do it? Do I need to close anything or can I just leave this running indefinitely?

One thing that I'm noticing is that SQS is returning messages but once I've had 10 messages passed into processMessage() (10 being the size of the channel buffer) that no further messages are actually processed.

Thanks all

  • 写回答

1条回答 默认 最新

  • douyan9417 2015-09-07 12:12
    关注

    That looks fine. A few notes:

    1. You can limit the work parallelism by means other than limiting the number of worker routines you spawn. For example you can create a goroutine for every message received, and then have the spawned goroutine wait for a semaphore that limits the parallelism. Of course there are tradeoffs, but you aren't limited to just the way you've described.

      sem := make(chan struct{}, n)
      work := func(m sqs.Message) {
          sem <- struct{}{} // When there's room we can proceed
          // do the work
          <-sem // Free room in the channel
      }()
      for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
          for _, m0 := range m {
              go work(m0)
          }
      }
      
    2. The limit of only 10 messages being processed is being caused elsewhere in your stack. Possibly you're seeing a race where the first 10 fill the channel, and then the work isn't completing, or perhaps you're accidentally returning from the worker routines. If your workers are persistent per the model you've described, you'll want to be certain that they don't return.

    3. It's not clear if you want the process to return after you've processed some number of messages. If you do want this process to exit, you'll need to wait for all the workers to finish their current tasks, and probably signal them to return afterwards. Take a look at sync.WaitGroup for synchronizing their completion, and having another channel to signal that there's no more work, or close msgChannel, and handle that in your workers. (Take a look at the 2-tuple return channel receive expression.)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥100 求三轴之间相互配合画圆以及直线的算法
  • ¥100 c语言,请帮蒟蒻写一个题的范例作参考
  • ¥15 名为“Product”的列已属于此 DataTable
  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 自己瞎改改,结果现在又运行不了了
  • ¥15 链式存储应该如何解决
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站