I use the following code but don't know why it crash with error (WaitGroup is reused before previous Wait) at line:
for _, proxy := range proxies {
wgGroup.Wait()
I want to ensure that when calling proxySource.GetProxies()
, and proxyProvider.receivingProxyBC.In() <- proxy
then not allow remoteSources
to call proxyProvider.receivingProxyBC.In() <- proxy
Detail code here:
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
wgGroup.Add(1)
proxies, err := proxySource.GetProxies()
wgGroup.Done()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
wgGroup.Add(1)
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
wgGroup.Done()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
wgGroup.Wait()
proxyProvider.receivingProxyBC.In() <- proxy
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
UPDATE RWLOCK
Using these code I can lock localSources
but it seem not optimized; I need when any localSources
getting then lock all remoteSources
; when there is no localSources
getting, all remoteSources
are allowed to get. Currently, only one remoteSources
is allow to get at the same time.
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
localGroupRwLock.RLock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.RUnlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
localGroupRwLock.RLock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.RUnlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
LogInfo("Get proxy from source ", proxySource.Id)
localGroupRwLock.Lock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.Unlock()
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
wgGroup.Wait()
localGroupRwLock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.Unlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}