doutaoer3148 2017-08-02 13:05
浏览 127

Golang Azure Blob存储,0b Blob并覆盖下载的Blob数据

Currently using: https://github.com/Azure/azure-sdk-for-go

overview: I'm currently downloading a blob from the azure blob store, parsing the blob and uploading a transcribed blob back to the store in another folder called filtered.

Problem: The blob uploaded is not in the folder filtered but in the root directory and the blob is 0B with no data. The blob upload also seems to destroy the blob i just downloaded resulting in the blob being 0B with no data. Downloading the blob works fine and i'm able to get the []byte array of the data.

Code:

import (
"bufio"
"fmt"
"os"
"strings"
"strconv"
"math/big"
"bytes"
"io/ioutil"
"github.com/Azure/azure-sdk-for-go/storage"
"compress/gzip"
"encoding/base64"
"crypto/md5"
)

func main() {
    var filter bool = true                                                  //check smart filter
    test := 0
    configfile, err := os.Open("config.txt")                                //open configfile
    check(err)                                                              //check file opened
    ConfigScanner := bufio.NewScanner(configfile)                           //open buffer
    ConfigScanner.Scan()                                                    //get serial number
    serialnum := ConfigScanner.Text()
    configfile.Close()                                                      //close the config file
    CanLUT := ParseDBC("file.dbc")                                          //parse the associated DBC file
    check(err)                                                              //check file opened
    m := make(map[int64]string)                                             //map of last seen message
    //Azure API
    client, err := storage.NewBasicClient(accountName, accountKey)          //get client from azure
    check(err)
    bsc := client.GetBlobService()                                          //access blob service
    cnt := bsc.GetContainerReference("containerblob")                           //get container of the blob
    LBP := storage.ListBlobsParameters{}                                    
    LBP.Prefix = "dev4/dev4"                                                //only get blobs with dev4/dev4 prefix
    list, err := cnt.ListBlobs(LBP)                                         //get list of all matching blobs
    check(err)
    for _, b := range list.Blobs {                                          //read all blobs from azure with prefix dev4/dev4
        oa := make([]byte,0)
        fmt.Println("getting blob: ",b.Name)
        readCloser, err := b.Get(nil)                                       //get blob data
        check(err)
        bytesRead, err := ioutil.ReadAll(readCloser)                        //read blob data to byte[]
        check(err)
        if len(bytesRead) < 1 {
            continue
        }
        br := bytes.NewReader(bytesRead)
        zr, err := gzip.NewReader(br)                                       //use gzip reader for zipped data
        check(err)
        uz, err := ioutil.ReadAll(zr)                                       //uz byte[] of unzipped file
        check(err)
        readCloser.Close()                                                  //close the reader
        zr.Close()                                                          //close gzip reader
        r := bytes.NewReader(uz)
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {                                                //loop on each line in the input file
            temp := ParseToFrame(scanner.Text())                            //parse the line into a usable struct
            _, exists := m[temp.nodeid]                                     //check if the frame has alread been seen and is stored in the hashmap
            if exists {                                                     //if exists in the map
                if ChkDuplicate(m, temp) {                                  //is the msg a duplicate? if true the message isnt so add it
                    m[temp.nodeid] = temp.data                              //update the data to the hashmap
                    DecodeFrame(temp, &oa, CanLUT, filter, serialnum)       //decode the frame and output it to another file
                }
            } else {                                                        //DNE in map so add it
                m[temp.nodeid] = temp.data
                DecodeFrame(temp, &oa, CanLUT,filter, serialnum)            //decode the frame and output it to another file
            }
        }//end blob file
        filestr := strings.Split(b.Name, "_")[1]
        filestr = "filtered/filtered_" + filestr
        var buffout bytes.Buffer
        gz := gzip.NewWriter(&buffout)
        _, err = gz.Write(oa)
        check(err)
        gz.Flush()
        gz.Close()
        compressedData := buffout.Bytes()
        //push block blob to azure
        fmt.Println("uploading: ",filestr)
        clientnew, err := storage.NewBasicClient(accountName, accountKey)           //get client from azure
        check(err)
        senderbsc := clientnew.GetBlobService()                                         //access blob service
        sendercnt := senderbsc.GetContainerReference("storeblob")                           //get container of store blob
        bblob := sendercnt.GetBlobReference("filtered_" + strings.Split(b.Name, "/")[1])
        err = bblob.CreateBlockBlob(nil)
        check(err)
        blockID := base64.StdEncoding.EncodeToString([]byte("00000"))
        err = bblob.PutBlock(blockID, compressedData, nil)
        check(err)
        list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)
        check(err)
        uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
        for i := range list.UncommittedBlocks {
            uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
            uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
        }
        err = b.PutBlockList(uncommittedBlocksList, nil)
        //check if upload was good.
        CheckHash(&compressedData,filestr,sendercnt)
        check(err)
        if(test == 0){
            break       //test only read one file
        }
        test++
    }//end for blobs     
}//end main

展开全部

  • 写回答

2条回答 默认 最新

  • doom910730 2017-08-03 23:13
    关注

    As @DavidMakogon said, you can use the API CreateBlockBlobFromReader of Azure Storage SDK for Go to upload from any reader implements the interface io.Reader to Azure Blob Storage.

    Here are my sample code as below.

    accountName := "<your-storage-account-name>"
    accountKey := "<your-storage-account-key>"
    client, _ := storage.NewBasicClient(accountName, accountKey)
    blobClinet := client.GetBlobService()
    containerName := "mycontainer"
    container := blobClinet.GetContainerReference(containerName)
    
    // Two sample ways for uploading
    // 1. Upload a text blob from string reader
    blobName := "upload.txt"
    blob := container.GetBlobReference(blobName)
    strReader := strings.NewReader("upload text to blob from string reader")
    blob.CreateBlockBlobFromReader(strReader, nil)
    
    // 2. Upload a file from file reader
    fileName := "hello.png"
    file, _ := os.Open(fileName)
    blobName := "hello.png"
    blob := container.GetBlobReference(blobName)
    blob.CreateBlockBlobFromReader(file, nil)
    

    Hope it helps.

    评论
  • douqiao1983 2017-08-04 07:48
    关注
        compressedData := buffout.Bytes()
        //push block blob to azure
        fmt.Println("uploading: ",filestr)
        blockID := base64.StdEncoding.EncodeToString([]byte("00001"))
        newblob := cnt.GetBlobReference(filestr)
        err = newblob.CreateBlockBlobFromReader(bytes.NewReader(compressedData),nil)
        check(err)
        err = newblob.PutBlock(blockID, compressedData, nil)
        check(err)
        list, err := newblob.GetBlockList(storage.BlockListTypeUncommitted, nil)
        check(err)
        uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
        for i := range list.UncommittedBlocks {
            uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
            uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
        }
        err = newblob.PutBlockList(uncommittedBlocksList, nil)
        check(err)
    

    This fixed my problem looking at the original i had a typo calling.

    list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)
    

    This caused azure to get both a new blob called filestr and to overwrite the original blob.

    评论
编辑
预览

报告相同问题?

手机看
程序员都在用的中文IT技术交流社区

程序员都在用的中文IT技术交流社区

专业的中文 IT 技术社区,与千万技术人共成长

专业的中文 IT 技术社区,与千万技术人共成长

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

客服 返回
顶部