dongxixia6399 2017-08-18 12:28
浏览 22

goroutine多次消耗同一行

currently i have a scenario where i have huge file (for example im going to say 500k lines of text) and the idea is to use worker (threads) to process them by 100 each thread. after running my code, i still wonder why the goroutines consume the same line more than once? im guessing it's racing to get the job done.

here's my code

package main

import (
     "log"
     "bufio"
     "fmt"
     "encoding/csv"
     "encoding/json"
     "io"
     "os"
     "sync"
)

type IMDBDataModel struct {
     Color                  string `json:"color"`
     DirectorName           string `json:"director_name"`
     NumCriticForReviews    string `json:"num_critic_for_reviews"`
     Duration               string `json:"duration"`
     DirectorFacebookLikes  string `json:"director_facebook_likes"`
     Actor3FacebookLikes    string `json:"actor_3_facebook_likes"`
     Actor2Name             string `json:"actor_2_name"`
     Actor1FacebookLikes    string `json:"actor_1_facebook_likes"`
     Gross                  string `json:"gross"`
     Genre                  string `json:"genres"`
     Actor1Name             string `json:"actor_1_name"`
     MovieTitle             string `json:"movie_title"`
     NumVotedUser           string `json:"num_voted_users"`
     CastTotalFacebookLikes string `json:"cast_total_facebook_likes"`
     Actor3Name             string `json:"actor_3_name"`
     FaceNumberInPoster     string `json:"facenumber_in_poster"`
     PlotKeywords           string `json:"plot_keywords"`
     MovieIMDBLink          string `json:"movie_imdb_link"`
     NumUserForReviews      string `json:"num_user_for_reviews"`
     Language               string `json:"language"`
     Country                string `json:"country"`
     ContentRating          string `json:"content_rating"`
     Budget                 string `json:"budget"`
     TitleYear              string `json:"title_year"`
     Actor2FacebookLikes    string `json:"actor_2_facebook_likes"`
     IMDBScore              string `json:"imdb_score"`
     AspectRatio            string `json:"aspect_ratio"`
     MovieFacebookLikes     string `json:"movie_facebook_likes"`
}

var iterated int64
var out []*IMDBDataModel

func populateString(input []IMDBDataModel, out []*IMDBDataModel, wg *sync.WaitGroup) {
     for _ , data := range input {          
          out = append(out, &data)
     }     
     wg.Done()
}

func consumeData(input <-chan *IMDBDataModel, wg *sync.WaitGroup){
     defer wg.Done()
     for data := range input {          
          iterated++          
          fmt.Printf("%d : %s
", iterated, data.MovieTitle)
          out = append(out, data)
     }
     fmt.Println("output size : ", len(out))

}

func processCSV(path string) (imdbList []IMDBDataModel){
     csvFile, _ := os.Open(path)
     reader := csv.NewReader(bufio.NewReader(csvFile))

     for {          
          line, error := reader.Read()
          if error == io.EOF {
               break
          } else if error != nil {
               log.Fatal(error)
          }
          imdbList = append(imdbList, 
               IMDBDataModel{
                    Color: line[0],
                    DirectorName: line[1],
                    NumCriticForReviews : line[2],
                    Duration: line[3],
                    DirectorFacebookLikes: line[4],
                    Actor3FacebookLikes: line[5],
                    Actor2Name: line[6],
                    Actor1FacebookLikes: line[7],
                    Gross: line[8],
                    Genre: line[9],
                    Actor1Name: line[10],
                    MovieTitle: line[11],
                    NumVotedUser: line[12],
                    CastTotalFacebookLikes: line[13],
                    Actor3Name: line[14],
                    FaceNumberInPoster: line[15],
                    PlotKeywords: line[16],
                    MovieIMDBLink: line[17],
                    NumUserForReviews: line[18],
                    Language: line[19],
                    Country: line[20],
                    ContentRating: line[21],
                    Budget: line[22],
                    TitleYear: line[23],
                    Actor2FacebookLikes: line[24],
                    IMDBScore: line[25],
                    AspectRatio: line[26],
                    MovieFacebookLikes: line[27],
               },
          )          
     }
     imdbJson, err := json.Marshal(imdbList)
     if err != nil {
          log.Println(imdbJson)
     }

     return 
}

func main() {     
     imdbList := processCSV("movie_metadata.csv")     
     imdbChannel  := make(chan *IMDBDataModel, 100) // buffer

     var wg sync.WaitGroup
     for i := 0; i < 5;i++ {
          wg.Add(1)
          go consumeData(imdbChannel,&wg)     
     }

     for _ ,task := range imdbList {          
          imdbChannel <- &task               
     }

     close(imdbChannel)     
     wg.Wait()

     // for _, item := range out {
     //      fmt.Println(item.MovieTitle)
     // }

     fmt.Println("Total Channel :", len(imdbChannel)) 
     fmt.Println("Total IMDB :", len(imdbList))
     fmt.Println("Total Data: ", len(out))
     fmt.Println("Iterated : ", iterated)
     fmt.Println("Goroutines finished..")


}

EDITED: after few suggestions on adding mutex and another channel, this is the modified consume function

func consumeData(input <-chan *IMDBDataModel, output chan *IMDBDataModel, wg *sync.WaitGroup) {
    defer wg.Done()
    for data := range input {
        iterated++
        // outLock.Lock()
        // out = append(out, data)
        // outLock.Unlock()
        output <- data
    }
}

however still consuming the same line (race occured) more than once.

....
My Date with Drew 
My Date with Drew 
My Date with Drew 
My Date with Drew 
My Date with Drew 
Total Channel : 0
Total IMDB : 5044
Total Data:  4944
Iterated :  5000
Goroutines finished..
  • 写回答

1条回答 默认 最新

  • dongliang2058 2017-08-18 12:52
    关注

    You issues is with:

    var out []*IMDBDataModel
    
    func consumeData(input <-chan *IMDBDataModel, wg *sync.WaitGroup){
         defer wg.Done()
         for data := range input {          
              iterated++          
              fmt.Printf("%d : %s
    ", iterated, data.MovieTitle)
              out = append(out, data)
         }
         fmt.Println("output size : ", len(out))
    
    }
    

    You are appending to "out" from multiple threads:

    try adding a lock around the places you write to "out" like this:

    var out []*IMDBDataModel
    var outLock sync.Mutex
    
    func consumeData(input <-chan *IMDBDataModel, wg *sync.WaitGroup){
         defer wg.Done()
         for data := range input {          
              iterated++          
              fmt.Printf("%d : %s
    ", iterated, data.MovieTitle)
              outLock.Lock()
              out = append(out, &data)
              outLock.Unlock()
         }
         outLock.Lock()
         fmt.Println("output size : ", len(out))
         outLock.Unlock()
    
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
  • ¥20 怎么用dlib库的算法识别小麦病虫害
  • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
  • ¥15 java写代码遇到问题,求帮助
  • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?
  • ¥15 有了解d3和topogram.js库的吗?有偿请教
  • ¥100 任意维数的K均值聚类
  • ¥15 stamps做sbas-insar,时序沉降图怎么画
  • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
  • ¥15 关于#Java#的问题,如何解决?