Frage

I have a simple application that I am working on to read MongoDB's replication oplog, serialize the results into a Go structure and send it to a channel to be processed. Currently I am reading from that channel and simply printing out the values inside of the structure.

I have tried reading the values from the channel using for/range, simple reading directly from it, and putting it inside of a select with a timeout. The results are all the same. Each time I run the code I get different results from the channel. I see each time the channel is being written too One time however reading from that channel I sometimes read out the same value 1-3 sometimes even 4 times, even with only a single write.

This usually happens only on the initial load (pulling in the older records) and doesn't seem to occur when reading live additions to the channel. Is there some problem where reading from the channel too fast happens before the item gets removed from it the first time its read?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("\n<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
War es hilfreich?

Lösung

I think you're reusing your *Operation which is causing issues. For example:

http://play.golang.org/p/_MeSBLWPwN

c := make(chan *int, 1)

go func() {
    val := new(int)
    for i :=0; i<10; i++ {
        *val = i
        c <- val
    }
    close(c)
}()


for val := range c {
    time.Sleep(time.Millisecond * 1)
    fmt.Println(*val)
}

This code results in:

2
3
4
5
6
7
8
9
9
9

And more importantly it's not thread safe. Try doing this maybe:

for {
    for { 
        var oper *Operation
        if !iter.Next(&oper) {
            break
        }
        fmt.Println("\n<<", oper.Id)
        Out <- oper
    }
    ...
}

Or use a plain Operation instead of a *Operation. (Because without the pointer the value is copied)

Andere Tipps

I think what you're doing is deserializing into the same instance of a struct each time, therefore having the same object read by the channel and rewritten by the sender. Try to simply move the initialization of it into the loop so you'll create a new one each time.

You can also run this code with go run -race or go build -race, it warns about this sort of stuff.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top