dongtun1683 2018-08-11 10:52 采纳率: 0%
浏览 255
已采纳

使用Go TCP客户端服务器实现高吞吐量

I'm going to develop a simple TCP client and server and I want to achieve high throughput (300000 Requests/Second) which is easy to reach with Cpp or C TCP client and server on a server hardware. I mean a server with 48 Cores and 64G Memory.

On my testbed, both client and server have 10G network interface card and I have receive-side-scaling at server side and transmit-packet-steering enabled at the client.

I configure the client to send 10 thousand requests per second. I just run multiple instances of Go go run client.go from a bash script to increase the throughput. However, in this way, Go is going to create lots of threads at the operating systems and a large number of threads results in high context switching cost, and I could not approach such throughputs. I suspected the number of Go instances I'm running from the command line. The code below is the code snippet for the client in the approach:

func Main(cmd_rate_int int, cmd_port string) {

   //runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications

   //var rate float64 = float64(rate_int)

   rate := float64(cmd_rate_int)

   port = cmd_port

   conn, err := net.Dial("tcp", port)
   if err != nil {
       fmt.Println("ERROR", err)
       os.Exit(1)
   }

   var my_random_number float64 = nextTime(rate) * 1000000
   var my_random_int int = int(my_random_number)
   var int_message int64 = time.Now().UnixNano()
   byte_message := make([]byte, 8)

   go func(conn net.Conn) {
       buf := make([]byte, 8)

       for true {
           _, err = io.ReadFull(conn, buf)
           now := time.Now().UnixNano()

           if err != nil {
               return
           }

           last := int64(binary.LittleEndian.Uint64(buf))
           fmt.Println((now - last) / 1000)
       }
       return

   }(conn)

   for true {
       my_random_number = nextTime(rate) * 1000000
       my_random_int = int(my_random_number)
       time.Sleep(time.Microsecond * time.Duration(my_random_int))
       int_message = time.Now().UnixNano()
       binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
       conn.Write(byte_message)
   }
}

So I try to run all my Go threads by calling go client() in the main so I do not run multiple instances in the Linux command line. I thought it may be a better idea. And it is really a better idea basically and the number of threads doesn't increase toward 700 or so in the operating system. But the throughput still is low and it seems it doesn't employ all capability of the underlying hardware. Actually, you may want to see the code I have run in the second approach:

func main() {

   //runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications
   args := os.Args[1:]
   rate_int, _ := strconv.Atoi(args[0])
   client_size, _ := strconv.Atoi(args[1])
   port := args[2]

   i := 0
   for i <= client_size {
       go client.Main(rate_int, port)
       i = i + 1
   }

   for true {

   }
}

I was wondering what is the best practice for in order to reach high throughput? I have always heard that Go is lightweight and performant and pretty comparable with C/Cpp pthread. However, I think in terms of performance still C/Cpp is far far better than Go. I might do something really wrong on this issue, so I would be happy if anybody can help to achieve high throughput with Go.

  • 写回答

2条回答 默认 最新

  • dongqinta4174 2018-08-12 21:51
    关注

    this is a quick rework of the op code. As the original source code is working, it does not provide a solution, however it illustrates bucket token usage, and few other small go tips.

    It does re use similar default values as op source code.

    It demonstrates you do not need two files / programs, to provide both client and server.

    It demonstrates usage of flag package.

    It shows how to parse unix nano timestamp appropriately using time.Unix(x,y)

    It shows how to take advantage of io.Copy to write-what-you-read on the same net.Conn. Rather than manual writing.

    Still, this is improper for production delivery.

    package main
    
    import (
        "encoding/binary"
        "flag"
        "fmt"
        "io"
        "log"
        "math"
        "math/rand"
        "net"
        "os"
        "sync/atomic"
        "time"
    
        "github.com/juju/ratelimit"
    )
    
    var total_rcv int64
    
    func main() {
    
        var cmd_rate_int float64
        var cmd_port string
        var client_size int
    
        flag.Float64Var(&cmd_rate_int, "rate", 400000, "change rate of message reading")
        flag.StringVar(&cmd_port, "port", ":9090", "port to listen")
        flag.IntVar(&client_size, "size", 20, "number of clients")
    
        flag.Parse()
    
        t := flag.Arg(0)
    
        if t == "server" {
            server(cmd_port)
    
        } else if t == "client" {
            for i := 0; i < client_size; i++ {
                go client(cmd_rate_int, cmd_port)
            }
            // <-make(chan bool) // infinite wait.
            <-time.After(time.Second * 2)
            fmt.Println("total exchanged", total_rcv)
    
        } else if t == "client_ratelimit" {
            bucket := ratelimit.NewBucketWithQuantum(time.Second, int64(cmd_rate_int), int64(cmd_rate_int))
            for i := 0; i < client_size; i++ {
                go clientRateLimite(bucket, cmd_port)
            }
            // <-make(chan bool) // infinite wait.
            <-time.After(time.Second * 3)
            fmt.Println("total exchanged", total_rcv)
        }
    }
    
    func server(cmd_port string) {
        ln, err := net.Listen("tcp", cmd_port)
        if err != nil {
            panic(err)
        }
    
        for {
            conn, err := ln.Accept()
            if err != nil {
                panic(err)
            }
            go io.Copy(conn, conn)
        }
    }
    
    func client(cmd_rate_int float64, cmd_port string) {
    
        conn, err := net.Dial("tcp", cmd_port)
        if err != nil {
            log.Println("ERROR", err)
            os.Exit(1)
        }
        defer conn.Close()
    
        go func(conn net.Conn) {
            buf := make([]byte, 8)
            for {
                _, err := io.ReadFull(conn, buf)
                if err != nil {
                    break
                }
                // int_message := int64(binary.LittleEndian.Uint64(buf))
                // t2 := time.Unix(0, int_message)
                // fmt.Println("ROUDNTRIP", time.Now().Sub(t2))
                atomic.AddInt64(&total_rcv, 1)
            }
            return
        }(conn)
    
        byte_message := make([]byte, 8)
        for {
            wait := time.Microsecond * time.Duration(nextTime(cmd_rate_int))
            if wait > 0 {
                time.Sleep(wait)
                fmt.Println("WAIT", wait)
            }
            int_message := time.Now().UnixNano()
            binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
            _, err := conn.Write(byte_message)
            if err != nil {
                log.Println("ERROR", err)
                return
            }
        }
    }
    
    func clientRateLimite(bucket *ratelimit.Bucket, cmd_port string) {
    
        conn, err := net.Dial("tcp", cmd_port)
        if err != nil {
            log.Println("ERROR", err)
            os.Exit(1)
        }
        defer conn.Close()
    
        go func(conn net.Conn) {
            buf := make([]byte, 8)
            for {
                _, err := io.ReadFull(conn, buf)
                if err != nil {
                    break
                }
                // int_message := int64(binary.LittleEndian.Uint64(buf))
                // t2 := time.Unix(0, int_message)
                // fmt.Println("ROUDNTRIP", time.Now().Sub(t2))
                atomic.AddInt64(&total_rcv, 1)
            }
            return
        }(conn)
    
        byte_message := make([]byte, 8)
        for {
            bucket.Wait(1)
            int_message := time.Now().UnixNano()
            binary.LittleEndian.PutUint64(byte_message, uint64(int_message))
            _, err := conn.Write(byte_message)
            if err != nil {
                log.Println("ERROR", err)
                return
            }
        }
    }
    
    func nextTime(rate float64) float64 {
        return -1 * math.Log(1.0-rand.Float64()) / rate
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥20 蓝牙耳机怎么查看日志
  • ¥15 R语言 拟时序分析降维图如何减少分支
  • ¥15 Fluent齿轮搅油
  • ¥15 八爪鱼爬数据为什么自己停了
  • ¥15 交替优化波束形成和ris反射角使保密速率最大化
  • ¥15 树莓派与pix飞控通信
  • ¥15 自动转发微信群信息到另外一个微信群
  • ¥15 outlook无法配置成功
  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统