dongzhan1570 2017-04-01 09:22
浏览 49
已采纳

剥离文件并通过TCP同时将大块写入服务器显示断开的管道错误

My client divides a file into multiple amount of chunks (128mb each), then it will upload the chunks to multiple servers concurrently using goroutines.

However, when I use more than 1 goroutine, I got an error from the my client program.

write tcp [::1]:49324->[::1]:2001: write: broken pipe

And in my server, the error is

EOF

Note that the broken pipe error and EOF error occurs in different chunks. For example, broken pipe error might happen when writing chunk 1 while EOF error might happen when server is receiving chunk 2.

Below is the client code:

//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)

var sentByte int64

for i:= 0; i < chunkCount; i += 1{
    guard <- struct{}{} 

    go func(i int){
        index := i%len(serverList)
        vsConnection, _ := net.Dial("tcp", serverList[index])

        sentByte=0
        file, _ := os.Open(fileName)
        file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
        for { 
            n, _ := file.Read(sendBuffer)

            n2, err2 := vsConnection.Write(sendBuffer[:n])
            if err2 != nil {
                fmt.Println("err2",err2,chunkName)              
            }
            if(n2!=65536){ //65536 is size of sendBuffer
                fmt.Println("n2",n2)
            }
            sentByte = sentByte+int64(n)
            if(sentByte == CHUNKSIZE){
                break;
            }
        }
        vsConnection.Close()
        file.Close()
        <-guard
    }(i)
}

Below is the server code:

func main() {
    mapping := cmap.New()
    server, error := net.Listen("tcp", ":2001")
    if error != nil {
        fmt.Println("There was an error starting the server" +    error.Error())
        return
    }

    for {
        connection, error := server.Accept()
        if error != nil {
            fmt.Println("There was am error with the connection" + error.Error())
            return
        }
        //one goroutine per connection
        go ConnectionHandler(connection,mapping)
    }
}

func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
    fmt.Println("Connected")
    //make a buffer to hold data        
    var bufferFile bytes.Buffer
    writer := bufio.NewWriter(&bufferFile)

    var receivedBytes int64
    receivedBytes=0
    for {

        if(CHUNKSIZE<=receivedBytes){
            break
        }
        n,err := io.CopyN(writer, connection, BUFFERSIZE)
        receivedBytes += n
        if err != nil {
            fmt.Println("err", err.Error(), fileName)
            break
        }
    }
    mapping.Set(fileName,bufferFile.Bytes())
    connection.Close()

}

Big thanks in advance.

  • 写回答

1条回答 默认 最新

  • duandai7601 2017-04-01 11:06
    关注

    In your client sentByte should be a local variable to the sender goroutine. Since you have declared it as a global, there is a race condition in your code. Try the below fix:

    go func(i int){
        index := i%len(serverList)
        vsConnection, _ := net.Dial("tcp", serverList[index])
    
        sentByte := 0 // make sentByte a local variable, so each goroutine 
                      // has its own copy 
        file, _ := os.Open(fileName)
        file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
        for { 
            n, _ := file.Read(sendBuffer)
            // ...
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 CSAPPattacklab
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图
  • ¥15 关于大棚监测的pcb板设计
  • ¥15 stm32开发clion时遇到的编译问题
  • ¥15 lna设计 源简并电感型共源放大器
  • ¥15 如何用Labview在myRIO上做LCD显示?(语言-开发语言)
  • ¥15 Vue3地图和异步函数使用
  • ¥15 C++ yoloV5改写遇到的问题