Skip to content

Leader Election #518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions agency/agency.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ import (
driver "github.com/arangodb/go-driver"
)

type LeaderElectionCell interface {
// Update checks the current leader cell and if no leader is present
// it ties to put itself in there. Will return the value currently present,
// wether we are leader and a duration after which Updated should be called again.
// If value is nil, we do not try to become leader.
Update(ctx context.Context, value interface{}) (interface{}, bool, time.Duration, error)

// Resign tries to resign leadership
Resign(ctx context.Context) error
}

// Agency provides API implemented by the ArangoDB agency.
type Agency interface {
// Connection returns the connection used by this api.
Expand Down Expand Up @@ -74,6 +85,8 @@ type Agency interface {
// Deprecated: use 'WriteTransaction' instead
// Register a URL to receive notification callbacks when the value of the given key changes
UnregisterChangeCallback(ctx context.Context, key []string, cbURL string) error

LeaderElectionCell(key []string, ttl time.Duration) LeaderElectionCell
}

// Deprecated: use 'agency.KeyConditioner' instead
Expand Down
101 changes: 101 additions & 0 deletions agency/agency_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,104 @@ func (c *agency) UnregisterChangeCallback(ctx context.Context, key []string, cbU
func createFullKey(key []string) string {
return "/" + strings.Join(key, "/")
}

func (c *agency) LeaderElectionCell(key []string, ttl time.Duration) LeaderElectionCell {
return &leaderElectionCellData{
agency: c,
lastTTL: 0,
leading: false,
key: key,
ttl: ttl,
}
}

type leaderElectionCellData struct {
agency Agency
lastTTL int64
leading bool
key []string
ttl time.Duration
}

type leaderStruct struct {
Data interface{} `json:"data,omitempty"`
TTL int64 `json:"ttl,omitempty"`
}

func (l *leaderElectionCellData) tryBecomeLeader(ctx context.Context, value interface{}, assumeEmpty bool) error {
trx := NewTransaction("", TransactionOptions{})

newTTL := time.Now().Add(l.ttl).Unix()
trx.AddKey(NewKeySet(l.key, leaderStruct{Data: value, TTL: newTTL}, 0))
if assumeEmpty {
trx.AddCondition(l.key, NewConditionOldEmpty(true))
} else {
key := append(l.key, "ttl")
trx.AddCondition(key, NewConditionIfEqual(l.lastTTL))
}

if err := l.agency.WriteTransaction(ctx, trx); err == nil {
l.lastTTL = newTTL
l.leading = true
} else {
return err
}

return nil
}

func (l *leaderElectionCellData) Update(ctx context.Context, value interface{}) (interface{}, bool, time.Duration, error) {
for {
assumeEmpty := false
var result leaderStruct
if err := l.agency.ReadKey(ctx, l.key, &result); err != nil {
if IsKeyNotFound(err) {
assumeEmpty = true
goto tryLeaderElection
}
assumeEmpty = false
}

{
now := time.Now()
if result.TTL < now.Unix() {
l.lastTTL = result.TTL
l.leading = false
goto tryLeaderElection
}

if result.TTL == l.lastTTL && l.leading {
// try to update the ttl
goto tryLeaderElection
} else {
// some new leader has been established
l.lastTTL = result.TTL
l.leading = false
return result.Data, false, time.Duration(l.lastTTL - now.Unix()), nil
}
}

tryLeaderElection:
if value == nil {
return nil, false, time.Second, nil
}
if err := l.tryBecomeLeader(ctx, value, assumeEmpty); err == nil {
return value, true, l.ttl / 2, nil
} else if !driver.IsPreconditionFailed(err) {
return nil, false, 0, err
}
}
}

func (l *leaderElectionCellData) Resign(ctx context.Context) error {
// delete the key with precondition that ttl is as expected
if !l.leading {
return nil
}
l.leading = false
trx := NewTransaction("", TransactionOptions{})
key := append(l.key, "ttl")
trx.AddCondition(key, NewConditionIfEqual(l.lastTTL))
trx.AddKey(NewKeyDelete(l.key))
return l.agency.WriteTransaction(ctx, trx)
}