I'm a beginner gopher, and I wrote an event listener worker queue for a project I'm working on.
I've deployed it on a staging server. After around 100 events have been triggered the listeners stop getting called when events are published. The server hasn't crashed either.
Here is my implementation:
// Event struct
type Event struct {
Name string
Data interface{}
}
// Stream to publish events to
var stream = make(chan *Event, 100)
// Publish sends new event data to the stream by the event name
func Publish(name string, data interface{}) {
ev := &Event{name, data}
stream <- ev
}
// Handler provides the interface for all event handlers.
// The Work will be called with the event that it should process
type Handler interface {
Work(*Event)
}
type worker struct {
Handler
Listen chan *Event
Quit chan bool
}
// Stop shuts down the worker
func (w *worker) Stop() {
go func() {
w.Quit <- true
}()
}
// Queue of worker Listen channels
type workerQueue chan chan *Event
// registry of workers
var registry = make(map[string][]workerQueue)
// Register creates 20 workers, assigns them to a queue, and
// appends the resulting worker queue to an event on the handler registry
func Register(name string, handlers ...Handler) {
if _, ok := registry[name]; !ok {
registry[name] = make([]workerQueue, 0)
}
// Create workerQueues for each handler
for _, h := range handlers {
queue := make(workerQueue, numListeners)
// Create 20 workers
for i := 0; i < 20; i++ {
newWorker := worker{
Handler: h,
Listen: make(chan *Event),
Quit: make(chan bool),
}
go func() {
for {
select {
case ev := <-newWorker.Listen:
nl.Work(ev)
case <-newWorker.Quit:
return
}
}
}()
queue <- newWorker.Listen
}
registry[name] = append(registry[name], queue)
}
}
// Start begins listening for events on stream
func Start() {
go func() {
for {
select {
// listen for events
case ev := <-stream:
go func() {
// get registered queues for the event
queues, ok := registry[ev.Name]
if !ok {
return
}
// Get worker channel from queue and send the event
for _, queue := range queues {
worker := <-queue
worker <- ev
}
}()
}
}
}()
}
Here is an example usage.
// Usage
Start()
type demoHandler struct {
db *sql.DB
}
type eventData struct {}
func (h *demoHandler) Work(ev *Event) {
// Do something
return
}
// Register handler
Register('some-event', &demoHandler{r})
Publish('some-event', &eventData{})
I'm passing a pointer to a demoHandler as the event handler because they need access to the underlying sql instance. Is it a problem that each worker queue uses the same demoHandler?
I can't for the life of me figure out where I went wrong! Short of an error in the handler code, is there a mistake in my code which causes all of my workers to go down?