dongtaotao19830418 2017-01-25 19:39
浏览 53

Goroutines和Mysql max连接

When I run functions in parallel that make use of mysql driver(Gorm), I find that when I set the following: db.DB().SetMaxOpenConns(30) db.DB().SetMaxIdleConns(10)

I get a clean output without any panics/error messages.

My database server variable for max_connection is set 100. I find if I SetMaxOpenConns to 50 or above I start to receive

[mysql] 2017/01/25 14:26:09 packets.go:33: read tcp 127.0.0.1:60614->127.0.0.1:3306: read: connection reset by peer
[mysql] 2017/01/25 14:26:09 packets.go:130: write tcp 127.0.0.1:60614->127.0.0.1:3306: write: broken pipe

If I just print the error and not panic then the errors eventually go away and the script finishes processing.

I'm having a tough time understanding how to manipulate these values. The Max Open connWhy when the open connections rate is lower that the script seems to have no problem connecting/reading but a higher max below the server limit is problematic.

MariaDB Version: 10.1.21 Go version: 1.7.4 There are 2.1m records in the csv file that I'm parsing. I'm batching 2000 records into each insert string

Here is a portion of the main code

func main(){
    //Setup and Connect to database 
    //Set variables
    //Read CSV File
    dbConnect()
    defer db.Close()
    db.DB().SetMaxOpenConns(30)
    db.DB().SetMaxIdleConns(10)
    db.DB().SetConnMaxLifetime(time.Second * 14400)
    filehandle, err := os.Open(physicianCSV)
    checkErr(err)
    defer filehandle.Close()

    reader := csv.NewReader(filehandle)
    _, err = reader.Read()
    checkErr(err)

    for i := 0; i <= readLimit; i++ {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                break
            }
            panic(err)
        }
        physician = convertCSVRecordToPhysician(record)
        //..Do stuff with physician, edit struct properties

        physicians = append(physicians, physician)

        if math.Mod(float64(i), float64(bulkAmount)) == 0 && i != 0 {
            fmt.Println(i, "Records: From ", i-bulkAmount, "to", i)
            wg.Add(1)
            sliceOfPhys := make([]Physician, bulkAmount)
            copy(sliceOfPhys, physicians)
            go bulkSavePhysicians(sliceOfPhys)
            physicians = physicians[:0]

        }
    }

    fmt.Println(readLimit, "records inserted")
    wg.Wait()
}

func bulkSavePhysicians(_physicians []Physician) {
    defer func() {
        if x := recover(); x != nil {
            fmt.Println(x)
        }
    }()
    defer wg.Done()

    sqlStringArray := buildSQLStatements(_physicians)
    batchSQL := fmt.Sprintf("insert into physicians values %s ;", strings.Join(sqlStringArray, ","))
    tx := db.Begin()
    errors := tx.Exec(batchSQL).GetErrors()
    if len(errors) > 0 {
        panic(errors)
    }
    tx.Commit()
}

func buildSQLStatements(_physicians []Physician) []string {

    var valueStr string
    var valueArr []string
    for _, phys := range _physicians {

        valueStr = fmt.Sprintf(`( "%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s" )`, phys.NPI,
            phys.PACID,
            phys.ProfessionalEnrollmentID,
            strings.Replace(phys.LastName, "'", "\\'", -1),
            phys.FirstName,
            phys.MiddleName,
            phys.Suffix,
            phys.Gender,
            phys.Credential,
            strings.Replace(phys.MedicalSchoolName, "'", "\\'", -1),
            phys.GraduationYear,
            phys.PrimarySpecialty,
            phys.SecondarySpecialty1,
            phys.SecondarySpecialty2,
            phys.SecondarySpecialty3,
            phys.SecondarySpecialty4,
            phys.AllSecondarySpecialties,
            strings.Replace(phys.OrganizationLegalName, "'", "\\'", -1),
            phys.GroupPracticePACID,
            phys.NumberOfGroupPracticeMembers,
            strings.Replace(phys.Line1StreetAddress, "'", "\\'", -1),
            phys.Line2StreetAddress,
            phys.MarkerOfAddressLine2Suppression,
            phys.City,
            phys.State,
            phys.ZipCode,
            phys.PhoneNumber,
            phys.HospitalAffiliationCCN1,
            phys.HospitalAffiliationLBN1,
            phys.HospitalAffiliationCCN2,
            phys.HospitalAffiliationLBN2,
            phys.HospitalAffiliationCCN3,
            phys.HospitalAffiliationLBN3,
            phys.HospitalAffiliationCCN4,
            phys.HospitalAffiliationLBN4,
            phys.HospitalAffiliationCCN5,
            phys.HospitalAffiliationLBN5,
            phys.ProfessionalAcceptsMedicareAssignment,
            phys.ReportedQualityMeasures,
            phys.UsedElectronicHealthRecords,
            phys.ParticipatedInTheMedicareMaintenance,
            phys.CommittedToHeartHealth,
            phys.SpecialtyID)

        valueArr = append(valueArr, valueStr)
    }
    return valueArr
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
    • ¥15 解riccati方程组
    • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
    • ¥30 用arduino开发esp32控制ps2手柄一直报错
    • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
    • ¥15 求chat4.0解答一道线性规划题,用lingo编程运行,第一问要求写出数学模型和lingo语言编程模型,第二问第三问解答就行,我的ddl要到了谁来求了
    • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
    • ¥50 树莓派安卓APK系统签名
    • ¥65 汇编语言除法溢出问题
    • ¥15 Visual Studio问题