doudeng3008 2019-09-22 08:44
浏览 73
已采纳

在启动第三个命令之前,请确保两个命令正在运行

I have three commands to run, but I'd like to make sure the two first are running before running the third one.

Currently, it does run A and B then C.

  1. I run A and B in goroutines
  2. I communicate their name through chan if there's no stderr
  3. the main functions pushes the names received through chan into a slice
  4. once the slice contains all names of module A and B it starts C

Some context

I'm in the process of learning goroutines and chan as a hobbyist. It's not clear to me how to output exec.Command("foo", "bar").Run() in a reliable way while it's running. It's not clear either how to handle errors received by each process through chan.

The reason why I need A and B to run before C is because A and B are graphql microservices, C needs them to run in order to get their schemas through HTTP and start doing some graphql federation (f.k.a. graphql stitching)

Inconsistencies

  • With my current approach, I will know if A and B are running only if they print something I guess.
  • I don't like that each subsequent stdout will hit an if statement, just to know if the process is running.
  • My error handling is not as clean as I'd like it to be.

Question

How could I have a more reliable way to ensure that A and B are running, event if they don't print anything and that they did not throw errors?

package main

import (
    "bufio"
    "fmt"
    "log"
    "os/exec"
    "reflect"
    "sort"
    "strings"
    "sync"
)

var wg sync.WaitGroup
var modulesToRun = []string{"micro-post", "micro-hello"}

func main() {
    // Send multiple values to chan
    // https://stackoverflow.com/a/50857250/9077800
    c := make(chan func() (string, error))

    go runModule([]string{"go", "run", "micro-post"}, c)  // PROCESS A
    go runModule([]string{"go", "run", "micro-hello"}, c) // PROCESS B

    modulesRunning := []string{}
    for {
        msg, err := (<-c)()
        if err != nil {
            log.Fatalln(err)
        }

        if strings.HasPrefix(msg, "micro-") && err == nil {
            modulesRunning = append(modulesRunning, msg)
            if CompareUnorderedSlices(modulesToRun, modulesRunning) {
                go runModule([]string{"go", "run", "micro-federation"}, c) // PROCESS C
            }
        }
    }

}

func runModule(commandArgs []string, o chan func() (string, error)) {
    cmd := exec.Command(commandArgs[0], commandArgs[1], commandArgs[2]+"/main.go")

    // Less verbose solution to stream output with io?
    // var stdBuffer bytes.Buffer
    // mw := io.MultiWriter(os.Stdout, &stdBuffer)
    // cmd.Stdout = mw
    // cmd.Stderr = mw

    c := make(chan struct{})
    wg.Add(1)

    // Stream command output
    // https://stackoverflow.com/a/38870609/9077800
    go func(cmd *exec.Cmd, c chan struct{}) {
        defer wg.Done()
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        stderr, err := cmd.StderrPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        <-c
        outScanner := bufio.NewScanner(stdout)
        for outScanner.Scan() {
            m := outScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return commandArgs[2], nil })
        }

        errScanner := bufio.NewScanner(stderr)
        for errScanner.Scan() {
            m := errScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return "bad", nil })
        }
    }(cmd, c)

    c <- struct{}{}
    cmd.Start()

    wg.Wait()
    close(o)
}

// CompareUnorderedSlices orders slices before comparing them
func CompareUnorderedSlices(a, b []string) bool {
    if len(a) != len(b) {
        return false
    }

    sort.Strings(a)
    sort.Strings(b)

    return reflect.DeepEqual(a, b)
}
  • 写回答

1条回答 默认 最新

  • douxing2156 2019-09-22 09:18
    关注

    About process management

    Starting the process is the action of calling the binary path with its arguments. It will fail if the bin path is not found, or some malformed arguments syntax is provided.

    As a consequence you might start a process with success, but receive an exit error because somehow its execution fails.

    Those details are important to figure out if you need only to startup the process to consider the operation as successful or dig further its state and/or output.

    In your code it appears you wait for the first line of stderr to be printed to consider it as started, without any consideration to the content being printed.

    It resemble more to a kind of sleeping time to ensure the process has initialized.

    Consider that starting the binary happens much faster in comparison to the execution of its bootstrap sequence.

    About the code, your exit rules are unclear. What is keeping main from exiting ?

    In the current code it will exit before C is executed when A and B has started (not anylising other cases)

    Your implementation of job concurrency in main is not standard. It is missing the loop to collect results, quit and close(chan).

    The chan signature is awkward, i would rather use a struct {Module string, Err error}

    The runModule function is buggy. It might close(o) while another routine might attempt to write it. If starts fails, you are not returning any error signal.

    A somewhat solution might look like this, consider it as being opinniated and depending the binary run other strategies can/should be implemented to detect error over the standard FDs.

    package main
    
    import (
        "bufio"
        "fmt"
        "log"
        "os"
        "os/exec"
        "strings"
        "sync"
        "time"
    )
    
    type cmd struct {
        Module string
        Cmd    string
        Args   []string
        Err    error
    }
    
    func main() {
    
        torun := []cmd{
            cmd{
                Module: "A",
                Cmd:    "ping",
                Args:   []string{"8.8.8.8"},
            },
            cmd{
                Module: "B",
                Cmd:    "ping",
                // Args:   []string{"8.8.8.8.9"},
                Args: []string{"8.8.8.8"},
            },
        }
    
        var wg sync.WaitGroup // use a waitgroup to ensure all concurrent jobs are done
        wg.Add(len(torun))
    
        out := make(chan cmd) // a channel to output cmd status
    
        go func() {
            wg.Wait()  //wait for the group to finish
            close(out) //  then close the signal channel
        }()
    
        // start the commands
        for _, c := range torun {
            // go runCmd(c, out, &wg)
            go runCmdAndWaitForSomeOutput(c, out, &wg)
        }
    
        // loop over the chan to collect errors
        // it ends when wg.Wait unfreeze and closes out
        for c := range out {
            if c.Err != nil {
                log.Fatalf("%v %v has failed with %v", c.Cmd, c.Args, c.Err)
            }
        }
    
        // here all commands started you can proceed further to run the last command
        fmt.Println("all done")
        os.Exit(0)
    }
    
    func runCmd(o cmd, out chan cmd, wg *sync.WaitGroup) {
        defer wg.Done()
    
        cmd := exec.Command(o.Cmd, o.Args...)
    
        if err := cmd.Start(); err != nil {
            o.Err = err // save err
            out <- o    // signal completion error
            return      // return to unfreeze the waitgroup wg
        }
        go cmd.Wait() // dont wait for command completion,
        // consider its done once the program started with success.
    
        // out <- o // useless as main look ups only for error
    }
    
    func runCmdAndWaitForSomeOutput(o cmd, out chan cmd, wg *sync.WaitGroup) {
        defer wg.Done()
    
        cmd := exec.Command(o.Cmd, o.Args...)
    
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            o.Err = err // save err
            out <- o    // signal completion
            return      // return to unfreeze the waitgroup wg
        }
        stderr, err := cmd.StderrPipe()
        if err != nil {
            o.Err = err
            out <- o
            return
        }
    
        if err := cmd.Start(); err != nil {
            o.Err = err
            out <- o
            return
        }
    
        go cmd.Wait() // dont wait for command completion
    
        // build a concurrent fd's scanner
    
        outScan := make(chan error) // to signal errors detected on the fd
    
        var wg2 sync.WaitGroup
        wg2.Add(2) // the number of fds being watched
    
        go func() {
            defer wg2.Done()
            sc := bufio.NewScanner(stdout)
            for sc.Scan() {
                line := sc.Text()
                if strings.Contains(line, "icmp_seq") { // the OK marker
                    return // quit asap to unfreeze wg2
                } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                    outScan <- fmt.Errorf("%v", line)
                    return // quit  to unfreeze wg2
                }
            }
        }()
    
        go func() {
            defer wg2.Done()
            sc := bufio.NewScanner(stderr)
            for sc.Scan() {
                line := sc.Text()
                if strings.Contains(line, "icmp_seq") { // the OK marker
                    return // quit asap to unfreeze wg2
                } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                    outScan <- fmt.Errorf("%v", line) // signal error
                    return                            // quit to unfreeze wg2
                }
            }
        }()
    
        go func() {
            wg2.Wait() // consider that if the program does not output anything,
            // or never prints ok/nok, this will block forever
            close(outScan) // close the chan so the next loop is finite
        }()
    
        // - simple timeout less loop
        // for err := range outScan {
        //  if err != nil {
        //      o.Err = err // save the execution error
        //      out <- o // signal the cmd
        //      return // qui to unfreeze the wait group wg
        //  }
        // }
    
        // - more complex version with timeout
        timeout := time.After(time.Second * 3)
        for {
            select {
            case err, ok := <-outScan:
                if !ok { // if !ok, outScan is closed and we should quit the loop
                    return
                }
                if err != nil {
                    o.Err = err // save the execution error
                    out <- o    // signal the cmd
                    return      // quit to unfreeze the wait group wg
                }
            case <-timeout:
                o.Err = fmt.Errorf("timed out...%v", timeout) // save the execution error
                out <- o                                      // signal the cmd
                return                                        // quit to unfreeze the wait group wg
            }
        }
    
        // exit and unfreeze the wait group wg
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 echarts动画效果失效的问题。官网下载的例子。
  • ¥60 许可证msc licensing软件报错显示已有相同版本软件,但是下一步显示无法读取日志目录。
  • ¥15 Attention is all you need 的代码运行
  • ¥15 一个服务器已经有一个系统了如果用usb再装一个系统,原来的系统会被覆盖掉吗
  • ¥15 使用esm_msa1_t12_100M_UR50S蛋白质语言模型进行零样本预测时,终端显示出了sequence handled的进度条,但是并不出结果就自动终止回到命令提示行了是怎么回事:
  • ¥15 前置放大电路与功率放大电路相连放大倍数出现问题
  • ¥30 关于<main>标签页面跳转的问题
  • ¥80 部署运行web自动化项目
  • ¥15 腾讯云如何建立同一个项目中物模型之间的联系
  • ¥30 VMware 云桌面水印如何添加