dslfq06464 2017-11-15 20:30 采纳率: 0%
浏览 29
已采纳

Goroutine和消息重复数据删除

So I have some kind of event queues and several goroutines which are getting the events from their corresponding queues in an infinite loop, process them, and send results into a channel. Different queues may give you the same event, so I need to make sure that each event is sent to channel exactly once, and any occurence of that message in another queue will be ignored. I believe that's more of an architectural issue but I can't figure out how to handle this properly.

Simplified version of my current code is below.

Goroutines that get and handle incoming events look somewhat like this:

func (q *Queue) ProcessEvents(handler Handler) {
   lastEvent = 0
   for {
       events = getEvents(lastEvent)
       for _, e := range events {
           if e.ID > lastEvent  {
                lastEvent = event.ID
           }
           handler.Handle(e)
       }
   }
}

Handler:

type Handler struct {
    c chan Event
}

func (h *Handler) Handle(event *Event) {
    //event processing omitted
    h.c <- event //Now it just sends a processed event into the channel no matter what.
}

And in main() I do

func main() {
    msgc := make(chan Event)
    for _, q := range queues {
        go func(queue Queue) {
            queue.ProcessEvents(&Handler{msgc})
        }
    }
}
  • 写回答

1条回答 默认 最新

  • dongtanhe4607 2017-11-17 14:17
    关注

    So you represent your current architecture as follows:

    Current architecture

    With this type of solution the Generators need to check a shared resource to see if an event was already emitted. This might look something like this:

    var hasEmmited map[string]bool
    var lock sync.Mutex
    
    func HasEmitted(event e) bool {
       lock.Lock()
       defer lock.Unlock()
       e,ok := hasEmmited[e.ID]
       return e && ok
    }
    
    func SetEmmited(event e) {
       lock.Lock()
       defer lock.Lock()
       hasEmmited[e.ID] = true
    }
    

    This requires locking/unlocking, which even in the best case scenario with no contention is till a great over-head considering the small amount of work being done in the critical section.

    With a small change in the architecture, like in the second diagram, it would be possible for one go-routine to to do the filtering without any locking.

    A potential solution

    Some commenters have said that designing solutions using go-routines is the same as designing for single-threaded applications. I do not believe this is the case. I would suggest looking at:

    Golang related messaging: https://blog.golang.org/pipelines

    Some message handling design patterns: http://www.enterpriseintegrationpatterns.com/

    The enterprise integration patterns might look out of place here, but it covers a lot of message passing patters that also applies in go.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料