dtczp02204 2017-07-24 17:57
浏览 24
已采纳

前往:将许多慢速API查询引导到单个SQL事务中

I wonder what would be idiomatic way to do as following. I have N slow API queries, and one database connection, I want to have a buffered channel, where responses will come, and one database transaction which I will use to write data. I could only come up with semaphore thing as following makeup example:

    func myFunc(){
      //10 concurrent API calls
      sem := make(chan bool, 10) 
     //A concurrent safe map as buffer
      var myMap  MyConcurrentMap 

      for i:=0;i<N;i++{
        sem<-true
        go func(i int){
          defer func(){<-sem}()
          resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
          myMap.Put(resp)
        }(i)
      }

      for j=0;j<cap(sem);j++{
        sem<-true
      }
      tx,_ := db.Begin()    
      for data:=range myMap{
       tx.Exec("Insert data into database")
      }
      tx.Commit()
}

I am nearly sure there is simpler, cleaner and more proper solution, but it is seems complicated to grasp for me.

EDIT: Well, I come with following solution, this way I do not need the buffer map, so once data comes to resp channel the data is printed or can be used to insert into a database, it works, I am still not sure if everything OK, at last there are no race.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

//Gloab waitGroup
var wg sync.WaitGroup

func init() {
    //just for fun sake, make rand seeded
    rand.Seed(time.Now().UnixNano())
}

//Emulate a slow API call
func verySlowAPI(id int) int {
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    return n
}

func main() {
    //Amount of tasks
    N := 100

    //Concurrency level
    concur := 10

    //Channel for tasks
    tasks := make(chan int, N)

    //Channel for responses
    resp := make(chan int, 10)

    //10 concurrent groutinezs
    wg.Add(concur) 
    for i := 1; i <= concur; i++ {
        go worker(tasks, resp)
    }

    //Add tasks
    for i := 0; i < N; i++ {
        tasks <- i
    }

    //Collect data from goroutiens
    for i := 0; i < N; i++ {
        fmt.Printf("%d
", <-resp)
    }

    //close the tasks channel
    close(tasks)

    //wait till finish
    wg.Wait()

}

func worker(task chan int, resp chan<- int) {
    defer wg.Done()
    for {
        task, ok := <-task
        if !ok {
            return
        }
        n := verySlowAPI(task)
        resp <- n
    }
}
  • 写回答

2条回答 默认 最新

  • doubei8541 2017-07-24 20:44
    关注

    Maybe this will work for you. Now you can get rid of your concurrent map. Here is a code snippet:

    func myFunc() {
        //10 concurrent API calls
        sem := make(chan bool, 10)
        respCh := make(chan YOUR_RESP_TYPE, 10)
        var responses []YOUR_RESP_TYPE
    
        for i := 0; i < N; i++ {
            sem <- true
            go func(i int) {
                defer func() {
                    <-sem
                }()
                resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
                respCh <- resp
            }(i)
        }
    
        respCollected := make(chan struct{})
        go func() {
            for i := 0; i < N; i++ {
                responses = append(responses, <-respCh)
            }
            close(respCollected)
        }()
    
        <-respCollected
        tx,_ := db.Begin()
        for _, data := range responses {
            tx.Exec("Insert data into database")
        }
        tx.Commit()
    }
    

    Than we need to use one more goroutine that will collect all responses in some slice or map from a response channel.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 关于#Java#的问题,如何解决?
  • ¥15 加热介质是液体,换热器壳侧导热系数和总的导热系数怎么算
  • ¥15 想问一下树莓派接上显示屏后出现如图所示画面,是什么问题导致的
  • ¥100 嵌入式系统基于PIC16F882和热敏电阻的数字温度计
  • ¥15 cmd cl 0x000007b
  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line
  • ¥500 火焰左右视图、视差(基于双目相机)
  • ¥100 set_link_state
  • ¥15 虚幻5 UE美术毛发渲染
  • ¥15 CVRP 图论 物流运输优化