@@ -106,18 +106,17 @@ func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context,
106
106
return nil
107
107
}
108
108
109
- func (s * runtimeClusterManager ) runLeaderElection (ctx context.Context , agencyClient agency. Agency , myURL string ) {
110
- le := election .NewLeaderElectionCell [string ](agencyClient , masterURLKey , masterURLTTL )
109
+ func (s * runtimeClusterManager ) runLeaderElection (ctx context.Context , myURL string ) {
110
+ le := election .NewLeaderElectionCell [string ](masterURLKey , masterURLTTL )
111
111
112
- var err error
113
- var delay time.Duration
112
+ delay := time .Microsecond
114
113
resignErrBackoff := backoff .NewExponentialBackOff ()
115
114
for {
116
115
timer := time .NewTimer (delay )
117
116
// Wait a bit
118
117
select {
119
118
case <- timer .C :
120
- // Delay over, just continue
119
+ // Delay over, just continue
121
120
case <- ctx .Done ():
122
121
// We're asked to stop
123
122
if ! timer .Stop () {
@@ -126,11 +125,18 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
126
125
return
127
126
}
128
127
128
+ agencyClient , err := s .createAgencyAPI ()
129
+ if err != nil {
130
+ delay = time .Second
131
+ s .log .Debug ().Err (err ).Msgf ("could not create agency client. Retrying in %s" , delay )
132
+ continue
133
+ }
134
+
129
135
oldMasterURL := s .GetMasterURL ()
130
136
if s .avoidBeingMaster {
131
137
if oldMasterURL == "" {
132
138
s .log .Debug ().Msg ("Initializing master URL before resigning" )
133
- currMasterURL , err := le .Read (ctx )
139
+ currMasterURL , err := le .Read (ctx , agencyClient )
134
140
if err != nil {
135
141
delay = 5 * time .Second
136
142
s .log .Err (err ).Msgf ("Failed to read current value before resigning. Retrying in %s" , delay )
@@ -140,7 +146,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
140
146
}
141
147
142
148
s .log .Debug ().Str ("master_url" , myURL ).Msgf ("Resigning leadership" )
143
- err = le .Resign (ctx )
149
+ err = le .Resign (ctx , agencyClient )
144
150
if err != nil {
145
151
delay = resignErrBackoff .NextBackOff ()
146
152
s .log .Err (err ).Msgf ("Resigning leadership failed. Retrying in %s" , delay )
@@ -157,7 +163,7 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
157
163
s .log .Debug ().
158
164
Str ("master_url" , myURL ).
159
165
Msg ("Updating leadership" )
160
- masterURL , isMaster , delay , err = le .Update (ctx , myURL )
166
+ masterURL , isMaster , delay , err = le .Update (ctx , agencyClient , myURL )
161
167
if err != nil {
162
168
delay = 5 * time .Second
163
169
s .log .Error ().Err (err ).Msgf ("Update leader election failed. Retrying in %s" , delay )
@@ -166,16 +172,15 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyCli
166
172
if isMaster && masterURL != myURL {
167
173
s .log .Error ().Msgf ("Unexpected error: this peer is a master but URL differs. Should be %s got %s" , myURL , masterURL )
168
174
}
175
+ if ! isMaster && masterURL == myURL {
176
+ s .log .Error ().Msgf ("Unexpected error: this peer is not a master but URL in agency is mine" )
177
+ }
169
178
170
179
s .updateMasterURL (masterURL , isMaster )
171
180
}
172
181
}
173
182
174
183
func (s * runtimeClusterManager ) updateMasterURL (masterURL string , isMaster bool ) {
175
- s .log .Debug ().
176
- Str ("new_master_url" , masterURL ).
177
- Bool ("is_master" , isMaster ).
178
- Msg ("Leadership updated" )
179
184
newState := stateRunningSlave
180
185
if isMaster {
181
186
newState = stateRunningMaster
@@ -215,16 +220,11 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
215
220
return
216
221
}
217
222
218
- agencyClient , err := s .createAgencyAPI ()
219
- if err != nil {
220
- log .Error ().Msg ("Could not create agency API client" )
221
- return
222
- }
223
223
ownURL := myPeer .CreateStarterURL ("/" )
224
- go s .runLeaderElection (ctx , agencyClient , ownURL )
224
+ go s .runLeaderElection (ctx , ownURL )
225
225
226
226
for {
227
- var delay time.Duration
227
+ delay := time .Microsecond
228
228
// Loop until stopping
229
229
if ctx .Err () != nil {
230
230
// Stop requested
@@ -243,7 +243,7 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
243
243
delay = time .Second * 15
244
244
}
245
245
} else {
246
- // we are still leading, check again later
246
+ // we are still leading or not initialized , check again later
247
247
delay = time .Second * 5
248
248
}
249
249
0 commit comments