领事上的分布式计数器更新问题

I'm using consul as a coordinator, and I coordinate completion of DAG branches using consul. Here each branches use a separate process. And to make the completion synchronised I use consul's counter and update with modifyIndex.

Although it seems there is some hole in the current implementation of code and the value is same for separate branches after increment.

The consul update is implemented as

func (consulStore *ConsulStateStore) Update(key string, oldValue string, newValue string) error {
    key = consulStore.consulKeyPath + "/" + key
    pair, _, err := consulStore.kv.Get(key, nil)
    if err != nil {
        return fmt.Errorf("failed to get key %s, error %v", key, err)
    }
    if pair == nil {
        return fmt.Errorf("failed to get key %s", key)
    }
    if string(pair.Value) != oldValue {
        return fmt.Errorf("Old value doesn't match for key %s", key)
    }
    modifyIndex := pair.ModifyIndex

    p := &consul.KVPair{Key: key, Value: []byte(newValue), ModifyIndex: modifyIndex}
    _, err = consulStore.kv.Put(p, nil)
    if err != nil {
        return fmt.Errorf("failed to update key %s, error %v", key, err)
    }
    return nil
}

My branch execution process calls the below function to increment a counter

// IncrementCounter increment counter by given term, if doesn't exist init with incrementby
func (fhandler *flowHandler) IncrementCounter(counter string, incrementby int) (int, error) {
        var serr error
        count := 0
        for i := 0; i < counterUpdateRetryCount; i++ {
                encoded, err := fhandler.stateStore.Get(counter)
                if err != nil {
                        // if doesn't exist try to create
                        err := fhandler.stateStore.Set(counter, fmt.Sprintf("%d", incrementby))
                        if err != nil {
                                return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
                        }
                        return incrementby, nil
                }

                current, err := strconv.Atoi(encoded)
                if err != nil {
                        return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
                }

                count = current + incrementby
                counterStr := fmt.Sprintf("%d", count)

                err = fhandler.stateStore.Update(counter, encoded, counterStr)
                if err == nil {
                        return count, nil
                }
                serr = err
        }
        return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", counter, serr)
}

In the runtime the counter is always initialised before increment

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐