dongqiao9583 2016-02-11 01:33
浏览 43
已采纳

前往:学习渠道和排队,致命错误

I'm trying to learn how to use channels to make a queue in Go for one of my other projects. My other project basically queues up database rows, and then does number crunching on the database using the details in the rows.

I don't want the same row to be processing in a worker at the same time, so it needs to check whether a worker is currently working on that specific row ID, and if so, wait for it to finish. If it's not the same row ID, it can run asynchronously, but I also want to limit the number of asynchronous workers that can run at the same time. In my code below, I'm trying to limit it to three workers at the moment.

Here's what I have:

package main

import (
    "log"
    "strconv"
    "time"
)

// RowInfo holds the job info
type RowInfo struct {
    id int
}

// WorkerCount holds how many workers are currently running
var WorkerCount int

// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool

// Process the RowInfo
func worker(row RowInfo) {
    rowID := strconv.Itoa(row.id)

    WorkerCount++
    WorkerLocked[rowID] = true

    time.Sleep(1 * time.Second)
    log.Printf("ID rcvd: %d", row.id)

    WorkerLocked[rowID] = false
    WorkerCount--
}

// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
    rowID := strconv.Itoa(row.id)
    for WorkerLocked[rowID] == true {
        time.Sleep(1 * time.Second)
    }

    go worker(row)
}

func main() {
    jobsQueue := make(chan RowInfo, 10)
    WorkerLocked = make(map[string]bool)

    // Dispatcher waits for jobs on the channel and dispatches to waiter
    go func() {
        // Wait for a job
        for {
            // Only have a max of 3 workers running asynch at a time
            for WorkerCount > 3 {
                time.Sleep(1 * time.Second)
            }

            job := <-jobsQueue
            go waiter(job)
        }
    }()

    // Test the queue, send some data
    for i := 0; i < 12; i++ {
        r := RowInfo{
            id: i,
        }
        jobsQueue <- r
    }

    // Prevent exit!
    for {
        time.Sleep(1 * time.Second)
    }
}

And I'm getting this error, but it's an intermittent issue because sometimes when I run it it appears to work. Is there a race condition?:

go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]

goroutine 37 [running]:
main.worker(0x5)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
    /home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8

goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
    /home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0

goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 12 [runnable]:
runtime.goexit1()
    /usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
    /home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c

goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 49 [runnable]:
main.worker(0x6)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2

Anyway, I am still learning, so if you look at my code and go "what the hell", well, I won't be surprised :) Maybe I'm approaching this problem entirely wrong. Thanks.

  • 写回答

2条回答 默认 最新

  • dscpg80066 2016-02-11 02:40
    关注

    If you're going to use the WorkerLocked map, you need to protect access to it using the sync package. You also need to protect WorkerCount in the same way (or using atomic operations). Doing something like that would also make sleeping unnecessary (using condition variables).

    Better yet, have 3 (or however many) workers waiting for rows to work on using channels. You would then distribute the rows to the various workers such that a particular row is always worked on by a particular worker (e.g., using row.id % 3 to determine which worker/channel to send the row to).

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 用土力学知识进行土坡稳定性分析与挡土墙设计
  • ¥70 PlayWright在Java上连接CDP关联本地Chrome启动失败,貌似是Windows端口转发问题
  • ¥15 帮我写一个c++工程
  • ¥30 Eclipse官网打不开,官网首页进不去,显示无法访问此页面,求解决方法
  • ¥15 关于smbclient 库的使用
  • ¥15 微信小程序协议怎么写
  • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
  • ¥20 怎么用dlib库的算法识别小麦病虫害
  • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
  • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?