diff --git a/agency/agency.go b/agency/agency.go index 22d686c8..c660fd73 100644 --- a/agency/agency.go +++ b/agency/agency.go @@ -30,6 +30,17 @@ import ( driver "github.com/arangodb/go-driver" ) +type LeaderElectionCell interface { + // Update checks the current leader cell and if no leader is present + // it ties to put itself in there. Will return the value currently present, + // wether we are leader and a duration after which Updated should be called again. + // If value is nil, we do not try to become leader. + Update(ctx context.Context, value interface{}) (interface{}, bool, time.Duration, error) + + // Resign tries to resign leadership + Resign(ctx context.Context) error +} + // Agency provides API implemented by the ArangoDB agency. type Agency interface { // Connection returns the connection used by this api. @@ -74,6 +85,8 @@ type Agency interface { // Deprecated: use 'WriteTransaction' instead // Register a URL to receive notification callbacks when the value of the given key changes UnregisterChangeCallback(ctx context.Context, key []string, cbURL string) error + + LeaderElectionCell(key []string, ttl time.Duration) LeaderElectionCell } // Deprecated: use 'agency.KeyConditioner' instead diff --git a/agency/agency_impl.go b/agency/agency_impl.go index b7e7fbad..5f9c2173 100644 --- a/agency/agency_impl.go +++ b/agency/agency_impl.go @@ -355,3 +355,104 @@ func (c *agency) UnregisterChangeCallback(ctx context.Context, key []string, cbU func createFullKey(key []string) string { return "/" + strings.Join(key, "/") } + +func (c *agency) LeaderElectionCell(key []string, ttl time.Duration) LeaderElectionCell { + return &leaderElectionCellData{ + agency: c, + lastTTL: 0, + leading: false, + key: key, + ttl: ttl, + } +} + +type leaderElectionCellData struct { + agency Agency + lastTTL int64 + leading bool + key []string + ttl time.Duration +} + +type leaderStruct struct { + Data interface{} `json:"data,omitempty"` + TTL int64 `json:"ttl,omitempty"` +} + +func (l *leaderElectionCellData) tryBecomeLeader(ctx context.Context, value interface{}, assumeEmpty bool) error { + trx := NewTransaction("", TransactionOptions{}) + + newTTL := time.Now().Add(l.ttl).Unix() + trx.AddKey(NewKeySet(l.key, leaderStruct{Data: value, TTL: newTTL}, 0)) + if assumeEmpty { + trx.AddCondition(l.key, NewConditionOldEmpty(true)) + } else { + key := append(l.key, "ttl") + trx.AddCondition(key, NewConditionIfEqual(l.lastTTL)) + } + + if err := l.agency.WriteTransaction(ctx, trx); err == nil { + l.lastTTL = newTTL + l.leading = true + } else { + return err + } + + return nil +} + +func (l *leaderElectionCellData) Update(ctx context.Context, value interface{}) (interface{}, bool, time.Duration, error) { + for { + assumeEmpty := false + var result leaderStruct + if err := l.agency.ReadKey(ctx, l.key, &result); err != nil { + if IsKeyNotFound(err) { + assumeEmpty = true + goto tryLeaderElection + } + assumeEmpty = false + } + + { + now := time.Now() + if result.TTL < now.Unix() { + l.lastTTL = result.TTL + l.leading = false + goto tryLeaderElection + } + + if result.TTL == l.lastTTL && l.leading { + // try to update the ttl + goto tryLeaderElection + } else { + // some new leader has been established + l.lastTTL = result.TTL + l.leading = false + return result.Data, false, time.Duration(l.lastTTL - now.Unix()), nil + } + } + + tryLeaderElection: + if value == nil { + return nil, false, time.Second, nil + } + if err := l.tryBecomeLeader(ctx, value, assumeEmpty); err == nil { + return value, true, l.ttl / 2, nil + } else if !driver.IsPreconditionFailed(err) { + return nil, false, 0, err + } + } +} + +func (l *leaderElectionCellData) Resign(ctx context.Context) error { + // delete the key with precondition that ttl is as expected + if !l.leading { + return nil + } + l.leading = false + trx := NewTransaction("", TransactionOptions{}) + key := append(l.key, "ttl") + trx.AddCondition(key, NewConditionIfEqual(l.lastTTL)) + trx.AddKey(NewKeyDelete(l.key)) + return l.agency.WriteTransaction(ctx, trx) +}