gRPC到远程服务器的带宽较慢

I have a gRPC service that transfers files from a local machine to a remote server, and I'm noticing some significant bandwidth issues. On average, it's downloading at about 1mb/s with one connection sharing several streams (usually about 8).

The server uses TLS to encryption, but that doesn't seem to be the bottleneck, as turning off TLS has negligible effect on the performance. I've also tried using iperf3 to test the bandwidth directly between the client and the server, and it resulted in 10mb/s.

Connecting to host <host>, port <port>
[  7] local 10.0.0.112 port 59651 connected to <ip> port <port>
[ ID] Interval           Transfer     Bitrate
[  7]   0.00-1.00   sec  1.28 MBytes  10.7 Mbits/sec
[  7]   1.00-2.00   sec   894 KBytes  7.35 Mbits/sec
[  7]   2.00-3.00   sec   999 KBytes  8.17 Mbits/sec
[  7]   3.00-4.00   sec  1.19 MBytes  10.0 Mbits/sec
[  7]   4.00-5.00   sec   753 KBytes  6.17 Mbits/sec
[  7]   5.00-6.00   sec  1.16 MBytes  9.67 Mbits/sec
[  7]   6.00-7.00   sec  1.00 MBytes  8.44 Mbits/sec
[  7]   7.00-8.00   sec  1.26 MBytes  10.5 Mbits/sec
[  7]   8.00-9.00   sec  1.22 MBytes  10.2 Mbits/sec
[  7]   9.00-10.00  sec  1.15 MBytes  9.66 Mbits/sec
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate
[  7]   0.00-10.00  sec  10.8 MBytes  9.09 Mbits/sec                  sender
[  7]   0.00-10.00  sec  10.7 MBytes  8.95 Mbits/sec                  receiver

The upload bandwidth from the client is about 10mb/s, and the download of the server is about 50mb/s (via speedtest-cli)

traceroute doesn't show anything interesting either...

traceroute to mikemeredith.ddns.net (108.52.111.249), 64 hops max, 72 byte packets
 1  10.0.0.1 (10.0.0.1)  2.195 ms  5.388 ms  1.385 ms
 2  <ip>  (<ip>)  8.256 ms  145.115 ms  19.025 ms
 3  <ip2> (<ip2>)  9.951 ms  9.471 ms  141.929 ms
 4  <ip3> (<ip3>)  18.389 ms  9.684 ms  12.248 ms
 5  <ip4> (<ip4>)  143.880 ms  25.077 ms  10.606 ms
 6  ae-13-ar01.capitolhghts.md.bad.comcast.net (68.87.168.61)  142.567 ms  137.153 ms  20.790 ms
 7  be-33657-cr02.ashburn.va.ibone.comcast.net (68.86.90.57)  14.326 ms  144.076 ms  22.957 ms
 8  be-1102-cs01.ashburn.va.ibone.comcast.net (96.110.32.169)  13.881 ms  144.756 ms  23.981 ms
 9  be-2107-pe07.ashburn.va.ibone.comcast.net (96.110.32.186)  20.203 ms  94.433 ms  23.034 ms
10  n-a.gw12.iad8.alter.net (152.179.50.205)  20.254 ms  278.023 ms  31.660 ms
11  * * *
12  <ip12> (<ip13>)  66.277 ms  39.229 ms  34.543 ms
13  <ip13> (<ip14>)  48.849 ms  49.300 ms  49.546 ms


Here's the actual code

Client Connection:

creds, err := credentials.NewClientTLSFromFile(cerLoc, "")
if err != nil {
    fmt..Printf("failed to get tls from file: %v
", err)
    panic(err)
}
conn, err = grpc.Dial(host+port, grpc.WithTransportCredentials(creds))

Client Stream:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := proto.Client(conn)
stream, err := client.BackupFiles(ctx, grpc.UseCompressor(gzip.Name))
// Send on stream, max size of message is 2mb

Server Listening:

// Start serving on port
l, err := net.Listen("tcp", port)
if err != nil {
    fmt.Printf("error listening on port %v: %v
", port, err)
    panic(err)
}

var s *grpc.Server
creds, err := credentials.NewServerTLSFromFile(
    certLoc,
    keyLoc,
)
if err != nil {
    fmt.Printf("error getting tls certs: %v
", err)
    panic(err)
}
s = grpc.NewServer(grpc.Creds(creds))
proto.RegisterBackupServer(s, &server{})
err = s.Serve(l)


// Actual stream handling

// Get a pooled SharedBuffer for assembling the file
b := getBuffer()
defer putBuffer(b)
c := make(chan int, []50)
u, _ := user.Current()

uid := uuid.New()
fout, err := os.Create(filePath + uid.String())
if err != nil {
    fmt.Println("error creating staging file: ", err)
    panic(err)
}
var wg sync.WaitGroup

go assemble(b, fout, c, &wg)
for {
    in, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        fmt.Println("encountered an error receiving on stream: ", err)
        return err
    }


    bytesWritten = b.LockWrite(in.Payload) // This buffer is shared between the stream and the go routine

    c <- bytesWritten
}


close(c)
wg.Wait()

_ = fout.Close()

// This is a pre-declared workerpool that basically moves files around 
wp.Submit(func() {
    finalizeFile(fout.Name(), name, perms, "", checkSum, userID)
})

return stream.SendAndClose(&proto.Resp{
    Status:   true,
})

func assemble(b *buffer.SharedBuffer, fout *os.File, in chan int, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    buf := make([]byte, buffer.BUFFSIZE*2)

    for i := range in {
        if fout != nil {
            b.Lock()
            _, err := b.Read(buf[:i])
            b.Unlock()
            if err == io.EOF {
                continue
            }
            if err != nil {
                panic(err)
            }
            n, err := fout.Write(buf[:i])
            if err != nil {
                panic(err)
            }
            if n != i {
                fmt.Printf("failed to write all bytes to file: %v != %v", n, i)
                panic(err)
            }
        }
    }
}

Seems like I might be missing something with the inner workings of gRPC?

dongying7667
dongying7667 我将在不久后更新代码,并提供更多详细信息
一年多之前 回复
dongmi4927
dongmi4927 因为它的~~专有~~,连接没有什么特别的,所以我可以分享。我将使用与实际代码有些相似的代码来更新代码。
一年多之前 回复
douqiangbei50208
douqiangbei50208 瓶颈可能是您省略的代码。也就是说,您实际在其中读取grpc消息并将其有效负载写入文件。此外,如果您怀疑Go程序中的性能问题,则始终可以跟踪它们的执行情况,看看是否找到瓶颈。blog.gopheracademy.com/advent-2017/go-execution-tracer
一年多之前 回复
dongpiao0731
dongpiao0731 为什么您省略了代码库的必要部分?
一年多之前 回复

1个回答



如果我理解正确,那么您正在读取一个goroutine中的输入流,并将字节分派给第二个goroutine。 为什么不让第二个goroutine完全处理流? 这样,第一个goroutine可以自由处理以下流(如果有)。 </ p>

通常,该模式是使用一个goroutine来侦听传入的请求,并生成新的goroutine来处理它们。 至关重要的是,主goroutine除了用来侦听请求的API外,不调用任何阻塞API。</ p>

例如:</ p>

   for {
newStream:= ListenForStreams()//阻塞直到下一个流
进入ConsumerStream(newStream)
}
</ code> </ pre>
</ div>

展开原文

原文

If I get it right, you are reading the input stream within one goroutine and dispatch the bytes to a second goroutine. Why don't you let the second goroutine handle the stream entirely? In this way the first goroutine is free to process the following streams, if any.

Typically the pattern is to have one goroutine that listens for incoming requests and spawns new goroutines for handling them. It is crutial that the main goroutine does not call any blocking API, besides the one to listen for requests.

For example:

for {
    newStream := ListenForStreams() //Block until next stream
    go consumeStream(newStream)
}

duanchi8112
duanchi8112 这说得通。 原型对我来说看起来还可以。 我只能建议使用跟踪器来发现关键路径并可能对其进行改进。
一年多之前 回复
douguanya4248
douguanya4248 好吧,第一个go例程不受我的控制-服务器获得的每个流,gRPC都会自动启动一个go例程来处理它(或者更确切地说,我认为它从池中获取了它)。 我选择此路由的原因是尽可能多地进入缓冲区,而无需等待写入文件。 该缓冲区是一个bytes.Buffer,因此可调整大小,通道为50。
一年多之前 回复
douji9816
douji9816 恕我直言,每个层级都不需要两个goroutine。 您可以直接在同一goroutine中执行输入流的分块,该过程还将这些分块写入输出文件。 超过所需的goroutine可能导致高争用,进而导致不良的并发。 此外,缓冲区和通道的大小是多少? 确保它们不要太小。 我建议跟踪您的可执行文件以查看发生了什么。
一年多之前 回复
duanqian6982
duanqian6982 是的,不是。 gRPC服务器自动在同一连接上对流进行多线程处理,因此每个流都有自己的go例程。 然后,为了不立即将整个文件读入内存,第二个go例程将从缓冲区中读入一个文件。
一年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐