douqiao7188
douqiao7188
2017-08-01 16:44

从命名管道连续读取

已采纳

I would like to know what other options I have in order to read continuously from a named pipe using golang. My current code relies on a infinite for loop running inside a gorutine; but hat keeps one CPU at 100% usage.

func main() {
....

var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()

f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer

wg.Add(1)
go func() {
        for {
          io.Copy(&buff, fpipe)
          if buff.Len() > 0 {
              buff.WriteTo(f)
           }
         }
    }()

    wg.Wait()
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

2条回答

  • dongtaoxue4674 dongtaoxue4674 4年前

    A named pipe reader will receive EOF when there are no writers left. The solution outside of this code is to make sure there is always one writer process holding the file descriptor, though it doesn't need to write anything.

    Within the Go program, if you want to wait for a new writer, you will have to poll the io.Reader in your for loop. Your current code does this with a busy loop, which will consume 100% of 1 cpu core. Adding a sleep and a way to return on other errors will work around the issue:

    for {
        err := io.Copy(&buff, fpipe)
        if buff.Len() > 0 {
            buff.WriteTo(f)
        }
    
        if err != nil {
            // something other than EOF happened
            return
        }
    
        time.Sleep(100 * time.Millisecond)
    }
    
    点赞 评论 复制链接分享
  • doubutao6216 doubutao6216 4年前

    Intro

    As already written, a named pipe reader will receive an EOF if no writers are left.

    However I find @JimB's solution less than optimal:

    1. A named pipe has a maximum capacity (65kB, iirc), which may well get filled within the 100msec sleep period. When the buffer is filled, all writers would block for no good reason.
    2. If a reboot happens, you will loose 50ms worth of data on average. Again, for no good reason.
    3. If you want to use a static buffer for copying, io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) would be the better solution, imho. But this is not even necessary, as io.Copy (or the underlying implementation) actually allocates a buffer of 32kB.

    My approach

    A better solution would be to wait for a write to happen and the immediately copy the contents of the named pipe to the destination file. On most systems, there is some sort of notification on file system events. The package github.com/rjeczalik/notify can be used to access the events we are interested in, as write events work cross platform on most of the important OSes. The other event which would be interesting for us is the removal of the named pipe, since we would not have anything to read from.

    Hence, my solution would be:

    package main
    
    import (
        "flag"
        "io"
        "log"
        "os"
    
        "github.com/rjeczalik/notify"
    )
    
    const (
        MAX_CONCURRENT_WRITERS = 5
    )
    
    var (
        pipePath string
        filePath string
    )
    
    func init() {
        flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
        flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
        log.SetOutput(os.Stderr)
    }
    
    func main() {
        flag.Parse()
    
        var p, f *os.File
        var err error
        var e notify.EventInfo
    
        // The usual stuff: checking wether the named pipe exists etc
        if p, err = os.Open(pipePath); os.IsNotExist(err) {
            log.Fatalf("Named pipe '%s' does not exist", pipePath)
        } else if os.IsPermission(err) {
            log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
        } else if err != nil {
            log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
        }
        // Yep, there and readable. Close the file handle on exit
        defer p.Close()
    
        // Do the same for the output file
        if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
            log.Fatalf("File '%s' does not exist", filePath)
        } else if os.IsPermission(err) {
            log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
        } else if err != nil {
            log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
        }
        // Again, close the filehandle on exit
        defer f.Close()
    
        // Here is where it happens. We create a buffered channel for events which might happen
        // on the file. The reason why we make it buffered to the number of expected concurrent writers
        // is that if all writers would (theoretically) write at once or at least pretty close
        // to each other, it might happen that we loose event. This is due to the underlying implementations
        // not because of go.
        c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
    
        // Here we tell notify to watch out named pipe for events, Write and Remove events
        // specifically. We watch for remove events, too, as this removes the file handle we
        // read from, making reads impossible
        notify.Watch(pipePath, c, notify.Write|notify.Remove)
    
        // We start an infinite loop...
        for {
            // ...waiting for an event to be passed.
            e = <-c
    
            switch e.Event() {
    
            case notify.Write:
                // If it a a write event, we copy the content of the named pipe to
                // our output file and wait for the next event to happen.
                // Note that this is idempotent: Even if we have huge writes by multiple
                // writers on the named pipe, the first call to Copy will copy the contents.
                // The time to copy that data may well be longer than it takes to generate the events.
                // However, subsequent calls may copy nothing, but that does not do any harm.
                io.Copy(f, p)
    
            case notify.Remove:
                // Some user or process has removed the named pipe,
                // so we have nothing left to read from.
                // We should inform the user and quit.
                log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
            }
        }
    }
    
    点赞 评论 复制链接分享