dongmangji0950 2018-11-08 02:40
浏览 791
已采纳

当使用k8s.io/client-go库更改kubernetes部署时,获得通知的最佳方法是什么?

Context

I'm writing a script that uses the k8s.io/client-go library (godocs here) to manipulate Deployments. In particular, I want to add a label selector to every Deployment in my cluster. Deployment label selectors are immutable. So my approach is to:

  1. Create a copy of each Deployment with the only difference being the name is suffixed with "-temp". This is to minimize downtime of existing Deployments.
  2. Delete the original Deployments.
  3. Recreate the original Deployments with the only difference being an additional label selector.
  4. Delete the temporary Deployments.

I can't just use the client-go library to go through steps 1-4 sequentially because I only want to go onto the next step when the API server considers the previous step to be done. For example, I don't want to do step 3 until the API server says the original Deployments have been deleted. Otherwise, I'll get the error that the Deployment with the same name already exists.

Question

What's the best way to use the client-go library to detect when a Deployment is done being created and deleted and to attach callback functions? I came across the following packages.

But I'm not sure what the differences are between them and which one to use.

I read examples of watch here and informer here. Here's two related SO questions.

Update

It seems like watch provides a lower-level way to watch for changes to resources and receive events about changes. Seems like using the SharedInformerFactory to create a SharedInformer is the way to go.

So far I have

import (
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    typedv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/tools/cache"
    "path/filepath"
    "strings"

    // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    "k8s.io/client-go/tools/clientcmd"
    "log"
    "os"
)

func main() {

...

    factory := informers.NewSharedInformerFactory(kubeclient, 0)
    informer := factory.Apps().V1().Deployments().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Created deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[tempLabelKey]; ok {
                fmt.Printf("Detected temporary deployment created in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
                fmt.Printf("Now deleting previous deployment in namespace %s, name %s.
", d.GetNamespace(), deploymentToDelete)
                deleteDeployment(deploymentToDelete, d.GetNamespace(), kubeclient)
            }
        },
        DeleteFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Deleted deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[stageLabelKey]; !ok {
                fmt.Printf("Detected deployment without stage label was deleted in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                fmt.Printf("Now creating normal deployment with stage label in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deployment := createDeploymentWithNewLabel(stageLabelKey, "production", d)
                createDeploymentsOnApi(deployment, kubeclient)
            }
        },
    })
    informer.Run(stopper)
}
  • 写回答

1条回答 默认 最新

  • 普通网友 2018-11-10 13:48
    关注

    I ended up using a SharedInformer.

    These resources were helpful.

    .

    package main
    
    import (
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io/ioutil"
        "k8s.io/api/apps/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "path/filepath"
        "strings"
        // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "k8s.io/client-go/tools/clientcmd"
        "log"
        "os"
    )
    
    const manifestsDir = "manifests"
    
    // Use an empty string to run on all namespaces
    const namespace = ""
    const newLabelKey = "new-label-to-add"
    const tempLabelKey = "temporary"
    const tempSuffix = "-temp"
    const componentLabelKey = "component"
    
    func main() {
        var kubeconfig *string
        if home := homeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        // use the current context in kubeconfig
        // TODO (dxia) How can I specify a masterUrl or even better a kubectl context?
        cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        exitOnErr(err)
    
        kubeclient, err := kubernetes.NewForConfig(cfg)
        exitOnErr(err)
    
        fmt.Printf("Getting deployments with '%s' label.
    ", componentLabelKey)
        deployments, err := kubeclient.AppsV1().Deployments(namespace).List(metav1.ListOptions{
            LabelSelector: componentLabelKey,
        })
        fmt.Printf("Got %d deployments.
    ", len(deployments.Items))
        exitOnErr(err)
    
        deployments = processDeployments(deployments)
        fmt.Println("Saving deployment manifests to disk as backup.")
        err = saveDeployments(deployments)
        exitOnErr(err)
    
        tempDeployments := appendToDeploymentName(deployments, tempSuffix)
        tempDeployments = createDeploymentsWithNewLabel(tempLabelKey, "true", tempDeployments)
    
        factory := informers.NewSharedInformerFactory(kubeclient, 0)
        informer := factory.Apps().V1().Deployments().Informer()
        stopper := make(chan struct{})
        defer close(stopper)
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[tempLabelKey]; ok {
                    labelsStr := joinLabelKeyVals(labels)
                    fmt.Printf("2: Temporary deployment created in namespace %s, name %s, labels '%s'.
    ", d.GetNamespace(), d.GetName(), labelsStr)
                    deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
    
                    deployment := getDeployment(d.GetNamespace(), deploymentToDelete, componentLabelKey, kubeclient)
    
                    if deployment != nil {
                        fmt.Printf("3: Now deleting previous deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                        if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    } else {
                        fmt.Printf("4: Didn't find deployment in namespace %s, name %s, label %s. Skipping.
    ", d.GetNamespace(), deploymentToDelete, componentLabelKey)
                    }
                } else if labelVal, ok := labels[newLabelKey]; ok && labelVal == "production" {
                    fmt.Printf("Normal deployment with '%s' label created in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                    deploymentToDelete := d.GetName() + tempSuffix
                    fmt.Printf("6: Now deleting temporary deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                    if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                        exitOnErr(err)
                    }
                }
            },
            DeleteFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[newLabelKey]; !ok {
                    if _, ok := labels[tempLabelKey]; !ok {
                        fmt.Printf("Deployment without '%s' or '%s' label deleted in namespace %s, name %s.
    ", newLabelKey, tempLabelKey, d.GetNamespace(), d.GetName())
                        fmt.Printf("5: Now creating normal deployment with '%s' label in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                        deploymentToCreate := createDeploymentWithNewLabel(newLabelKey, "production", *d)
                        if err := createDeploymentOnApi(deploymentToCreate, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    }
                }
            },
        })
    
        fmt.Println("1: Creating temporary Deployments.")
        err = createDeploymentsOnApi(tempDeployments, kubeclient)
        exitOnErr(err)
    
        informer.Run(stopper)
    }
    
    func getDeployment(namespace string, name string, labelKey string, client *kubernetes.Clientset) *v1.Deployment {
        d, err := client.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
        if err != nil {
            return nil
        }
    
        if _, ok := d.GetLabels()[labelKey]; !ok {
            return nil
        }
    
        return d
    }
    
    func createDeploymentWithNewLabel(key string, val string, deployment v1.Deployment) v1.Deployment {
        newDeployment := deployment.DeepCopy()
        labels := newDeployment.GetLabels()
        if labels == nil {
            labels = make(map[string]string)
            newDeployment.SetLabels(labels)
        }
        labels[key] = val
    
        podTemplateSpecLabels := newDeployment.Spec.Template.GetLabels()
        if podTemplateSpecLabels == nil {
            podTemplateSpecLabels = make(map[string]string)
            newDeployment.Spec.Template.SetLabels(podTemplateSpecLabels)
        }
        podTemplateSpecLabels[key] = val
    
        labelSelectors := newDeployment.Spec.Selector.MatchLabels
        if labelSelectors == nil {
            labelSelectors = make(map[string]string)
            newDeployment.Spec.Selector.MatchLabels = labelSelectors
        }
        labelSelectors[key] = val
    
        return *newDeployment
    }
    
    func createDeploymentsWithNewLabel(key string, val string, deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            newDeployment := createDeploymentWithNewLabel(key, val, d)
            newDeployments.Items = append(newDeployments.Items, newDeployment)
        }
    
        return newDeployments
    }
    
    func setAPIVersionAndKindForDeployment(d v1.Deployment, apiVersion string, kind string) {
        // These fields are empty strings.
        // Looks like an open issue: https://github.com/kubernetes/kubernetes/issues/3030.
        d.APIVersion = apiVersion
        d.Kind = kind
    }
    
    func processDeployments(deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            // Set APIVersion and Kind until https://github.com/kubernetes/kubernetes/issues/3030 is fixed
            setAPIVersionAndKindForDeployment(d, "apps/v1", "Deployment")
            d.Status = v1.DeploymentStatus{}
            d.SetUID(types.UID(""))
            d.SetSelfLink("")
            d.SetGeneration(0)
            d.SetCreationTimestamp(metav1.Now())
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func saveDeployments(deployments *v1.DeploymentList) error {
        for _, d := range deployments.Items {
            if err := saveManifest(d); err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func saveManifest(resource interface{}) error {
        var path = manifestsDir
        var name string
        var err error
    
        switch v := resource.(type) {
        case v1.Deployment:
            path = fmt.Sprintf("%s%s/%s/%s", path, v.GetClusterName(), v.GetNamespace(), "deployments")
            name = v.GetName()
        default:
            return errors.New(fmt.Sprintf("Got an unknown resource kind: %v", resource))
        }
    
        bytes, err := json.MarshalIndent(resource, "", "  ")
        if err != nil {
            return err
        }
    
        err = os.MkdirAll(path, 0755)
        if err != nil {
            return err
        }
    
        err = ioutil.WriteFile(fmt.Sprintf("%s/%s", path, name), bytes, 0644)
        if err != nil {
            return err
        }
    
        return nil
    }
    
    func deleteDeployment(namespace string, name string, client *kubernetes.Clientset) error {
        if err := client.AppsV1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
            return err
        }
    
        return nil
    }
    
    func appendToDeploymentName(deployments *v1.DeploymentList, suffix string) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            d.SetName(fmt.Sprintf("%s%s", d.GetName(), suffix))
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func createDeploymentOnApi(d v1.Deployment, client *kubernetes.Clientset) error {
        d.SetResourceVersion("")
    
        if _, err := client.AppsV1().Deployments(d.GetNamespace()).Create(&d); err != nil {
            return err
        }
    
        return nil
    }
    
    func createDeploymentsOnApi(deployments *v1.DeploymentList, client *kubernetes.Clientset) error {
        for _, d := range deployments.Items {
            if err := createDeploymentOnApi(d, client); err != nil {
                return err
            }
        }
        return nil
    }
    
    func joinLabelKeyVals(labels map[string]string) string {
        labelKeyVals := make([]string, 0, len(labels))
        for k, v := range labels {
            labelKeyVals = append(labelKeyVals, fmt.Sprintf("%v=%v", k, v))
        }
        return strings.Join(labelKeyVals, ", ")
    }
    
    func homeDir() string {
        if h := os.Getenv("HOME"); h != "" {
            return h
        }
        return os.Getenv("USERPROFILE") // windows
    }
    
    func exitOnErr(err error) {
        if err != nil {
            log.Fatal(err)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测
  • ¥15 ETLCloud 处理json多层级问题
  • ¥15 matlab中使用gurobi时报错
  • ¥15 这个主板怎么能扩出一两个sata口
  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探
  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么