I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded. Different tasks can have completely different input/output structures.
I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?
This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.
My code so far to register new Tasks:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
And my task code:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}