dpa89292 2019-01-24 00:16
浏览 190

通过SSH传输Stdout和Stderr,操纵流,然后打印到本地Stdout和Stderr

I'm performing a bunch of operations over SSH on a remote machine and I'm streaming its stdout and stderr and then consuming it to by a writer, which writes to the local stdout and stderr, along with byte buffers.

Just before the writer consumes it, I want to perform a series of string manipulations on it and then write to my screen and buffer. Up to this point, it all works fine and dandy.

My issue is now it's not a stream anymore, it hangs and then outputs the whole glob in one chunk. I want it to be real time, so I put channels in my go routines but with no improvement. Below are my functions, let me know if you can spot a reason why, or possibly a better way of achieving this.

// sending

func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, readers chan io.Reader) {
        if filters != nil {
                // filters exist
                // read first 8 bytes
                res := readPipe(8, pipe)

                // get each line from the resulting streamed output
                for _, str := range strings.Split(res, "
") {
                        if str != "" {
                                out := lineFilterAndReplace(str, filters, replaceFilters)

                                // instantiate an io.Reader obj from the given string
                                outReader := strings.NewReader(out)

                                readers <- outReader
                        }
                }
        } else {
                // filters dont exist
                if len(replaceFilters) > 0 {
                        res := readPipe(8, pipe)

                        for _, str := range strings.Split(res, "
") {
                                if str != "" {
                                        out := lineReplace(str, replaceFilters)

                                        // instantiate an io.Reader obj from the given string
                                        outReader := strings.NewReader(out)

                                        readers <- outReader
                                }
                        }
                } else {
                        readers <- pipe
                }
        }
}

// recieving

    outReaders := make(chan io.Reader)

    go handleStdStream(outFilters, replaceFilters, stdoutIn, outReaders)

    go func() {
            for {
                    pipe := <-outReaders

                    _, errStdout = io.Copy(outWriter, pipe)
            }

            // _, errStdout = io.Copy(outWriter, stdoutIn)
    }()
  • 写回答

1条回答 默认 最新

  • duanpu1064 2019-01-24 01:33
    关注

    I don't think you need channels or goroutines to accomplish this. The Writer and Reader interfaces are already streaming; you sip bytes from a Reader continuously until you hit EOF or an error and you hand off bytes to a Writer continuously until you're done or you get an error. On its own, processing a stream does not require any concurrency, so doing this sequentially in a single goroutine is quite appropriate.

    You shouldn't ignore error returns. If a function or method returns an error value, you need to check it. In the case of I/O, you usually need to stop reading from a Reader when it returns an error and you usually need to stop writing to a Writer when it returns an error. In the case of a Reader you also have to check for the special "error" value io.EOF.

    I think using Scanner from the bufio package is better than trying to do your own buffering/splitting. By default, Scanner splits input on newlines (Unix-style LF or DOS-style CRLF). It also gets rid of the need to check for io.EOF, provided you only interact with the Reader through the Scanner.

    Consider the following version of handleStdStream:

    func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, w io.Writer) error {
        scanner := bufio.NewScanner(pipe)
        for scanner.Scan() {
            str := scanner.Text()
            if str == "" {
                continue
            }
            out := ""
            if len(filters) != 0 {
                out = lineFilterAndReplace(str, filters, replaceFilters)
            } else {
                out = lineReplace(str, replaceFilters)
            }
            if _, err := w.Write([]byte(out)); err != nil {
                return err
            }
        }
        if err := scanner.Err(); err != nil {
            return err
        }
        return nil
    }
    

    You would use it like this:

    err := handleStdStream(filters, replaceFilters, pipe, outWriter)
    if err != nil {
        // do something, like printing the error to a log or stderr
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮