doucanshou6998 2017-04-16 11:29
浏览 81
已采纳

如何停止被进程启动的外部I / O阻塞的goroutine?

I'm having a problem here that I can't exit goroutine safely.

I'm having an external process created using exec.Command (storing cmd, stdin pipe and stdout pipe of the process):

exec.Command(args[0], args[1]...) // args[0] is a base command

Whenever there is a need to start that process I'm calling:

cmd.Start()

Then upon start and attaching I'm running 2 goroutines:

shutdown := make(chan struct{})
// Run the routine which will read from process and send the data to CmdIn channel
go pr.cmdInRoutine()
// Run the routine which will read from CmdOut and write to process
go pr.cmdOutRoutine()

cmdInRoutine:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    for {
        println("CMDINROUTINE")
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        default:
            println("Inside the for loop of the CmdInRoutine")
            if pr.stdOutPipe == nil {
                println("!!! Standard output pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            buf := make([]byte, 2048)

            size, err := pr.stdOutPipe.Read(buf)
            if err != nil {
                println("!!! Sending exit request from cmdInRoutine !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            println("--- Received data for sending to CmdIn:", string(buf[:size]))
            pr.CmdIn <- buf[:size]
        }

    }
}

cmdOutRoutine:

func (pr *ExternalProcess) cmdOutRoutine() {
    app.At(te, "cmdOutRoutine")

    for {
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdOutRoutine down !!!")
            return
        case data := <-pr.CmdOut:
            println("Received data for sending to Process: ", data)
            if pr.stdInPipe == nil {
                println("!!! Standard input pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                return
            }

            println("--- Received input to write to external process:", string(data))
            _, err := pr.stdInPipe.Write(append(data, '
'))
            if err != nil {
                println("!!! Couldn't Write To the std in pipe of the process !!!")
                pr.ProcessExit <- true
                return
            }
        }
    }
}

Interesting cases here:

1) When process sends EOF (don't mind the pr.ProcessExit <- true I was notifying to the parent handler using a channel to stop and exit the process) in cmdInRoutine I'm closing the shutdown channel too and this lets cmdOutRoutine to exit because inside the select statement there's no default case so it blocks and waits for either exit or data in which then gets written to the running process using stored stdInPipe.

2) When I want to just stop goroutines but leave process running i.e kind of pausing the read and write I'm closing the shutdown channel with the hope that those 2 goroutines will end.
- cmdOutRoutine prints !!! Shutting cmdOutRoutine down !!! because select doesn't have default case and closing the shutdown channel causes returning almost immediately
- cmdOutRoutine doesn't print anything and I'm having a weird feeling that it's not even returning, I think because It's blocked in the default case at reading from stdInPipe.

I was thinking of running another goroutine inside the cmdOutRoutine before the for loop and have stdIn data of the process converted into a channel then I would be able to eliminate default case in cmdInRoutine but this creates another problem, that new goroutine also has to be stopped it will still be blocked by the reading from the stdIn of the running process.

Any ideas how can I resolve this (modify the logic) to meet the needs of shutting down anytime and starting goroutines (process I/O) but not the running process itself? Or Is there a way to avoid blocking calls of read and write at all, that I'm not aware of yet?

Thanks a lot.

  • 写回答

1条回答 默认 最新

  • doo6568 2017-04-17 20:42
    关注

    It's probably blocked at pr.stdOutPipe.Read(buf). You could try closing pr.stdOutPipe, that should interrupt the Read.

    You can also close pr.stdInPipe, to make sure the Write doesn't block.

    Edit: This won't allow you to reattach, but there's no other way to interrupt that read. It's probably better to just keep these two goroutines running for the entire process, and pause somewhere else in the stack (e.g. if you don't want to receive the command's output in the paused state, don't write buf to pr.CmdIn - but be careful to avoid race conditions).

    Close might be buggy in current versions of go: issue 6817.

    End of edit

    Also, be careful with pr.CmdIn. If closing stdOutPipe doesn't cause Read to return an error, cmdInRoutine will try to write to the channel. If nothing reads from it, cmdInRoutine will block forever. I would move pr.stdOutPipe.Read(buf) out of the select, then add pr.CmdIn <- buf[:size] as another case to the select:

    func (pr *ExternalProcess) cmdInRoutine() {
        app.At(te, "cmdInRoutine")
    
        // this check should probably only happen once.
        // if stdOutPipe can change during the loop,
        // then that's a race condition.
        if pr.stdOutPipe == nil {
            println("!!! Standard output pipe is nil. Sending Exit Request !!!")
            pr.ProcessExit <- true
            close(pr.shutdown)
            return
        }
    
        for {
            println("CMDINROUTINE")
            // we need to allocate a new buffer in each iteration,
            // because when we pass it through the channel,
            // we can no longer safely overwrite the data in it,
            // since the other goroutine might still be using it.
            buf := make([]byte, 2048)
            size, err := pr.stdOutPipe.Read(buf)
            if err != nil {
                println("!!! Sending exit request from cmdInRoutine !!!")
                // Be careful with this, if you also closed pr.shutdown when you closed stdOutPipe, then this is going to panic (closing a closed channel).
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }
    
            // now that we have some data, try to send it,
            // unless we're done.
            select {
            case <-pr.shutdown:
                println("!!! Shutting cmdInRoutine down !!!")
                return
            case pr.CmdIn <- buf[:size]:
                println("--- Received data for sending to CmdIn:", string(buf[:size]))
            }
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 eda:门禁系统设计
  • ¥50 如何使用js去调用vscode-js-debugger的方法去调试网页
  • ¥15 376.1电表主站通信协议下发指令全被否认问题
  • ¥15 物体双站RCS和其组成阵列后的双站RCS关系验证
  • ¥15 复杂网络,变滞后传递熵,FDA
  • ¥20 csv格式数据集预处理及模型选择
  • ¥15 部分网页页面无法显示!
  • ¥15 怎样解决power bi 中设置管理聚合,详细信息表和详细信息列显示灰色,而不能选择相应的内容呢?
  • ¥15 QTOF MSE数据分析
  • ¥15 平板录音机录音问题解决