dongyata3336 2016-11-02 20:24
浏览 142
已采纳

多个Docker容器日志

I'm trying to get the logs from multiple docker containers at once (order doesn't matter). This works as expected if types.ContainerLogsOption.Follow is set to false.

If types.ContainerLogsOption.Follow is set to true sometimes the log output get stuck after a few logs and no follow up logs are printed to stdout.

If the output doesn't get stuck it works as expected.

Additionally if I restart one or all of the containers the command doesn't exit like docker logs -f containerName does.

func (w *Whatever) Logs(options LogOptions) {
    readers := []io.Reader{}

    for _, container := range options.Containers {
        responseBody, err := w.Docker.Client.ContainerLogs(context.Background(), container, types.ContainerLogsOptions{
            ShowStdout: true,
            ShowStderr: true,
            Follow:     options.Follow,
        })
        defer responseBody.Close()

        if err != nil {
            log.Fatal(err)
        }
        readers = append(readers, responseBody)
    }

    // concatenate all readers to one
    multiReader := io.MultiReader(readers...)

    _, err := stdcopy.StdCopy(os.Stdout, os.Stderr, multiReader)
    if err != nil && err != io.EOF {
        log.Fatal(err)
    }
}

Basically there is no great difference in my implementation from that of docker logs https://github.com/docker/docker/blob/master/cli/command/container/logs.go, hence I'm wondering what causes this issues.

  • 写回答

1条回答 默认 最新

  • dongrang9300 2016-11-03 02:36
    关注

    As JimB commented, that method won't work due to the operation of io.MultiReader. What you need to do is read from each from each response individually and combine the output. Since you're dealing with logs, it would make sense to break up the reads on newlines. bufio.Scanner does this for a single io.Reader. So one option would be to create a new type that scans multiple readers concurrently.

    You could use it like this:

    scanner := NewConcurrentScanner(readers...)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
    if err := scanner.Err(); err != nil {
        log.Fatalln(err)
    }
    

    Example implementation of a concurrent scanner:

    // ConcurrentScanner works like io.Scanner, but with multiple io.Readers
    type ConcurrentScanner struct {
        scans  chan []byte   // Scanned data from readers
        errors chan error    // Errors from readers
        done   chan struct{} // Signal that all readers have completed
        cancel func()        // Cancel all readers (stop on first error)
    
        data []byte // Last scanned value
        err  error
    }
    
    // NewConcurrentScanner starts scanning each reader in a separate goroutine
    // and returns a *ConcurrentScanner.
    func NewConcurrentScanner(readers ...io.Reader) *ConcurrentScanner {
        ctx, cancel := context.WithCancel(context.Background())
        s := &ConcurrentScanner{
            scans:  make(chan []byte),
            errors: make(chan error),
            done:   make(chan struct{}),
            cancel: cancel,
        }
    
        var wg sync.WaitGroup
        wg.Add(len(readers))
    
        for _, reader := range readers {
            // Start a scanner for each reader in it's own goroutine.
            go func(reader io.Reader) {
                defer wg.Done()
                scanner := bufio.NewScanner(reader)
    
                for scanner.Scan() {
                    select {
                    case s.scans <- scanner.Bytes():
                        // While there is data, send it to s.scans,
                        // this will block until Scan() is called.
                    case <-ctx.Done():
                        // This fires when context is cancelled,
                        // indicating that we should exit now.
                        return
                    }
                }
                if err := scanner.Err(); err != nil {
                    select {
                    case s.errors <- err:
                        // Reprort we got an error
                    case <-ctx.Done():
                        // Exit now if context was cancelled, otherwise sending
                        // the error and this goroutine will never exit.
                        return
                    }
                }
            }(reader)
        }
    
        go func() {
            // Signal that all scanners have completed
            wg.Wait()
            close(s.done)
        }()
    
        return s
    }
    
    func (s *ConcurrentScanner) Scan() bool {
        select {
        case s.data = <-s.scans:
            // Got data from a scanner
            return true
        case <-s.done:
            // All scanners are done, nothing to do.
        case s.err = <-s.errors:
            // One of the scanners error'd, were done.
        }
        s.cancel() // Cancel context regardless of how we exited.
        return false
    }
    
    func (s *ConcurrentScanner) Bytes() []byte {
        return s.data
    }
    
    func (s *ConcurrentScanner) Text() string {
        return string(s.data)
    }
    
    func (s *ConcurrentScanner) Err() error {
        return s.err
    }
    

    Here's an example of it working in the Go Playground: https://play.golang.org/p/EUB0K2V7iT

    You can see that the concurrent scanner output is interleaved. Rather than reading all of one reader, then moving on to the next, as is seen with io.MultiReader.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 potsgresql15备份问题
  • ¥15 Mac系统vs code使用phpstudy如何配置debug来调试php
  • ¥15 目前主流的音乐软件,像网易云音乐,QQ音乐他们的前端和后台部分是用的什么技术实现的?求解!
  • ¥60 pb数据库修改与连接
  • ¥15 spss统计中二分类变量和有序变量的相关性分析可以用kendall相关分析吗?
  • ¥15 拟通过pc下指令到安卓系统,如果追求响应速度,尽可能无延迟,是不是用安卓模拟器会优于实体的安卓手机?如果是,可以快多少毫秒?
  • ¥20 神经网络Sequential name=sequential, built=False
  • ¥16 Qphython 用xlrd读取excel报错
  • ¥15 单片机学习顺序问题!!
  • ¥15 ikuai客户端多拨vpn,重启总是有个别重拨不上