dongtaotao19830418 2017-01-25 19:39
浏览 53

Goroutines和Mysql max连接

When I run functions in parallel that make use of mysql driver(Gorm), I find that when I set the following: db.DB().SetMaxOpenConns(30) db.DB().SetMaxIdleConns(10)

I get a clean output without any panics/error messages.

My database server variable for max_connection is set 100. I find if I SetMaxOpenConns to 50 or above I start to receive

[mysql] 2017/01/25 14:26:09 packets.go:33: read tcp 127.0.0.1:60614->127.0.0.1:3306: read: connection reset by peer
[mysql] 2017/01/25 14:26:09 packets.go:130: write tcp 127.0.0.1:60614->127.0.0.1:3306: write: broken pipe

If I just print the error and not panic then the errors eventually go away and the script finishes processing.

I'm having a tough time understanding how to manipulate these values. The Max Open connWhy when the open connections rate is lower that the script seems to have no problem connecting/reading but a higher max below the server limit is problematic.

MariaDB Version: 10.1.21 Go version: 1.7.4 There are 2.1m records in the csv file that I'm parsing. I'm batching 2000 records into each insert string

Here is a portion of the main code

func main(){
    //Setup and Connect to database 
    //Set variables
    //Read CSV File
    dbConnect()
    defer db.Close()
    db.DB().SetMaxOpenConns(30)
    db.DB().SetMaxIdleConns(10)
    db.DB().SetConnMaxLifetime(time.Second * 14400)
    filehandle, err := os.Open(physicianCSV)
    checkErr(err)
    defer filehandle.Close()

    reader := csv.NewReader(filehandle)
    _, err = reader.Read()
    checkErr(err)

    for i := 0; i <= readLimit; i++ {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                break
            }
            panic(err)
        }
        physician = convertCSVRecordToPhysician(record)
        //..Do stuff with physician, edit struct properties

        physicians = append(physicians, physician)

        if math.Mod(float64(i), float64(bulkAmount)) == 0 && i != 0 {
            fmt.Println(i, "Records: From ", i-bulkAmount, "to", i)
            wg.Add(1)
            sliceOfPhys := make([]Physician, bulkAmount)
            copy(sliceOfPhys, physicians)
            go bulkSavePhysicians(sliceOfPhys)
            physicians = physicians[:0]

        }
    }

    fmt.Println(readLimit, "records inserted")
    wg.Wait()
}

func bulkSavePhysicians(_physicians []Physician) {
    defer func() {
        if x := recover(); x != nil {
            fmt.Println(x)
        }
    }()
    defer wg.Done()

    sqlStringArray := buildSQLStatements(_physicians)
    batchSQL := fmt.Sprintf("insert into physicians values %s ;", strings.Join(sqlStringArray, ","))
    tx := db.Begin()
    errors := tx.Exec(batchSQL).GetErrors()
    if len(errors) > 0 {
        panic(errors)
    }
    tx.Commit()
}

func buildSQLStatements(_physicians []Physician) []string {

    var valueStr string
    var valueArr []string
    for _, phys := range _physicians {

        valueStr = fmt.Sprintf(`( "%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s" )`, phys.NPI,
            phys.PACID,
            phys.ProfessionalEnrollmentID,
            strings.Replace(phys.LastName, "'", "\\'", -1),
            phys.FirstName,
            phys.MiddleName,
            phys.Suffix,
            phys.Gender,
            phys.Credential,
            strings.Replace(phys.MedicalSchoolName, "'", "\\'", -1),
            phys.GraduationYear,
            phys.PrimarySpecialty,
            phys.SecondarySpecialty1,
            phys.SecondarySpecialty2,
            phys.SecondarySpecialty3,
            phys.SecondarySpecialty4,
            phys.AllSecondarySpecialties,
            strings.Replace(phys.OrganizationLegalName, "'", "\\'", -1),
            phys.GroupPracticePACID,
            phys.NumberOfGroupPracticeMembers,
            strings.Replace(phys.Line1StreetAddress, "'", "\\'", -1),
            phys.Line2StreetAddress,
            phys.MarkerOfAddressLine2Suppression,
            phys.City,
            phys.State,
            phys.ZipCode,
            phys.PhoneNumber,
            phys.HospitalAffiliationCCN1,
            phys.HospitalAffiliationLBN1,
            phys.HospitalAffiliationCCN2,
            phys.HospitalAffiliationLBN2,
            phys.HospitalAffiliationCCN3,
            phys.HospitalAffiliationLBN3,
            phys.HospitalAffiliationCCN4,
            phys.HospitalAffiliationLBN4,
            phys.HospitalAffiliationCCN5,
            phys.HospitalAffiliationLBN5,
            phys.ProfessionalAcceptsMedicareAssignment,
            phys.ReportedQualityMeasures,
            phys.UsedElectronicHealthRecords,
            phys.ParticipatedInTheMedicareMaintenance,
            phys.CommittedToHeartHealth,
            phys.SpecialtyID)

        valueArr = append(valueArr, valueStr)
    }
    return valueArr
}
  • 写回答

0条回答

    报告相同问题?

    悬赏问题

    • ¥15 jupyterthemes 设置完毕后没有效果
    • ¥15 matlab图像高斯低通滤波
    • ¥15 针对曲面部件的制孔路径规划,大家有什么思路吗
    • ¥15 钢筋实图交点识别,机器视觉代码
    • ¥15 如何在Linux系统中,但是在window系统上idea里面可以正常运行?(相关搜索:jar包)
    • ¥50 400g qsfp 光模块iphy方案
    • ¥15 两块ADC0804用proteus仿真时,出现异常
    • ¥15 关于风控系统,如何去选择
    • ¥15 这款软件是什么?需要能满足我的需求
    • ¥15 SpringSecurityOauth2登陆前后request不一致