duanla1996 2016-07-10 00:13
浏览 95
已采纳

Mgo字段错误类型

I am trying to do a bulk upsert with the mgo library. I was reading the documentation regarding bulk upserts since this is the first time I've ever used MongoDB, and it looks like I have to provide pairs of documents to update.

In my function, I'm performing a find all query, then using the results from the query to use as the existing part of the pair for the bulk.Upsert() operation. I'm not sure if this is the right way to go about it, but I have to do upserts on ~65k documents at a time.

Here are the type structs, and the worker pool function which reads from a channel to perform the aforementioned MongoDB operations.

// types from my project's `lib` package.
type Auctions struct {
    Auc          int `json:"auc" bson:"_id"`
    Item         int `json:"item" bson:"item"`
    Owner        string `json:"owner" bson:"owner"`
    OwnerRealm   string `json:"ownerRealm" bson:"ownerRealm"`
    Bid          int `json:"bid" bson:"bid"`
    Buyout       int `json:"buyout" bson:"buyout"`
    Quantity     int `json:"quantity" bson:"quantity"`
    TimeLeft     string `json:"timeLeft" bson:"timeLeft"`
    Rand         int `json:"rand" bson:"rand"`
    Seed         int `json:"seed" bson:"seed"`
    Context      int `json:"context" bson:"context"`
    BonusLists   []struct {
        BonusListID int `json:"bonusListId" bson:"bonusListId"`
    } `json:"bonusLists,omitempty" bson:"bonusLists,omitempty"`
    Modifiers    []struct {
        Type  int `json:"type" bson:"type"`
        Value int `json:"value" bson:"value"`
    } `json:"modifiers,omitempty" bson:"modifiers,omitempty"`
    PetSpeciesID int `json:"petSpeciesId,omitempty" bson:"petSpeciesId,omitempty"`
    PetBreedID   int `json:"petBreedId,omitempty" bson:"petBreedId,omitempty"`
    PetLevel     int `json:"petLevel,omitempty" bson:"petLevel,omitempty"`
    PetQualityID int `json:"petQualityId,omitempty" bson:"petQualityId,omitempty"`
}

type AuctionResponse struct {
    Realms   []struct {
        Name string `json:"name"`
        Slug string `json:"slug"`
    } `json:"realms"`
    Auctions []Auctions `json:"auctions"`
}

func (b *Blizzard) RealmAuctionGrabber(realms chan string, db *mgo.Database, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range realms {
        NewReq, err := http.NewRequest("GET", fmt.Sprintf("%s/auction/data/%s", b.Url, i), nil)
        if err != nil {
            fmt.Printf("Cannot create new request for realm %s: %s", i, err)
        }

        // Update the request with the default parameters and grab the files links.
        Request := PopulateDefaultParams(NewReq)
        log.Debugf("downloading %s auction locations.", i)
        Response, err := b.Client.Do(Request)
        if err != nil {
            fmt.Printf("Error request realm auction data: %s
", err)
        }
        defer Response.Body.Close()

        Body, err := ioutil.ReadAll(Response.Body)
        if err != nil {
            fmt.Printf("Error parsing request body: %s
", err)
        }

        var AuctionResp lib.AuctionAPI
        err = json.Unmarshal(Body, &AuctionResp)
        if err != nil {
            fmt.Printf("Error marshalling auction repsonse body: %s
", err)
        }

        for _, x := range AuctionResp.Files {
            NewDataReq, err := http.NewRequest("GET", x.URL, nil)
            if err != nil {
                log.Error(err)
            }

            AuctionResponse, err := b.Client.Do(NewDataReq)
            if err != nil {
                fmt.Printf("Error request realm auction data: %s
", err)
            }
            defer Response.Body.Close()

            AuctionBody, err := ioutil.ReadAll(AuctionResponse.Body)
            if err != nil {
                fmt.Printf("Error parsing request body: %s
", err)
            }

            var AuctionData lib.AuctionResponse
            err = json.Unmarshal(AuctionBody, &AuctionData)

            // grab all the current records, then perform an Upsert!
            var existing []lib.Auctions
            col := db.C(i)
            err = col.Find(nil).All(&existing)
            if err != nil {
                log.Error(err)
            }

            log.Infof("performing bulk upsert for %s", i)

            auctionData, err := bson.Marshal(AuctionData.Auctions)
            if err != nil {
                log.Error("error marshalling bson: %s", err)
            }

            existingData, _ := bson.Marshal(existing)
            bulk := db.C(i).Bulk()
            bulk.Upsert(existingData, auctionData)
            _, err = bulk.Run()
            if err != nil {
                log.Error("error performing upsert! error: ", err)
            }
        }
    }
}

When I call bulk.Upsert(existingData,auctionData), things are fine. However, when I call bulk.Run(), I'm getting this logged error message:

{"level":"error","msg":"error performing upsert! error: wrong type for 'q' field, expected object, found q: BinData(0, 0500000000)","time":"2016-07-09T16:53:45-07:00"}

I'm assuming this is related to how I'm doing the BSON tagging in the Auction struct, but I'm not sure because this is the first time I've worked with MongoDB. Right now there are no collections in the database,

Is the error message related to the BSON tagging, and how can I fix it?

  • 写回答

1条回答 默认 最新

  • doujichan1399 2016-07-12 09:15
    关注

    Don't know if this is still actual. Here is the code:

    package main
    
    import (
        "gopkg.in/mgo.v2"
        "log"
        "io/ioutil"
        "encoding/json"
        "gopkg.in/mgo.v2/bson"
    )
    
    type Auctions struct {
        Auc   int `json:"auc" bson:"_id"`
        Owner string `json:"owner" bson:"owner"`
    }
    
    type AuctionResponse struct {
        Auctions []Auctions `json:"auctions"`
    }
    
    func main() {
        session, err := mgo.Dial("mongodb://127.0.0.1:27017/aucs")
    
        if err != nil {
            panic(err)
        }
        defer session.Close()
        session.SetMode(mgo.Monotonic, true)
    
        db := session.DB("aucs")
    
        var AuctionData AuctionResponse
        AuctionBody, _ := ioutil.ReadFile("auctions.json")
        err = json.Unmarshal(AuctionBody, &AuctionData)
    
        log.Println("performing bulk upsert for %s", "realm")
    
        bulk := db.C("realm").Bulk()
        for _, auc := range AuctionData.Auctions {
            bulk.Upsert(bson.M{"_id": auc.Auc}, auc)
        }
        _, err = bulk.Run()
        if err != nil {
            log.Panic("error performing upsert! error: ", err)
        }
    }
    

    There are some changes that I've made to check if I was right on real data but I hope it's not hard to understand what's going on. Now let me explain a bit Mongo's part.

    1. You don't need to get existing documents. That would be strange to request all existing documents before updating or inserting them especially on huge databases.
    2. mgo's upsert function requires two parameters:
      • selector document (I prefer to think about it as about WHERE condition in terms of SQL)
      • document that you actually have and that will be inserted into DB if selector query fails
    3. In case of bulk upsert there is unlimited amount of selector - document pairs available to push in one request like bulk.Upsert(sel1, doc1, sel2, doc2, sel3, doc3) but in your case there is no point to use this feature.
    4. upsert affect only single document so even if you selector fits multiple documents only first will be updated. In other words, upsert is more similar to MySQL's INSERT ... ON DUPLICATE KEY UPDATE than to UPDATE ... WHERE. In your case (with unique auction's IDs) single upsert looks like MySQL query:

      INSERT INTO realm (auc, owner) VALUES ('1', 'Hogger') ON DUPLICATE KEY UPDATE owner = 'Hogger';
      
    5. With bulk we can just make multiple upserts in the loop and then run them at once.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料