dru5089 2018-01-20 01:40 采纳率: 0%
浏览 105
已采纳

在先前的等待未知原因之前重用WaitGroup

I use the following code but don't know why it crash with error (WaitGroup is reused before previous Wait) at line:

for _, proxy := range proxies {
                    wgGroup.Wait()

I want to ensure that when calling proxySource.GetProxies(), and proxyProvider.receivingProxyBC.In() <- proxy then not allow remoteSources to call proxyProvider.receivingProxyBC.In() <- proxy

Detail code here:

    wgGroup := sync.WaitGroup{}
    wgGroup.Add(len(localSources))
    for _, proxySource := range localSources {
        go func(proxySource *ProxySource) {
            lastTimeGet := time.Now()
            firstTimeLoad := true
            wgGroup.Done()
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                wgGroup.Add(1)
                proxies, err := proxySource.GetProxies()
                wgGroup.Done()
                LogInfo("Get proxy from source ", proxySource.Id)
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                wgGroup.Add(1)
                for _, proxy := range proxies {
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                wgGroup.Done()
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }
    for _, proxySource := range remoteSources {
        go func(proxySource *ProxySource) {
            time.Sleep(2 * time.Second)
            lastTimeGet := time.Now()
            firstTimeLoad := true
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                proxies, err := proxySource.GetProxies()
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                for _, proxy := range proxies {
                    wgGroup.Wait()
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }

UPDATE RWLOCK

Using these code I can lock localSources but it seem not optimized; I need when any localSources getting then lock all remoteSources; when there is no localSources getting, all remoteSources are allowed to get. Currently, only one remoteSources is allow to get at the same time.

wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
  go func(proxySource *ProxySource) {
    lastTimeGet := time.Now()
    firstTimeLoad := true
    wgGroup.Done()
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      localGroupRwLock.RLock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.RUnlock()
      LogInfo("Get proxy from source ", proxySource.Id)
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      localGroupRwLock.RLock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.RUnlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}
for _, proxySource := range remoteSources {
  go func(proxySource *ProxySource) {
    time.Sleep(2 * time.Second)
    lastTimeGet := time.Now()
    firstTimeLoad := true
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      LogInfo("Get proxy from source ", proxySource.Id)
      localGroupRwLock.Lock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.Unlock()
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      wgGroup.Wait()
      localGroupRwLock.Lock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.Unlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}
  • 写回答

1条回答 默认 最新

  • dongyan8896 2018-01-20 03:05
    关注

    From document:

    A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

    and for Wait() :

    Wait blocks until the WaitGroup counter is zero.

    You can also see examples there. The thing is, WaitGroup is used for blocing until the counter gets to zero. So in orignal code, assume no runtime error, every goroutine in the second for loop will block until goroutines in the first is done. And in the first part, the Add(1) and Done() would not block at all. Data race would remain.

    The the error is documented in Add() methods: Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.

    Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.

    However, You are not waiting for independent sets of groups either.

    The tool fitting for your code is sync.Mutex. Document, again:

    A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.

    A Mutex must not be copied after first use.

    type Mutex struct { // contains filtered or unexported fields }

    func (*Mutex) Lock

    func (m *Mutex) Lock()

    Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock

    func (m *Mutex) Unlock()

    Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.

    So as you describe, you want "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy when proxySource.GetProxies() or for _, proxy := range proxies is called". Pause is better described by the term block, and that is a textbook mutex lock problem: Guard all three "calls" (as for loop is not a call) with locks and it is done.

    It might be a little tricky on how to guard a for loop with mutex, it should look like this:

    lock.Lock
    for ... {
        lock.Unlock()
        ...
        lock.Lock()
    }
    

    So I changed you code and hopefully it should work as expected:

    lock := sync.Mutex{}
    lock.Lock()
    for _, proxySource := range localSources {
        lock.Unlock()
        go func(proxySource *ProxySource) {
            lock.Lock()
            lastTimeGet := time.Now()
            firstTimeLoad := true
            lock.Unlock()
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                lock.Lock()
                proxies, err := proxySource.GetProxies()
                lock.Unlock()
                LogInfo("Get proxy from source ", proxySource.Id)
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                lock.Lock()
                for _, proxy := range proxies {
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                lock.Unlock()
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
        lock.Lock()
    }
    for _, proxySource := range remoteSources {
        go func(proxySource *ProxySource) {
            time.Sleep(2 * time.Second)
            lastTimeGet := time.Now()
            firstTimeLoad := true
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                proxies, err := proxySource.GetProxies()
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                for _, proxy := range proxies {
                    lock.Lock()
                    proxyProvider.receivingProxyBC.In() <- proxy
                    lock.Unlock()
                }
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }
    

    Note 1: You may be tempted to use defer. Don't. defer is for function, not blocks.

    Note 2: When using mutex in golang, it often raise a question of designing. One shall always look if it is better to use channel and refactor the code, though in many cases mutex is not a bad idea. But here I can read nothing about the design so I will just let it go.

    Note 3: The code auctually has a problem of pausing proxySource.GetProxies() and the for loop when calling proxyProvider.receivingProxyBC.In() <- proxy. Whether this is desired or not depends. If it is not desired, you should look at sync.RWMutex, and change it according to it. I will leave it to you.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog
  • ¥15 Excel发现不可读取的内容
  • ¥15 关于#stm32#的问题:CANOpen的PDO同步传输问题
  • ¥20 yolov5自定义Prune报错,如何解决?