I have written this scheduler, but I am not able to make it "kill" that input function f
when f
takes more than the input recurring
time interval.
If that f
was a process instead of a thread, then this thing I am looking for could be some sort of defined hard preemption.
That f
definition is something I do not have control on. It represents an ETL job involving crunching data from multiple databases during a batch execution. That f
it's been written in go
and works fine, but I need somehow to have some sort of control on it taking too long to execute.
I know f
is atomic, so it either changes the database at the end of its execution or not. So it can be considered safe to "kill" it when it takes too long.
func schedule(f func(), recurring time.Duration) chan struct{} {
ticker := time.NewTicker(recurring)
quit := make(chan struct{})
go func(inFunc func()) {
for {
select {
case <-ticker.C:
fmt.Println("Ticked")
// when "go" is removed, then if "f()" takes
// more than "recurring", then it postpones
// the following executions of "f()"
//
// instead somehow it should be "killed"
//
// check the timestamps in the execution of the test
go inFunc()
case <-quit:
fmt.Println("Stopping the scheduler")
ticker.Stop()
return
}
}
}(f)
return quit
}
To see what's going on I've written this test:
func TestSlowExecutions(t *testing.T) {
// log some information using a human readable timestamp
dummyLog := func(format string, a ...interface{}) (n int, err error) {
prefix := fmt.Sprintf("[%v] ", time.Now())
message := fmt.Sprintf(format, a...)
return fmt.Printf("%s%s
", prefix, message)
}
// UUID to be able to uniquely identify "fooFunc"
newUuid := func() string {
// sudo apt-get install uuid-runtime
uuid, _ := exec.Command("uuidgen").Output()
re := regexp.MustCompile(`?
`)
uuidStr := re.ReplaceAllString(string(uuid), "")
return uuidStr
}
// simulate some sort of very slow execution
fooFunc := func() {
uuid := newUuid()
dummyLog("Ticked")
dummyLog("Starting task %s", uuid)
time.Sleep(2 * time.Second)
dummyLog("Finished task %s", uuid)
}
// test the very slow execution of "fooFunc"
quitChan := schedule(fooFunc, 1*time.Second)
time.Sleep(4 * time.Second)
close(quitChan)
// wait more to see the "closing" message
time.Sleep(4 * time.Second)
}