dongwu4834 2019-07-22 07:50
浏览 41

领事上的分布式计数器更新问题

I'm using consul as a coordinator, and I coordinate completion of DAG branches using consul. Here each branches use a separate process. And to make the completion synchronised I use consul's counter and update with modifyIndex.

Although it seems there is some hole in the current implementation of code and the value is same for separate branches after increment.

The consul update is implemented as

func (consulStore *ConsulStateStore) Update(key string, oldValue string, newValue string) error {
    key = consulStore.consulKeyPath + "/" + key
    pair, _, err := consulStore.kv.Get(key, nil)
    if err != nil {
        return fmt.Errorf("failed to get key %s, error %v", key, err)
    }
    if pair == nil {
        return fmt.Errorf("failed to get key %s", key)
    }
    if string(pair.Value) != oldValue {
        return fmt.Errorf("Old value doesn't match for key %s", key)
    }
    modifyIndex := pair.ModifyIndex

    p := &consul.KVPair{Key: key, Value: []byte(newValue), ModifyIndex: modifyIndex}
    _, err = consulStore.kv.Put(p, nil)
    if err != nil {
        return fmt.Errorf("failed to update key %s, error %v", key, err)
    }
    return nil
}

My branch execution process calls the below function to increment a counter

// IncrementCounter increment counter by given term, if doesn't exist init with incrementby
func (fhandler *flowHandler) IncrementCounter(counter string, incrementby int) (int, error) {
        var serr error
        count := 0
        for i := 0; i < counterUpdateRetryCount; i++ {
                encoded, err := fhandler.stateStore.Get(counter)
                if err != nil {
                        // if doesn't exist try to create
                        err := fhandler.stateStore.Set(counter, fmt.Sprintf("%d", incrementby))
                        if err != nil {
                                return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
                        }
                        return incrementby, nil
                }

                current, err := strconv.Atoi(encoded)
                if err != nil {
                        return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
                }

                count = current + incrementby
                counterStr := fmt.Sprintf("%d", count)

                err = fhandler.stateStore.Update(counter, encoded, counterStr)
                if err == nil {
                        return count, nil
                }
                serr = err
        }
        return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", counter, serr)
}

In the runtime the counter is always initialised before increment

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 用visual studi code完成html页面
    • ¥15 聚类分析或者python进行数据分析
    • ¥15 逻辑谓词和消解原理的运用
    • ¥15 三菱伺服电机按启动按钮有使能但不动作
    • ¥15 js,页面2返回页面1时定位进入的设备
    • ¥50 导入文件到网吧的电脑并且在重启之后不会被恢复
    • ¥15 (希望可以解决问题)ma和mb文件无法正常打开,打开后是空白,但是有正常内存占用,但可以在打开Maya应用程序后打开场景ma和mb格式。
    • ¥20 ML307A在使用AT命令连接EMQX平台的MQTT时被拒绝
    • ¥20 腾讯企业邮箱邮件可以恢复么
    • ¥15 有人知道怎么将自己的迁移策略布到edgecloudsim上使用吗?