Goroutines和Mysql max连接

When I run functions in parallel that make use of mysql driver(Gorm), I find that when I set the following: db.DB().SetMaxOpenConns(30) db.DB().SetMaxIdleConns(10)

I get a clean output without any panics/error messages.

My database server variable for max_connection is set 100. I find if I SetMaxOpenConns to 50 or above I start to receive

[mysql] 2017/01/25 14:26:09 packets.go:33: read tcp 127.0.0.1:60614->127.0.0.1:3306: read: connection reset by peer
[mysql] 2017/01/25 14:26:09 packets.go:130: write tcp 127.0.0.1:60614->127.0.0.1:3306: write: broken pipe

If I just print the error and not panic then the errors eventually go away and the script finishes processing.

I'm having a tough time understanding how to manipulate these values. The Max Open connWhy when the open connections rate is lower that the script seems to have no problem connecting/reading but a higher max below the server limit is problematic.

MariaDB Version: 10.1.21 Go version: 1.7.4 There are 2.1m records in the csv file that I'm parsing. I'm batching 2000 records into each insert string

Here is a portion of the main code

func main(){
    //Setup and Connect to database 
    //Set variables
    //Read CSV File
    dbConnect()
    defer db.Close()
    db.DB().SetMaxOpenConns(30)
    db.DB().SetMaxIdleConns(10)
    db.DB().SetConnMaxLifetime(time.Second * 14400)
    filehandle, err := os.Open(physicianCSV)
    checkErr(err)
    defer filehandle.Close()

    reader := csv.NewReader(filehandle)
    _, err = reader.Read()
    checkErr(err)

    for i := 0; i <= readLimit; i++ {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                break
            }
            panic(err)
        }
        physician = convertCSVRecordToPhysician(record)
        //..Do stuff with physician, edit struct properties

        physicians = append(physicians, physician)

        if math.Mod(float64(i), float64(bulkAmount)) == 0 && i != 0 {
            fmt.Println(i, "Records: From ", i-bulkAmount, "to", i)
            wg.Add(1)
            sliceOfPhys := make([]Physician, bulkAmount)
            copy(sliceOfPhys, physicians)
            go bulkSavePhysicians(sliceOfPhys)
            physicians = physicians[:0]

        }
    }

    fmt.Println(readLimit, "records inserted")
    wg.Wait()
}

func bulkSavePhysicians(_physicians []Physician) {
    defer func() {
        if x := recover(); x != nil {
            fmt.Println(x)
        }
    }()
    defer wg.Done()

    sqlStringArray := buildSQLStatements(_physicians)
    batchSQL := fmt.Sprintf("insert into physicians values %s ;", strings.Join(sqlStringArray, ","))
    tx := db.Begin()
    errors := tx.Exec(batchSQL).GetErrors()
    if len(errors) > 0 {
        panic(errors)
    }
    tx.Commit()
}

func buildSQLStatements(_physicians []Physician) []string {

    var valueStr string
    var valueArr []string
    for _, phys := range _physicians {

        valueStr = fmt.Sprintf(`( "%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s" )`, phys.NPI,
            phys.PACID,
            phys.ProfessionalEnrollmentID,
            strings.Replace(phys.LastName, "'", "\\'", -1),
            phys.FirstName,
            phys.MiddleName,
            phys.Suffix,
            phys.Gender,
            phys.Credential,
            strings.Replace(phys.MedicalSchoolName, "'", "\\'", -1),
            phys.GraduationYear,
            phys.PrimarySpecialty,
            phys.SecondarySpecialty1,
            phys.SecondarySpecialty2,
            phys.SecondarySpecialty3,
            phys.SecondarySpecialty4,
            phys.AllSecondarySpecialties,
            strings.Replace(phys.OrganizationLegalName, "'", "\\'", -1),
            phys.GroupPracticePACID,
            phys.NumberOfGroupPracticeMembers,
            strings.Replace(phys.Line1StreetAddress, "'", "\\'", -1),
            phys.Line2StreetAddress,
            phys.MarkerOfAddressLine2Suppression,
            phys.City,
            phys.State,
            phys.ZipCode,
            phys.PhoneNumber,
            phys.HospitalAffiliationCCN1,
            phys.HospitalAffiliationLBN1,
            phys.HospitalAffiliationCCN2,
            phys.HospitalAffiliationLBN2,
            phys.HospitalAffiliationCCN3,
            phys.HospitalAffiliationLBN3,
            phys.HospitalAffiliationCCN4,
            phys.HospitalAffiliationLBN4,
            phys.HospitalAffiliationCCN5,
            phys.HospitalAffiliationLBN5,
            phys.ProfessionalAcceptsMedicareAssignment,
            phys.ReportedQualityMeasures,
            phys.UsedElectronicHealthRecords,
            phys.ParticipatedInTheMedicareMaintenance,
            phys.CommittedToHeartHealth,
            phys.SpecialtyID)

        valueArr = append(valueArr, valueStr)
    }
    return valueArr
}
duanqun7761
duanqun7761 替代批量sql字符串的是带有多个值的预准备语句,请参见此处。
3 年多之前 回复
dongpu6141
dongpu6141 如果插入到一个磁盘上的一个表中,则大约5个以上的连接可能会变慢,因为连接都争夺同一资源。较少的连接意味着较少的争用,因此请尝试将goroutine的数量减少到5。max-allowed-packet允许单个插入语句的大小,插入数据的各种大小可以解释为什么错误是偶发的。增加最大允许数据包的替代方法是减少插入批数。另外,请考虑在批量插入之前关闭自动提交功能。
3 年多之前 回复
doupin8555
doupin8555 我编辑了代码以重建csv,然后在mariadb控制台中使用了LOADData查询。花了约35分钟。感谢您的LOADDATA链接,因为它说连接丢失会受到最大允许数据包限制的影响。我的服务器上的限制是默认的16MB。我增加了限制10150MB。然后,我可以将打开的连接数增加到50个而不会出现错误。内存使用量超过1GB,但接下来我将通过一些性能分析来解决。
3 年多之前 回复
douzao2992
douzao2992 插入所有2m条记录通常需要多长时间?
3 年多之前 回复
douyuefei3546
douyuefei3546 似乎更多的连接正在达到一些资源限制。代码是否在所有情况下都检查所有可能的错误?例如,tx:=db.Begin()可能会引发错误,如果不加以处理,则可能导致读/写错误。同样在代码中,当发生紧急情况时,Tx不会回滚,因此基础连接将保持打开状态。如此恐慌和恢复并不常见,只需回滚并返回即可。tx.Exec(batchSQL).GetErrors()很奇怪-使用什么驱动程序?可以使用LOADDATA代替吗?
3 年多之前 回复
doujuan2688
doujuan2688 由对等方重置的连接表明mysql服务器正在关闭go客户端尝试读取的tcp连接。请添加mysql&go版本。您可以张贴可重现此问题的代码吗?发送了多少个请求?有多少个并发请求?错误会在一段时间后还是立即发生?
3 年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问