dongmangji0950 2018-11-08 02:40
浏览 791
已采纳

当使用k8s.io/client-go库更改kubernetes部署时,获得通知的最佳方法是什么?

Context

I'm writing a script that uses the k8s.io/client-go library (godocs here) to manipulate Deployments. In particular, I want to add a label selector to every Deployment in my cluster. Deployment label selectors are immutable. So my approach is to:

  1. Create a copy of each Deployment with the only difference being the name is suffixed with "-temp". This is to minimize downtime of existing Deployments.
  2. Delete the original Deployments.
  3. Recreate the original Deployments with the only difference being an additional label selector.
  4. Delete the temporary Deployments.

I can't just use the client-go library to go through steps 1-4 sequentially because I only want to go onto the next step when the API server considers the previous step to be done. For example, I don't want to do step 3 until the API server says the original Deployments have been deleted. Otherwise, I'll get the error that the Deployment with the same name already exists.

Question

What's the best way to use the client-go library to detect when a Deployment is done being created and deleted and to attach callback functions? I came across the following packages.

But I'm not sure what the differences are between them and which one to use.

I read examples of watch here and informer here. Here's two related SO questions.

Update

It seems like watch provides a lower-level way to watch for changes to resources and receive events about changes. Seems like using the SharedInformerFactory to create a SharedInformer is the way to go.

So far I have

import (
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    typedv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/tools/cache"
    "path/filepath"
    "strings"

    // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    "k8s.io/client-go/tools/clientcmd"
    "log"
    "os"
)

func main() {

...

    factory := informers.NewSharedInformerFactory(kubeclient, 0)
    informer := factory.Apps().V1().Deployments().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Created deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[tempLabelKey]; ok {
                fmt.Printf("Detected temporary deployment created in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
                fmt.Printf("Now deleting previous deployment in namespace %s, name %s.
", d.GetNamespace(), deploymentToDelete)
                deleteDeployment(deploymentToDelete, d.GetNamespace(), kubeclient)
            }
        },
        DeleteFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Deleted deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[stageLabelKey]; !ok {
                fmt.Printf("Detected deployment without stage label was deleted in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                fmt.Printf("Now creating normal deployment with stage label in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deployment := createDeploymentWithNewLabel(stageLabelKey, "production", d)
                createDeploymentsOnApi(deployment, kubeclient)
            }
        },
    })
    informer.Run(stopper)
}
  • 写回答

1条回答 默认 最新

  • 普通网友 2018-11-10 13:48
    关注

    I ended up using a SharedInformer.

    These resources were helpful.

    .

    package main
    
    import (
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io/ioutil"
        "k8s.io/api/apps/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "path/filepath"
        "strings"
        // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "k8s.io/client-go/tools/clientcmd"
        "log"
        "os"
    )
    
    const manifestsDir = "manifests"
    
    // Use an empty string to run on all namespaces
    const namespace = ""
    const newLabelKey = "new-label-to-add"
    const tempLabelKey = "temporary"
    const tempSuffix = "-temp"
    const componentLabelKey = "component"
    
    func main() {
        var kubeconfig *string
        if home := homeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        // use the current context in kubeconfig
        // TODO (dxia) How can I specify a masterUrl or even better a kubectl context?
        cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        exitOnErr(err)
    
        kubeclient, err := kubernetes.NewForConfig(cfg)
        exitOnErr(err)
    
        fmt.Printf("Getting deployments with '%s' label.
    ", componentLabelKey)
        deployments, err := kubeclient.AppsV1().Deployments(namespace).List(metav1.ListOptions{
            LabelSelector: componentLabelKey,
        })
        fmt.Printf("Got %d deployments.
    ", len(deployments.Items))
        exitOnErr(err)
    
        deployments = processDeployments(deployments)
        fmt.Println("Saving deployment manifests to disk as backup.")
        err = saveDeployments(deployments)
        exitOnErr(err)
    
        tempDeployments := appendToDeploymentName(deployments, tempSuffix)
        tempDeployments = createDeploymentsWithNewLabel(tempLabelKey, "true", tempDeployments)
    
        factory := informers.NewSharedInformerFactory(kubeclient, 0)
        informer := factory.Apps().V1().Deployments().Informer()
        stopper := make(chan struct{})
        defer close(stopper)
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[tempLabelKey]; ok {
                    labelsStr := joinLabelKeyVals(labels)
                    fmt.Printf("2: Temporary deployment created in namespace %s, name %s, labels '%s'.
    ", d.GetNamespace(), d.GetName(), labelsStr)
                    deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
    
                    deployment := getDeployment(d.GetNamespace(), deploymentToDelete, componentLabelKey, kubeclient)
    
                    if deployment != nil {
                        fmt.Printf("3: Now deleting previous deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                        if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    } else {
                        fmt.Printf("4: Didn't find deployment in namespace %s, name %s, label %s. Skipping.
    ", d.GetNamespace(), deploymentToDelete, componentLabelKey)
                    }
                } else if labelVal, ok := labels[newLabelKey]; ok && labelVal == "production" {
                    fmt.Printf("Normal deployment with '%s' label created in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                    deploymentToDelete := d.GetName() + tempSuffix
                    fmt.Printf("6: Now deleting temporary deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                    if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                        exitOnErr(err)
                    }
                }
            },
            DeleteFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[newLabelKey]; !ok {
                    if _, ok := labels[tempLabelKey]; !ok {
                        fmt.Printf("Deployment without '%s' or '%s' label deleted in namespace %s, name %s.
    ", newLabelKey, tempLabelKey, d.GetNamespace(), d.GetName())
                        fmt.Printf("5: Now creating normal deployment with '%s' label in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                        deploymentToCreate := createDeploymentWithNewLabel(newLabelKey, "production", *d)
                        if err := createDeploymentOnApi(deploymentToCreate, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    }
                }
            },
        })
    
        fmt.Println("1: Creating temporary Deployments.")
        err = createDeploymentsOnApi(tempDeployments, kubeclient)
        exitOnErr(err)
    
        informer.Run(stopper)
    }
    
    func getDeployment(namespace string, name string, labelKey string, client *kubernetes.Clientset) *v1.Deployment {
        d, err := client.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
        if err != nil {
            return nil
        }
    
        if _, ok := d.GetLabels()[labelKey]; !ok {
            return nil
        }
    
        return d
    }
    
    func createDeploymentWithNewLabel(key string, val string, deployment v1.Deployment) v1.Deployment {
        newDeployment := deployment.DeepCopy()
        labels := newDeployment.GetLabels()
        if labels == nil {
            labels = make(map[string]string)
            newDeployment.SetLabels(labels)
        }
        labels[key] = val
    
        podTemplateSpecLabels := newDeployment.Spec.Template.GetLabels()
        if podTemplateSpecLabels == nil {
            podTemplateSpecLabels = make(map[string]string)
            newDeployment.Spec.Template.SetLabels(podTemplateSpecLabels)
        }
        podTemplateSpecLabels[key] = val
    
        labelSelectors := newDeployment.Spec.Selector.MatchLabels
        if labelSelectors == nil {
            labelSelectors = make(map[string]string)
            newDeployment.Spec.Selector.MatchLabels = labelSelectors
        }
        labelSelectors[key] = val
    
        return *newDeployment
    }
    
    func createDeploymentsWithNewLabel(key string, val string, deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            newDeployment := createDeploymentWithNewLabel(key, val, d)
            newDeployments.Items = append(newDeployments.Items, newDeployment)
        }
    
        return newDeployments
    }
    
    func setAPIVersionAndKindForDeployment(d v1.Deployment, apiVersion string, kind string) {
        // These fields are empty strings.
        // Looks like an open issue: https://github.com/kubernetes/kubernetes/issues/3030.
        d.APIVersion = apiVersion
        d.Kind = kind
    }
    
    func processDeployments(deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            // Set APIVersion and Kind until https://github.com/kubernetes/kubernetes/issues/3030 is fixed
            setAPIVersionAndKindForDeployment(d, "apps/v1", "Deployment")
            d.Status = v1.DeploymentStatus{}
            d.SetUID(types.UID(""))
            d.SetSelfLink("")
            d.SetGeneration(0)
            d.SetCreationTimestamp(metav1.Now())
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func saveDeployments(deployments *v1.DeploymentList) error {
        for _, d := range deployments.Items {
            if err := saveManifest(d); err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func saveManifest(resource interface{}) error {
        var path = manifestsDir
        var name string
        var err error
    
        switch v := resource.(type) {
        case v1.Deployment:
            path = fmt.Sprintf("%s%s/%s/%s", path, v.GetClusterName(), v.GetNamespace(), "deployments")
            name = v.GetName()
        default:
            return errors.New(fmt.Sprintf("Got an unknown resource kind: %v", resource))
        }
    
        bytes, err := json.MarshalIndent(resource, "", "  ")
        if err != nil {
            return err
        }
    
        err = os.MkdirAll(path, 0755)
        if err != nil {
            return err
        }
    
        err = ioutil.WriteFile(fmt.Sprintf("%s/%s", path, name), bytes, 0644)
        if err != nil {
            return err
        }
    
        return nil
    }
    
    func deleteDeployment(namespace string, name string, client *kubernetes.Clientset) error {
        if err := client.AppsV1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
            return err
        }
    
        return nil
    }
    
    func appendToDeploymentName(deployments *v1.DeploymentList, suffix string) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            d.SetName(fmt.Sprintf("%s%s", d.GetName(), suffix))
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func createDeploymentOnApi(d v1.Deployment, client *kubernetes.Clientset) error {
        d.SetResourceVersion("")
    
        if _, err := client.AppsV1().Deployments(d.GetNamespace()).Create(&d); err != nil {
            return err
        }
    
        return nil
    }
    
    func createDeploymentsOnApi(deployments *v1.DeploymentList, client *kubernetes.Clientset) error {
        for _, d := range deployments.Items {
            if err := createDeploymentOnApi(d, client); err != nil {
                return err
            }
        }
        return nil
    }
    
    func joinLabelKeyVals(labels map[string]string) string {
        labelKeyVals := make([]string, 0, len(labels))
        for k, v := range labels {
            labelKeyVals = append(labelKeyVals, fmt.Sprintf("%v=%v", k, v))
        }
        return strings.Join(labelKeyVals, ", ")
    }
    
    func homeDir() string {
        if h := os.Getenv("HOME"); h != "" {
            return h
        }
        return os.Getenv("USERPROFILE") // windows
    }
    
    func exitOnErr(err error) {
        if err != nil {
            log.Fatal(err)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥30 backtrader对于期货交易的现金和资产计算的问题
  • ¥15 求C# .net4.8小报表工具
  • ¥15 安装虚拟机时出现问题
  • ¥15 Selenium+docker Chrome不能运行
  • ¥15 mac电脑,安装charles后无法正常抓包
  • ¥18 visio打开文件一直显示文件未找到
  • ¥15 请教一下,openwrt如何让同一usb储存设备拔插后设备符号不变?
  • ¥50 使用quartz框架进行分布式任务定时调度,启动了两个实例,但是只有一个实例参与调度,另外一个实例没有参与调度,不知道是为什么?请各位帮助看一下原因!!
  • ¥50 怎么获取Ace Editor中的python代码后怎么调用Skulpt执行代码
  • ¥30 fpga基于dds生成幅值相位频率和波形可调的容易信号发生器。