When I run functions in parallel that make use of mysql driver(Gorm), I find that when I set the following: db.DB().SetMaxOpenConns(30) db.DB().SetMaxIdleConns(10)
I get a clean output without any panics/error messages.
My database server variable for max_connection is set 100. I find if I SetMaxOpenConns to 50 or above I start to receive
[mysql] 2017/01/25 14:26:09 packets.go:33: read tcp 127.0.0.1:60614->127.0.0.1:3306: read: connection reset by peer
[mysql] 2017/01/25 14:26:09 packets.go:130: write tcp 127.0.0.1:60614->127.0.0.1:3306: write: broken pipe
If I just print the error and not panic then the errors eventually go away and the script finishes processing.
I'm having a tough time understanding how to manipulate these values. The Max Open connWhy when the open connections rate is lower that the script seems to have no problem connecting/reading but a higher max below the server limit is problematic.
MariaDB Version: 10.1.21 Go version: 1.7.4 There are 2.1m records in the csv file that I'm parsing. I'm batching 2000 records into each insert string
Here is a portion of the main code
func main(){
//Setup and Connect to database
//Set variables
//Read CSV File
dbConnect()
defer db.Close()
db.DB().SetMaxOpenConns(30)
db.DB().SetMaxIdleConns(10)
db.DB().SetConnMaxLifetime(time.Second * 14400)
filehandle, err := os.Open(physicianCSV)
checkErr(err)
defer filehandle.Close()
reader := csv.NewReader(filehandle)
_, err = reader.Read()
checkErr(err)
for i := 0; i <= readLimit; i++ {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
physician = convertCSVRecordToPhysician(record)
//..Do stuff with physician, edit struct properties
physicians = append(physicians, physician)
if math.Mod(float64(i), float64(bulkAmount)) == 0 && i != 0 {
fmt.Println(i, "Records: From ", i-bulkAmount, "to", i)
wg.Add(1)
sliceOfPhys := make([]Physician, bulkAmount)
copy(sliceOfPhys, physicians)
go bulkSavePhysicians(sliceOfPhys)
physicians = physicians[:0]
}
}
fmt.Println(readLimit, "records inserted")
wg.Wait()
}
func bulkSavePhysicians(_physicians []Physician) {
defer func() {
if x := recover(); x != nil {
fmt.Println(x)
}
}()
defer wg.Done()
sqlStringArray := buildSQLStatements(_physicians)
batchSQL := fmt.Sprintf("insert into physicians values %s ;", strings.Join(sqlStringArray, ","))
tx := db.Begin()
errors := tx.Exec(batchSQL).GetErrors()
if len(errors) > 0 {
panic(errors)
}
tx.Commit()
}
func buildSQLStatements(_physicians []Physician) []string {
var valueStr string
var valueArr []string
for _, phys := range _physicians {
valueStr = fmt.Sprintf(`( "%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s" )`, phys.NPI,
phys.PACID,
phys.ProfessionalEnrollmentID,
strings.Replace(phys.LastName, "'", "\\'", -1),
phys.FirstName,
phys.MiddleName,
phys.Suffix,
phys.Gender,
phys.Credential,
strings.Replace(phys.MedicalSchoolName, "'", "\\'", -1),
phys.GraduationYear,
phys.PrimarySpecialty,
phys.SecondarySpecialty1,
phys.SecondarySpecialty2,
phys.SecondarySpecialty3,
phys.SecondarySpecialty4,
phys.AllSecondarySpecialties,
strings.Replace(phys.OrganizationLegalName, "'", "\\'", -1),
phys.GroupPracticePACID,
phys.NumberOfGroupPracticeMembers,
strings.Replace(phys.Line1StreetAddress, "'", "\\'", -1),
phys.Line2StreetAddress,
phys.MarkerOfAddressLine2Suppression,
phys.City,
phys.State,
phys.ZipCode,
phys.PhoneNumber,
phys.HospitalAffiliationCCN1,
phys.HospitalAffiliationLBN1,
phys.HospitalAffiliationCCN2,
phys.HospitalAffiliationLBN2,
phys.HospitalAffiliationCCN3,
phys.HospitalAffiliationLBN3,
phys.HospitalAffiliationCCN4,
phys.HospitalAffiliationLBN4,
phys.HospitalAffiliationCCN5,
phys.HospitalAffiliationLBN5,
phys.ProfessionalAcceptsMedicareAssignment,
phys.ReportedQualityMeasures,
phys.UsedElectronicHealthRecords,
phys.ParticipatedInTheMedicareMaintenance,
phys.CommittedToHeartHealth,
phys.SpecialtyID)
valueArr = append(valueArr, valueStr)
}
return valueArr
}