func init() {
//建立数据可链接
db, e = sqlx.Connect("mysql", "root:123456@tcp(127.0.0.1:3306)/gowk")
HandleError(e, "sqlx.Connect")
defer db.Close()
//必要时建表
_, e = db.Exec("create table if not exists t_bigData_kf(id int primary key auto_increment,name varchar(30),idcard char(18));")
HandleError(e, "db.Exec create table")
fmt.Println("数据表已创建")
//初始存取数据管道
chkp = make(chan *kfperson, 10000000)
chbd = make(chan *kfperson, 100)
//开启1000条协程写入数据,
for i := 0; i < 100; i++ {
go insertKftable()
}
//准备一个处理脏数据的协程
go writeBadTxt()
//创建一个失败数据Txt
A_bad, _ = os.OpenFile("E:/假数据/清洗数据/A_bad.txt", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0754)
defer A_bad.Close()
//读取大数据文本
file, e := os.Open("E:/假数据/清洗数据/A.txt")
HandleError(e, "os.Open")
defer file.Close()
//以缓存的方式读出,区别于ioutil.ReadFile()一次性读入文件到内存,不适合大文本的读取
reader := bufio.NewReader(file)
//kfps := new(kfperson)
for {
linStr, e := reader.ReadString('\n')
if e == io.EOF {
close(chbd)
close(chkp)
break
}
HandleError(e, "reader.ReadString")
linSplic := strings.Split(linStr, ",")
//kfps.Name, kfps.Idcard = linSplic[0], linSplic[1]
name,idcard:=linSplic[0], linSplic[1]
kfps := kfperson{Name: name, Idcard: idcard}
chkp <- &kfps
}
}
func writeBadTxt() {
writer := bufio.NewWriter(A_bad)
for ps := range chbd {
writer.WriteString(ps.Name + "," + ps.Idcard + "\n")
fmt.Println("***************************************", "Gid:", GetGID())
}
writer.Flush()
}
func insertKftable() {
for ps := range chkp {
_, e := db.Exec("insert into t_bigData_kf(name,idcard) values(?,?);", ps.Name, ps.Idcard)
if e == nil {
fmt.Println("chkp:", len(chkp), "\t", "Gid:", GetGID())
} else {
chbd <- ps
}
}
}
```这些代码我看了一个晚上,我找不出问题在哪里?请求高手指点迷津!