Skip to content

Fix leader election edge-cases #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680
github.com/arangodb-helper/go-helper v0.2.1
github.com/arangodb-helper/go-helper v0.4.1
github.com/arangodb/go-driver v1.6.0
github.com/arangodb/go-upgrade-rules v0.0.0-20200605091205-439fb1ee86e7
github.com/cenkalti/backoff v2.2.1+incompatible
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ github.com/arangodb-helper/go-helper v0.2.0 h1:rpkX0msiMVorxPJygR2R5s67RjQdFEIGm
github.com/arangodb-helper/go-helper v0.2.0/go.mod h1:RHgEwQTFWdJ9wFDGUCgUZzaz9NLaFUskSsHgOPM5XR4=
github.com/arangodb-helper/go-helper v0.2.1 h1:hwh/Nbkce3FHtegKYKzfGIhlGI5bRaDMpjQCRKMG55A=
github.com/arangodb-helper/go-helper v0.2.1/go.mod h1:RHgEwQTFWdJ9wFDGUCgUZzaz9NLaFUskSsHgOPM5XR4=
github.com/arangodb-helper/go-helper v0.4.1-0.20230713102015-83a33422c87a h1:TsR6WxBzhwTJyYXyWedrzUdmLL/MnIw18wJVPT8V0HI=
github.com/arangodb-helper/go-helper v0.4.1-0.20230713102015-83a33422c87a/go.mod h1:RHgEwQTFWdJ9wFDGUCgUZzaz9NLaFUskSsHgOPM5XR4=
github.com/arangodb-helper/go-helper v0.4.1-0.20230713105633-bc4e0cf3a627 h1:gmZ3WBqiQGPvRtKBUoWvFSwb2eXIg+X1MaNMoSG+CwY=
github.com/arangodb-helper/go-helper v0.4.1-0.20230713105633-bc4e0cf3a627/go.mod h1:RHgEwQTFWdJ9wFDGUCgUZzaz9NLaFUskSsHgOPM5XR4=
github.com/arangodb-helper/go-helper v0.4.1 h1:yO4Bu5AhuvenDe5AmSWRo/ya/GI1lmVA+BYrRT9umsI=
github.com/arangodb-helper/go-helper v0.4.1/go.mod h1:RHgEwQTFWdJ9wFDGUCgUZzaz9NLaFUskSsHgOPM5XR4=
github.com/arangodb/go-driver v1.6.0 h1:NFWj/idqXZxhFVueihMSI2R9NotNIsgvNfM/xmpekb4=
github.com/arangodb/go-driver v1.6.0/go.mod h1:HQmdGkvNMVBTE3SIPSQ8T/ZddC6iwNsfMR+dDJQxIsI=
github.com/arangodb/go-upgrade-rules v0.0.0-20200605091205-439fb1ee86e7 h1:zCY5fsv5apos+oAdd1bLr1UEFOHeIUDZCItbwU/u6XE=
Expand Down
40 changes: 20 additions & 20 deletions service/runtime_cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,17 @@ func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context,
return nil
}

func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyClient agency.Agency, myURL string) {
le := election.NewLeaderElectionCell[string](agencyClient, masterURLKey, masterURLTTL)
func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, myURL string) {
le := election.NewLeaderElectionCell[string](masterURLKey, masterURLTTL)

var err error
var delay time.Duration
delay := time.Microsecond
resignErrBackoff := backoff.NewExponentialBackOff()
for {
timer := time.NewTimer(delay)
// Wait a bit
select {
case <-timer.C:
// Delay over, just continue
// Delay over, just continue
case <-ctx.Done():
// We're asked to stop
if !timer.Stop() {
Expand All @@ -126,11 +125,18 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
return
}

agencyClient, err := s.createAgencyAPI()
if err != nil {
delay = time.Second
s.log.Debug().Err(err).Msgf("could not create agency client. Retrying in %s", delay)
continue
}

oldMasterURL := s.GetMasterURL()
if s.avoidBeingMaster {
if oldMasterURL == "" {
s.log.Debug().Msg("Initializing master URL before resigning")
currMasterURL, err := le.Read(ctx)
currMasterURL, err := le.Read(ctx, agencyClient)
if err != nil {
delay = 5 * time.Second
s.log.Err(err).Msgf("Failed to read current value before resigning. Retrying in %s", delay)
Expand All @@ -140,7 +146,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
}

s.log.Debug().Str("master_url", myURL).Msgf("Resigning leadership")
err = le.Resign(ctx)
err = le.Resign(ctx, agencyClient)
if err != nil {
delay = resignErrBackoff.NextBackOff()
s.log.Err(err).Msgf("Resigning leadership failed. Retrying in %s", delay)
Expand All @@ -157,7 +163,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
s.log.Debug().
Str("master_url", myURL).
Msg("Updating leadership")
masterURL, isMaster, delay, err = le.Update(ctx, myURL)
masterURL, isMaster, delay, err = le.Update(ctx, agencyClient, myURL)
if err != nil {
delay = 5 * time.Second
s.log.Error().Err(err).Msgf("Update leader election failed. Retrying in %s", delay)
Expand All @@ -166,16 +172,15 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
if isMaster && masterURL != myURL {
s.log.Error().Msgf("Unexpected error: this peer is a master but URL differs. Should be %s got %s", myURL, masterURL)
}
if !isMaster && masterURL == myURL {
s.log.Error().Msgf("Unexpected error: this peer is not a master but URL in agency is mine")
}

s.updateMasterURL(masterURL, isMaster)
}
}

func (s *runtimeClusterManager) updateMasterURL(masterURL string, isMaster bool) {
s.log.Debug().
Str("new_master_url", masterURL).
Bool("is_master", isMaster).
Msg("Leadership updated")
newState := stateRunningSlave
if isMaster {
newState = stateRunningMaster
Expand Down Expand Up @@ -215,16 +220,11 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
return
}

agencyClient, err := s.createAgencyAPI()
if err != nil {
log.Error().Msg("Could not create agency API client")
return
}
ownURL := myPeer.CreateStarterURL("/")
go s.runLeaderElection(ctx, agencyClient, ownURL)
go s.runLeaderElection(ctx, ownURL)

for {
var delay time.Duration
delay := time.Microsecond
// Loop until stopping
if ctx.Err() != nil {
// Stop requested
Expand All @@ -243,7 +243,7 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
delay = time.Second * 15
}
} else {
// we are still leading, check again later
// we are still leading or not initialized, check again later
delay = time.Second * 5
}

Expand Down
12 changes: 6 additions & 6 deletions service/upgrade_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,22 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, forceMinorUpg
return maskAny(err)
}
m.log.Debug().Msg("Creating lock")
lock, err := election.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
lock, err := election.NewLock(m, upgradeManagerLockKey, "", upgradeManagerLockTTL)
if err != nil {
return maskAny(err)
}

// Claim the upgrade lock
m.log.Debug().Msg("Locking lock")
if err := lock.Lock(ctx); err != nil {
if err := lock.Lock(ctx, api); err != nil {
m.log.Debug().Err(err).Msg("Lock failed")
return maskAny(err)
}

// Close agency lock when we're done
defer func() {
m.log.Debug().Msg("Unlocking lock")
lock.Unlock(context.Background())
lock.Unlock(context.Background(), api)
}()

m.log.Debug().Msg("Reading upgrade plan...")
Expand Down Expand Up @@ -547,22 +547,22 @@ func (m *upgradeManager) AbortDatabaseUpgrade(ctx context.Context) error {
return maskAny(err)
}
m.log.Debug().Msg("Creating lock")
lock, err := election.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
lock, err := election.NewLock(m, upgradeManagerLockKey, "", upgradeManagerLockTTL)
if err != nil {
return maskAny(err)
}

// Claim the upgrade lock
m.log.Debug().Msg("Locking lock")
if err := lock.Lock(ctx); err != nil {
if err := lock.Lock(ctx, api); err != nil {
m.log.Debug().Err(err).Msg("Lock failed")
return maskAny(err)
}

// Close agency lock when we're done
defer func() {
m.log.Debug().Msg("Unlocking lock")
lock.Unlock(context.Background())
lock.Unlock(context.Background(), api)
}()

// Check plan
Expand Down