dongmangji0950 2018-11-08 02:40
浏览 791
已采纳

当使用k8s.io/client-go库更改kubernetes部署时,获得通知的最佳方法是什么?

Context

I'm writing a script that uses the k8s.io/client-go library (godocs here) to manipulate Deployments. In particular, I want to add a label selector to every Deployment in my cluster. Deployment label selectors are immutable. So my approach is to:

  1. Create a copy of each Deployment with the only difference being the name is suffixed with "-temp". This is to minimize downtime of existing Deployments.
  2. Delete the original Deployments.
  3. Recreate the original Deployments with the only difference being an additional label selector.
  4. Delete the temporary Deployments.

I can't just use the client-go library to go through steps 1-4 sequentially because I only want to go onto the next step when the API server considers the previous step to be done. For example, I don't want to do step 3 until the API server says the original Deployments have been deleted. Otherwise, I'll get the error that the Deployment with the same name already exists.

Question

What's the best way to use the client-go library to detect when a Deployment is done being created and deleted and to attach callback functions? I came across the following packages.

But I'm not sure what the differences are between them and which one to use.

I read examples of watch here and informer here. Here's two related SO questions.

Update

It seems like watch provides a lower-level way to watch for changes to resources and receive events about changes. Seems like using the SharedInformerFactory to create a SharedInformer is the way to go.

So far I have

import (
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    typedv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/tools/cache"
    "path/filepath"
    "strings"

    // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    "k8s.io/client-go/tools/clientcmd"
    "log"
    "os"
)

func main() {

...

    factory := informers.NewSharedInformerFactory(kubeclient, 0)
    informer := factory.Apps().V1().Deployments().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Created deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[tempLabelKey]; ok {
                fmt.Printf("Detected temporary deployment created in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
                fmt.Printf("Now deleting previous deployment in namespace %s, name %s.
", d.GetNamespace(), deploymentToDelete)
                deleteDeployment(deploymentToDelete, d.GetNamespace(), kubeclient)
            }
        },
        DeleteFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Deleted deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[stageLabelKey]; !ok {
                fmt.Printf("Detected deployment without stage label was deleted in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                fmt.Printf("Now creating normal deployment with stage label in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deployment := createDeploymentWithNewLabel(stageLabelKey, "production", d)
                createDeploymentsOnApi(deployment, kubeclient)
            }
        },
    })
    informer.Run(stopper)
}
  • 写回答

1条回答 默认 最新

  • 普通网友 2018-11-10 13:48
    关注

    I ended up using a SharedInformer.

    These resources were helpful.

    .

    package main
    
    import (
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io/ioutil"
        "k8s.io/api/apps/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "path/filepath"
        "strings"
        // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "k8s.io/client-go/tools/clientcmd"
        "log"
        "os"
    )
    
    const manifestsDir = "manifests"
    
    // Use an empty string to run on all namespaces
    const namespace = ""
    const newLabelKey = "new-label-to-add"
    const tempLabelKey = "temporary"
    const tempSuffix = "-temp"
    const componentLabelKey = "component"
    
    func main() {
        var kubeconfig *string
        if home := homeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        // use the current context in kubeconfig
        // TODO (dxia) How can I specify a masterUrl or even better a kubectl context?
        cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        exitOnErr(err)
    
        kubeclient, err := kubernetes.NewForConfig(cfg)
        exitOnErr(err)
    
        fmt.Printf("Getting deployments with '%s' label.
    ", componentLabelKey)
        deployments, err := kubeclient.AppsV1().Deployments(namespace).List(metav1.ListOptions{
            LabelSelector: componentLabelKey,
        })
        fmt.Printf("Got %d deployments.
    ", len(deployments.Items))
        exitOnErr(err)
    
        deployments = processDeployments(deployments)
        fmt.Println("Saving deployment manifests to disk as backup.")
        err = saveDeployments(deployments)
        exitOnErr(err)
    
        tempDeployments := appendToDeploymentName(deployments, tempSuffix)
        tempDeployments = createDeploymentsWithNewLabel(tempLabelKey, "true", tempDeployments)
    
        factory := informers.NewSharedInformerFactory(kubeclient, 0)
        informer := factory.Apps().V1().Deployments().Informer()
        stopper := make(chan struct{})
        defer close(stopper)
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[tempLabelKey]; ok {
                    labelsStr := joinLabelKeyVals(labels)
                    fmt.Printf("2: Temporary deployment created in namespace %s, name %s, labels '%s'.
    ", d.GetNamespace(), d.GetName(), labelsStr)
                    deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
    
                    deployment := getDeployment(d.GetNamespace(), deploymentToDelete, componentLabelKey, kubeclient)
    
                    if deployment != nil {
                        fmt.Printf("3: Now deleting previous deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                        if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    } else {
                        fmt.Printf("4: Didn't find deployment in namespace %s, name %s, label %s. Skipping.
    ", d.GetNamespace(), deploymentToDelete, componentLabelKey)
                    }
                } else if labelVal, ok := labels[newLabelKey]; ok && labelVal == "production" {
                    fmt.Printf("Normal deployment with '%s' label created in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                    deploymentToDelete := d.GetName() + tempSuffix
                    fmt.Printf("6: Now deleting temporary deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                    if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                        exitOnErr(err)
                    }
                }
            },
            DeleteFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[newLabelKey]; !ok {
                    if _, ok := labels[tempLabelKey]; !ok {
                        fmt.Printf("Deployment without '%s' or '%s' label deleted in namespace %s, name %s.
    ", newLabelKey, tempLabelKey, d.GetNamespace(), d.GetName())
                        fmt.Printf("5: Now creating normal deployment with '%s' label in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                        deploymentToCreate := createDeploymentWithNewLabel(newLabelKey, "production", *d)
                        if err := createDeploymentOnApi(deploymentToCreate, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    }
                }
            },
        })
    
        fmt.Println("1: Creating temporary Deployments.")
        err = createDeploymentsOnApi(tempDeployments, kubeclient)
        exitOnErr(err)
    
        informer.Run(stopper)
    }
    
    func getDeployment(namespace string, name string, labelKey string, client *kubernetes.Clientset) *v1.Deployment {
        d, err := client.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
        if err != nil {
            return nil
        }
    
        if _, ok := d.GetLabels()[labelKey]; !ok {
            return nil
        }
    
        return d
    }
    
    func createDeploymentWithNewLabel(key string, val string, deployment v1.Deployment) v1.Deployment {
        newDeployment := deployment.DeepCopy()
        labels := newDeployment.GetLabels()
        if labels == nil {
            labels = make(map[string]string)
            newDeployment.SetLabels(labels)
        }
        labels[key] = val
    
        podTemplateSpecLabels := newDeployment.Spec.Template.GetLabels()
        if podTemplateSpecLabels == nil {
            podTemplateSpecLabels = make(map[string]string)
            newDeployment.Spec.Template.SetLabels(podTemplateSpecLabels)
        }
        podTemplateSpecLabels[key] = val
    
        labelSelectors := newDeployment.Spec.Selector.MatchLabels
        if labelSelectors == nil {
            labelSelectors = make(map[string]string)
            newDeployment.Spec.Selector.MatchLabels = labelSelectors
        }
        labelSelectors[key] = val
    
        return *newDeployment
    }
    
    func createDeploymentsWithNewLabel(key string, val string, deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            newDeployment := createDeploymentWithNewLabel(key, val, d)
            newDeployments.Items = append(newDeployments.Items, newDeployment)
        }
    
        return newDeployments
    }
    
    func setAPIVersionAndKindForDeployment(d v1.Deployment, apiVersion string, kind string) {
        // These fields are empty strings.
        // Looks like an open issue: https://github.com/kubernetes/kubernetes/issues/3030.
        d.APIVersion = apiVersion
        d.Kind = kind
    }
    
    func processDeployments(deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            // Set APIVersion and Kind until https://github.com/kubernetes/kubernetes/issues/3030 is fixed
            setAPIVersionAndKindForDeployment(d, "apps/v1", "Deployment")
            d.Status = v1.DeploymentStatus{}
            d.SetUID(types.UID(""))
            d.SetSelfLink("")
            d.SetGeneration(0)
            d.SetCreationTimestamp(metav1.Now())
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func saveDeployments(deployments *v1.DeploymentList) error {
        for _, d := range deployments.Items {
            if err := saveManifest(d); err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func saveManifest(resource interface{}) error {
        var path = manifestsDir
        var name string
        var err error
    
        switch v := resource.(type) {
        case v1.Deployment:
            path = fmt.Sprintf("%s%s/%s/%s", path, v.GetClusterName(), v.GetNamespace(), "deployments")
            name = v.GetName()
        default:
            return errors.New(fmt.Sprintf("Got an unknown resource kind: %v", resource))
        }
    
        bytes, err := json.MarshalIndent(resource, "", "  ")
        if err != nil {
            return err
        }
    
        err = os.MkdirAll(path, 0755)
        if err != nil {
            return err
        }
    
        err = ioutil.WriteFile(fmt.Sprintf("%s/%s", path, name), bytes, 0644)
        if err != nil {
            return err
        }
    
        return nil
    }
    
    func deleteDeployment(namespace string, name string, client *kubernetes.Clientset) error {
        if err := client.AppsV1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
            return err
        }
    
        return nil
    }
    
    func appendToDeploymentName(deployments *v1.DeploymentList, suffix string) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            d.SetName(fmt.Sprintf("%s%s", d.GetName(), suffix))
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func createDeploymentOnApi(d v1.Deployment, client *kubernetes.Clientset) error {
        d.SetResourceVersion("")
    
        if _, err := client.AppsV1().Deployments(d.GetNamespace()).Create(&d); err != nil {
            return err
        }
    
        return nil
    }
    
    func createDeploymentsOnApi(deployments *v1.DeploymentList, client *kubernetes.Clientset) error {
        for _, d := range deployments.Items {
            if err := createDeploymentOnApi(d, client); err != nil {
                return err
            }
        }
        return nil
    }
    
    func joinLabelKeyVals(labels map[string]string) string {
        labelKeyVals := make([]string, 0, len(labels))
        for k, v := range labels {
            labelKeyVals = append(labelKeyVals, fmt.Sprintf("%v=%v", k, v))
        }
        return strings.Join(labelKeyVals, ", ")
    }
    
    func homeDir() string {
        if h := os.Getenv("HOME"); h != "" {
            return h
        }
        return os.Getenv("USERPROFILE") // windows
    }
    
    func exitOnErr(err error) {
        if err != nil {
            log.Fatal(err)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器