dovs36921 2019-07-29 12:22
浏览 418

grpc服务器在同时发送许多消息后停止接收消息

I am implementing a simple grpc service where the summary of a task is to be sent to the grpc server. Everything works fine if I send less number of messages but when I begin to send like 5000 messages the server stops and gets deadline exceeded message in client side. I also tried to reconnect again but found the error message as.

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: timed out waiting for server handshake

The server shows no error and is alive.

I tried setting GRPC_GO_REQUIRE_HANDSHAKE=off as well but the error still prevails. I also implemented sending summary in batch but same scenerio repeated.

Is there any limitations to number of messages to be sent in grpc?

Here is my service proto


// The Result service definition.
service Result {
  rpc ConntectMaster(ConnectionRequest) returns (stream ExecutionCommand) {}
  rpc postSummary(Summary) returns(ExecutionCommand) {}
}

message Summary{

  int32 successCount = 1;
  int32 failedCount = 2;
  int32 startTime = 3;
  repeated TaskResult results = 4;
  bool isLast = 5;
  string id = 6;
}

postSummary implementation in sever

// PostSummary posts the summary to the master
func (server *Server) PostSummary(ctx context.Context, in *pb.Summary) (*pb.ExecutionCommand, error) {

    for i := 0; i < len(in.Results); i++ {

        res := in.Results[i]
        log.Printf("%s --> %d Res :: %s, len : %d", in.Id, i, res.Id, len(in.Results))

    }
    return &pb.ExecutionCommand{Type: stopExec}, nil
}
func postSummaryInBatch(executor *Executor, index int) {
    summary := pb.Summary{
        SuccessCount: int32(executor.summary.successCount),
        FailedCount:  int32(executor.summary.failedCount),
        Results:      []*pb.TaskResult{},
        IsLast:       false,
    }

    if index >= len(executor.summary.TaskResults) {
        summary.IsLast = true
        return
    }

    var to int
    batch := 500
    if (index + batch) <= len(executor.summary.TaskResults) {
        to = index + batch
    } else {
        to = len(executor.summary.TaskResults)
    }
    for i := index; i < to; i++ {
        result := executor.summary.TaskResults[i]
        taskResult := pb.TaskResult{
            Id:   result.id,
            Msg:  result.msg,
            Time: result.time,
        }
        // log.Printf("adding res : %s ", taskResult.Id)

        if result.err != nil {
            taskResult.IsError = true
        }
        summary.Results = append(summary.Results, &taskResult)
    }
    summary.Id = fmt.Sprintf("%d-%d", index, to)
    log.Printf("sent from  %d to %d ", index, to)
    postSummary(executor, &summary, 0)
    postSummaryInBatch(executor, to)
}

func postSummary(executor *Executor, summary *pb.Summary, retryCount int) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    cmd, err := client.PostSummary(ctx, summary)
    if err != nil {
        if retryCount < 3 {
            reconnect(executor)
            postSummary(executor, summary, retryCount+1)
        }
        log.Printf(err.Error())
        // log.Fatal("cannot send summary report")
    } else {
        processServerCommand(executor, cmd)
    }
}
  • 写回答

1条回答 默认 最新

  • douhuantui6259 2019-07-29 13:36
    关注

    grpc default maxReceiveMessageSize is 4MB, your grpc client probably went over that limit.

    grpc uses h2 in transport layer which opens only one tcp conn and multiplex "requests" over that, reduce significant overhead compare to h1, I wouldn't worry too much for batching and will just make individual calls to grpc server.

    评论

报告相同问题?

悬赏问题

  • ¥15 luckysheet
  • ¥15 ZABBIX6.0L连接数据库报错,如何解决?(操作系统-centos)
  • ¥15 找一位技术过硬的游戏pj程序员
  • ¥15 matlab生成电测深三层曲线模型代码
  • ¥50 随机森林与房贷信用风险模型
  • ¥50 buildozer打包kivy app失败
  • ¥30 在vs2022里运行python代码
  • ¥15 不同尺寸货物如何寻找合适的包装箱型谱
  • ¥15 求解 yolo算法问题
  • ¥15 虚拟机打包apk出现错误