diff --git a/README.md b/README.md index 33c38e8..74399bc 100644 --- a/README.md +++ b/README.md @@ -190,15 +190,23 @@ Create a connection pool with count of size established connections. - `pool ~= nil` on success - `error(reason)` on error -### `conn = pool:get()` +### `conn = pool:get(opts)` Get a connection from pool. Reset connection before returning it. If connection -is broken then it will be reestablished. If there is no free connections then -calling fiber will sleep until another fiber returns some connection to pool. +is broken then it will be reestablished. +If there is no free connections and timeout is not specified then calling fiber +will sleep until another fiber returns some connection to pool. +If timeout is specified, and there is no free connections for the duration of the timeout, +then the return value is nil. + +*Options*: + + - `timeout` - maximum number of seconds to wait for a connection *Returns*: - - `conn ~= nil` + - `conn ~= nil` on success + - `conn == nil` on there is no free connections when timeout option is specified ### `pool:put(conn)` diff --git a/mysql/init.lua b/mysql/init.lua index dc2c0db..7e56ed3 100644 --- a/mysql/init.lua +++ b/mysql/init.lua @@ -7,6 +7,15 @@ local ffi = require('ffi') local pool_mt local conn_mt +-- The marker for empty slots in a connection pool. +-- +-- When a user puts a connection that is in an unusable state to a +-- pool, we put this marker to a pool's internal connection queue. +-- +-- Note: It should not be equal to `nil`, because fiber channel's +-- `get` method returns `nil` when a timeout is reached. +local POOL_EMPTY_SLOT = true + --create a new connection local function conn_create(mysql_conn) local queue = fiber.channel(1) @@ -20,10 +29,14 @@ local function conn_create(mysql_conn) end -- get connection from pool -local function conn_get(pool) - local mysql_conn = pool.queue:get() +local function conn_get(pool, timeout) + local mysql_conn = pool.queue:get(timeout) + + -- A timeout was reached. + if mysql_conn == nil then return nil end + local status - if mysql_conn == nil then + if mysql_conn == POOL_EMPTY_SLOT then status, mysql_conn = driver.connect(pool.host, pool.port or 0, pool.user, pool.pass, pool.db, pool.use_numeric_result) @@ -31,12 +44,13 @@ local function conn_get(pool) return error(mysql_conn) end end + local conn = conn_create(mysql_conn) -- we can use ffi gc to return mysql connection to pool conn.__gc_hook = ffi.gc(ffi.new('void *'), function(self) mysql_conn:close() - pool.queue:put(nil) + pool.queue:put(POOL_EMPTY_SLOT) end) return conn end @@ -46,7 +60,7 @@ local function conn_put(conn) ffi.gc(conn.__gc_hook, nil) if not conn.queue:get() then conn.usable = false - return nil + return POOL_EMPTY_SLOT end conn.usable = false return mysqlconn @@ -171,7 +185,7 @@ local function pool_close(self) self.usable = false for i = 1, self.size do local mysql_conn = self.queue:get() - if mysql_conn ~= nil then + if mysql_conn ~= POOL_EMPTY_SLOT then mysql_conn:close() end end @@ -179,11 +193,17 @@ local function pool_close(self) end -- Returns connection -local function pool_get(self) +local function pool_get(self, opts) + opts = opts or {} + if not self.usable then return error('Pool is not usable') end - local conn = conn_get(self) + local conn = conn_get(self, opts.timeout) + + -- A timeout was reached. + if conn == nil then return nil end + conn:reset(self.user, self.pass, self.db) return conn end @@ -193,7 +213,7 @@ local function pool_put(self, conn) if conn.usable then self.queue:put(conn_put(conn)) else - self.queue:put(nil) + self.queue:put(POOL_EMPTY_SLOT) end end diff --git a/test/mysql.test.lua b/test/mysql.test.lua index 337f39b..2835421 100755 --- a/test/mysql.test.lua +++ b/test/mysql.test.lua @@ -171,7 +171,7 @@ local function test_mysql_int64(t, p) end local function test_connection_pool(test, pool) - test:plan(5) + test:plan(6) -- {{{ Case group: all connections are consumed initially. @@ -211,6 +211,28 @@ local function test_connection_pool(test, pool) assert(pool.queue:is_empty(), 'test case postcondition fails') end) + -- Case: get a connection with a timeout. + test:test('pool:get({timeout = <...>})', function(test) + test:plan(3) + assert(pool.queue:is_empty(), 'test case precondition fails') + + -- Verify that we blocks until reach a timeout, then + -- unblocks and get `nil` as a result. + local latch = fiber.channel(1) + local conn + fiber.create(function() + conn = pool:get({timeout = 2}) + latch:put(true) + end) + local res = latch:get(1) + test:is(res, nil, 'pool:get() blocks until a timeout') + local res = latch:get() + test:ok(res ~= nil, 'pool:get() unblocks after a timeout') + test:is(conn, nil, 'pool:get() returns nil if a timeout was reached') + + assert(pool.queue:is_empty(), 'test case postcondition fails') + end) + -- Give all connections back to the poll. for _, conn in ipairs(connections) do pool:put(conn)