I'm using the official go-elasticsearch 7.x
api from golang to query an elasticsearch container:
I keep getting this response when I try to index some documents (after connecting successfully)
Error getting response: net/http: HTTP/1.x transport connection broken: malformed HTTP status code "is"
From what I can tell my request to index the document seems normal and it appears as though the error is coming from within the container? I've tried different TLS configurations, including no tls configuration at all and I still get the same response.
No error gets returns when I call elasticClient.Info()
which it's connected to the client successfully
golang:
var err error
cfg := elasticsearch.Config{
Addresses: []string{
"http://elasticsearch:9200",
"http://elasticsearch:9300",
},
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Renegotiation: tls.RenegotiateOnceAsClient,
Certificates: []tls.Certificate{cer},
},
},
}
elasticClient, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
return nil
val, err := json.Marshal(&load)
if err != nil {
log.Fatalf("Error marshalling load, %v", err)
}
log.Printf("|| %s ||", string(val))
req := esapi.IndexRequest{
Index: "load",
DocumentID: strconv.Itoa(int(load.Id)),
Body: strings.NewReader(string(val)),
Refresh: "true",
}
info, err := elasticClient.Info()
if err != nil {
return err
}
log.Println(info)
// Perform the request with the client.
res, err := req.Do(context.Background(), elasticClient)
if err != nil {
log.Fatalf("Error getting response: %s", err.Error())
}
if res.IsError() {
log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
return nil
typical json to index
string(val)
in IndexRequest
{"deliverydate":"2019-08-28T13:00:00-04:00","temperature":"34",
"firstname":"****","loaddate":"2019-08-26T13:00:00-04:00","id":"188244","weight":"41918","totalcust":"2609","ponumber":"******","tsize":"53 ft","phone":"*******","equipment":"***","ext":"****","lastname":"****","pzip":"****",
"pcity":"****","pstate":"****","dzip":"22153","dcity":"*****","dstate":"****",
"locations":{"1":{"lat":44.260059,"long":-72.575387},"2":{"lat":38.748411,"long":-77.234767},"3":{},"4":{"lat":39.192978,"long":-76.7238},"5":{"lat":38.694281,"long":-75.772155},"6":{"lat":39.149275,"long":-76.775249},"7":{"lat":39.606779,"long":-75.833272}},"stopTypes":{"1":"P","2":"D","3":"D","4":"D","5":"D","6":"D","7":"D"}}
docker-compose.yml
version: "3"
services:
redis_server:
image: "redis"
ports:
- "6379:6379"
lambda_server:
build: .
ports:
- "8080:50051"
depends_on:
- redis_server
- elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.3.1
ports:
- "9200:9200"
- "9300:9300"
environment:
- xpack.security.enabled=false
- discovery.type=single-node
- http.cors.enabled=true
- http.cors.allow-origin=*
Edit 1:
Just tried wrapping the request in a loop to see if something changes and it appears to produce something more useful:
lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=188243
lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=189265
It appears it's a poorly formed request? Despite following the example exactly.
edited code:
func cacheLoadsToSearch() {
data := redisClient.Scan(0, "*", totalNewLoads).Iterator()
accessedKeys := map[string]bool{}
for data.Next() {
id := data.Val()
if accessedKeys[id] {
continue //continue to next iteration of loop, Scan cursor may return duplicate results
} else {
accessedKeys[id] = true
}
loadItem, err := readFromRedis(id)
if err != nil {
log.Fatalf("Error reading from redis cache, %v", err)
}
wg.Add(1)
pushToElasticSearch(loadItem)
}
wg.Wait()
}
func pushToElasticSearch(load *lambdapb.Load) error {
val, err := json.Marshal(&load)
if err != nil {
log.Fatalf("Error marshalling load, %v", err)
}
log.Printf("|| %s ||", string(val))
req := esapi.IndexRequest{
Index: "load",
DocumentID: strconv.Itoa(int(load.Id)),
Body: strings.NewReader(string(val)),
Refresh: "true",
}
info, err := elasticClient.Info()
if err != nil {
return err
}
log.Println(info)
// Perform the request with the client.
go func() {
defer wg.Done()
for {
res, err := req.Do(context.Background(), elasticClient)
if err != nil {
log.Printf("Error getting response fron indexRequest: %s", err.Error())
time.Sleep(5 * time.Second)
continue
}
if res.IsError() {
log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
time.Sleep(5 * time.Second)
continue
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
break
}
}
}()
return nil
}
I also edited the setup function
func setupElasticSearch() error {
var err error
cfg := elasticsearch.Config{
Addresses: []string{
"http://elasticsearch:9200",
"http://elasticsearch:9300",
},
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Renegotiation: tls.RenegotiateOnceAsClient,
Certificates: []tls.Certificate{cer},
},
},
}
elasticClient, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
done := make(chan int)
go func(done chan<- int) {
for {
res, err := elasticClient.Indices.Create("load")
if err != nil {
log.Printf("Malformed response to create index: %s", err)
time.Sleep(5 * time.Second)
continue
}
if res.IsError() {
log.Printf("Cannot create index: %s", res)
time.Sleep(5 * time.Second)
continue
}
log.Print("Successfully created index")
break
}
done <- 1
}(done)
<-done
return nil
}