douwen2158 2019-08-24 16:07
浏览 305

Elasticsearch容器返回格式错误的http状态响应

I'm using the official go-elasticsearch 7.x api from golang to query an elasticsearch container:

I keep getting this response when I try to index some documents (after connecting successfully)

Error getting response: net/http: HTTP/1.x transport connection broken: malformed HTTP status code "is"

From what I can tell my request to index the document seems normal and it appears as though the error is coming from within the container? I've tried different TLS configurations, including no tls configuration at all and I still get the same response.

No error gets returns when I call elasticClient.Info() which it's connected to the client successfully

golang:

var err error
    cfg := elasticsearch.Config{
        Addresses: []string{
            "http://elasticsearch:9200",
            "http://elasticsearch:9300",
        },
        Transport: &http.Transport{
            TLSClientConfig: &tls.Config{
                Renegotiation: tls.RenegotiateOnceAsClient,
                Certificates:  []tls.Certificate{cer},
            },
        },
    }
    elasticClient, err = elasticsearch.NewClient(cfg)
    if err != nil {
        return err
    }
    return nil

val, err := json.Marshal(&load)
    if err != nil {
        log.Fatalf("Error marshalling load, %v", err)
    }
    log.Printf("|| %s ||", string(val))
    req := esapi.IndexRequest{
        Index:      "load",
        DocumentID: strconv.Itoa(int(load.Id)),
        Body:       strings.NewReader(string(val)),
        Refresh:    "true",
    }
    info, err := elasticClient.Info()
    if err != nil {
        return err
    }
    log.Println(info)
    // Perform the request with the client.
    res, err := req.Do(context.Background(), elasticClient)
    if err != nil {
        log.Fatalf("Error getting response: %s", err.Error())
    }
    if res.IsError() {
        log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
    } else {

        // Deserialize the response into a map.
        var r map[string]interface{}
        if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
            log.Printf("Error parsing the response body: %s", err)
        } else {
            // Print the response status and indexed document version.
            log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
        }
    }
    return nil

typical json to index

string(val) in IndexRequest

{"deliverydate":"2019-08-28T13:00:00-04:00","temperature":"34",
"firstname":"****","loaddate":"2019-08-26T13:00:00-04:00","id":"188244","weight":"41918","totalcust":"2609","ponumber":"******","tsize":"53 ft","phone":"*******","equipment":"***","ext":"****","lastname":"****","pzip":"****",
"pcity":"****","pstate":"****","dzip":"22153","dcity":"*****","dstate":"****",
"locations":{"1":{"lat":44.260059,"long":-72.575387},"2":{"lat":38.748411,"long":-77.234767},"3":{},"4":{"lat":39.192978,"long":-76.7238},"5":{"lat":38.694281,"long":-75.772155},"6":{"lat":39.149275,"long":-76.775249},"7":{"lat":39.606779,"long":-75.833272}},"stopTypes":{"1":"P","2":"D","3":"D","4":"D","5":"D","6":"D","7":"D"}}

docker-compose.yml

version: "3"
services:
  redis_server:
    image: "redis"
    ports:
      - "6379:6379"
  lambda_server:
    build: .
    ports:
      - "8080:50051"
    depends_on:
      - redis_server
      - elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.1
    ports:
      - "9200:9200"
      - "9300:9300"      
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
      - http.cors.enabled=true
      - http.cors.allow-origin=*

Edit 1:

Just tried wrapping the request in a loop to see if something changes and it appears to produce something more useful:

lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=188243 lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=189265

It appears it's a poorly formed request? Despite following the example exactly.

edited code:

func cacheLoadsToSearch() {
    data := redisClient.Scan(0, "*", totalNewLoads).Iterator()
    accessedKeys := map[string]bool{}
    for data.Next() {
        id := data.Val()
        if accessedKeys[id] {
            continue //continue to next iteration of loop, Scan cursor may return duplicate results
        } else {
            accessedKeys[id] = true
        }
        loadItem, err := readFromRedis(id)
        if err != nil {
            log.Fatalf("Error reading from redis cache, %v", err)
        }
        wg.Add(1)
        pushToElasticSearch(loadItem)
    }
    wg.Wait()
}

func pushToElasticSearch(load *lambdapb.Load) error {
    val, err := json.Marshal(&load)
    if err != nil {
        log.Fatalf("Error marshalling load, %v", err)
    }
    log.Printf("|| %s ||", string(val))
    req := esapi.IndexRequest{
        Index:      "load",
        DocumentID: strconv.Itoa(int(load.Id)),
        Body:       strings.NewReader(string(val)),
        Refresh:    "true",
    }
    info, err := elasticClient.Info()
    if err != nil {
        return err
    }
    log.Println(info)
    // Perform the request with the client.
    go func() {
        defer wg.Done()
        for {
            res, err := req.Do(context.Background(), elasticClient)
            if err != nil {
                log.Printf("Error getting response fron indexRequest: %s", err.Error())
                time.Sleep(5 * time.Second)
                continue
            }
            if res.IsError() {
                log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
                time.Sleep(5 * time.Second)
                continue
            } else {

                // Deserialize the response into a map.
                var r map[string]interface{}
                if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
                    log.Printf("Error parsing the response body: %s", err)
                } else {
                    // Print the response status and indexed document version.
                    log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
                }
                break
            }
        }
    }()
    return nil
}

I also edited the setup function

func setupElasticSearch() error {
    var err error
    cfg := elasticsearch.Config{
        Addresses: []string{
            "http://elasticsearch:9200",
            "http://elasticsearch:9300",
        },
        Transport: &http.Transport{
            TLSClientConfig: &tls.Config{
                Renegotiation: tls.RenegotiateOnceAsClient,
                Certificates:  []tls.Certificate{cer},
            },
        },
    }
    elasticClient, err = elasticsearch.NewClient(cfg)
    if err != nil {
        return err
    }
    done := make(chan int)
    go func(done chan<- int) {
        for {
            res, err := elasticClient.Indices.Create("load")
            if err != nil {
                log.Printf("Malformed response to create index: %s", err)
                time.Sleep(5 * time.Second)
                continue
            }
            if res.IsError() {
                log.Printf("Cannot create index: %s", res)
                time.Sleep(5 * time.Second)
                continue
            }
            log.Print("Successfully created index")
            break
        }
        done <- 1
    }(done)
    <-done
    return nil
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 Python时间序列如何拟合疏系数模型
    • ¥15 求学软件的前人们指明方向🥺
    • ¥50 如何增强飞上天的树莓派的热点信号强度,以使得笔记本可以在地面实现远程桌面连接
    • ¥15 MCNP里如何定义多个源?
    • ¥20 双层网络上信息-疾病传播
    • ¥50 paddlepaddle pinn
    • ¥20 idea运行测试代码报错问题
    • ¥15 网络监控:网络故障告警通知
    • ¥15 django项目运行报编码错误
    • ¥15 STM32驱动继电器