duanchun1909 2019-02-04 13:22
浏览 68

在go worker / event系统中,worker是否应该访问同一结构(通过指针)进行工作?

I'm a beginner gopher, and I wrote an event listener worker queue for a project I'm working on.

I've deployed it on a staging server. After around 100 events have been triggered the listeners stop getting called when events are published. The server hasn't crashed either.

Here is my implementation:

// Event struct 
type Event struct {
  Name string
  Data interface{}
}

// Stream to publish events to
var stream = make(chan *Event, 100)

// Publish sends new event data to the stream by the event name
func Publish(name string, data interface{}) {
  ev := &Event{name, data}
  stream <- ev
}

// Handler provides the interface for all event handlers.
// The Work will be called with the event that it should process
type Handler interface {
  Work(*Event)
}

type worker struct {
  Handler
  Listen chan *Event
  Quit   chan bool
}

// Stop shuts down the worker
func (w *worker) Stop() {
  go func() {
    w.Quit <- true
  }()
}

// Queue of worker Listen channels
type workerQueue chan chan *Event

// registry of workers
var registry = make(map[string][]workerQueue)

// Register creates 20 workers, assigns them to a queue, and 
// appends the resulting worker queue to an event on the handler registry
func Register(name string, handlers ...Handler) {
  if _, ok := registry[name]; !ok {
    registry[name] = make([]workerQueue, 0)
  }

  // Create workerQueues for each handler
  for _, h := range handlers {
    queue := make(workerQueue, numListeners)

    // Create 20 workers
    for i := 0; i < 20; i++ {
      newWorker := worker{
        Handler: h,
        Listen:  make(chan *Event),
        Quit:    make(chan bool),
      }

      go func() {
        for {
          select {
          case ev := <-newWorker.Listen:
            nl.Work(ev)
          case <-newWorker.Quit:
            return
          }
        }
      }()

      queue <- newWorker.Listen
    }

    registry[name] = append(registry[name], queue)
  }
}

// Start begins listening for events on stream
func Start() {
  go func() {
    for {
      select {
      // listen for events
      case ev := <-stream:
        go func() {
          // get registered queues for the event
          queues, ok := registry[ev.Name]

          if !ok {
            return
          }

          // Get worker channel from queue and send the event
          for _, queue := range queues {
            worker := <-queue
            worker <- ev
          }
        }()
      }
    }
  }()
}

Here is an example usage.

// Usage

Start()

type demoHandler struct {
  db *sql.DB
}

type eventData struct {}

func (h *demoHandler) Work(ev *Event) {
  // Do something
  return
}

// Register handler
Register('some-event', &demoHandler{r})

Publish('some-event', &eventData{})

I'm passing a pointer to a demoHandler as the event handler because they need access to the underlying sql instance. Is it a problem that each worker queue uses the same demoHandler?

I can't for the life of me figure out where I went wrong! Short of an error in the handler code, is there a mistake in my code which causes all of my workers to go down?

  • 写回答

1条回答 默认 最新

  • dpauxqt1281 2019-02-04 16:29
    关注

    "In a go worker/event system, should workers access the same struct (via pointer) to do work?" No, it's not a problem. It would be a problem if the code inside your handler access a critical section, but I think that's not causing your program to block.

    Your server doesn't crash or block either because no panic is being triggered, and your program is listening and executing on separate goroutines, which are lightweight threads of execution.

    It probably has to be with the channels you are using to send and receive events.

    Sends and receives to a channel are blocking by default. This means that when you send or receive from a channel it will block its goroutine until the other side is ready.

    In the case of buffered channels, sends block when the buffer is full, and receives block when the buffer is empty, as in your stream channel:

    var stream = make(chan *Event, 100)
    

    You said: "After around 100 events have been triggered the listeners stop getting called when events are published".

    So if you call the Publish function and do stream <- ev when the "stream" channel buffer is full, it will block until the channel has place to receive another element.

    I'd suggest reading a bit about non-blocking channel operations.

    Maybe the block is occurring in some part of your real usage code.

    评论

报告相同问题?

悬赏问题

  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮