dongzhan1570 2017-04-01 09:22
浏览 49
已采纳

剥离文件并通过TCP同时将大块写入服务器显示断开的管道错误

My client divides a file into multiple amount of chunks (128mb each), then it will upload the chunks to multiple servers concurrently using goroutines.

However, when I use more than 1 goroutine, I got an error from the my client program.

write tcp [::1]:49324->[::1]:2001: write: broken pipe

And in my server, the error is

EOF

Note that the broken pipe error and EOF error occurs in different chunks. For example, broken pipe error might happen when writing chunk 1 while EOF error might happen when server is receiving chunk 2.

Below is the client code:

//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)

var sentByte int64

for i:= 0; i < chunkCount; i += 1{
    guard <- struct{}{} 

    go func(i int){
        index := i%len(serverList)
        vsConnection, _ := net.Dial("tcp", serverList[index])

        sentByte=0
        file, _ := os.Open(fileName)
        file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
        for { 
            n, _ := file.Read(sendBuffer)

            n2, err2 := vsConnection.Write(sendBuffer[:n])
            if err2 != nil {
                fmt.Println("err2",err2,chunkName)              
            }
            if(n2!=65536){ //65536 is size of sendBuffer
                fmt.Println("n2",n2)
            }
            sentByte = sentByte+int64(n)
            if(sentByte == CHUNKSIZE){
                break;
            }
        }
        vsConnection.Close()
        file.Close()
        <-guard
    }(i)
}

Below is the server code:

func main() {
    mapping := cmap.New()
    server, error := net.Listen("tcp", ":2001")
    if error != nil {
        fmt.Println("There was an error starting the server" +    error.Error())
        return
    }

    for {
        connection, error := server.Accept()
        if error != nil {
            fmt.Println("There was am error with the connection" + error.Error())
            return
        }
        //one goroutine per connection
        go ConnectionHandler(connection,mapping)
    }
}

func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
    fmt.Println("Connected")
    //make a buffer to hold data        
    var bufferFile bytes.Buffer
    writer := bufio.NewWriter(&bufferFile)

    var receivedBytes int64
    receivedBytes=0
    for {

        if(CHUNKSIZE<=receivedBytes){
            break
        }
        n,err := io.CopyN(writer, connection, BUFFERSIZE)
        receivedBytes += n
        if err != nil {
            fmt.Println("err", err.Error(), fileName)
            break
        }
    }
    mapping.Set(fileName,bufferFile.Bytes())
    connection.Close()

}

Big thanks in advance.

  • 写回答

1条回答 默认 最新

  • duandai7601 2017-04-01 11:06
    关注

    In your client sentByte should be a local variable to the sender goroutine. Since you have declared it as a global, there is a race condition in your code. Try the below fix:

    go func(i int){
        index := i%len(serverList)
        vsConnection, _ := net.Dial("tcp", serverList[index])
    
        sentByte := 0 // make sentByte a local variable, so each goroutine 
                      // has its own copy 
        file, _ := os.Open(fileName)
        file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
        for { 
            n, _ := file.Read(sendBuffer)
            // ...
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?