多次使用时,TCP连接返回“管道中断”错误

This question relates to go and its net package.

I wrote a simple tcp server handles some RPC. the client is using a chan net.Conn to manage all tcp connection on the client side. Server is running with a tcp listener.

here's the code: client:

package server

import (
    "errors"
    "log"
    "net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp4", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(msg []byte) ([]byte, error) {
    conn := getConn()

    log.Println("check conn: ", conn)
    log.Println("msg: ", msg)

    defer releaseConn(conn)
    // send message
    n, err := conn.Write(msg)
    if err != nil {
        log.Panic(err)
    } else if n < len(msg) {
        log.Panic(errors.New("Message did not send in full"))
    }

    // receiving a message
    inBytes := make([]byte, 0)

    for {
        // bufsize 1024, read bufsize bytes each time
        b := make([]byte, bufSize)
        res, err := conn.Read(b)
        log.Println("server sends >>>>>>>>>>>>: ", res)
        if err != nil {
            b[0] = ReError
            break
        }
        inBytes = append(inBytes, b[:res]...)
        // message finished.
        if res < bufSize {
            break
        }
    }
    // check replied message
    if len(inBytes) == 0 {
        return []byte{}, errors.New("empty buffer error")
    }
    log.Println("SendMessage gets: ", inBytes)
    return inBytes, nil
}

func releaseConn(conn net.Conn) error {
    log.Println("return conn to pool")
    select {
    case tcpPool <- conn:
        return nil
    }
}

func getConn() (conn net.Conn) {
    log.Println("Take one from pool")
    select {
    case conn := <-tcpPool:
        return conn
    }
}

server

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return errors.Wrapf(err, "Unable to listen on address %s
", addr)
    }
    log.Println("Listen on", listener.Addr().String())
    defer listener.Close()
    for {
        log.Println("Accept a connection request.")
        conn, err := listener.Accept()
        if err != nil {
            log.Println("Failed accepting a connection request:", err)
            continue
        }
        log.Println("Handle incoming messages.")
        go onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    defer func() {
        if e := recover(); e != nil {
            //later log
            if err, ok := e.(error); ok {
                println("recover", err.Error())
            }
        }
        conn.Close()
    }()
    // load msg
    for {
        buf := make([]byte, bufSize)
        res, err := conn.Read(buf)
        log.Println("server reading: ", res)
        inBytes = append(inBytes, buf[:res]...)
        if err != nil || res < bufSize {
            break
        }
    }

    var req RPCRequest
    err := json.Unmarshal(inBytes, &req)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request: ", req)

    var query UserRequest
    err = json.Unmarshal(req.Query, &query)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request query: ", query)

    // call method to process request
    // good now we can proceed to function call
    // some actual function calls gets a output
    // outBytes, err := json.Marshal(out)
    conn.Write(outBytes)
}

I think this is very standard. but for some reason, I can only send message on the client side one, and then the follow 2nd and 3rd start to show some irregularity.

1st ---> success, gets response 2nd ---> client can send but nothing gets back, logs on server side shows no in coming message 3rd ---> if I send from client side one more time, it shows broken pipe error..

dongmangzong8006
dongmangzong8006 另外:不需要分别检查错误和int返回的int:“如果返回n<len(p),则写入必须返回非nil错误。”golang.org/pkg/io/#Writer
一年多之前 回复
dou426098
dou426098 不是您的问题的答案,而是您的术语上的重要一点:Go没有异常,也不会抛出异常或其他任何异常。您所说的是返回错误,这与您的标题最初描述的是根本不同的行为。编码时,了解这种差异很重要。
一年多之前 回复

1个回答

There are some bad handling way. First, the flag to insure the msg from server finished is depending on io.EOF,not length

    // message finished.
    if res < 512 {
        break
    }

instead of this, reader returns an io.EOF is the only symbol that shows message finished. Second, chan type has its property to block and not need to use select.by the way, you really need to start a goroutine to release. The same requirement for getConn

func releaseConn(conn net.Conn)  {
    go func(){
        tcpPool <- conn
    }()
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

Third, listener should not be close, code below is bad

defer listener.Close()

The most important reason is on the client side, res, err := conn.Read(b) this receive the reply from the server. when nothing reply ,it block rather than io.EOF, nor some error else. It means ,you cann't box a lasting communicating part into a function send(). You can do a single thing to use sendmsg() to send, but never use sendmsg() to handle the reply. you can handle reply like this

var receive chan string

func init() {
    receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

I found several problem in your code, but I can't tell which one leads your failure. This is my code according to what you write and did some fixing. client.go:

package main

import (
    "fmt"
    "io"
    "log"
    "net"
)

var tcpPool chan net.Conn
var receive chan string

func init() {
    receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(con net.Conn, msg []byte) error {
    // send message
    _, err := con.Write(msg)
    if err != nil {
        log.Panic(err)
    }
    return nil
}

func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

func main() {
    NewClient(20, "localhost:8101")
    con := <-tcpPool
    e := SendMessage(con, []byte("hello, i am client"))
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    go ReceiveMessage(con)
    var msg string
    for {
        select {
        case msg = <-receive:
            fmt.Println(msg)
        }
    }
}

server.go

package main

import (
    "fmt"
    "io"
    "net"
)

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {

            fmt.Println(err.Error())
            continue

        }
        onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    // load msg
    for {
        buf := make([]byte, 512)
        res, err := conn.Read(buf)
        if err != nil {
            if err == io.EOF {
                return
            }
            fmt.Println(err.Error())
            return
        }
        inBytes = append(inBytes, buf[:res]...)

        fmt.Println("receive from client:" + string(inBytes))
        conn.Write([]byte("hello"))
    }
}

func main() {
    if e := StartTCPServer("tcp", ":8101"); e != nil {
        fmt.Println(e.Error())
        return
    }
}

this works and no error. By the way, I can't see where either on the client side or the server side you do con.Close(). It's nessasary to close it.This means a connection once got from the pool, you don't put it back. When you think a connection is over, then close it and build a new connection to fill the pool rather than put it back,beause it's a fatal operation to put a closed con back to the pool.

dpy33121
dpy33121 我想我也可以将此线程移至Stackoverflow Codereview论坛。
一年多之前 回复
douludi8413
douludi8413 感谢您的详细解释。 我将尝试消除所有这些问题,我认为不调用con.Close()是导致此问题的原因,就像您所说的,已将关闭的连接放回池中,并在下一个getConn()中重用
一年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐