doubian0284 2018-01-01 22:58
浏览 70
已采纳

通过缓冲通道(Golang)的并发执行进程的油门数

Intent:

I am looking for a means to run os-level shell commands in parallel, but want to be careful to not clobber CPU and am wondering if a buffered channel would fit this use case.

Implemented:

Create a series of Jobs with a simulated runtime duration. Send these jobs to a queue which will dispatch them to run over a buffered channel as throttled by EXEC_THROTTLE.

Observations:

This 'works' (to the extent that it compiles and runs), but I am wondering if the buffer is working as specified (see: 'Intent') to throttle the number of processes running in parallel.

Disclaimer:

Now, I am aware that newbies tend to over-use channels, but I feel this request for insight is honest, as I've at least exercised the restraint to use a sync.WaitGroup. Forgive the somewhat toy example, but all insight would be appreciated.

Playground

package main

import (
    // "os/exec"
    "log"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

const (
    EXEC_THROTTLE = 2
)

type JobsManifest []Job

type Job struct {
    cmd     string
    result  string
    runtime int // Simulate long-running task
}

func (j JobsManifest) queueJobs(logChan chan<- string, runChan chan Job, wg *sync.WaitGroup) {
    go dispatch(logChan, runChan)
    for _, job := range j {
        wg.Add(1)
        runChan <- job
    }
}

func dispatch(logChan chan<- string, runChan chan Job) {
    for j := range runChan {
        go run(j, logChan)
    }
}

func run(j Job, logChan chan<- string) {
    time.Sleep(time.Second * time.Duration(j.runtime))
    j.result = strconv.Itoa(rand.Intn(10)) // j.result = os.Exec("/bin/bash", "-c", j.cmd).Output()
    logChan <- j.result
    log.Printf("   ran: %s
", j.cmd)
}

func logger(logChan <-chan string, wg *sync.WaitGroup) {
    for {
        res := <-logChan
        log.Printf("logged: %s
", res)
        wg.Done()
    }
}

func main() {

    jobs := []Job{
        Job{
            cmd:     "ps -p $(pgrep vim) | tail -n 1 | awk '{print $3}'",
            runtime: 1,
        },
        Job{
            cmd:     "wc -l /var/log/foo.log | awk '{print $1}'",
            runtime: 2,
        },
        Job{
            cmd:     "ls -l ~/go/src/github.com/ | wc -l | awk '{print $1}'",
            runtime: 3,
        },
        Job{
            cmd:     "find /var/log/ -regextype posix-extended -regex '.*[0-9]{10}'",
            runtime: 4,
        },
    }

    var wg sync.WaitGroup
    logChan := make(chan string)
    runChan := make(chan Job, EXEC_THROTTLE)
    go logger(logChan, &wg)

    start := time.Now()
    JobsManifest(jobs).queueJobs(logChan, runChan, &wg)
    wg.Wait()
    log.Printf("finish: %s
", time.Since(start))
}
  • 写回答

4条回答 默认 最新

  • dongwu9647 2018-01-02 00:42
    关注

    If I understand you right, you mean to establish a mechanism to ensure that at any time at most a number of EXEC_THROTTLE jobs are running. And if that is your intention, the code does not work.

    It is because when you start a job, you have already consumed the channel - allowing another job to be started, yet no jobs have been finished. You can debug this by add an counter (you'll need atomic add or mutex).

    You may do the work by simply start a group of goroutine with an unbuffered channel and block when executating jobs:

    func Run(j Job) r Result {
        //Run your job here
    }
    
    func Dispatch(ch chan Job) {
        for j:=range ch {
            wg.Add(1)
            Run(j)
            wg.Done()
        }
    }
    
    func main() {
        ch := make(chan Job)
        for i:=0; i<EXEC_THROTTLE; i++ {
            go Dispatch(ch)
        }
        //call dispatch according to the queue here.
    }
    

    It works because as along as one goroutine is consuming the channel, it means at least one goroutine is not running and there is at most EXEC_THROTTLE-1 jobs running so it is good to execuate one more and it does so.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(3条)

报告相同问题?

悬赏问题

  • ¥20 用雷电模拟器安装百达屋apk一直闪退
  • ¥15 算能科技20240506咨询(拒绝大模型回答)
  • ¥15 自适应 AR 模型 参数估计Matlab程序
  • ¥100 角动量包络面如何用MATLAB绘制
  • ¥15 merge函数占用内存过大
  • ¥15 Revit2020下载问题
  • ¥15 使用EMD去噪处理RML2016数据集时候的原理
  • ¥15 神经网络预测均方误差很小 但是图像上看着差别太大
  • ¥15 单片机无法进入HAL_TIM_PWM_PulseFinishedCallback回调函数
  • ¥15 Oracle中如何从clob类型截取特定字符串后面的字符