duanchun2349 2017-09-12 19:32
浏览 35
已采纳

使用Go SDK检查AWS Data Pipeline的状态

Situation: I have 2 data pipelines that run on-demand. Pipeline B cannot run until Pipeline A has completed. I'm trying to automate running both pipelines in a single script/program but I'm unsure how to do all of this in Go.

I have some Go code that activates a data pipeline:

func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
    svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
    input := &datapipeline.ActivatePipelineInput{
        PipelineId: aws.String(pipelineID),
    }
    result, err := svc.ActivatePipeline(input)
    if err != nil {
        fmt.Println("error activating pipeline: ", err)
    }
    fmt.Println(result)
    return result, nil
}

After activating, I want to be able to monitor that pipeline and determine when it's finished so that I can run a second pipeline. Similar to the list-runs CLI command but I'm not sure what the corresponding Go function would be.

$ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
       Name                                                Scheduled Start      Status                 
       ID                                                  Started              Ended              
---------------------------------------------------------------------------------------------------
   1.  EC2ResourceObj                                      2017-09-12T17:49:55  FINISHED               
       @EC2ResourceObj_2017-09-12T17:49:55                 2017-09-12T17:49:58  2017-09-12T17:56:52

   2.  Installation                                        2017-09-12T17:49:55  FINISHED               
       @Installation_@ShellCommandActivityObj_2017-09-12T  2017-09-12T17:49:57  2017-09-12T17:54:09

   3.  S3OutputLocation                                    2017-09-12T17:49:55  FINISHED               
       @S3OutputLocation_2017-09-12T17:49:55               2017-09-12T17:49:58  2017-09-12T17:54:50

   4.  ShellCommandActivityObj                             2017-09-12T17:49:55  FINISHED               
       @ShellCommandActivityObj_2017-09-12T17:49:55        2017-09-12T17:49:57  2017-09-12T17:54:49

So once all actions are marked 'FINISHED', I want to activate my second pipeline. What's the best way to accomplish this?

  • 写回答

1条回答 默认 最新

  • dongqiao1158 2018-12-11 17:40
    关注

    FYI in case anyone else comes across this, this is how I resolved this:

    Golang AWS API call to describe objects/actions of a data pipeline, returns true if all objects are finished

        func awsDescribeObjects(pipelineID, region string, objects []string) bool {
            var r Object
            var s []string
            var f bool
            svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
            input := &datapipeline.DescribeObjectsInput{
                PipelineId: aws.String(pipelineID),
                ObjectIds:  aws.StringSlice(objects),
            }
            result, err := svc.DescribeObjects(input)
            if err != nil {
                fmt.Println("error describing pipeline objects: ", err)
                f = false
                return f
            }
            //fmt.Println("original result: ", result)
            result2 := re.ReplaceAllString(result.String(), `"$1"$2`) //add "" around keys
            result3 := re1.ReplaceAllString(result2, `$3$2`)          //remove key and string/ref value from fields struct
            result4 := strings.Replace(result3, "@", "", -1)          //remove @ from keys and values
            result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`)    //remove "" from timestamps
            result6 := re3.ReplaceAllString(result5, `$1,`)           // remove {} from fields struct
            json.Unmarshal([]byte(result6), &r)
            // fmt.Printf("R: %+v
    ", r)
            p := r.PipelineObjects
            // fmt.Printf("P: %+v
    ", p)
            for i := range p {
                for m := range p[i].Fields {
                    fmt.Printf("%v STATUS: %v
    ", p[i].Name, p[i].Fields[m].Status)
                    s = append(s, p[i].Fields[m].Status)
                    if p[i].Fields[m].Status != "FINISHED" {
                        f = false
                    } else {
                        f = true
                    }
                }
                // fmt.Println("bool: ", f)
            }
            return f
        }
    

    my main go function

        func main() {
            if *action == "describe" {
                obj := strings.Split(*object, ",")
    
                for i := 0; i <= 20; i++ {
                    f := awsDescribeObjects(*pipeline, *region, obj)
                    fmt.Printf("%v - Status Check %v - Finished?: %v
    ", time.Now(), i, f)
                    if f == true {
                        fmt.Println("FINISHED describing pipeline complete")
                        break
                    }
                    time.Sleep(5 * time.Minute)
                    if i == 20 {
                        fmt.Println("TIME OUT - describe pipeline timed out, max time reached")
                        os.Exit(1)
                    }
                }
            }
        }
    

    Shell script with go executable:

    #PIPELINE 1
    echo "Starting Pipeline 1..."
    echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
    echo sleep 1m
    echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
    echo "Pipeline 1 complete"
    #PIPELINE 2
    echo "Starting Pipeline 2..."
    echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
    echo sleep 1m
    echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
    echo "Pipeline 2 complete"
    echo "FINISHED"
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥15 请问Lammps做复合材料拉伸模拟,应力应变曲线问题
  • ¥30 python代码,帮调试
  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥100 Jenkins自动化部署—悬赏100元