douzhenyu6533
douzhenyu6533
2019-08-17 14:50

同时解析JSON-运行时错误的恐慌(与解码有关)

已采纳

I was playing with go recently and stuck with a runtime error, I can't explain. These are my working functions.

    type User struct {
        Browsers []string `json:"browsers"`
        Name     string   `json:"name"`
        Email    string   `json:"email"`
    }

    func asyncUserProcJson(wg *sync.WaitGroup, users *[]User, ch chan []byte) {
        for buf := range ch {
            var mu sync.Mutex

            var user User
            mu.Lock()
            err := json.Unmarshal(buf, &user)
            mu.Unlock()
            if err != nil {
                fmt.Println("json:", err)
                wg.Done()
                continue
            }

            *users = append(*users, user)
            wg.Done()

        }
    }

    func userProcJson(buf []byte) (User, error) {
        var user User
        err := json.Unmarshal(buf, &user)
        if err != nil {
            return User{}, err
        }
        return user, nil
    }

If I do a common - non-concurrent aproach, its works as expected. But if, try to use channel to pass bytes to goroutine... it fails.

type AsyncUserProc func(*sync.WaitGroup, *[]User, chan []byte)
type UserProc func(buf []byte) (User, error)

type SearchParams struct {
    out              io.Writer
    asyncUserProc    AsyncUserProc 
    userProc         UserProc 
}

func (sp SearchParams) AsyncSearch() []User {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalln(err)
    }

    var Users = make([]User, 0, 1024)
    var ch = make(chan []byte)
    var wg sync.WaitGroup

    go sp.asyncUserProcess(&wg, &Users, ch)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        wg.Add(1)
        ch <- scanner.Bytes()
    }

    if err := scanner.Err(); err != nil {
        fmt.Fprintln(os.Stderr, "reading standard input:", err)
    }
    close(ch)
    wg.Wait()

    return Users
}

func (sp SearchParams) Search() []User {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalln(err)
    }

    // json processor
    var Users = make([]User, 0, 1024)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {

        u, err := sp.userProcess(scanner.Bytes())
        if err != nil {
            log.Panicln(err)
            continue
        }
        Users = append(Users, u)
    }

    if err := scanner.Err(); err != nil {
        fmt.Fprintln(os.Stderr, "reading standard input:", err)
    }

    return Users
}

Workflow is the next one:

  1. filePath contains a JSON chunks (each on new line)
  2. Open for reading.
  3. Create a line scanner

    1. (AsyncSearch)

      • Pass line to channel.
      • return value of the line from range (blocking operation)
      • pass to json.Unmarshal
      • troubles
    2. (Search)

      • Pass line directly to userProc func
      • Enjoy result

I am getting a lot (different) errors.

  • a lot of json unmarshaling error.
  • index out of range
  • JSON decoder out of sync - data changing underfoot?

as description of last error:

// phasePanicMsg is used as a panic message when we end up with something that
// shouldn't happen. It can indicate a bug in the JSON decoder, or that
// something is editing the data slice while the decoder executes.

So here is a question: How the bytes slice is modified? I thought it was blocking operation. What am I missing in language mechanics?

Example of the errors (different each run)

json: invalid character 'i' looking for beginning of value
json: invalid character ':' after top-level value
json: invalid character 'r' looking for beginning of value
panic: runtime error: index out of range
----
json: invalid character '.' after top-level value
json: invalid character 'K' looking for beginning of value
panic: JSON decoder out of sync - data changing underfoot?
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • doushenmao9036 doushenmao9036 2年前

    Package bufio

    import "bufio"
    

    func (*Scanner) Bytes

    func (s *Scanner) Bytes() []byte
    

    Bytes returns the most recent token generated by a call to Scan. The underlying array may point to data that will be overwritten by a subsequent call to Scan. It does no allocation.


    The underlying array may point to data that will be overwritten by a subsequent call to Scan.

    点赞 评论 复制链接分享