doumixiang2227 2018-11-23 12:25
浏览 25

去继承进行任务处理

I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded. Different tasks can have completely different input/output structures.

I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?

This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.

My code so far to register new Tasks:

func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
    log.Printf("Sending response for %s: %v", task.GetName(), output)

    jsonResponse, err := json.Marshal(output)
    if err != nil {
        log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
        return err
    }

    msg := amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        Timestamp:    time.Now(),
        ContentType:  "text/plain",
        Body:         jsonResponse,
    }

    return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}

func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
    log.Println("Registering task: " + task.GetName())

    deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
    if err != nil {
        log.Fatalf("Could not consume: %v", err)
    }

    go func() {
        for taskMsg := range deliverChann {
            log.Printf("Task received: %v", taskMsg)

            // check if it's in UTF-8
            if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
                log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
                taskMsg.Reject(false)
            }

            // check if it's JSON
            if strings.ToLower(taskMsg.ContentType) != "application/json" {
                log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
                taskMsg.Reject(false)
            }

            log.Printf("Decoding %v", taskMsg.Body)
            if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
                log.Printf("Could not decode task %s body: %v", task.GetName(), err)
                taskMsg.Reject(false)
                continue
            }

            log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
            output, err := task.Process(input)
            if err != nil {
                log.Printf("Task '%s' error: %s", task.GetName(), err)
                taskMsg.Reject(true)
                continue
            }

            if err := sendResult(chann, task, output); err != nil {
                log.Printf("Could not send result for task: " + task.GetName())
                taskMsg.Reject(true)
                continue
            }

            if err := taskMsg.Ack(false); err != nil {
                log.Println("Could not acknowledge: " + err.Error())
            }
        }
    }()
}

And my task code:

type TaskInput struct{}
type TaskOutput struct{}

type TaskData interface {
    Input TaskInput `json:"input"`
    Output TaskOutput `json:"output"`
}

type TaskImpl interface {
    GetName() string
    Process(msg *TaskData) (*TaskData, error)
}


type SampleTaskInput struct {
    TaskInput
    Arg string
}
type SampleTaskOutput struct {
    TaskOutput
    Result string
}

type SampleTask struct {
    TaskImpl
}

func (SampleTask) GetName() string {
    return "sample"
}

func (SampleTask) Process(msg *TaskData) (*TaskData, error){

    log.Printf("Executing sampletask with arg: %v", msg.Input)

    msg.Output = SampleTaskOutput{}

    return SampleTaskOutput{
        Result: "String was " + data.Arg,
    }, nil
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥100 set_link_state
    • ¥15 虚幻5 UE美术毛发渲染
    • ¥15 CVRP 图论 物流运输优化
    • ¥15 Tableau online 嵌入ppt失败
    • ¥100 支付宝网页转账系统不识别账号
    • ¥15 基于单片机的靶位控制系统
    • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
    • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
    • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
    • ¥15 手机接入宽带网线,如何释放宽带全部速度