dongqu2863 2019-01-06 22:18
浏览 2

调度员不解雇

I am trying to implement a dispatcher/worker method based on http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

The concept I am trying to build is that I retrieve a message from a queue (AWS SQS) and then download a file (AWS S3) and perform processing on it.

However the dispatcher/worker is never triggered, as you can see from the console log, "submitted" is the last message posted:

2019/01/06 22:16:35 main start
2019/01/06 22:16:35 Worker queue dispatcher started...
2019/01/06 22:16:36 SQS
2019/01/06 22:16:36 {
  Messages: [{
<REDACTED>
    }]
}
2019/01/06 22:16:36 submitting
2019/01/06 22:16:36 {<REDACTED>}
2019/01/06 22:16:36 submitted

My main() looks like this:

package main
import "flag"
import "log"
import "foobar/lib"
func main() {
    var config = flag.String("config", "/etc/foo/foobar/gsg.conf", "Config file location")
    flag.Parse()
    var conf,err = foobar.LoadConfiguration(*config)
    if err != nil {
        log.Fatal(err)
    }
    foobar.NewHandler(conf)
    var sts,stsErr = foobar.GetSQSMessage(conf)
    if stsErr != nil {
        log.Fatal(stsErr)
    }
    log.Print("SQS")
    log.Print(sts)
    foobar.SubmitPayload(sts)
}

I have a handler.go as follows:

package foobar
import (
    "log"
)
import "github.com/aws/aws-sdk-go/service/sqs"
func NewHandler(appConfig *FoobarConfigStruct) error {
    JobQueue = make(chan Job, appConfig.MaxQueues)
    log.Println("main start")
    var dispatcher = NewDispatcher(appConfig.MaxWorkers)
    dispatcher.Run()
    return nil
}

func SubmitPayload(sqsOutput *sqs.ReceiveMessageOutput) error {
    var x = &sqs.Message{}
    var y = Payload{*x}
    var work = Job{Payload: y}
    JobQueue <- work
    for _,payload := range sqsOutput.Messages {
        log.Println("submitting")
        log.Println(*payload.Body)
        var item = Payload{*payload}
        var work = Job{Payload:item}
        JobQueue <- work
        log.Println("submitted")
    }
    return nil
}

A dispatcher.go as follows:

package foobar
import (
        "log"
)

type Dispatcher struct {
        maxWorkers int
        WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}

func (d *Dispatcher) Run() {
        for i := 0; i < d.maxWorkers; i++ {
                worker := NewWorker(d.WorkerPool)
                worker.Start()
        }

        go d.dispatch()
}

func (d *Dispatcher) dispatch() {
        log.Printf("Worker queue dispatcher started...")
        for {

                select {
                case job := <-JobQueue:
                        log.Printf("Dispatcher request received")
                        go func(job Job) {
                                jobChannel := <-d.WorkerPool
                                jobChannel <- job
                        }(job)
                }
        }
}

And a worker.go as follows :

package foobar                                                                                                                    

import (                                                                                                                           
    "log"                                                                                                                          
)                                                                                                                                  

import "github.com/aws/aws-sdk-go/service/sqs"                                                                                     

type Payload struct {                                                                                                              
    sqs sqs.Message                                                                                                                
}                                                                                                                                  

type Job struct {                                                                                                                  
    Payload Payload                                                                                                                
}                                                                                                                                  

var JobQueue chan Job                                                                                                              

type Worker struct {                                                                                                               
    WorkerPool chan chan Job                                                                                                       
    JobChannel chan Job                                                                                                            
    quit       chan bool                                                                                                           
}                                                                                                                                  

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}
func (w Worker) Start() {
    go func() {
        for {
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                log.Printf("starting worker...");
                if err := job.Payload.ProcessPayload(); err != nil {
                    log.Printf("Error processing: %s", err.Error())
                }

            case <-w.quit:
                return
            }
        }
    }()
}

func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

func (p *Payload) ProcessPayload() error {
    log.Printf("Processing payload")
    log.Printf(*p.sqs.Body)
    return nil
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 用hfss做微带贴片阵列天线的时候分析设置有问题
    • ¥50 我撰写的python爬虫爬不了 要爬的网址有反爬机制
    • ¥15 Centos / PETSc / PETGEM
    • ¥15 centos7.9 IPv6端口telnet和端口监控问题
    • ¥120 计算机网络的新校区组网设计
    • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
    • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
    • ¥20 海浪数据 南海地区海况数据,波浪数据
    • ¥20 软件测试决策法疑问求解答
    • ¥15 win11 23H2删除推荐的项目,支持注册表等