I'm working on a search engine project currently. For faster crawl speed I use one goroutine per a link visit. But I encountered two problems that made me wonder!
First one is a code sample:
package main
import "fmt"
import "sync"
import "time"
type test struct {
running int
max int
mu sync.Mutex
}
func main() {
t := &test{max: 1000}
t.start()
}
func (t *test) start() {
for {
if t.running >= t.max {
time.Sleep(200 * time.Millisecond)
continue
}
go t.visit()
}
}
func (t *test) visit() {
t.inc()
defer t.dec()
fmt.Println("visit called")
fmt.Printf("running: %d, max: %d
", t.running, t.max)
fmt.Println()
time.Sleep(time.Second)
}
func (t *test) inc() {
t.mu.Lock()
t.running++
t.mu.Unlock()
}
func (t *test) dec() {
t.mu.Lock()
t.running--
t.mu.Unlock()
}
Output (cropped):
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
While I'm explicitly checking for maximum allowed goroutines in the loop, Why running goroutines exceeds the maximum?
Second one is a part of real project code:
UPDATE: This is actually fixed, the problem was in LinkProvider.Get()
implementation that took too long to return. parser.visit()
returns in the mean time, but the loop in Parser.Start()
is waiting for a new link... and the output seems sequential!
package worker
import (
"errors"
"fmt"
"sync"
"time"
"bitbucket.org/codictive/ise/components/crawler/models"
"bitbucket.org/codictive/ise/components/log/logger"
"bitbucket.org/codictive/ise/core/component"
"bitbucket.org/codictive/ise/core/database"
)
// Worker is a service that processes crawlable links.
type Worker interface {
Start() error
Stop() error
Restart() error
Status() Status
}
// Status contains runtime status of a worker.
type Status struct {
Running bool
RunningParsersCount int
}
// New return a new defaultWorker with given config.
func New() Worker {
return &defaultWorker{
flow: make(chan bool),
stop: make(chan bool),
}
}
// defaultWorker is a Worker implementation.
type defaultWorker struct {
linkProvider LinkProvider
handlersLimit int
runningHandlersCount int
running bool
mu sync.Mutex
flow chan bool
stop chan bool
}
func (w *defaultWorker) init() {
prate, _ := component.IntConfig("crawler.crawlInterval")
arate, _ := component.IntConfig("crawler.ad_crawlInterval")
concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit")
w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour)
w.handlersLimit = concLimit
}
// Start runs worker.
func (w *defaultWorker) Start() error {
logger.Info("Starting crawler worker...")
w.running = true
w.init()
defer func() {
w.running = false
logger.Info("Worker stopped.")
}()
for {
select {
case <-w.stop:
w.flow <- true
return nil
default:
fmt.Printf("running: %d limit: %d
", w.runningHandlersCount, w.handlersLimit)
if w.runningHandlersCount >= w.handlersLimit {
time.Sleep(200 * time.Millisecond)
continue
}
link := w.linkProvider.Get()
if link.ID == 0 {
logger.Debug("no link to crawl")
time.Sleep(time.Minute)
continue
}
go func(l *models.CrawlLink) {
go w.visit(l)
}(link)
}
}
}
// Stop stops worker.
func (w *defaultWorker) Stop() error {
logger.Info("Stopping crawler worker...")
w.stop <- true
select {
case <-w.flow:
return nil
case <-time.After(2 * time.Minute):
return errors.New("worker did not stopped properly")
}
}
// Restart re-starts worker.
func (w *defaultWorker) Restart() error {
logger.Info("Re-starting crawler worker...")
w.stop <- true
select {
case <-w.flow:
return w.Start()
case <-time.After(2 * time.Minute):
return errors.New("can not restart worker")
}
}
// Status reports current worker status.
func (w *defaultWorker) Status() Status {
return Status{
Running: w.running,
RunningParsersCount: w.runningHandlersCount,
}
}
func (w *defaultWorker) visit(cl *models.CrawlLink) {
w.incrementRunningWorkers()
defer w.decrementRunningWorkers()
if cl == nil {
logger.Warning("[crawler.worker.visit] Can not visit a nil link.")
return
}
if err := cl.LoadFull(); err != nil {
logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err)
return
}
parser := NewParser(cl)
if parser == nil {
logger.Error("[crawler.worker.visit] Parser instantiation failed.")
return
}
before := time.Now()
if err := parser.Parse(); err != nil {
cl.Error = err.Error()
logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err)
db := database.Open()
if err := db.Save(&cl).Error; err != nil {
logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err)
}
}
logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before))
fmt.Printf("[crawler.worker.visit] Parsing %q took %s.
", cl.URL, time.Since(before))
}
func (w *defaultWorker) incrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount++
w.mu.Unlock()
fmt.Printf("increment called. current: %d
", w.runningHandlersCount)
}
func (w *defaultWorker) decrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount--
w.mu.Unlock()
fmt.Printf("decrement called. current: %d
", w.runningHandlersCount)
}
Output:
2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
running: 0 limit: 1000
Running server on :8080
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms.
decrement called. current: 0
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
As you can see the visit
method runs completely sequential! Whether I call it with just go visit(link)
or the one used in above code.
Why this happens? What is stopping the loop from iterating?