dovs36921 2019-07-29 12:22
浏览 418

grpc服务器在同时发送许多消息后停止接收消息

I am implementing a simple grpc service where the summary of a task is to be sent to the grpc server. Everything works fine if I send less number of messages but when I begin to send like 5000 messages the server stops and gets deadline exceeded message in client side. I also tried to reconnect again but found the error message as.

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: timed out waiting for server handshake

The server shows no error and is alive.

I tried setting GRPC_GO_REQUIRE_HANDSHAKE=off as well but the error still prevails. I also implemented sending summary in batch but same scenerio repeated.

Is there any limitations to number of messages to be sent in grpc?

Here is my service proto


// The Result service definition.
service Result {
  rpc ConntectMaster(ConnectionRequest) returns (stream ExecutionCommand) {}
  rpc postSummary(Summary) returns(ExecutionCommand) {}
}

message Summary{

  int32 successCount = 1;
  int32 failedCount = 2;
  int32 startTime = 3;
  repeated TaskResult results = 4;
  bool isLast = 5;
  string id = 6;
}

postSummary implementation in sever

// PostSummary posts the summary to the master
func (server *Server) PostSummary(ctx context.Context, in *pb.Summary) (*pb.ExecutionCommand, error) {

    for i := 0; i < len(in.Results); i++ {

        res := in.Results[i]
        log.Printf("%s --> %d Res :: %s, len : %d", in.Id, i, res.Id, len(in.Results))

    }
    return &pb.ExecutionCommand{Type: stopExec}, nil
}
func postSummaryInBatch(executor *Executor, index int) {
    summary := pb.Summary{
        SuccessCount: int32(executor.summary.successCount),
        FailedCount:  int32(executor.summary.failedCount),
        Results:      []*pb.TaskResult{},
        IsLast:       false,
    }

    if index >= len(executor.summary.TaskResults) {
        summary.IsLast = true
        return
    }

    var to int
    batch := 500
    if (index + batch) <= len(executor.summary.TaskResults) {
        to = index + batch
    } else {
        to = len(executor.summary.TaskResults)
    }
    for i := index; i < to; i++ {
        result := executor.summary.TaskResults[i]
        taskResult := pb.TaskResult{
            Id:   result.id,
            Msg:  result.msg,
            Time: result.time,
        }
        // log.Printf("adding res : %s ", taskResult.Id)

        if result.err != nil {
            taskResult.IsError = true
        }
        summary.Results = append(summary.Results, &taskResult)
    }
    summary.Id = fmt.Sprintf("%d-%d", index, to)
    log.Printf("sent from  %d to %d ", index, to)
    postSummary(executor, &summary, 0)
    postSummaryInBatch(executor, to)
}

func postSummary(executor *Executor, summary *pb.Summary, retryCount int) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    cmd, err := client.PostSummary(ctx, summary)
    if err != nil {
        if retryCount < 3 {
            reconnect(executor)
            postSummary(executor, summary, retryCount+1)
        }
        log.Printf(err.Error())
        // log.Fatal("cannot send summary report")
    } else {
        processServerCommand(executor, cmd)
    }
}
  • 写回答

1条回答 默认 最新

  • douhuantui6259 2019-07-29 13:36
    关注

    grpc default maxReceiveMessageSize is 4MB, your grpc client probably went over that limit.

    grpc uses h2 in transport layer which opens only one tcp conn and multiplex "requests" over that, reduce significant overhead compare to h1, I wouldn't worry too much for batching and will just make individual calls to grpc server.

    评论

报告相同问题?

悬赏问题

  • ¥15 基于卷积神经网络的声纹识别
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 CSAPPattacklab
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图
  • ¥15 stm32开发clion时遇到的编译问题