Skip to content

timeout option for the pool:get() #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,23 @@ Create a connection pool with count of size established connections.
- `pool ~= nil` on success
- `error(reason)` on error

### `conn = pool:get()`
### `conn = pool:get(opts)`

Get a connection from pool. Reset connection before returning it. If connection
is broken then it will be reestablished. If there is no free connections then
calling fiber will sleep until another fiber returns some connection to pool.
is broken then it will be reestablished.
If there is no free connections and timeout is not specified then calling fiber
will sleep until another fiber returns some connection to pool.
If timeout is specified, and there is no free connections for the duration of the timeout,
then the return value is nil.

*Options*:

- `timeout` - maximum number of seconds to wait for a connection

*Returns*:

- `conn ~= nil`
- `conn ~= nil` on success
- `conn == nil` on there is no free connections when timeout option is specified

### `pool:put(conn)`

Expand Down
38 changes: 29 additions & 9 deletions mysql/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ local ffi = require('ffi')
local pool_mt
local conn_mt

-- The marker for empty slots in a connection pool.
--
-- When a user puts a connection that is in an unusable state to a
-- pool, we put this marker to a pool's internal connection queue.
--
-- Note: It should not be equal to `nil`, because fiber channel's
-- `get` method returns `nil` when a timeout is reached.
local POOL_EMPTY_SLOT = true

--create a new connection
local function conn_create(mysql_conn)
local queue = fiber.channel(1)
Expand All @@ -20,23 +29,28 @@ local function conn_create(mysql_conn)
end

-- get connection from pool
local function conn_get(pool)
local mysql_conn = pool.queue:get()
local function conn_get(pool, timeout)
local mysql_conn = pool.queue:get(timeout)

-- A timeout was reached.
if mysql_conn == nil then return nil end

local status
if mysql_conn == nil then
if mysql_conn == POOL_EMPTY_SLOT then
status, mysql_conn = driver.connect(pool.host, pool.port or 0,
pool.user, pool.pass,
pool.db, pool.use_numeric_result)
if status < 0 then
return error(mysql_conn)
end
end

local conn = conn_create(mysql_conn)
-- we can use ffi gc to return mysql connection to pool
conn.__gc_hook = ffi.gc(ffi.new('void *'),
function(self)
mysql_conn:close()
pool.queue:put(nil)
pool.queue:put(POOL_EMPTY_SLOT)
end)
return conn
end
Expand All @@ -46,7 +60,7 @@ local function conn_put(conn)
ffi.gc(conn.__gc_hook, nil)
if not conn.queue:get() then
conn.usable = false
return nil
return POOL_EMPTY_SLOT
end
conn.usable = false
return mysqlconn
Expand Down Expand Up @@ -171,19 +185,25 @@ local function pool_close(self)
self.usable = false
for i = 1, self.size do
local mysql_conn = self.queue:get()
if mysql_conn ~= nil then
if mysql_conn ~= POOL_EMPTY_SLOT then
mysql_conn:close()
end
end
return 1
end

-- Returns connection
local function pool_get(self)
local function pool_get(self, opts)
opts = opts or {}

if not self.usable then
return error('Pool is not usable')
end
local conn = conn_get(self)
local conn = conn_get(self, opts.timeout)

-- A timeout was reached.
if conn == nil then return nil end

conn:reset(self.user, self.pass, self.db)
return conn
end
Expand All @@ -193,7 +213,7 @@ local function pool_put(self, conn)
if conn.usable then
self.queue:put(conn_put(conn))
else
self.queue:put(nil)
self.queue:put(POOL_EMPTY_SLOT)
end
end

Expand Down
24 changes: 23 additions & 1 deletion test/mysql.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ local function test_mysql_int64(t, p)
end

local function test_connection_pool(test, pool)
test:plan(5)
test:plan(6)

-- {{{ Case group: all connections are consumed initially.

Expand Down Expand Up @@ -211,6 +211,28 @@ local function test_connection_pool(test, pool)
assert(pool.queue:is_empty(), 'test case postcondition fails')
end)

-- Case: get a connection with a timeout.
test:test('pool:get({timeout = <...>})', function(test)
test:plan(3)
assert(pool.queue:is_empty(), 'test case precondition fails')

-- Verify that we blocks until reach a timeout, then
-- unblocks and get `nil` as a result.
local latch = fiber.channel(1)
local conn
fiber.create(function()
conn = pool:get({timeout = 2})
latch:put(true)
end)
local res = latch:get(1)
test:is(res, nil, 'pool:get() blocks until a timeout')
local res = latch:get()
test:ok(res ~= nil, 'pool:get() unblocks after a timeout')
test:is(conn, nil, 'pool:get() returns nil if a timeout was reached')

assert(pool.queue:is_empty(), 'test case postcondition fails')
end)

-- Give all connections back to the poll.
for _, conn in ipairs(connections) do
pool:put(conn)
Expand Down