douqiang7462 2019-01-31 05:55
浏览 71
已采纳

多次使用时,TCP连接返回“管道中断”错误

This question relates to go and its net package.

I wrote a simple tcp server handles some RPC. the client is using a chan net.Conn to manage all tcp connection on the client side. Server is running with a tcp listener.

here's the code: client:

package server

import (
    "errors"
    "log"
    "net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp4", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(msg []byte) ([]byte, error) {
    conn := getConn()

    log.Println("check conn: ", conn)
    log.Println("msg: ", msg)

    defer releaseConn(conn)
    // send message
    n, err := conn.Write(msg)
    if err != nil {
        log.Panic(err)
    } else if n < len(msg) {
        log.Panic(errors.New("Message did not send in full"))
    }

    // receiving a message
    inBytes := make([]byte, 0)

    for {
        // bufsize 1024, read bufsize bytes each time
        b := make([]byte, bufSize)
        res, err := conn.Read(b)
        log.Println("server sends >>>>>>>>>>>>: ", res)
        if err != nil {
            b[0] = ReError
            break
        }
        inBytes = append(inBytes, b[:res]...)
        // message finished.
        if res < bufSize {
            break
        }
    }
    // check replied message
    if len(inBytes) == 0 {
        return []byte{}, errors.New("empty buffer error")
    }
    log.Println("SendMessage gets: ", inBytes)
    return inBytes, nil
}

func releaseConn(conn net.Conn) error {
    log.Println("return conn to pool")
    select {
    case tcpPool <- conn:
        return nil
    }
}

func getConn() (conn net.Conn) {
    log.Println("Take one from pool")
    select {
    case conn := <-tcpPool:
        return conn
    }
}

server

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return errors.Wrapf(err, "Unable to listen on address %s
", addr)
    }
    log.Println("Listen on", listener.Addr().String())
    defer listener.Close()
    for {
        log.Println("Accept a connection request.")
        conn, err := listener.Accept()
        if err != nil {
            log.Println("Failed accepting a connection request:", err)
            continue
        }
        log.Println("Handle incoming messages.")
        go onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    defer func() {
        if e := recover(); e != nil {
            //later log
            if err, ok := e.(error); ok {
                println("recover", err.Error())
            }
        }
        conn.Close()
    }()
    // load msg
    for {
        buf := make([]byte, bufSize)
        res, err := conn.Read(buf)
        log.Println("server reading: ", res)
        inBytes = append(inBytes, buf[:res]...)
        if err != nil || res < bufSize {
            break
        }
    }

    var req RPCRequest
    err := json.Unmarshal(inBytes, &req)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request: ", req)

    var query UserRequest
    err = json.Unmarshal(req.Query, &query)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request query: ", query)

    // call method to process request
    // good now we can proceed to function call
    // some actual function calls gets a output
    // outBytes, err := json.Marshal(out)
    conn.Write(outBytes)
}

I think this is very standard. but for some reason, I can only send message on the client side one, and then the follow 2nd and 3rd start to show some irregularity.

1st ---> success, gets response 2nd ---> client can send but nothing gets back, logs on server side shows no in coming message 3rd ---> if I send from client side one more time, it shows broken pipe error..

  • 写回答

1条回答 默认 最新

  • douran6443 2019-01-31 08:23
    关注

    There are some bad handling way. First, the flag to insure the msg from server finished is depending on io.EOF,not length

        // message finished.
        if res < 512 {
            break
        }
    

    instead of this, reader returns an io.EOF is the only symbol that shows message finished. Second, chan type has its property to block and not need to use select.by the way, you really need to start a goroutine to release. The same requirement for getConn

    func releaseConn(conn net.Conn)  {
        go func(){
            tcpPool <- conn
        }()
    }
    
    func getConn() net.Conn {
        con := <-tcpPool
        return con
    }
    
    

    Third, listener should not be close, code below is bad

    defer listener.Close()
    

    The most important reason is on the client side, res, err := conn.Read(b) this receive the reply from the server. when nothing reply ,it block rather than io.EOF, nor some error else. It means ,you cann't box a lasting communicating part into a function send(). You can do a single thing to use sendmsg() to send, but never use sendmsg() to handle the reply. you can handle reply like this

    var receive chan string
    
    func init() {
        receive = make(chan string, 10)
    }
    func ReceiveMessage(con net.Conn) {
        // receiving a message
        inBytes := make([]byte, 0, 1000)
        var b = make([]byte, 512)
        for {
            // bufsize 1024, read bufsize bytes each time
            res, err := con.Read(b)
            if err != nil {
                if err == io.EOF {
                    break
                }
                fmt.Println(err.Error())
                break
            }
            inBytes = append(inBytes, b[:res]...)
            msg := string(inBytes)
            fmt.Println("receive msg from server:" + msg)
            receive <- msg
        }
    }
    

    I found several problem in your code, but I can't tell which one leads your failure. This is my code according to what you write and did some fixing. client.go:

    package main
    
    import (
        "fmt"
        "io"
        "log"
        "net"
    )
    
    var tcpPool chan net.Conn
    var receive chan string
    
    func init() {
        receive = make(chan string, 10)
    }
    func NewClient(connections int, address string) {
        tcpPool = make(chan net.Conn, connections)
        for i := 0; i < connections; i++ {
            conn, err := net.Dial("tcp", address)
            if err != nil {
                log.Panic(err)
            }
            tcpPool <- conn
        }
    }
    
    func SendMessage(con net.Conn, msg []byte) error {
        // send message
        _, err := con.Write(msg)
        if err != nil {
            log.Panic(err)
        }
        return nil
    }
    
    func ReceiveMessage(con net.Conn) {
        // receiving a message
        inBytes := make([]byte, 0, 1000)
        var b = make([]byte, 512)
        for {
            // bufsize 1024, read bufsize bytes each time
            res, err := con.Read(b)
            if err != nil {
                if err == io.EOF {
                    break
                }
                fmt.Println(err.Error())
                break
            }
            inBytes = append(inBytes, b[:res]...)
            msg := string(inBytes)
            fmt.Println("receive msg from server:" + msg)
            receive <- msg
        }
    }
    
    func getConn() net.Conn {
        con := <-tcpPool
        return con
    }
    
    func main() {
        NewClient(20, "localhost:8101")
        con := <-tcpPool
        e := SendMessage(con, []byte("hello, i am client"))
        if e != nil {
            fmt.Println(e.Error())
            return
        }
        go ReceiveMessage(con)
        var msg string
        for {
            select {
            case msg = <-receive:
                fmt.Println(msg)
            }
        }
    }
    
    

    server.go

    package main
    
    import (
        "fmt"
        "io"
        "net"
    )
    
    func StartTCPServer(network, addr string) error {
        listener, err := net.Listen(network, addr)
        if err != nil {
            return err
        }
        for {
            conn, err := listener.Accept()
            if err != nil {
    
                fmt.Println(err.Error())
                continue
    
            }
            onConn(conn)
        }
    }
    
    //onConn recieves a tcp connection and waiting for incoming messages
    func onConn(conn net.Conn) {
        inBytes := make([]byte, 0)
        // load msg
        for {
            buf := make([]byte, 512)
            res, err := conn.Read(buf)
            if err != nil {
                if err == io.EOF {
                    return
                }
                fmt.Println(err.Error())
                return
            }
            inBytes = append(inBytes, buf[:res]...)
    
            fmt.Println("receive from client:" + string(inBytes))
            conn.Write([]byte("hello"))
        }
    }
    
    func main() {
        if e := StartTCPServer("tcp", ":8101"); e != nil {
            fmt.Println(e.Error())
            return
        }
    }
    
    

    this works and no error. By the way, I can't see where either on the client side or the server side you do con.Close(). It's nessasary to close it.This means a connection once got from the pool, you don't put it back. When you think a connection is over, then close it and build a new connection to fill the pool rather than put it back,beause it's a fatal operation to put a closed con back to the pool.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 Oracle触发器记录修改前后的字段值
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 CSAPPattacklab
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图
  • ¥15 stm32开发clion时遇到的编译问题
  • ¥15 lna设计 源简并电感型共源放大器