I was playing with go recently and stuck with a runtime error, I can't explain. These are my working functions.
type User struct {
Browsers []string `json:"browsers"`
Name string `json:"name"`
Email string `json:"email"`
}
func asyncUserProcJson(wg *sync.WaitGroup, users *[]User, ch chan []byte) {
for buf := range ch {
var mu sync.Mutex
var user User
mu.Lock()
err := json.Unmarshal(buf, &user)
mu.Unlock()
if err != nil {
fmt.Println("json:", err)
wg.Done()
continue
}
*users = append(*users, user)
wg.Done()
}
}
func userProcJson(buf []byte) (User, error) {
var user User
err := json.Unmarshal(buf, &user)
if err != nil {
return User{}, err
}
return user, nil
}
If I do a common - non-concurrent aproach, its works as expected. But if, try to use channel to pass bytes to goroutine... it fails.
type AsyncUserProc func(*sync.WaitGroup, *[]User, chan []byte)
type UserProc func(buf []byte) (User, error)
type SearchParams struct {
out io.Writer
asyncUserProc AsyncUserProc
userProc UserProc
}
func (sp SearchParams) AsyncSearch() []User {
file, err := os.Open(filePath)
if err != nil {
log.Fatalln(err)
}
var Users = make([]User, 0, 1024)
var ch = make(chan []byte)
var wg sync.WaitGroup
go sp.asyncUserProcess(&wg, &Users, ch)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
wg.Add(1)
ch <- scanner.Bytes()
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
close(ch)
wg.Wait()
return Users
}
func (sp SearchParams) Search() []User {
file, err := os.Open(filePath)
if err != nil {
log.Fatalln(err)
}
// json processor
var Users = make([]User, 0, 1024)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
u, err := sp.userProcess(scanner.Bytes())
if err != nil {
log.Panicln(err)
continue
}
Users = append(Users, u)
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
return Users
}
Workflow is the next one:
- filePath contains a JSON chunks (each on new line)
- Open for reading.
-
Create a line scanner
-
(AsyncSearch)
- Pass line to channel.
- return value of the line from range (blocking operation)
- pass to json.Unmarshal
- troubles
-
(Search)
- Pass line directly to userProc func
- Enjoy result
-
I am getting a lot (different) errors.
- a lot of json unmarshaling error.
- index out of range
- JSON decoder out of sync - data changing underfoot?
as description of last error:
// phasePanicMsg is used as a panic message when we end up with something that
// shouldn't happen. It can indicate a bug in the JSON decoder, or that
// something is editing the data slice while the decoder executes.
So here is a question: How the bytes slice is modified? I thought it was blocking operation. What am I missing in language mechanics?
Example of the errors (different each run)
json: invalid character 'i' looking for beginning of value
json: invalid character ':' after top-level value
json: invalid character 'r' looking for beginning of value
panic: runtime error: index out of range
----
json: invalid character '.' after top-level value
json: invalid character 'K' looking for beginning of value
panic: JSON decoder out of sync - data changing underfoot?