I'm trying to learn how to use channels to make a queue in Go for one of my other projects. My other project basically queues up database rows, and then does number crunching on the database using the details in the rows.
I don't want the same row to be processing in a worker at the same time, so it needs to check whether a worker is currently working on that specific row ID, and if so, wait for it to finish. If it's not the same row ID, it can run asynchronously, but I also want to limit the number of asynchronous workers that can run at the same time. In my code below, I'm trying to limit it to three workers at the moment.
Here's what I have:
package main
import (
"log"
"strconv"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
id int
}
// WorkerCount holds how many workers are currently running
var WorkerCount int
// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool
// Process the RowInfo
func worker(row RowInfo) {
rowID := strconv.Itoa(row.id)
WorkerCount++
WorkerLocked[rowID] = true
time.Sleep(1 * time.Second)
log.Printf("ID rcvd: %d", row.id)
WorkerLocked[rowID] = false
WorkerCount--
}
// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
rowID := strconv.Itoa(row.id)
for WorkerLocked[rowID] == true {
time.Sleep(1 * time.Second)
}
go worker(row)
}
func main() {
jobsQueue := make(chan RowInfo, 10)
WorkerLocked = make(map[string]bool)
// Dispatcher waits for jobs on the channel and dispatches to waiter
go func() {
// Wait for a job
for {
// Only have a max of 3 workers running asynch at a time
for WorkerCount > 3 {
time.Sleep(1 * time.Second)
}
job := <-jobsQueue
go waiter(job)
}
}()
// Test the queue, send some data
for i := 0; i < 12; i++ {
r := RowInfo{
id: i,
}
jobsQueue <- r
}
// Prevent exit!
for {
time.Sleep(1 * time.Second)
}
}
And I'm getting this error, but it's an intermittent issue because sometimes when I run it it appears to work. Is there a race condition?:
go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]
goroutine 37 [running]:
main.worker(0x5)
/home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
/home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8
goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
/home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
/home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0
goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 12 [runnable]:
runtime.goexit1()
/usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
/home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c
goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 49 [runnable]:
main.worker(0x6)
/home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2
Anyway, I am still learning, so if you look at my code and go "what the hell", well, I won't be surprised :) Maybe I'm approaching this problem entirely wrong. Thanks.