Skip to content

Commit a3f62d6

Browse files
committed
api: add Connection.CloseGraceful()
CloseGraceful closes Connection gracefully. Unlike Connection.Close() it waits for all requests to complete. Part of #257
1 parent 642dfb3 commit a3f62d6

File tree

4 files changed

+115
-11
lines changed

4 files changed

+115
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Connection.CloseGraceful() unlike Connection.Close() waits for all
14+
requests to complete (#257)
15+
1316
### Changed
1417

1518
### Fixed

connection.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,13 @@ func (conn *Connection) Close() error {
462462
return conn.closeConnection(err, true)
463463
}
464464

465+
// CloseGraceful closes Connection gracefully. It waits for all requests to
466+
// complete.
467+
// After this method called, there is no way to reopen this Connection.
468+
func (conn *Connection) CloseGraceful() error {
469+
return conn.shutdown(true)
470+
}
471+
465472
// Addr returns a configured address of Tarantool socket.
466473
func (conn *Connection) Addr() string {
467474
return conn.addr
@@ -1532,17 +1539,27 @@ func shutdownEventCallback(event WatchEvent) {
15321539
// step 2.
15331540
val, ok := event.Value.(bool)
15341541
if ok && val {
1535-
go event.Conn.shutdown()
1542+
go event.Conn.shutdown(false)
15361543
}
15371544
}
15381545

1539-
func (conn *Connection) shutdown() {
1546+
func (conn *Connection) shutdown(forever bool) error {
15401547
// Forbid state changes.
15411548
conn.mutex.Lock()
15421549
defer conn.mutex.Unlock()
15431550

15441551
if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
1545-
return
1552+
if forever {
1553+
err := ClientError{ErrConnectionClosed, "connection closed by client"}
1554+
return conn.closeConnection(err, true)
1555+
}
1556+
return nil
1557+
}
1558+
1559+
if forever {
1560+
// We don't want to reconnect any more.
1561+
conn.opts.Reconnect = 0
1562+
conn.opts.MaxReconnects = 0
15461563
}
15471564

15481565
conn.cond.Broadcast()
@@ -1551,7 +1568,7 @@ func (conn *Connection) shutdown() {
15511568
c := conn.c
15521569
for {
15531570
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
1554-
return
1571+
return nil
15551572
}
15561573
if atomic.LoadInt64(&conn.requestCnt) == 0 {
15571574
break
@@ -1563,14 +1580,19 @@ func (conn *Connection) shutdown() {
15631580
conn.cond.Wait()
15641581
}
15651582

1566-
// Start to reconnect based on common rules, same as in net.box.
1567-
// Reconnect also closes the connection: server waits until all
1568-
// subscribed connections are terminated.
1569-
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1570-
// step 3.
1571-
conn.reconnectImpl(
1572-
ClientError{
1583+
if forever {
1584+
err := ClientError{ErrConnectionClosed, "connection closed by client"}
1585+
return conn.closeConnection(err, true)
1586+
} else {
1587+
// Start to reconnect based on common rules, same as in net.box.
1588+
// Reconnect also closes the connection: server waits until all
1589+
// subscribed connections are terminated.
1590+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1591+
// step 3.
1592+
conn.reconnectImpl(ClientError{
15731593
ErrConnectionClosed,
15741594
"connection closed after server shutdown",
15751595
}, conn.c)
1596+
return nil
1597+
}
15761598
}

example_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,3 +1115,39 @@ func ExamplePingRequest_Context() {
11151115
// Ping Resp <nil>
11161116
// Ping Error context is done
11171117
}
1118+
1119+
// ExampleConnection_CloseGraceful_force demonstrates how to force close
1120+
// a connection with graceful close in progress after a while.
1121+
func ExampleConnection_CloseGraceful_force() {
1122+
conn := example_connect(opts)
1123+
1124+
eval := `local fiber = require('fiber')
1125+
local time = ...
1126+
fiber.sleep(time)
1127+
`
1128+
req := tarantool.NewEvalRequest(eval).Args([]interface{}{10})
1129+
fut := conn.Do(req)
1130+
1131+
done := make(chan struct{})
1132+
go func() {
1133+
conn.CloseGraceful()
1134+
fmt.Println("Connection.CloseGraceful() done!")
1135+
close(done)
1136+
}()
1137+
1138+
select {
1139+
case <-done:
1140+
case <-time.After(time.Second):
1141+
fmt.Println("Force Connection.Close()!")
1142+
conn.Close()
1143+
}
1144+
<-done
1145+
1146+
fmt.Println("Result:")
1147+
fmt.Println(fut.Get())
1148+
// Output:
1149+
// Force Connection.Close()!
1150+
// Connection.CloseGraceful() done!
1151+
// Result:
1152+
// <nil> connection closed by client (0x4001)
1153+
}

shutdown_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
. "github.com/tarantool/go-tarantool"
1718
"github.com/tarantool/go-tarantool/test_helpers"
@@ -143,6 +144,48 @@ func TestGracefulShutdown(t *testing.T) {
143144
testGracefulShutdown(t, conn, &inst)
144145
}
145146

147+
func TestCloseGraceful(t *testing.T) {
148+
opts := Opts{
149+
User: shtdnClntOpts.User,
150+
Pass: shtdnClntOpts.Pass,
151+
Timeout: shtdnClntOpts.Timeout,
152+
}
153+
154+
inst, err := test_helpers.StartTarantool(shtdnSrvOpts)
155+
require.Nil(t, err)
156+
defer test_helpers.StopTarantoolWithCleanup(inst)
157+
158+
conn := test_helpers.ConnectWithValidation(t, shtdnServer, opts)
159+
defer conn.Close()
160+
161+
// Send request with sleep.
162+
evalSleep := 3 // In seconds.
163+
require.Lessf(t,
164+
time.Duration(evalSleep)*time.Second,
165+
shtdnClntOpts.Timeout,
166+
"test request won't be failed by timeout")
167+
168+
req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg})
169+
fut := conn.Do(req)
170+
171+
go func() {
172+
// CloseGraceful closes the connection gracefully.
173+
conn.CloseGraceful()
174+
// Connection is closed.
175+
assert.Equal(t, true, conn.ClosedNow())
176+
}()
177+
178+
// Check that a request rejected if graceful shutdown in progress.
179+
time.Sleep((time.Duration(evalSleep) * time.Second) / 2)
180+
_, err = conn.Do(NewPingRequest()).Get()
181+
assert.ErrorContains(t, err, "server shutdown in progress")
182+
183+
// Check that a previous request was successful.
184+
resp, err := fut.Get()
185+
assert.Nilf(t, err, "sleep request no error")
186+
assert.NotNilf(t, resp, "sleep response exists")
187+
}
188+
146189
func TestGracefulShutdownWithReconnect(t *testing.T) {
147190
test_helpers.SkipIfWatchersUnsupported(t)
148191

0 commit comments

Comments
 (0)