douyan9398 2018-11-22 05:26
浏览 516

Olivere软件包中的BulkIndexer用于Golang替换Elastigo

I notice that I can use BulkIndexer if I want to send data into elasticsearch in bulk. As stated in the Elastigo documentation

A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

Code in elastigo to insert in bulk

var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)

func insertInBulkElastic(){
    //Create a custom error function when inserting data into elasticsearch 
   //in bulk
    indexer.Sender = func(buf *bytes.Buffer) error {
    // @buf is the buffer of docs about to be written
    respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        // handle it better than this

        fmt.Println("Error", string(respJson)) // 

        fmt.Println("Error", err)
    }

    if err == nil {
        fmt.Println("The data was inserted successfullly to elastic search")
    }
    return err
  }



}

Does anyone know how to send bulk request using olivere for golang?

Thanks

  • 写回答

1条回答 默认 最新

  • dongyun3805 2018-11-22 10:53
    关注

    Here is a working example using olivere in Go. You can read more about the BulkProcessor here

    Hope this help :)

    package main
    
    import (
        "context"
        "log"
        "time"
    
        elastic "gopkg.in/olivere/elastic.v5"
    )
    
    func main() {
        options := []elastic.ClientOptionFunc{
            elastic.SetHealthcheck(true),
            elastic.SetHealthcheckTimeout(20 * time.Second),
            elastic.SetSniff(false),
            elastic.SetHealthcheckInterval(30 * time.Second),
            elastic.SetURL("http://127.0.0.1:9200"),
            elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
        }
        client, err := elastic.NewClient(options...)
        if err != nil {
            panic(err)
        }
        // ensure index exist
        exists, err := client.IndexExists("my_index").Do(context.Background())
        if err != nil {
            panic(err)
        }
        if !exists {
            if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
                panic(err)
            }
        }
        client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
            "properties": map[string]string{
                "name": "keyword",
            },
        }).Do(context.Background())
    
        // create new bulk processor from client
        bulkProcessor, err := elastic.NewBulkProcessorService(client).
            Workers(5).
            BulkActions(1000).
            FlushInterval(1 * time.Second).
            After(after).
            Do(context.Background())
    
        // now the bulk processor can be reused for entire the app
        myDoc := struct {
            Name string
        }{
            Name: "jack",
        }
        req := elastic.NewBulkIndexRequest()
        req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)
    
        // Use Add method to add request into the processor
        bulkProcessor.Add(req)
    
        // wait for sometime...
        time.Sleep(5 * time.Second)
    }
    
    func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
        if err != nil {
            log.Printf("bulk commit failed, err: %v
    ", err)
        }
        // do what ever you want in case bulk commit success
        log.Printf("commit successfully, len(requests)=%d
    ", len(requests))
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog