dongtuo4723 2018-01-12 15:51
浏览 192
已采纳

如何在计划的时间间隔内强迫执行时间太长的函数执行中断

I have written this scheduler, but I am not able to make it "kill" that input function f when f takes more than the input recurring time interval.

If that f was a process instead of a thread, then this thing I am looking for could be some sort of defined hard preemption.

That f definition is something I do not have control on. It represents an ETL job involving crunching data from multiple databases during a batch execution. That f it's been written in go and works fine, but I need somehow to have some sort of control on it taking too long to execute.

I know f is atomic, so it either changes the database at the end of its execution or not. So it can be considered safe to "kill" it when it takes too long.

func schedule(f func(), recurring time.Duration) chan struct{} {
    ticker := time.NewTicker(recurring)
    quit := make(chan struct{})
    go func(inFunc func()) {
        for {
            select {
            case <-ticker.C:
                fmt.Println("Ticked")
                // when "go" is removed, then if "f()" takes
                // more than "recurring", then it postpones
                // the following executions of "f()"
                //
                // instead somehow it should be "killed"
                // 
                // check the timestamps in the execution of the test
                go inFunc()
            case <-quit:
                fmt.Println("Stopping the scheduler")
                ticker.Stop()
                return
            }
        }
    }(f)

    return quit
}

To see what's going on I've written this test:

func TestSlowExecutions(t *testing.T) {
    // log some information using a human readable timestamp
    dummyLog := func(format string, a ...interface{}) (n int, err error) {
        prefix := fmt.Sprintf("[%v] ", time.Now())
        message := fmt.Sprintf(format, a...)
        return fmt.Printf("%s%s
", prefix, message)
    }

    // UUID to be able to uniquely identify "fooFunc"
    newUuid := func() string {
        // sudo apt-get install uuid-runtime
        uuid, _ := exec.Command("uuidgen").Output()

        re := regexp.MustCompile(`?
`)
        uuidStr := re.ReplaceAllString(string(uuid), "")
        return uuidStr
    }

    // simulate some sort of very slow execution
    fooFunc := func() {
        uuid := newUuid()
        dummyLog("Ticked")
        dummyLog("Starting task %s", uuid)
        time.Sleep(2 * time.Second)
        dummyLog("Finished task %s", uuid)
    }

    // test the very slow execution of "fooFunc"
    quitChan := schedule(fooFunc, 1*time.Second)

    time.Sleep(4 * time.Second)
    close(quitChan)
    // wait more to see the "closing" message
    time.Sleep(4 * time.Second)
}
  • 写回答

1条回答 默认 最新

  • douren1928 2018-01-15 12:07
    关注

    I negotiated the usage of a context with timeout (https://golang.org/pkg/context/#WithTimeout) with the author of f().

    See below for a working example, paying attention to the timestamps of the dummyLog so it should be clear what's happening on all the go routines involved in this process.

    The code:

    // dummyLog could be used to log some information using a human readable timestamp and the benefits of `fmt.Sprintf`
    func dummyLog(format string, a ...interface{}) (n int, err error) {
        prefix := fmt.Sprintf("[%v] ", time.Now())
        message := fmt.Sprintf(format, a...)
        return fmt.Printf("%s%s
    ", prefix, message)
    }
    
    // newContext is providing a brand new context with a upper bound timeout
    func newContext(timeoutUpperBound time.Duration) (context.Context, context.CancelFunc) {
        ctx, cancel := context.WithTimeout(context.Background(), timeoutUpperBound)
        deadline, ok := ctx.Deadline()
        dummyLog("The context deadline is set to %s is it still valid? %v", deadline, ok)
        return ctx, cancel
    }
    
    // schedule could be used to schedule arbitrary functions with a recurring interval
    func schedule(f func(ctx context.Context), recurring time.Duration) chan struct{} {
        ticker := time.NewTicker(recurring)
        quit := make(chan struct{})
        go func(inFunc func(ctx context.Context)) {
            for {
                select {
                case <-ticker.C:
                    dummyLog("Ticked in the scheduler")
                    // simulate the "killing" of "inFunc" when it takes too long
                    go func(recurring time.Duration) {
                        inCtx, cancel := newContext(recurring)
                        defer cancel()
                        inFunc(inCtx)
                    }(recurring)
                case <-quit:
                    dummyLog("Stopping the scheduler")
                    ticker.Stop()
                    return
                }
            }
        }(f)
    
        return quit
    }
    

    The execution of the code in a testing environment (although not assertions have been performed):

    func TestSomething(t *testing.T) {
    
        // newUuid could be used to generate a UUID to be able to uniquely identify "fooFunc"
        newUuid := func() string {
            // sudo apt-get install uuid-runtime
            uuid, _ := exec.Command("uuidgen").Output()
    
            re := regexp.MustCompile(`?
    `)
            uuidStr := re.ReplaceAllString(string(uuid), "")
            return uuidStr
        }
    
        // randBetween is a dummy random int generator using "math/rand"
        randBetween := func(min int, max int) int {
            return min + rand.Intn(max-min)
        }
    
        // fooFunc simulates some sort of very slow execution
        // like database queries or network I/O
        fooFunc := func(ctx context.Context) {
            uuid := newUuid()
            randWait := time.Duration(randBetween(0, 4000)) * time.Millisecond
            dummyLog("Starting task %s taking %s random time", uuid, randWait)
            select {
            case <-time.After(randWait):
                dummyLog("Finished task %s", uuid)
            case <-ctx.Done():
                dummyLog("Killed task %s, reason: '%s'", uuid, ctx.Err())
            }
        }
    
        // test the very slow execution of "fooFunc"
        timeoutUpperBound := 2 * time.Second
        quitChan := schedule(fooFunc, timeoutUpperBound)
    
        time.Sleep(6 * timeoutUpperBound)
        close(quitChan)
        // wait more to see the "closing" message
        time.Sleep(4 * time.Second)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 帮我写一个c++工程
  • ¥30 Eclipse官网打不开,官网首页进不去,显示无法访问此页面,求解决方法
  • ¥15 关于smbclient 库的使用
  • ¥15 微信小程序协议怎么写
  • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
  • ¥20 怎么用dlib库的算法识别小麦病虫害
  • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
  • ¥15 java写代码遇到问题,求帮助
  • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?
  • ¥15 有了解d3和topogram.js库的吗?有偿请教