Intent:
I am looking for a means to run os-level shell commands in parallel, but want to be careful to not clobber CPU and am wondering if a buffered channel would fit this use case.
Implemented:
Create a series of Job
s with a simulated runtime duration. Send these jobs to a queue which will dispatch
them to run
over a buffered channel as throttled by EXEC_THROTTLE
.
Observations:
This 'works' (to the extent that it compiles and runs), but I am wondering if the buffer is working as specified (see: 'Intent') to throttle the number of processes running in parallel.
Disclaimer:
Now, I am aware that newbies tend to over-use channels, but I feel this request for insight is honest, as I've at least exercised the restraint to use a sync.WaitGroup
. Forgive the somewhat toy example, but all insight would be appreciated.
package main
import (
// "os/exec"
"log"
"math/rand"
"strconv"
"sync"
"time"
)
const (
EXEC_THROTTLE = 2
)
type JobsManifest []Job
type Job struct {
cmd string
result string
runtime int // Simulate long-running task
}
func (j JobsManifest) queueJobs(logChan chan<- string, runChan chan Job, wg *sync.WaitGroup) {
go dispatch(logChan, runChan)
for _, job := range j {
wg.Add(1)
runChan <- job
}
}
func dispatch(logChan chan<- string, runChan chan Job) {
for j := range runChan {
go run(j, logChan)
}
}
func run(j Job, logChan chan<- string) {
time.Sleep(time.Second * time.Duration(j.runtime))
j.result = strconv.Itoa(rand.Intn(10)) // j.result = os.Exec("/bin/bash", "-c", j.cmd).Output()
logChan <- j.result
log.Printf(" ran: %s
", j.cmd)
}
func logger(logChan <-chan string, wg *sync.WaitGroup) {
for {
res := <-logChan
log.Printf("logged: %s
", res)
wg.Done()
}
}
func main() {
jobs := []Job{
Job{
cmd: "ps -p $(pgrep vim) | tail -n 1 | awk '{print $3}'",
runtime: 1,
},
Job{
cmd: "wc -l /var/log/foo.log | awk '{print $1}'",
runtime: 2,
},
Job{
cmd: "ls -l ~/go/src/github.com/ | wc -l | awk '{print $1}'",
runtime: 3,
},
Job{
cmd: "find /var/log/ -regextype posix-extended -regex '.*[0-9]{10}'",
runtime: 4,
},
}
var wg sync.WaitGroup
logChan := make(chan string)
runChan := make(chan Job, EXEC_THROTTLE)
go logger(logChan, &wg)
start := time.Now()
JobsManifest(jobs).queueJobs(logChan, runChan, &wg)
wg.Wait()
log.Printf("finish: %s
", time.Since(start))
}