dsxgby126001 2017-02-13 14:43
浏览 214
已采纳

在Golang中过滤字节流的正确方法?

I want to filter the STDOUT from a command such that I only keep the first and last line of any contiguous block of terminated lines (to largely ignore progress indicators).

Here's my attempt (orig code does more, this is a simplified version, but basically the filtering has to happen as the input comes in, not at the end):

package main

import (
    "bytes"
    "fmt"
    "os/exec"
)

var cr = []byte("")
var lf = []byte("
")

func main() {
    input1 := []byte("a
b

progress 98%")
    input2 := []byte("progress 99%")
    input3 := []byte("progress 100%")
    input4 := []byte("

c
")

    var stream []byte
    stream = append(stream, input1...)
    stream = append(stream, input2...)
    stream = append(stream, input3...)
    stream = append(stream, input4...)

    fmt.Printf("stream:
%s
", stream)

    streamer := &myFilter{}
    streamer.Write(input1)
    streamer.Write(input2)
    streamer.Write(input3)
    streamer.Write(input4)
    final := streamer.Bytes()

    fmt.Printf("streamer:
%s

", final)

    cmd := exec.Command("bash", "-c", "perl -e '$|++; print qq[a
b

progress: 98%]; for (99..100) { print qq[progess: $_%]; sleep(1); } print qq[

c
]'")
    cmd.Stdout = &myFilter{}
    cmd.Start()
    cmd.Wait()
    fromCmd := cmd.Stdout.(*myFilter).Bytes()

    fmt.Printf("fromCmd:
%s
", fromCmd)
}

type myFilter struct {
    partialLine []byte
    storage     []byte
}

func (w *myFilter) Write(p []byte) (n int, err error) {
    // in order to filter out all but the first and last line of a set of 
    // terminated lines (a progress bar), we need to collect whole 
 terminated
    // lines
    lines := bytes.SplitAfter(p, lf)

    if len(w.partialLine) > 0 || (len(lines) == 1 && !bytes.HasSuffix(p, lf)) {
        w.partialLine = append(w.partialLine, lines[0]...)

        partialComplete := false
        if len(lines) > 1 {
            lines = lines[1:]
            partialComplete = true

        } else {
            lines = nil
            if bytes.HasSuffix(p, lf) {
                partialComplete = true
            }
        }

        if partialComplete {
            w.filterCR(w.partialLine)
            w.partialLine = nil
        }
    }

    lastLineIndex := len(lines) - 1
    if lastLineIndex > -1 && !bytes.HasSuffix(p, lf) {
        w.partialLine, lines = lines[lastLineIndex], lines[:lastLineIndex]
    }

    for _, line := range lines {
        w.filterCR(line)
    }

    return len(p), nil
}

func (w *myFilter) filterCR(p []byte) {
    if bytes.Contains(p, cr) {
        lines := bytes.Split(p, cr)
        w.store(lines[0])
        w.store(lf)

        if len(lines) > 2 {
            w.store(lines[len(lines)-2])
            w.store(lf)
        }
    } else {
        w.store(p)
    }
}

func (w *myFilter) store(p []byte) {
    w.storage = append(w.storage, p...)
}

func (w *myFilter) Bytes() []byte {
    if len(w.partialLine) > 0 {
        w.filterCR(w.partialLine)
    }
    return w.storage
}

My output is:

stream:
a
b

progress 100%

c

streamer:
a
b

progress 98%
progress 100%

c


fromCmd:
a
b

ss: 100%
progess: 100%

c

What I want is the output you see from "fromCmd" to match the output I got from "streamer".

What am I doing wrong, why does my actual output seem "corrupt", why does the real command run behave differently to my "streamer" test, and what's a better way to filter STDOUT?

  • 写回答

1条回答 默认 最新

  • drza10046 2017-02-13 16:13
    关注

    Your partial line algorithm isn't correct for all inputs.

    You can replace myFilter with a bufio.Scanner, which will handle the partial line buffering correctly for you, and a []byte or bytes.Buffer to accumulate the output.

    var out bytes.Buffer
    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        p := scanner.Bytes()
        lines := bytes.Split(p, cr)
        out.Write(lines[0])
        out.Write(lf)
        if len(lines) > 1 {
            out.Write(lines[len(lines)-1])
            out.Write(lf)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?