dslfq06464 2017-11-15 20:30 采纳率: 0%
浏览 29
已采纳

Goroutine和消息重复数据删除

So I have some kind of event queues and several goroutines which are getting the events from their corresponding queues in an infinite loop, process them, and send results into a channel. Different queues may give you the same event, so I need to make sure that each event is sent to channel exactly once, and any occurence of that message in another queue will be ignored. I believe that's more of an architectural issue but I can't figure out how to handle this properly.

Simplified version of my current code is below.

Goroutines that get and handle incoming events look somewhat like this:

func (q *Queue) ProcessEvents(handler Handler) {
   lastEvent = 0
   for {
       events = getEvents(lastEvent)
       for _, e := range events {
           if e.ID > lastEvent  {
                lastEvent = event.ID
           }
           handler.Handle(e)
       }
   }
}

Handler:

type Handler struct {
    c chan Event
}

func (h *Handler) Handle(event *Event) {
    //event processing omitted
    h.c <- event //Now it just sends a processed event into the channel no matter what.
}

And in main() I do

func main() {
    msgc := make(chan Event)
    for _, q := range queues {
        go func(queue Queue) {
            queue.ProcessEvents(&Handler{msgc})
        }
    }
}
  • 写回答

1条回答 默认 最新

  • dongtanhe4607 2017-11-17 14:17
    关注

    So you represent your current architecture as follows:

    Current architecture

    With this type of solution the Generators need to check a shared resource to see if an event was already emitted. This might look something like this:

    var hasEmmited map[string]bool
    var lock sync.Mutex
    
    func HasEmitted(event e) bool {
       lock.Lock()
       defer lock.Unlock()
       e,ok := hasEmmited[e.ID]
       return e && ok
    }
    
    func SetEmmited(event e) {
       lock.Lock()
       defer lock.Lock()
       hasEmmited[e.ID] = true
    }
    

    This requires locking/unlocking, which even in the best case scenario with no contention is till a great over-head considering the small amount of work being done in the critical section.

    With a small change in the architecture, like in the second diagram, it would be possible for one go-routine to to do the filtering without any locking.

    A potential solution

    Some commenters have said that designing solutions using go-routines is the same as designing for single-threaded applications. I do not believe this is the case. I would suggest looking at:

    Golang related messaging: https://blog.golang.org/pipelines

    Some message handling design patterns: http://www.enterpriseintegrationpatterns.com/

    The enterprise integration patterns might look out of place here, but it covers a lot of message passing patters that also applies in go.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 ansys fluent计算闪退
  • ¥15 有关wireshark抓包的问题
  • ¥15 需要写计算过程,不要写代码,求解答,数据都在图上
  • ¥15 向数据表用newid方式插入GUID问题
  • ¥15 multisim电路设计
  • ¥20 用keil,写代码解决两个问题,用库函数
  • ¥50 ID中开关量采样信号通道、以及程序流程的设计
  • ¥15 U-Mamba/nnunetv2固定随机数种子
  • ¥15 vba使用jmail发送邮件正文里面怎么加图片
  • ¥15 vb6.0如何向数据库中添加自动生成的字段数据。