dougou8639 2019-06-01 01:56
浏览 65

种族条件读取子进程的标准输出和标准错误

In Go, I'm trying to:

  1. start a subprocess
  2. read from stdout and stderr separately
  3. implement an overall timeout

After much googling, we've come up with some code that seems to do the job, most of the time. But there seems to be a race condition whereby some output is not read.

The problem seems to only occur on Linux, not Windows.

Following the simplest possible solution found with google, we tried creating a context with a timeout:

context.WithTimeout(context.Background(), 10*time.Second)

While this worked most of the time, we were able to find cases where it would just hang forever. There was some aspect of the child process that caused this to deadlock. (Something to do with grandchildren that were not sufficiently dissasociated from the child process, and thus caused the child to never completely exit.)

Also, it seemed that in some cases the error that is returned when the timeout occurrs would indicate a timeout, but would only be delivered after the process had actually exited (thus making the whole concept of the timeout useless).

func GetOutputsWithTimeout(command string, args []string, timeout int) (io.ReadCloser, io.ReadCloser, int, error) {
    start := time.Now()
    procLogger.Tracef("Initializing %s %+v", command, args)
    cmd := exec.Command(command, args...)

    // get pipes to standard output/error
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.StdoutPipe() error: %+v", err.Error())
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.StderrPipe() error: %+v", err.Error())
    }

    // setup buffers to capture standard output and standard error
    var buf bytes.Buffer
    var ebuf bytes.Buffer

    // create a channel to capture any errors from wait
    done := make(chan error)
    // create a semaphore to indicate when both pipes are closed
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        if _, err := buf.ReadFrom(stdout); err != nil {
            procLogger.Debugf("%s: Error Slurping stdout: %+v", command, err)
        }
        wg.Done()
    }()
    go func() {
        if _, err := ebuf.ReadFrom(stderr); err != nil {
            procLogger.Debugf("%s: Error  Slurping stderr: %+v", command, err)
        }
        wg.Done()
    }()

    // start process
    procLogger.Debugf("Starting %s", command)
    if err := cmd.Start(); err != nil {
        procLogger.Errorf("%s: failed to start: %+v", command, err)
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.Start() error: %+v", err.Error())
    }

    go func() {
        procLogger.Debugf("Waiting for %s (%d) to finish", command, cmd.Process.Pid)
        err := cmd.Wait()                                             // this can  be 'forced' by the killing of the process
        procLogger.Tracef("%s finished: errStatus=%+v", command, err) // err could be nil here
        //notify select of completion, and the status
        done <- err
    }()

    // Wait for timeout or completion.
    select {
    // Timed out
    case <-time.After(time.Duration(timeout) * time.Second):
        elapsed := time.Since(start)
        procLogger.Errorf("%s: timeout after %.1f
", command, elapsed.Seconds())
        if err := TerminateTree(cmd); err != nil {
            return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), -1,
                fmt.Errorf("failed to kill %s, pid=%d: %+v",
                    command, cmd.Process.Pid, err)
        }
        wg.Wait() // this *should* take care of waiting for stdout and stderr to be collected after we killed the process
        return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), -1,
            fmt.Errorf("%s: timeout %d s reached, pid=%d process killed",
                command, timeout, cmd.Process.Pid)
    //Exited normally or with a non-zero exit code
    case err := <-done:
        wg.Wait() // this *should* take care of waiting for stdout and stderr to be collected after the process terminated naturally.
        elapsed := time.Since(start)
        procLogger.Tracef("%s: Done after %.1f
", command, elapsed.Seconds())
        rc := -1
        // Note that we have to use go1.10 compatible mechanism.
        if err != nil {
            procLogger.Tracef("%s exited with error: %+v", command, err)
            exitErr, ok := err.(*exec.ExitError)
            if ok {
                ws := exitErr.Sys().(syscall.WaitStatus)
                rc = ws.ExitStatus()
            }
            procLogger.Debugf("%s exited with status %d", command, rc)
            return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), rc,
                fmt.Errorf("%s: process done with error: %+v",
                    command, err)
        } else {
            ws := cmd.ProcessState.Sys().(syscall.WaitStatus)
            rc = ws.ExitStatus()
        }
        procLogger.Debugf("%s exited with status %d", command, rc)
        return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), rc, nil
    }
    //NOTREACHED: should not reach this line!
}

Calling GetOutputsWithTimeout("uname",[]string{"-mpi"},10) will return the expected single line of output most of the time. But sometimes it will return no output, as if the goroutine that reads stdout didn't start soon enough to "catch" all the output (or exited early?) The "most of the time" strongly suggests a race condition.

We will also sometimes see errors from the goroutines about "file already closed" (this seems to happen with the timeout condition, but will happen at other "normal" times as well).

I would have thought that starting the goroutines before the cmd.Start() would have ensured that no output would be missed, and that using the WaitGroup would guarantee they would both complete before reading the buffers.

So how are we missing output? Is there still a race condition between the two "reader" goroutines and the cmd.Start()? Should we ensure those two are running using yet another WaitGroup?

Or is there a problem with the implementation of ReadFrom()?

Note that we are currently using go1.10 due to backward-compatibility problems with older OSs but the same effect occurs with go1.12.4.

Or are we overthinking this, and a simple implementation with context.WithTimeout() would do the job?

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥50 MATLAB实现圆柱体容器内球形颗粒堆积
    • ¥15 python如何将动态的多个子列表,拼接后进行集合的交集
    • ¥20 vitis-ai量化基于pytorch框架下的yolov5模型
    • ¥15 如何实现H5在QQ平台上的二次分享卡片效果?
    • ¥15 python爬取bilibili校园招聘网站
    • ¥30 求解达问题(有红包)
    • ¥15 请解包一个pak文件
    • ¥15 不同系统编译兼容问题
    • ¥100 三相直流充电模块对数字电源芯片在物理上它必须具备哪些功能和性能?
    • ¥30 数字电源对DSP芯片的具体要求