dqyp50298 2014-01-31 20:33
浏览 20
已采纳

为什么我的Go频道会多次返回同一元素

I have a simple application that I am working on to read MongoDB's replication oplog, serialize the results into a Go structure and send it to a channel to be processed. Currently I am reading from that channel and simply printing out the values inside of the structure.

I have tried reading the values from the channel using for/range, simple reading directly from it, and putting it inside of a select with a timeout. The results are all the same. Each time I run the code I get different results from the channel. I see each time the channel is being written too One time however reading from that channel I sometimes read out the same value 1-3 sometimes even 4 times, even with only a single write.

This usually happens only on the initial load (pulling in the older records) and doesn't seem to occur when reading live additions to the channel. Is there some problem where reading from the channel too fast happens before the item gets removed from it the first time its read?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("
<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
  • 写回答

2条回答 默认 最新

  • douchuo0730 2014-01-31 21:27
    关注

    I think you're reusing your *Operation which is causing issues. For example:

    http://play.golang.org/p/_MeSBLWPwN

    c := make(chan *int, 1)
    
    go func() {
        val := new(int)
        for i :=0; i<10; i++ {
            *val = i
            c <- val
        }
        close(c)
    }()
    
    
    for val := range c {
        time.Sleep(time.Millisecond * 1)
        fmt.Println(*val)
    }
    

    This code results in:

    2
    3
    4
    5
    6
    7
    8
    9
    9
    9
    

    And more importantly it's not thread safe. Try doing this maybe:

    for {
        for { 
            var oper *Operation
            if !iter.Next(&oper) {
                break
            }
            fmt.Println("
    <<", oper.Id)
            Out <- oper
        }
        ...
    }
    

    Or use a plain Operation instead of a *Operation. (Because without the pointer the value is copied)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作