I'm using consul as a coordinator, and I coordinate completion of DAG branches using consul. Here each branches use a separate process. And to make the completion synchronised I use consul's counter and update with modifyIndex.
Although it seems there is some hole in the current implementation of code and the value is same for separate branches after increment.
The consul update is implemented as
func (consulStore *ConsulStateStore) Update(key string, oldValue string, newValue string) error {
key = consulStore.consulKeyPath + "/" + key
pair, _, err := consulStore.kv.Get(key, nil)
if err != nil {
return fmt.Errorf("failed to get key %s, error %v", key, err)
}
if pair == nil {
return fmt.Errorf("failed to get key %s", key)
}
if string(pair.Value) != oldValue {
return fmt.Errorf("Old value doesn't match for key %s", key)
}
modifyIndex := pair.ModifyIndex
p := &consul.KVPair{Key: key, Value: []byte(newValue), ModifyIndex: modifyIndex}
_, err = consulStore.kv.Put(p, nil)
if err != nil {
return fmt.Errorf("failed to update key %s, error %v", key, err)
}
return nil
}
My branch execution process calls the below function to increment a counter
// IncrementCounter increment counter by given term, if doesn't exist init with incrementby
func (fhandler *flowHandler) IncrementCounter(counter string, incrementby int) (int, error) {
var serr error
count := 0
for i := 0; i < counterUpdateRetryCount; i++ {
encoded, err := fhandler.stateStore.Get(counter)
if err != nil {
// if doesn't exist try to create
err := fhandler.stateStore.Set(counter, fmt.Sprintf("%d", incrementby))
if err != nil {
return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
}
return incrementby, nil
}
current, err := strconv.Atoi(encoded)
if err != nil {
return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
}
count = current + incrementby
counterStr := fmt.Sprintf("%d", count)
err = fhandler.stateStore.Update(counter, encoded, counterStr)
if err == nil {
return count, nil
}
serr = err
}
return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", counter, serr)
}
In the runtime the counter is always initialised before increment