duanla1996
duanla1996
2016-07-10 00:13

Mgo字段错误类型

已采纳

I am trying to do a bulk upsert with the mgo library. I was reading the documentation regarding bulk upserts since this is the first time I've ever used MongoDB, and it looks like I have to provide pairs of documents to update.

In my function, I'm performing a find all query, then using the results from the query to use as the existing part of the pair for the bulk.Upsert() operation. I'm not sure if this is the right way to go about it, but I have to do upserts on ~65k documents at a time.

Here are the type structs, and the worker pool function which reads from a channel to perform the aforementioned MongoDB operations.

// types from my project's `lib` package.
type Auctions struct {
    Auc          int `json:"auc" bson:"_id"`
    Item         int `json:"item" bson:"item"`
    Owner        string `json:"owner" bson:"owner"`
    OwnerRealm   string `json:"ownerRealm" bson:"ownerRealm"`
    Bid          int `json:"bid" bson:"bid"`
    Buyout       int `json:"buyout" bson:"buyout"`
    Quantity     int `json:"quantity" bson:"quantity"`
    TimeLeft     string `json:"timeLeft" bson:"timeLeft"`
    Rand         int `json:"rand" bson:"rand"`
    Seed         int `json:"seed" bson:"seed"`
    Context      int `json:"context" bson:"context"`
    BonusLists   []struct {
        BonusListID int `json:"bonusListId" bson:"bonusListId"`
    } `json:"bonusLists,omitempty" bson:"bonusLists,omitempty"`
    Modifiers    []struct {
        Type  int `json:"type" bson:"type"`
        Value int `json:"value" bson:"value"`
    } `json:"modifiers,omitempty" bson:"modifiers,omitempty"`
    PetSpeciesID int `json:"petSpeciesId,omitempty" bson:"petSpeciesId,omitempty"`
    PetBreedID   int `json:"petBreedId,omitempty" bson:"petBreedId,omitempty"`
    PetLevel     int `json:"petLevel,omitempty" bson:"petLevel,omitempty"`
    PetQualityID int `json:"petQualityId,omitempty" bson:"petQualityId,omitempty"`
}

type AuctionResponse struct {
    Realms   []struct {
        Name string `json:"name"`
        Slug string `json:"slug"`
    } `json:"realms"`
    Auctions []Auctions `json:"auctions"`
}

func (b *Blizzard) RealmAuctionGrabber(realms chan string, db *mgo.Database, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range realms {
        NewReq, err := http.NewRequest("GET", fmt.Sprintf("%s/auction/data/%s", b.Url, i), nil)
        if err != nil {
            fmt.Printf("Cannot create new request for realm %s: %s", i, err)
        }

        // Update the request with the default parameters and grab the files links.
        Request := PopulateDefaultParams(NewReq)
        log.Debugf("downloading %s auction locations.", i)
        Response, err := b.Client.Do(Request)
        if err != nil {
            fmt.Printf("Error request realm auction data: %s
", err)
        }
        defer Response.Body.Close()

        Body, err := ioutil.ReadAll(Response.Body)
        if err != nil {
            fmt.Printf("Error parsing request body: %s
", err)
        }

        var AuctionResp lib.AuctionAPI
        err = json.Unmarshal(Body, &AuctionResp)
        if err != nil {
            fmt.Printf("Error marshalling auction repsonse body: %s
", err)
        }

        for _, x := range AuctionResp.Files {
            NewDataReq, err := http.NewRequest("GET", x.URL, nil)
            if err != nil {
                log.Error(err)
            }

            AuctionResponse, err := b.Client.Do(NewDataReq)
            if err != nil {
                fmt.Printf("Error request realm auction data: %s
", err)
            }
            defer Response.Body.Close()

            AuctionBody, err := ioutil.ReadAll(AuctionResponse.Body)
            if err != nil {
                fmt.Printf("Error parsing request body: %s
", err)
            }

            var AuctionData lib.AuctionResponse
            err = json.Unmarshal(AuctionBody, &AuctionData)

            // grab all the current records, then perform an Upsert!
            var existing []lib.Auctions
            col := db.C(i)
            err = col.Find(nil).All(&existing)
            if err != nil {
                log.Error(err)
            }

            log.Infof("performing bulk upsert for %s", i)

            auctionData, err := bson.Marshal(AuctionData.Auctions)
            if err != nil {
                log.Error("error marshalling bson: %s", err)
            }

            existingData, _ := bson.Marshal(existing)
            bulk := db.C(i).Bulk()
            bulk.Upsert(existingData, auctionData)
            _, err = bulk.Run()
            if err != nil {
                log.Error("error performing upsert! error: ", err)
            }
        }
    }
}

When I call bulk.Upsert(existingData,auctionData), things are fine. However, when I call bulk.Run(), I'm getting this logged error message:

{"level":"error","msg":"error performing upsert! error: wrong type for 'q' field, expected object, found q: BinData(0, 0500000000)","time":"2016-07-09T16:53:45-07:00"}

I'm assuming this is related to how I'm doing the BSON tagging in the Auction struct, but I'm not sure because this is the first time I've worked with MongoDB. Right now there are no collections in the database,

Is the error message related to the BSON tagging, and how can I fix it?

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • doujichan1399 doujichan1399 5年前

    Don't know if this is still actual. Here is the code:

    package main
    
    import (
        "gopkg.in/mgo.v2"
        "log"
        "io/ioutil"
        "encoding/json"
        "gopkg.in/mgo.v2/bson"
    )
    
    type Auctions struct {
        Auc   int `json:"auc" bson:"_id"`
        Owner string `json:"owner" bson:"owner"`
    }
    
    type AuctionResponse struct {
        Auctions []Auctions `json:"auctions"`
    }
    
    func main() {
        session, err := mgo.Dial("mongodb://127.0.0.1:27017/aucs")
    
        if err != nil {
            panic(err)
        }
        defer session.Close()
        session.SetMode(mgo.Monotonic, true)
    
        db := session.DB("aucs")
    
        var AuctionData AuctionResponse
        AuctionBody, _ := ioutil.ReadFile("auctions.json")
        err = json.Unmarshal(AuctionBody, &AuctionData)
    
        log.Println("performing bulk upsert for %s", "realm")
    
        bulk := db.C("realm").Bulk()
        for _, auc := range AuctionData.Auctions {
            bulk.Upsert(bson.M{"_id": auc.Auc}, auc)
        }
        _, err = bulk.Run()
        if err != nil {
            log.Panic("error performing upsert! error: ", err)
        }
    }
    

    There are some changes that I've made to check if I was right on real data but I hope it's not hard to understand what's going on. Now let me explain a bit Mongo's part.

    1. You don't need to get existing documents. That would be strange to request all existing documents before updating or inserting them especially on huge databases.
    2. mgo's upsert function requires two parameters:
      • selector document (I prefer to think about it as about WHERE condition in terms of SQL)
      • document that you actually have and that will be inserted into DB if selector query fails
    3. In case of bulk upsert there is unlimited amount of selector - document pairs available to push in one request like bulk.Upsert(sel1, doc1, sel2, doc2, sel3, doc3) but in your case there is no point to use this feature.
    4. upsert affect only single document so even if you selector fits multiple documents only first will be updated. In other words, upsert is more similar to MySQL's INSERT ... ON DUPLICATE KEY UPDATE than to UPDATE ... WHERE. In your case (with unique auction's IDs) single upsert looks like MySQL query:

      INSERT INTO realm (auc, owner) VALUES ('1', 'Hogger') ON DUPLICATE KEY UPDATE owner = 'Hogger';
      
    5. With bulk we can just make multiple upserts in the loop and then run them at once.

    点赞 评论 复制链接分享

为你推荐