doudeng3008 2019-09-22 08:44
浏览 73
已采纳

在启动第三个命令之前,请确保两个命令正在运行

I have three commands to run, but I'd like to make sure the two first are running before running the third one.

Currently, it does run A and B then C.

  1. I run A and B in goroutines
  2. I communicate their name through chan if there's no stderr
  3. the main functions pushes the names received through chan into a slice
  4. once the slice contains all names of module A and B it starts C

Some context

I'm in the process of learning goroutines and chan as a hobbyist. It's not clear to me how to output exec.Command("foo", "bar").Run() in a reliable way while it's running. It's not clear either how to handle errors received by each process through chan.

The reason why I need A and B to run before C is because A and B are graphql microservices, C needs them to run in order to get their schemas through HTTP and start doing some graphql federation (f.k.a. graphql stitching)

Inconsistencies

  • With my current approach, I will know if A and B are running only if they print something I guess.
  • I don't like that each subsequent stdout will hit an if statement, just to know if the process is running.
  • My error handling is not as clean as I'd like it to be.

Question

How could I have a more reliable way to ensure that A and B are running, event if they don't print anything and that they did not throw errors?

package main

import (
    "bufio"
    "fmt"
    "log"
    "os/exec"
    "reflect"
    "sort"
    "strings"
    "sync"
)

var wg sync.WaitGroup
var modulesToRun = []string{"micro-post", "micro-hello"}

func main() {
    // Send multiple values to chan
    // https://stackoverflow.com/a/50857250/9077800
    c := make(chan func() (string, error))

    go runModule([]string{"go", "run", "micro-post"}, c)  // PROCESS A
    go runModule([]string{"go", "run", "micro-hello"}, c) // PROCESS B

    modulesRunning := []string{}
    for {
        msg, err := (<-c)()
        if err != nil {
            log.Fatalln(err)
        }

        if strings.HasPrefix(msg, "micro-") && err == nil {
            modulesRunning = append(modulesRunning, msg)
            if CompareUnorderedSlices(modulesToRun, modulesRunning) {
                go runModule([]string{"go", "run", "micro-federation"}, c) // PROCESS C
            }
        }
    }

}

func runModule(commandArgs []string, o chan func() (string, error)) {
    cmd := exec.Command(commandArgs[0], commandArgs[1], commandArgs[2]+"/main.go")

    // Less verbose solution to stream output with io?
    // var stdBuffer bytes.Buffer
    // mw := io.MultiWriter(os.Stdout, &stdBuffer)
    // cmd.Stdout = mw
    // cmd.Stderr = mw

    c := make(chan struct{})
    wg.Add(1)

    // Stream command output
    // https://stackoverflow.com/a/38870609/9077800
    go func(cmd *exec.Cmd, c chan struct{}) {
        defer wg.Done()
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        stderr, err := cmd.StderrPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        <-c
        outScanner := bufio.NewScanner(stdout)
        for outScanner.Scan() {
            m := outScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return commandArgs[2], nil })
        }

        errScanner := bufio.NewScanner(stderr)
        for errScanner.Scan() {
            m := errScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return "bad", nil })
        }
    }(cmd, c)

    c <- struct{}{}
    cmd.Start()

    wg.Wait()
    close(o)
}

// CompareUnorderedSlices orders slices before comparing them
func CompareUnorderedSlices(a, b []string) bool {
    if len(a) != len(b) {
        return false
    }

    sort.Strings(a)
    sort.Strings(b)

    return reflect.DeepEqual(a, b)
}
  • 写回答

1条回答 默认 最新

  • douxing2156 2019-09-22 09:18
    关注

    About process management

    Starting the process is the action of calling the binary path with its arguments. It will fail if the bin path is not found, or some malformed arguments syntax is provided.

    As a consequence you might start a process with success, but receive an exit error because somehow its execution fails.

    Those details are important to figure out if you need only to startup the process to consider the operation as successful or dig further its state and/or output.

    In your code it appears you wait for the first line of stderr to be printed to consider it as started, without any consideration to the content being printed.

    It resemble more to a kind of sleeping time to ensure the process has initialized.

    Consider that starting the binary happens much faster in comparison to the execution of its bootstrap sequence.

    About the code, your exit rules are unclear. What is keeping main from exiting ?

    In the current code it will exit before C is executed when A and B has started (not anylising other cases)

    Your implementation of job concurrency in main is not standard. It is missing the loop to collect results, quit and close(chan).

    The chan signature is awkward, i would rather use a struct {Module string, Err error}

    The runModule function is buggy. It might close(o) while another routine might attempt to write it. If starts fails, you are not returning any error signal.

    A somewhat solution might look like this, consider it as being opinniated and depending the binary run other strategies can/should be implemented to detect error over the standard FDs.

    package main
    
    import (
        "bufio"
        "fmt"
        "log"
        "os"
        "os/exec"
        "strings"
        "sync"
        "time"
    )
    
    type cmd struct {
        Module string
        Cmd    string
        Args   []string
        Err    error
    }
    
    func main() {
    
        torun := []cmd{
            cmd{
                Module: "A",
                Cmd:    "ping",
                Args:   []string{"8.8.8.8"},
            },
            cmd{
                Module: "B",
                Cmd:    "ping",
                // Args:   []string{"8.8.8.8.9"},
                Args: []string{"8.8.8.8"},
            },
        }
    
        var wg sync.WaitGroup // use a waitgroup to ensure all concurrent jobs are done
        wg.Add(len(torun))
    
        out := make(chan cmd) // a channel to output cmd status
    
        go func() {
            wg.Wait()  //wait for the group to finish
            close(out) //  then close the signal channel
        }()
    
        // start the commands
        for _, c := range torun {
            // go runCmd(c, out, &wg)
            go runCmdAndWaitForSomeOutput(c, out, &wg)
        }
    
        // loop over the chan to collect errors
        // it ends when wg.Wait unfreeze and closes out
        for c := range out {
            if c.Err != nil {
                log.Fatalf("%v %v has failed with %v", c.Cmd, c.Args, c.Err)
            }
        }
    
        // here all commands started you can proceed further to run the last command
        fmt.Println("all done")
        os.Exit(0)
    }
    
    func runCmd(o cmd, out chan cmd, wg *sync.WaitGroup) {
        defer wg.Done()
    
        cmd := exec.Command(o.Cmd, o.Args...)
    
        if err := cmd.Start(); err != nil {
            o.Err = err // save err
            out <- o    // signal completion error
            return      // return to unfreeze the waitgroup wg
        }
        go cmd.Wait() // dont wait for command completion,
        // consider its done once the program started with success.
    
        // out <- o // useless as main look ups only for error
    }
    
    func runCmdAndWaitForSomeOutput(o cmd, out chan cmd, wg *sync.WaitGroup) {
        defer wg.Done()
    
        cmd := exec.Command(o.Cmd, o.Args...)
    
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            o.Err = err // save err
            out <- o    // signal completion
            return      // return to unfreeze the waitgroup wg
        }
        stderr, err := cmd.StderrPipe()
        if err != nil {
            o.Err = err
            out <- o
            return
        }
    
        if err := cmd.Start(); err != nil {
            o.Err = err
            out <- o
            return
        }
    
        go cmd.Wait() // dont wait for command completion
    
        // build a concurrent fd's scanner
    
        outScan := make(chan error) // to signal errors detected on the fd
    
        var wg2 sync.WaitGroup
        wg2.Add(2) // the number of fds being watched
    
        go func() {
            defer wg2.Done()
            sc := bufio.NewScanner(stdout)
            for sc.Scan() {
                line := sc.Text()
                if strings.Contains(line, "icmp_seq") { // the OK marker
                    return // quit asap to unfreeze wg2
                } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                    outScan <- fmt.Errorf("%v", line)
                    return // quit  to unfreeze wg2
                }
            }
        }()
    
        go func() {
            defer wg2.Done()
            sc := bufio.NewScanner(stderr)
            for sc.Scan() {
                line := sc.Text()
                if strings.Contains(line, "icmp_seq") { // the OK marker
                    return // quit asap to unfreeze wg2
                } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                    outScan <- fmt.Errorf("%v", line) // signal error
                    return                            // quit to unfreeze wg2
                }
            }
        }()
    
        go func() {
            wg2.Wait() // consider that if the program does not output anything,
            // or never prints ok/nok, this will block forever
            close(outScan) // close the chan so the next loop is finite
        }()
    
        // - simple timeout less loop
        // for err := range outScan {
        //  if err != nil {
        //      o.Err = err // save the execution error
        //      out <- o // signal the cmd
        //      return // qui to unfreeze the wait group wg
        //  }
        // }
    
        // - more complex version with timeout
        timeout := time.After(time.Second * 3)
        for {
            select {
            case err, ok := <-outScan:
                if !ok { // if !ok, outScan is closed and we should quit the loop
                    return
                }
                if err != nil {
                    o.Err = err // save the execution error
                    out <- o    // signal the cmd
                    return      // quit to unfreeze the wait group wg
                }
            case <-timeout:
                o.Err = fmt.Errorf("timed out...%v", timeout) // save the execution error
                out <- o                                      // signal the cmd
                return                                        // quit to unfreeze the wait group wg
            }
        }
    
        // exit and unfreeze the wait group wg
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥30 自适应 LMS 算法实现 FIR 最佳维纳滤波器matlab方案
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行
  • ¥20 测距传感器数据手册i2c
  • ¥15 RPA正常跑,cmd输入cookies跑不出来
  • ¥15 求帮我调试一下freefem代码
  • ¥15 matlab代码解决,怎么运行
  • ¥15 R语言Rstudio突然无法启动