dongmangji0950
dongmangji0950
2018-11-08 02:40

当使用k8s.io/client-go库更改kubernetes部署时,获得通知的最佳方法是什么?

  • kubernetes
已采纳

Context

I'm writing a script that uses the k8s.io/client-go library (godocs here) to manipulate Deployments. In particular, I want to add a label selector to every Deployment in my cluster. Deployment label selectors are immutable. So my approach is to:

  1. Create a copy of each Deployment with the only difference being the name is suffixed with "-temp". This is to minimize downtime of existing Deployments.
  2. Delete the original Deployments.
  3. Recreate the original Deployments with the only difference being an additional label selector.
  4. Delete the temporary Deployments.

I can't just use the client-go library to go through steps 1-4 sequentially because I only want to go onto the next step when the API server considers the previous step to be done. For example, I don't want to do step 3 until the API server says the original Deployments have been deleted. Otherwise, I'll get the error that the Deployment with the same name already exists.

Question

What's the best way to use the client-go library to detect when a Deployment is done being created and deleted and to attach callback functions? I came across the following packages.

But I'm not sure what the differences are between them and which one to use.

I read examples of watch here and informer here. Here's two related SO questions.

Update

It seems like watch provides a lower-level way to watch for changes to resources and receive events about changes. Seems like using the SharedInformerFactory to create a SharedInformer is the way to go.

So far I have

import (
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    typedv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/tools/cache"
    "path/filepath"
    "strings"

    // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    "k8s.io/client-go/tools/clientcmd"
    "log"
    "os"
)

func main() {

...

    factory := informers.NewSharedInformerFactory(kubeclient, 0)
    informer := factory.Apps().V1().Deployments().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Created deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[tempLabelKey]; ok {
                fmt.Printf("Detected temporary deployment created in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
                fmt.Printf("Now deleting previous deployment in namespace %s, name %s.
", d.GetNamespace(), deploymentToDelete)
                deleteDeployment(deploymentToDelete, d.GetNamespace(), kubeclient)
            }
        },
        DeleteFunc: func(obj interface{}) {
            d := obj.(v1.Deployment)
            fmt.Printf("Deleted deployment in namespace %s, name %s.
", d.GetNamespace(), d.GetName())

            if _, ok := d.GetLabels()[stageLabelKey]; !ok {
                fmt.Printf("Detected deployment without stage label was deleted in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                fmt.Printf("Now creating normal deployment with stage label in namespace %s, name %s.
", d.GetNamespace(), d.GetName())
                deployment := createDeploymentWithNewLabel(stageLabelKey, "production", d)
                createDeploymentsOnApi(deployment, kubeclient)
            }
        },
    })
    informer.Run(stopper)
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • dtqu3278 dtqu3278 3年前

    I ended up using a SharedInformer.

    These resources were helpful.

    .

    package main
    
    import (
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io/ioutil"
        "k8s.io/api/apps/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "path/filepath"
        "strings"
        // We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "k8s.io/client-go/tools/clientcmd"
        "log"
        "os"
    )
    
    const manifestsDir = "manifests"
    
    // Use an empty string to run on all namespaces
    const namespace = ""
    const newLabelKey = "new-label-to-add"
    const tempLabelKey = "temporary"
    const tempSuffix = "-temp"
    const componentLabelKey = "component"
    
    func main() {
        var kubeconfig *string
        if home := homeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        // use the current context in kubeconfig
        // TODO (dxia) How can I specify a masterUrl or even better a kubectl context?
        cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        exitOnErr(err)
    
        kubeclient, err := kubernetes.NewForConfig(cfg)
        exitOnErr(err)
    
        fmt.Printf("Getting deployments with '%s' label.
    ", componentLabelKey)
        deployments, err := kubeclient.AppsV1().Deployments(namespace).List(metav1.ListOptions{
            LabelSelector: componentLabelKey,
        })
        fmt.Printf("Got %d deployments.
    ", len(deployments.Items))
        exitOnErr(err)
    
        deployments = processDeployments(deployments)
        fmt.Println("Saving deployment manifests to disk as backup.")
        err = saveDeployments(deployments)
        exitOnErr(err)
    
        tempDeployments := appendToDeploymentName(deployments, tempSuffix)
        tempDeployments = createDeploymentsWithNewLabel(tempLabelKey, "true", tempDeployments)
    
        factory := informers.NewSharedInformerFactory(kubeclient, 0)
        informer := factory.Apps().V1().Deployments().Informer()
        stopper := make(chan struct{})
        defer close(stopper)
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[tempLabelKey]; ok {
                    labelsStr := joinLabelKeyVals(labels)
                    fmt.Printf("2: Temporary deployment created in namespace %s, name %s, labels '%s'.
    ", d.GetNamespace(), d.GetName(), labelsStr)
                    deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
    
                    deployment := getDeployment(d.GetNamespace(), deploymentToDelete, componentLabelKey, kubeclient)
    
                    if deployment != nil {
                        fmt.Printf("3: Now deleting previous deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                        if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    } else {
                        fmt.Printf("4: Didn't find deployment in namespace %s, name %s, label %s. Skipping.
    ", d.GetNamespace(), deploymentToDelete, componentLabelKey)
                    }
                } else if labelVal, ok := labels[newLabelKey]; ok && labelVal == "production" {
                    fmt.Printf("Normal deployment with '%s' label created in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                    deploymentToDelete := d.GetName() + tempSuffix
                    fmt.Printf("6: Now deleting temporary deployment in namespace %s, name %s.
    ", d.GetNamespace(), deploymentToDelete)
                    if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
                        exitOnErr(err)
                    }
                }
            },
            DeleteFunc: func(obj interface{}) {
                d := obj.(*v1.Deployment)
                labels := d.GetLabels()
    
                if _, ok := labels[newLabelKey]; !ok {
                    if _, ok := labels[tempLabelKey]; !ok {
                        fmt.Printf("Deployment without '%s' or '%s' label deleted in namespace %s, name %s.
    ", newLabelKey, tempLabelKey, d.GetNamespace(), d.GetName())
                        fmt.Printf("5: Now creating normal deployment with '%s' label in namespace %s, name %s.
    ", newLabelKey, d.GetNamespace(), d.GetName())
                        deploymentToCreate := createDeploymentWithNewLabel(newLabelKey, "production", *d)
                        if err := createDeploymentOnApi(deploymentToCreate, kubeclient); err != nil {
                            exitOnErr(err)
                        }
                    }
                }
            },
        })
    
        fmt.Println("1: Creating temporary Deployments.")
        err = createDeploymentsOnApi(tempDeployments, kubeclient)
        exitOnErr(err)
    
        informer.Run(stopper)
    }
    
    func getDeployment(namespace string, name string, labelKey string, client *kubernetes.Clientset) *v1.Deployment {
        d, err := client.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
        if err != nil {
            return nil
        }
    
        if _, ok := d.GetLabels()[labelKey]; !ok {
            return nil
        }
    
        return d
    }
    
    func createDeploymentWithNewLabel(key string, val string, deployment v1.Deployment) v1.Deployment {
        newDeployment := deployment.DeepCopy()
        labels := newDeployment.GetLabels()
        if labels == nil {
            labels = make(map[string]string)
            newDeployment.SetLabels(labels)
        }
        labels[key] = val
    
        podTemplateSpecLabels := newDeployment.Spec.Template.GetLabels()
        if podTemplateSpecLabels == nil {
            podTemplateSpecLabels = make(map[string]string)
            newDeployment.Spec.Template.SetLabels(podTemplateSpecLabels)
        }
        podTemplateSpecLabels[key] = val
    
        labelSelectors := newDeployment.Spec.Selector.MatchLabels
        if labelSelectors == nil {
            labelSelectors = make(map[string]string)
            newDeployment.Spec.Selector.MatchLabels = labelSelectors
        }
        labelSelectors[key] = val
    
        return *newDeployment
    }
    
    func createDeploymentsWithNewLabel(key string, val string, deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            newDeployment := createDeploymentWithNewLabel(key, val, d)
            newDeployments.Items = append(newDeployments.Items, newDeployment)
        }
    
        return newDeployments
    }
    
    func setAPIVersionAndKindForDeployment(d v1.Deployment, apiVersion string, kind string) {
        // These fields are empty strings.
        // Looks like an open issue: https://github.com/kubernetes/kubernetes/issues/3030.
        d.APIVersion = apiVersion
        d.Kind = kind
    }
    
    func processDeployments(deployments *v1.DeploymentList) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            // Set APIVersion and Kind until https://github.com/kubernetes/kubernetes/issues/3030 is fixed
            setAPIVersionAndKindForDeployment(d, "apps/v1", "Deployment")
            d.Status = v1.DeploymentStatus{}
            d.SetUID(types.UID(""))
            d.SetSelfLink("")
            d.SetGeneration(0)
            d.SetCreationTimestamp(metav1.Now())
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func saveDeployments(deployments *v1.DeploymentList) error {
        for _, d := range deployments.Items {
            if err := saveManifest(d); err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func saveManifest(resource interface{}) error {
        var path = manifestsDir
        var name string
        var err error
    
        switch v := resource.(type) {
        case v1.Deployment:
            path = fmt.Sprintf("%s%s/%s/%s", path, v.GetClusterName(), v.GetNamespace(), "deployments")
            name = v.GetName()
        default:
            return errors.New(fmt.Sprintf("Got an unknown resource kind: %v", resource))
        }
    
        bytes, err := json.MarshalIndent(resource, "", "  ")
        if err != nil {
            return err
        }
    
        err = os.MkdirAll(path, 0755)
        if err != nil {
            return err
        }
    
        err = ioutil.WriteFile(fmt.Sprintf("%s/%s", path, name), bytes, 0644)
        if err != nil {
            return err
        }
    
        return nil
    }
    
    func deleteDeployment(namespace string, name string, client *kubernetes.Clientset) error {
        if err := client.AppsV1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
            return err
        }
    
        return nil
    }
    
    func appendToDeploymentName(deployments *v1.DeploymentList, suffix string) *v1.DeploymentList {
        newDeployments := &v1.DeploymentList{}
        for _, d := range deployments.Items {
            d.SetName(fmt.Sprintf("%s%s", d.GetName(), suffix))
            newDeployments.Items = append(newDeployments.Items, d)
        }
        return newDeployments
    }
    
    func createDeploymentOnApi(d v1.Deployment, client *kubernetes.Clientset) error {
        d.SetResourceVersion("")
    
        if _, err := client.AppsV1().Deployments(d.GetNamespace()).Create(&d); err != nil {
            return err
        }
    
        return nil
    }
    
    func createDeploymentsOnApi(deployments *v1.DeploymentList, client *kubernetes.Clientset) error {
        for _, d := range deployments.Items {
            if err := createDeploymentOnApi(d, client); err != nil {
                return err
            }
        }
        return nil
    }
    
    func joinLabelKeyVals(labels map[string]string) string {
        labelKeyVals := make([]string, 0, len(labels))
        for k, v := range labels {
            labelKeyVals = append(labelKeyVals, fmt.Sprintf("%v=%v", k, v))
        }
        return strings.Join(labelKeyVals, ", ")
    }
    
    func homeDir() string {
        if h := os.Getenv("HOME"); h != "" {
            return h
        }
        return os.Getenv("USERPROFILE") // windows
    }
    
    func exitOnErr(err error) {
        if err != nil {
            log.Fatal(err)
        }
    }
    
    点赞 评论 复制链接分享