I have a simple application that I am working on to read MongoDB's replication oplog, serialize the results into a Go structure and send it to a channel to be processed. Currently I am reading from that channel and simply printing out the values inside of the structure.
I have tried reading the values from the channel using for/range, simple reading directly from it, and putting it inside of a select with a timeout. The results are all the same. Each time I run the code I get different results from the channel. I see each time the channel is being written too One time however reading from that channel I sometimes read out the same value 1-3 sometimes even 4 times, even with only a single write.
This usually happens only on the initial load (pulling in the older records) and doesn't seem to occur when reading live additions to the channel. Is there some problem where reading from the channel too fast happens before the item gets removed from it the first time its read?
package main
import (
"fmt"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
)
type Operation struct {
Id int64 `bson:"h" json:"id"`
Operator string `bson:"op" json:"operator"`
Namespace string `bson:"ns" json:"namespace"`
Select bson.M `bson:"o" json:"select"`
Update bson.M `bson:"o2" json:"update"`
Timestamp int64 `bson:"ts" json:"timestamp"`
}
func Tail(collection *mgo.Collection, Out chan<- *Operation) {
iter := collection.Find(nil).Tail(-1)
var oper *Operation
for {
for iter.Next(&oper) {
fmt.Println("
<<", oper.Id)
Out <- oper
}
if err := iter.Close(); err != nil {
fmt.Println(err)
return
}
}
}
func main() {
session, err := mgo.Dial("127.0.0.1")
if err != nil {
panic(err)
}
defer session.Close()
c := session.DB("local").C("oplog.rs")
cOper := make(chan *Operation, 1)
go Tail(c, cOper)
for operation := range cOper {
fmt.Println()
fmt.Println("Id: ", operation.Id)
fmt.Println("Operator: ", operation.Operator)
fmt.Println("Namespace: ", operation.Namespace)
fmt.Println("Select: ", operation.Select)
fmt.Println("Update: ", operation.Update)
fmt.Println("Timestamp: ", operation.Timestamp)
}
}