Skip to content

Allocate timers outside of loops to avoid repeat allocations #4367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions acceptance/cluster/localcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,10 @@ func (l *LocalCluster) Start() {
func (l *LocalCluster) Assert(t *testing.T) {
const almostZero = 50 * time.Millisecond
filter := func(ch chan Event, wait time.Duration) *Event {
for {
select {
case act := <-ch:
return &act
case <-time.After(wait):
}
break
select {
case act := <-ch:
return &act
case <-time.After(wait):
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,8 @@ func (g *Gossip) bootstrap() {
stopper := g.server.stopper

stopper.RunWorker(func() {
var bootstrapTimer util.Timer
defer bootstrapTimer.Stop()
for {
stopper.RunTask(func() {
g.mu.Lock()
Expand All @@ -769,8 +771,10 @@ func (g *Gossip) bootstrap() {
})

// Pause an interval before next possible bootstrap.
bootstrapTimer.Reset(g.bootstrapInterval)
select {
case <-time.After(g.bootstrapInterval):
case <-bootstrapTimer.C:
bootstrapTimer.Read = true
// continue
case <-stopper.ShouldStop():
return
Expand Down
20 changes: 12 additions & 8 deletions kv/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,20 @@ func send(opts SendOptions, replicas ReplicaSlice,
var errors, retryableErrors int

// Wait for completions.
var sendNextTimer util.Timer
defer sendNextTimer.Stop()
for {
sendNextTimer.Reset(opts.SendNextTimeout)
select {
case <-sendNextTimer.C:
sendNextTimer.Read = true
// On successive RPC timeouts, send to additional replicas if available.
if len(orderedClients) > 0 {
sp.LogEvent("timeout, trying next peer")
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}

case call := <-done:
if call.Error == nil {
// Verify response data integrity if this is a proto response.
Expand Down Expand Up @@ -207,14 +219,6 @@ func send(opts SendOptions, replicas ReplicaSlice,
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}

case <-time.After(opts.SendNextTimeout):
// On successive RPC timeouts, send to additional replicas if available.
if len(orderedClients) > 0 {
sp.LogEvent("timeout, trying next peer")
sendOneFn(&orderedClients[0], opts.Timeout, context, sp, done)
orderedClients = orderedClients[1:]
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,13 @@ func NewTxnCoordSender(wrapped client.Sender, clock *hlc.Clock, linearizable boo
func (tc *TxnCoordSender) startStats() {
res := time.Millisecond // for duration logging resolution
lastNow := tc.clock.PhysicalNow()
var statusLogTimer util.Timer
defer statusLogTimer.Stop()
for {
statusLogTimer.Reset(statusLogInterval)
select {
case <-time.After(statusLogInterval):
case <-statusLogTimer.C:
statusLogTimer.Read = true
if !log.V(1) {
continue
}
Expand Down
6 changes: 5 additions & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ func (c *Client) runHeartbeat(retryOpts retry.Options) {
}

var err = errUnstarted // initial condition
var heartbeatTimer util.Timer
defer heartbeatTimer.Stop()
for {
for r := retry.Start(retryOpts); r.Next(); {
if c.maybeClose(retryOpts.Closer) {
Expand Down Expand Up @@ -344,13 +346,15 @@ func (c *Client) runHeartbeat(retryOpts retry.Options) {

// Wait after the heartbeat so that the first iteration gets a wait-free
// heartbeat attempt.
heartbeatTimer.Reset(c.heartbeatInterval)
select {
case <-c.closer:
return
case <-retryOpts.Closer:
c.close()
return
case <-time.After(c.heartbeatInterval):
case <-heartbeatTimer.C:
heartbeatTimer.Read = true
// TODO(tamird): Perhaps retry more aggressively when the client is unhealthy.
}
}
Expand Down
7 changes: 6 additions & 1 deletion rpc/clock_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
Expand Down Expand Up @@ -152,11 +153,15 @@ func (r *RemoteClockMonitor) MonitorRemoteOffsets(stopper *stop.Stopper) {
if log.V(1) {
log.Infof("monitoring cluster offset")
}
var monitorTimer util.Timer
defer monitorTimer.Stop()
for {
monitorTimer.Reset(monitorInterval)
select {
case <-stopper.ShouldStop():
return
case <-time.After(monitorInterval):
case <-monitorTimer.C:
monitorTimer.Read = true
offsetInterval, err := r.findOffsetInterval()
// By the contract of the hlc, if the value is 0, then safety checking
// of the max offset is disabled. However we may still want to
Expand Down
6 changes: 5 additions & 1 deletion server/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,15 @@ func (t *rpcTransport) processQueue(nodeID roachpb.NodeID, storeID roachpb.Store
done := make(chan *gorpc.Call, cap(ch))
var req *storage.RaftMessageRequest
protoResp := &storage.RaftMessageResponse{}
var raftIdleTimer util.Timer
defer raftIdleTimer.Stop()
for {
raftIdleTimer.Reset(raftIdleTimeout)
select {
case <-t.rpcContext.Stopper.ShouldStop():
return
case <-time.After(raftIdleTimeout):
case <-raftIdleTimer.C:
raftIdleTimer.Read = true
if log.V(1) {
log.Infof("closing Raft transport to %d due to inactivity", nodeID)
}
Expand Down
7 changes: 6 additions & 1 deletion storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
Expand Down Expand Up @@ -186,6 +187,8 @@ func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
// heard from in longer than timeUntilStoreDead.
func (sp *StorePool) start(stopper *stop.Stopper) {
stopper.RunWorker(func() {
var timeoutTimer util.Timer
defer timeoutTimer.Stop()
for {
var timeout time.Duration
sp.mu.Lock()
Expand All @@ -210,8 +213,10 @@ func (sp *StorePool) start(stopper *stop.Stopper) {
}
}
sp.mu.Unlock()
timeoutTimer.Reset(timeout)
select {
case <-time.After(timeout):
case <-timeoutTimer.C:
timeoutTimer.Read = true
case <-stopper.ShouldStop():
return
}
Expand Down
79 changes: 79 additions & 0 deletions util/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Nathan VanBenschoten ([email protected])

package util

import "time"

// The Timer type represents a single event. When the Timer expires,
// the current time will be sent on Timer.C.
//
// This timer implementation is an abstraction around the standard
// library's time.Timer that provides a temporary workaround for the
// issue described in https://github.com/golang/go/issues/14038. As
// such, this timer should only be used when Reset is planned to
// be called continually in a loop. For this Reset pattern to work,
// Timer.Read must be set to true whenever a timestamp is read from
// the Timer.C channel. If Timer.Read is not set to true when the
// channel is read from, the next call to Timer.Reset will deadlock.
// This pattern looks something like:
//
// var timer util.Timer
// defer timer.Stop()
// for {
// timer.Reset(wait)
// switch {
// case <-timer.C:
// timer.Read = true
// ...
// }
// }
//
// Note that unlike the standard library's Timer type, this Timer will
// not begin counting down until Reset is called for the first time, as
// there is no constructor function.
type Timer struct {
*time.Timer
Read bool
}

// Reset changes the timer to expire after duration d and returns
// the new value of the timer. This method includes the fix proposed
// in https://github.com/golang/go/issues/11513#issuecomment-157062583,
// but requires users of Timer to set Timer.Read to true whenever
// they successfully read from the Timer's channel. Reset operates on
// and returns a value so that Timer can be stack allocated.
func (t *Timer) Reset(d time.Duration) {
if t.Timer == nil {
t.Timer = time.NewTimer(d)
return
}
if !t.Timer.Reset(d) && !t.Read {
<-t.C
}
t.Read = false
}

// Stop prevents the Timer from firing. It returns true if the call stops
// the timer, false if the timer has already expired, been stopped previously,
// or had never been initialized with a call to Timer.Reset. Stop does not
// close the channel, to prevent a read from succeeding incorrectly.
func (t *Timer) Stop() bool {
if t.Timer == nil {
return false
}
return t.Timer.Stop()
}
127 changes: 127 additions & 0 deletions util/timer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Nathan VanBenschoten ([email protected])

package util

import (
"testing"
"time"
)

const timeStep = 10 * time.Millisecond

func TestTimerTimeout(t *testing.T) {
var timer Timer
defer func() {
if stopped := timer.Stop(); stopped {
t.Errorf("expected Stop to return false, got true")
}
}()
timer.Reset(timeStep)

<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerStop(t *testing.T) {
var timer Timer
timer.Reset(timeStep)
if stopped := timer.Stop(); !stopped {
t.Errorf("expected Stop to return true, got false")
}

select {
case <-timer.C:
t.Errorf("expected timer to stop after call to Stop; got timer that was not stopped")
case <-time.After(5 * timeStep):
}
}

func TestTimerUninitializedStopNoop(t *testing.T) {
var timer Timer
if stopped := timer.Stop(); stopped {
t.Errorf("expected Stop to return false when the timer was never reset, got true")
}
}

func TestTimerResetBeforeTimeout(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerResetAfterTimeoutAndNoRead(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

time.Sleep(2 * timeStep)

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerResetAfterTimeoutAndRead(t *testing.T) {
var timer Timer
defer timer.Stop()
timer.Reset(timeStep)

<-timer.C
timer.Read = true

timer.Reset(timeStep)
<-timer.C
timer.Read = true

select {
case <-timer.C:
t.Errorf("expected timer to only timeout once after Reset; got two timeouts")
case <-time.After(5 * timeStep):
}
}

func TestTimerMakesProgressInLoop(t *testing.T) {
var timer Timer
defer timer.Stop()
for i := 0; i < 5; i++ {
timer.Reset(timeStep)
<-timer.C
timer.Read = true
}
}