duancilan5124 2018-03-06 12:33
浏览 463

正常关闭gRPC下游

Using the following proto buffer code :

syntax = "proto3";

package pb;

message SimpleRequest {
    int64 number = 1;
}

message SimpleResponse {
    int64 doubled = 1;
}

// All the calls in this serivce preform the action of doubling a number.
// The streams will continuously send the next double, eg. 1, 2, 4, 8, 16.
service Test {
    // This RPC streams from the server only.
    rpc Downstream(SimpleRequest) returns (stream SimpleResponse);
}

I'm able to successfully open a stream, and continuously get the next doubled number from the server.

My go code for running this looks like :

ctxDownstream, cancel := context.WithCancel(ctx)
downstream, err := testClient.Downstream(ctxDownstream, &pb.SimpleRequest{Number: 1})
for {
    responseDownstream, err := downstream.Recv()
    if err != io.EOF {
        println(fmt.Sprintf("downstream response: %d, error: %v", responseDownstream.Doubled, err))

        if responseDownstream.Doubled >= 32 {
            break
        }
    }
}
cancel() // !!This is not a graceful shutdown
println(fmt.Sprintf("%v", downstream.Trailer()))

The problem I'm having is using a context cancellation means my downstream.Trailer() response is empty. Is there a way to gracefully close this connection from the client side and receive downstream.Trailer().

Note: if I close the downstream connection from the server side, my trailers are populated. But I have no way of instructing my server side to close this particular stream. So there must be a way to gracefully close a stream client side.

Thanks.

As requested some server code :

func (b *binding) Downstream(req *pb.SimpleRequest, stream pb.Test_DownstreamServer) error {
    request := req

    r := make(chan *pb.SimpleResponse)
    e := make(chan error)
    ticker := time.NewTicker(200 * time.Millisecond)
    defer func() { ticker.Stop(); close(r); close(e) }()

    go func() {
        defer func() { recover() }()
        for {
            select {
            case <-ticker.C:
                response, err := b.Endpoint(stream.Context(), request)
                if err != nil {
                    e <- err
                }
                r <- response
            }
        }
    }()

    for {
        select {
        case err := <-e:
            return err
        case response := <-r:
            if err := stream.Send(response); err != nil {
                return err
            }
            request.Number = response.Doubled
        case <-stream.Context().Done():
            return nil
        }
    }
}

You will still need to populate the trailer with some information. I use the grpc.StreamServerInterceptor to do this.

  • 写回答

2条回答 默认 最新

  • dongyutan1703 2018-03-10 17:48
    关注

    According to the grpc go documentation

    Trailer returns the trailer metadata from the server, if there is any. It must only be called after stream.CloseAndRecv has returned, or stream.Recv has returned a non-nil error (including io.EOF).

    So if you want to read the trailer in client try something like this

    ctxDownstream, cancel := context.WithCancel(ctx)
    defer cancel()
    for {
      ...
      // on error or EOF
      break;
    }
    println(fmt.Sprintf("%v", downstream.Trailer()))
    

    Break from the infinate loop when there is a error and print the trailer. cancel will be called at the end of the function as it is deferred.

    评论

报告相同问题?

悬赏问题

  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)
  • ¥20 matlab yalmip kkt 双层优化问题
  • ¥15 如何在3D高斯飞溅的渲染的场景中获得一个可控的旋转物体
  • ¥88 实在没有想法,需要个思路
  • ¥15 MATLAB报错输入参数太多
  • ¥15 python中合并修改日期相同的CSV文件并按照修改日期的名字命名文件
  • ¥15 有赏,i卡绘世画不出
  • ¥15 如何用stata画出文献中常见的安慰剂检验图
  • ¥15 c语言链表结构体数据插入