doushan1157 2019-02-19 07:29
浏览 280

无法从监视流解码事件:流错误:流ID 3; 内部错误

I want to listen for add events that are deployments within my cluster,I use the client-go Watch Api,It works well at first, but after a short period of time, it will report an error.

I have tried two API ways to listen

the one way:

func startWatchDeployment(clientSet *kubernetes.Clientset){
    defer func() {
        err := recover()
        if err != nil {
            fmt.Println(err)
        }
    }()

    Log.Info("正在监听deployment...")
    count := 0
    deploymentsClient := clientSet.AppsV1beta1().Deployments(metav1.NamespaceAll)
    list,_ := deploymentsClient.List(metav1.ListOptions{})
    items := list.Items
    w, _ := deploymentsClient.Watch(metav1.ListOptions{})
    for {
        select {
            case e, _ := <-w.ResultChan():
                Log.Infof("162: %s",e)
                if e.Type == watch.Added || e.Type == watch.Deleted{
                    if count != len(items){
                        count += 1
                    }else{
                        // go的reflect获取运行时的struct
                        nname := e.Object.(*v1beta1.Deployment).Namespace
                        if r, _ := regexp.Compile("^(p|u|user)-");nname != "default" && nname != "cattle-system" &&
                            nname != "kube-system" && nname != "dsky-system" &&
                            nname != "kube-public" && nname != "local" && nname != "tools" && !r.MatchString(nname) {
                            data := make(map[string]interface{},1)
                            data["type"] = e.Type
                            data["name"] = e.Object.(*v1beta1.Deployment).Name
                            data["namespace"] = e.Object.(*v1beta1.Deployment).Namespace
                            watchChannel <- data
                        }
                    }
                }
        }
    }
}

the other way:

func startWatchDp(clientSet *kubernetes.Clientset){
    watchlist := cache.NewListWatchFromClient(
        clientSet.AppsV1().RESTClient(),
        "deployments",
        metav1.NamespaceAll,
        fields.Everything())

    _, controller := cache.NewInformer(
        watchlist,
        &v13.Deployment{},
        time.Millisecond*100,
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                watchChannel <- 1
                //fmt.Println(obj)
            },
            DeleteFunc: func(obj interface{}) {
                watchChannel <- 1
            },
        },
    )

    stop := make(chan struct{})
    go controller.Run(stop)

    for {
        time.Sleep(10 * time.Second)
    }
}

Here is my result:

E0219 15:20:02.274210   19272 streamwatcher.go:109] Unable to decode an event from the watch stream: stream error: stream ID 3; INTERNAL_ERROR
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"

How can I fix it?Please help!Thanks

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥50 易语言把MYSQL数据库中的数据添加至组合框
    • ¥20 求数据集和代码#有偿答复
    • ¥15 关于下拉菜单选项关联的问题
    • ¥20 java-OJ-健康体检
    • ¥15 rs485的上拉下拉,不会对a-b<-200mv有影响吗,就是接受时,对判断逻辑0有影响吗
    • ¥15 使用phpstudy在云服务器上搭建个人网站
    • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
    • ¥15 vue3+express部署到nginx
    • ¥20 搭建pt1000三线制高精度测温电路
    • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况