dongtuo4723 2017-07-31 12:28
浏览 85

发送分块文件以保存在mongodb中

I have 2 different servers for example (Server 1 , Server 2), In the first server I have golang app which splits files and sending to second server which should save in mongodb via mgo.v2

Server 1:

func mainHandle(rw http.ResponseWriter, rq *http.Request) {

    fileToBeChunked := "/Users/IT/Desktop/4k.jpg"

    file, err := os.Open(fileToBeChunked)

    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    defer file.Close()

    fileInfo, _ := file.Stat()

    var fileSize int64 = fileInfo.Size()

    const fileChunk = 0.25 * (1 << 20) // 1 MB, change this to your requirement

    // calculate total number of parts the file will be chunked into

    totalPartsNum := uint64(math.Ceil(float64(fileSize) / float64(fileChunk)))

    fmt.Printf("Splitting to %d pieces.
", totalPartsNum)

    var sent int
    prev_part := 0

    for i := uint64(0); i < totalPartsNum; i++ {

        partSize := int(math.Min(fileChunk, float64(fileSize-int64(i*fileChunk))))

        sent = sent + partSize
        partBuffer := make([]byte, partSize)
        //fileBuffer := make([]byte, fileSize)

        //fmt.Printf("File size is %d 
",fileBuffer)
        fmt.Printf("Part size is %d 
",partSize)

        file.Read(partBuffer)


        client := &http.Client{}
        req, _ := http.NewRequest("PUT", "http://localhost:8011/upload/23", bytes.NewReader(partBuffer))
        req.Header.Set("Content-Range", "bytes "+strconv.Itoa(prev_part) + "-"+strconv.Itoa(sent-1)+"/"+strconv.FormatInt(fileSize,10))
        req.Header.Set("Content-Length", strconv.Itoa(partSize))
        res, _ := client.Do(req)
        prev_part = sent
        fmt.Println(res)

    }


}

Server 2

func UploadFileChunk(rw http.ResponseWriter,rq *http.Request) {

    fmt.Println(rq.Header["Content-Range"])
    cnt_range:=rq.Header["Content-Range"]
    file:=createFileByName("asd")
    if(checkFileChunkIndex(cnt_range[0])) {
        buf, err := ioutil.ReadAll(rq.Body)
        if err!=nil {log.Fatal("request",err)}

        file.Write(buf)
    } else {
        file.Close()
    }
}



func checkFileChunkIndex(cnt_r string ) bool {

    re := regexp.MustCompile(`([a-z]+) ([[:alnum:]]+)-([[:alnum:]]+)/([[:alnum:]]+)`)

    component:= re.FindStringSubmatch(cnt_r)

    filesize,_:= strconv.Atoi(component[4])
    last_chunk,_:= strconv.Atoi(component[3])
    return filesize - last_chunk - 1 > 0
}


func createFileByName(fname string ) *mgo.GridFile {
    if(!file_created) {
        _File,err:= db.GridFS("fs").Create("112233")
        File = *_File
        if err != nil {
            panic(err)
        }
        file_created = true
        return &File
    }
    return &File
}

For testing I have send a file with size 4mb, but after 3rd request , as a response I have got the following error

fatal error: sync: unlock of unlocked mutex

Here is the full error response

fatal error: sync: unlock of unlocked mutex

goroutine 7 [running]:
runtime.throw(0x144de3d, 0x1e)
    /usr/local/go/src/runtime/panic.go:596 +0x95 fp=0xc42004b8c0 sp=0xc42004b8a0
sync.throw(0x144de3d, 0x1e)
    /usr/local/go/src/runtime/panic.go:585 +0x35 fp=0xc42004b8e0 sp=0xc42004b8c0
sync.(*Mutex).Unlock(0xc4200ca8c0)
    /usr/local/go/src/sync/mutex.go:113 +0xa4 fp=0xc42004b908 sp=0xc42004b8e0
sync.(*Cond).Wait(0x162c4e8)
    /usr/local/go/src/sync/cond.go:56 +0x6c fp=0xc42004b938 sp=0xc42004b908
gopkg.in/mgo%2ev2.(*GridFile).insertChunk(0x162c4e0, 0xc42018a000, 0x3fc00, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:623 +0x1b3 fp=0xc42004ba48 sp=0xc42004b938
gopkg.in/mgo%2ev2.(*GridFile).Write(0x162c4e0, 0xc420678000, 0x40000, 0x7fe00, 0x40000, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:595 +0x2d6 fp=0xc42004baf0 sp=0xc42004ba48
go_storage/files.UploadFileChunk(0x15faf80, 0xc4202e4380, 0xc42000b000)
    /Users/IT/go/src/go_storage/files/files.go:249 +0x23b fp=0xc42004bba8 sp=0xc42004baf0
net/http.HandlerFunc.ServeHTTP(0x1459710, 0x15faf80, 0xc4202e4380, 0xc42000b000)
    /usr/local/go/src/net/http/server.go:1942 +0x44 fp=0xc42004bbd0 sp=0xc42004bba8
github.com/gorilla/mux.(*Router).ServeHTTP(0xc42001a320, 0x15faf80, 0xc4202e4380, 0xc42000b000)
    /Users/IT/go/src/github.com/gorilla/mux/mux.go:114 +0x10c fp=0xc42004bcd0 sp=0xc42004bbd0
net/http.(*ServeMux).ServeHTTP(0x162bee0, 0x15faf80, 0xc4202e4380, 0xc42000ae00)
    /usr/local/go/src/net/http/server.go:2238 +0x130 fp=0xc42004bd10 sp=0xc42004bcd0
net/http.serverHandler.ServeHTTP(0xc420096dc0, 0x15faf80, 0xc4202e4380, 0xc42000ae00)
    /usr/local/go/src/net/http/server.go:2568 +0x92 fp=0xc42004bd58 sp=0xc42004bd10
net/http.(*conn).serve(0xc4200cd2c0, 0x15fb7c0, 0xc420016ac0)
    /usr/local/go/src/net/http/server.go:1825 +0x612 fp=0xc42004bfc8 sp=0xc42004bd58
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc42004bfd0 sp=0xc42004bfc8
created by net/http.(*Server).Serve
    /usr/local/go/src/net/http/server.go:2668 +0x2ce

goroutine 1 [IO wait]:
net.runtime_pollWait(0x23a0e40, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc4200556b8, 0x72, 0x0, 0xc4200bcee0)
    /usr/local/go/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc4200556b8, 0xffffffffffffffff, 0x0)
    /usr/local/go/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).accept(0xc420055650, 0x0, 0x15f6ec0, 0xc4200bcee0)
    /usr/local/go/src/net/fd_unix.go:430 +0x1e5
net.(*TCPListener).accept(0xc42000e0a0, 0xc4200cd340, 0x13c96c0, 0xffffffffffffffff)
    /usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
net.(*TCPListener).AcceptTCP(0xc42000e0a0, 0xc420049df0, 0xc420049df8, 0xc420049de8)
    /usr/local/go/src/net/tcpsock.go:215 +0x49
net/http.tcpKeepAliveListener.Accept(0xc42000e0a0, 0x1459ca8, 0xc4200cd2c0, 0x15fb880, 0xc420019c80)
    /usr/local/go/src/net/http/server.go:3044 +0x2f
net/http.(*Server).Serve(0xc420096dc0, 0x15fb280, 0xc42000e0a0, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2643 +0x228
net/http.(*Server).ListenAndServe(0xc420096dc0, 0xc420096dc0, 0x15f60c0)
    /usr/local/go/src/net/http/server.go:2585 +0xb0
net/http.ListenAndServe(0x143fbf7, 0x5, 0x0, 0x0, 0xc420110360, 0xc420019230)
    /usr/local/go/src/net/http/server.go:2787 +0x7f
main.main()
    /Users/IT/go/src/go_storage/main.go:24 +0x358

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2197 +0x1

goroutine 5 [select]:
gopkg.in/mgo%2ev2.(*mongoCluster).syncServersLoop(0xc42000a800)
    /Users/IT/go/src/gopkg.in/mgo.v2/cluster.go:394 +0x3a3
created by gopkg.in/mgo%2ev2.newCluster
    /Users/IT/go/src/gopkg.in/mgo.v2/cluster.go:78 +0x188

goroutine 22 [semacquire]:
sync.runtime_SemacquireMutex(0xc4200f20e4)
    /usr/local/go/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0xc4200f20e0)
    /usr/local/go/src/sync/mutex.go:87 +0x9d
gopkg.in/mgo%2ev2.(*mongoSocket).Acquire(0xc4200f20e0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:242 +0x31
gopkg.in/mgo%2ev2.(*Session).acquireSocket(0xc4200e04e0, 0x0, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4419 +0x475
gopkg.in/mgo%2ev2.(*Collection).writeOp(0xc420019f20, 0x139efa0, 0xc4200e8000, 0x100e501, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4603 +0xd6
gopkg.in/mgo%2ev2.(*Collection).Insert(0xc420019f20, 0xc420010070, 0x1, 0x1, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:2437 +0xac
gopkg.in/mgo%2ev2.(*GridFile).insertChunk.func1(0x162c4e0, 0xc42033c000, 0x3fc3e, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:642 +0xe8
created by gopkg.in/mgo%2ev2.(*GridFile).insertChunk
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:650 +0x460

goroutine 19 [sleep]:
time.Sleep(0x37e11d600)
    /usr/local/go/src/runtime/time.go:59 +0xf9
gopkg.in/mgo%2ev2.(*mongoServer).pinger(0xc4200f2000, 0x1)
    /Users/IT/go/src/gopkg.in/mgo.v2/server.go:301 +0x293
created by gopkg.in/mgo%2ev2.newServer
    /Users/IT/go/src/gopkg.in/mgo.v2/server.go:89 +0x166

goroutine 21 [semacquire]:
sync.runtime_SemacquireMutex(0xc4200f20e4)
    /usr/local/go/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0xc4200f20e0)
    /usr/local/go/src/sync/mutex.go:87 +0x9d
gopkg.in/mgo%2ev2.(*mongoSocket).readLoop(0xc4200f20e0)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:582 +0x40a
created by gopkg.in/mgo%2ev2.newSocket
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:194 +0x259

goroutine 9 [semacquire]:
sync.runtime_SemacquireMutex(0xc4202d2044)
    /usr/local/go/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0xc4202d2040)
    /usr/local/go/src/sync/mutex.go:87 +0x9d
gopkg.in/mgo%2ev2.(*mongoSocket).SimpleQuery(0xc4200f20e0, 0xc4202e4000, 0x6, 0x143fbe3, 0x5, 0xc4202d2031, 0xb)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:367 +0x28b
gopkg.in/mgo%2ev2.(*Database).run(0xc4200bc8e0, 0xc4200f20e0, 0x13d0f00, 0xc4202d4040, 0x13cbb00, 0xc4202e2000, 0xc4200f2000, 0xc4200f20e0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:3261 +0x1b3
gopkg.in/mgo%2ev2.(*Collection).writeOpCommand(0xc420019f20, 0xc4200f20e0, 0xc4200e2000, 0x139efa0, 0xc4202d6000, 0x1010001, 0x23685a8, 0x0, 0x100c420028638)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4786 +0x1db
gopkg.in/mgo%2ev2.(*Collection).writeOp(0xc420019f20, 0x139efa0, 0xc4202d6000, 0x100e501, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4645 +0x6dc
gopkg.in/mgo%2ev2.(*Collection).Insert(0xc420019f20, 0xc4202d2000, 0x1, 0x1, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:2437 +0xac
gopkg.in/mgo%2ev2.(*GridFile).insertChunk.func1(0x162c4e0, 0xc420290000, 0x3fc3e, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:642 +0xe8
created by gopkg.in/mgo%2ev2.(*GridFile).insertChunk
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:650 +0x460

goroutine 12 [semacquire]:
sync.runtime_SemacquireMutex(0xc4200e6074)
    /usr/local/go/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0xc4200e6070)
    /usr/local/go/src/sync/mutex.go:87 +0x9d
gopkg.in/mgo%2ev2.(*mongoSocket).SimpleQuery(0xc4200f20e0, 0xc4202e40e0, 0x6, 0x143fbe3, 0x5, 0xc4200e6060, 0xb)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:367 +0x28b
gopkg.in/mgo%2ev2.(*Database).run(0xc4200bc8e0, 0xc4200f20e0, 0x13d0f00, 0xc4200ee080, 0x13cbb00, 0xc4202e2060, 0x2, 0xc42005e400)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:3261 +0x1b3
gopkg.in/mgo%2ev2.(*Collection).writeOpCommand(0xc420019f20, 0xc4200f20e0, 0xc4200e2000, 0x139efa0, 0xc420018d20, 0x1010001, 0x2304558, 0x46, 0x8700c420023638)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4786 +0x1db
gopkg.in/mgo%2ev2.(*Collection).writeOp(0xc420019f20, 0x139efa0, 0xc420018d20, 0x100e501, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4645 +0x6dc
gopkg.in/mgo%2ev2.(*Collection).Insert(0xc420019f20, 0xc420010060, 0x1, 0x1, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:2437 +0xac
gopkg.in/mgo%2ev2.(*GridFile).insertChunk.func1(0x162c4e0, 0xc42013a000, 0x3fc3e, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:642 +0xe8
created by gopkg.in/mgo%2ev2.(*GridFile).insertChunk
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:650 +0x460

goroutine 38 [IO wait]:
net.runtime_pollWait(0x23a0f00, 0x77, 0x4)
    /usr/local/go/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc4200fa068, 0x77, 0x15f84c0, 0x15f4540)
    /usr/local/go/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitWrite(0xc4200fa068, 0xc420392ef2, 0x2edd1)
    /usr/local/go/src/net/fd_poll_runtime.go:84 +0x34
net.(*netFD).Write(0xc4200fa000, 0xc420382000, 0x3fcc3, 0x40000, 0x10ef2, 0x15f84c0, 0x15f4540)
    /usr/local/go/src/net/fd_unix.go:340 +0x34d
net.(*conn).Write(0xc4200ea008, 0xc420382000, 0x3fcc3, 0x40000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:193 +0x70
gopkg.in/mgo%2ev2.(*mongoSocket).Query(0xc4200f20e0, 0xc420181878, 0x1, 0x1, 0x0, 0x2)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:525 +0x2ad1
gopkg.in/mgo%2ev2.(*mongoSocket).SimpleQuery(0xc4200f20e0, 0xc4200f21c0, 0x6, 0x143fbe3, 0x5, 0xc420010631, 0xb)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:363 +0x22a
gopkg.in/mgo%2ev2.(*Database).run(0xc4200bc8e0, 0xc4200f20e0, 0x13d0f00, 0xc4202d4140, 0x13cbb00, 0xc4200f6000, 0x1444391, 0xc42002a800)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:3261 +0x1b3
gopkg.in/mgo%2ev2.(*Collection).writeOpCommand(0xc420019f20, 0xc4200f20e0, 0xc4200e2000, 0x139efa0, 0xc4200e8000, 0x1010001, 0x2306a28, 0x0, 0x800c4204a3e38)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4786 +0x1db
gopkg.in/mgo%2ev2.(*Collection).writeOp(0xc420019f20, 0x139efa0, 0xc4200e8000, 0xc4200e6001, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4645 +0x6dc
gopkg.in/mgo%2ev2.(*Collection).Insert(0xc420019f20, 0xc4200e6000, 0x1, 0x1, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:2437 +0xac
gopkg.in/mgo%2ev2.(*GridFile).insertChunk.func1(0x162c4e0, 0xc4204d8000, 0x3fc3e, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:642 +0xe8
created by gopkg.in/mgo%2ev2.(*GridFile).insertChunk
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:650 +0x460

goroutine 24 [semacquire]:
sync.runtime_SemacquireMutex(0xc4200f20e4)
    /usr/local/go/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0xc4200f20e0)
    /usr/local/go/src/sync/mutex.go:87 +0x9d
gopkg.in/mgo%2ev2.(*mongoSocket).Acquire(0xc4200f20e0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/socket.go:242 +0x31
gopkg.in/mgo%2ev2.(*Session).acquireSocket(0xc4200e04e0, 0x0, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4419 +0x475
gopkg.in/mgo%2ev2.(*Collection).writeOp(0xc420019f20, 0x139efa0, 0xc4202d6000, 0x100e501, 0x0, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:4603 +0xd6
gopkg.in/mgo%2ev2.(*Collection).Insert(0xc420019f20, 0xc4202d2010, 0x1, 0x1, 0x0, 0x0)
    /Users/IT/go/src/gopkg.in/mgo.v2/session.go:2437 +0xac
gopkg.in/mgo%2ev2.(*GridFile).insertChunk.func1(0x162c4e0, 0xc4205f8000, 0x3fc3e, 0x40000)
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:642 +0xe8
created by gopkg.in/mgo%2ev2.(*GridFile).insertChunk
    /Users/IT/go/src/gopkg.in/mgo.v2/gridfs.go:650 +0x460

goroutine 14 [IO wait]:
net.runtime_pollWait(0x23a0d80, 0x72, 0x7)
    /usr/local/go/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc420055728, 0x72, 0x15f84c0, 0x15f4540)
    /usr/local/go/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc420055728, 0xc420016b11, 0x1)
    /usr/local/go/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).Read(0xc4200556c0, 0xc420016b11, 0x1, 0x1, 0x0, 0x15f84c0, 0x15f4540)
    /usr/local/go/src/net/fd_unix.go:250 +0x1b7
net.(*conn).Read(0xc42000e0a8, 0xc420016b11, 0x1, 0x1, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:181 +0x70
net/http.(*connReader).backgroundRead(0xc420016b00)
    /usr/local/go/src/net/http/server.go:656 +0x58
created by net/http.(*connReader).startBackgroundRead
    /usr/local/go/src/net/http/server.go:652 +0xdf

Process finished with exit code 2

I have found that in the write function there is a code look like this

file.assertMode(gfsWriting)
    file.m.Lock()
    debugf("GridFile %p: writing %d bytes", file, len(data))
    defer file.m.Unlock()

    if file.err != nil {
        return 0, file.err
    }
  • 写回答

1条回答 默认 最新

  • dongmen1925 2017-07-31 14:29
    关注

    You are making a copy of the GridFile returned by GridFS.Create for no reason. Don't copy the struct, just return the pointer you are given.

    GridFile contains private fields which cannot be copied, like a sync.Mutex and a sync.Cond. Running go vet on your code should catch these errors for you.

    评论

报告相同问题?

悬赏问题

  • ¥15 想问一下stata17中这段代码哪里有问题呀
  • ¥15 flink cdc无法实时同步mysql数据
  • ¥100 有人会搭建GPT-J-6B框架吗?有偿
  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决