diff --git a/appveyor.yml b/appveyor.yml index c129cec0..620d1e85 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -16,12 +16,14 @@ environment: - PYTHON: "C:\\Python36-x64" - PYTHON: "C:\\Python37" - PYTHON: "C:\\Python37-x64" + - PYTHON: "C:\\Python38" + - PYTHON: "C:\\Python38-x64" install: # install runtime dependencies - "%PYTHON%\\python.exe -m pip install -r requirements.txt" # install testing dependencies - - "%PYTHON%\\python.exe -m pip install pyyaml%PYYAML%" + - "%PYTHON%\\python.exe -m pip install pyyaml%PYYAML% dbapi-compliance==1.15.0" build: off diff --git a/tarantool/__init__.py b/tarantool/__init__.py index a9838321..b4ed81d4 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -75,4 +75,4 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None, __all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema', 'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning', - 'SchemaError'] + 'SchemaError', 'dbapi'] diff --git a/tarantool/connection.py b/tarantool/connection.py index 6f330896..f9d1de01 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -34,7 +34,8 @@ RequestSubscribe, RequestUpdate, RequestUpsert, - RequestAuthenticate + RequestAuthenticate, + RequestExecute ) from tarantool.space import Space from tarantool.const import ( @@ -49,12 +50,20 @@ ITERATOR_ALL ) from tarantool.error import ( + Error, NetworkError, DatabaseError, InterfaceError, SchemaError, NetworkWarning, + OperationalError, + DataError, + IntegrityError, + InternalError, + ProgrammingError, + NotSupportedError, SchemaReloadException, + Warning, warn ) from tarantool.schema import Schema @@ -76,11 +85,19 @@ class Connection(object): Also this class provides low-level interface to data manipulation (insert/delete/update/select). ''' - Error = tarantool.error + # DBAPI Extension: supply exceptions as attributes on the connection + Error = Error DatabaseError = DatabaseError InterfaceError = InterfaceError SchemaError = SchemaError NetworkError = NetworkError + Warning = Warning + DataError = DataError + OperationalError = OperationalError + IntegrityError = IntegrityError + InternalError = InternalError + ProgrammingError = ProgrammingError + NotSupportedError = NotSupportedError def __init__(self, host, port, user=None, @@ -91,6 +108,7 @@ def __init__(self, host, port, connect_now=True, encoding=ENCODING_DEFAULT, call_16=False, + use_list=True, connection_timeout=CONNECTION_TIMEOUT): ''' Initialize a connection to the server. @@ -125,6 +143,7 @@ def __init__(self, host, port, self.error = True self.encoding = encoding self.call_16 = call_16 + self.use_list = use_list self.connection_timeout = connection_timeout if connect_now: self.connect() @@ -136,6 +155,13 @@ def close(self): self._socket.close() self._socket = None + def is_closed(self): + self._check_not_closed() + + def _check_not_closed(self, error=None): + if self._socket is None: + raise DatabaseError(error or "The connector is closed") + def connect_basic(self): if self.host == None: self.connect_unix() @@ -250,17 +276,18 @@ def _read_response(self): def _send_request_wo_reconnect(self, request): ''' - :rtype: `Response` instance + :rtype: `Response` instance or subclass :raise: NetworkError ''' - assert isinstance(request, Request) + if not issubclass(type(request), Request): + raise NetworkError response = None while True: try: self._socket.sendall(bytes(request)) - response = Response(self, self._read_response()) + response = request.response_class(self, self._read_response()) break except SchemaReloadException as e: self.update_schema(e.schema_version) @@ -785,3 +812,23 @@ def generate_sync(self): Need override for async io connection ''' return 0 + + def execute(self, query, params=None): + ''' + Execute SQL request. + + :param query: SQL syntax query + :type query: str + + :param params: Bind values to use in query + :type params: list, dict + + :return: query result data + :rtype: list + ''' + self._check_not_closed() + if not params: + params = [] + request = RequestExecute(self, query, params) + response = self._send_request(request) + return response diff --git a/tarantool/const.py b/tarantool/const.py index 9d175974..0db35978 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -29,6 +29,13 @@ # IPROTO_DATA = 0x30 IPROTO_ERROR = 0x31 +# +IPROTO_METADATA = 0x32 +IPROTO_SQL_TEXT = 0x40 +IPROTO_SQL_BIND = 0x41 +IPROTO_SQL_INFO = 0x42 +IPROTO_SQL_INFO_ROW_COUNT = 0x00 +IPROTO_SQL_INFO_AUTOINCREMENT_IDS = 0x01 IPROTO_GREETING_SIZE = 128 IPROTO_BODY_MAX_LEN = 2147483648 @@ -44,6 +51,7 @@ REQUEST_TYPE_EVAL = 8 REQUEST_TYPE_UPSERT = 9 REQUEST_TYPE_CALL = 10 +REQUEST_TYPE_EXECUTE = 11 REQUEST_TYPE_PING = 64 REQUEST_TYPE_JOIN = 65 REQUEST_TYPE_SUBSCRIBE = 66 diff --git a/tarantool/dbapi.py b/tarantool/dbapi.py new file mode 100644 index 00000000..2ccc3274 --- /dev/null +++ b/tarantool/dbapi.py @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +from tarantool.connection import Connection as BaseConnection +from tarantool.error import * + + +paramstyle = 'named' +apilevel = "2.0" +threadsafety = 1 + + +class Cursor: + + def __init__(self, conn): + self._c = conn + self._lastrowid = None + self._rowcount = None + self.arraysize = 1 + self.autocommit = False + self.closed = False + self._rows = None + + def callproc(self, procname, *params): # TODO + """ + Call a stored database procedure with the given name. The sequence of + parameters must contain one entry for each argument that the + procedure expects. The result of the call is returned as modified + copy of the input sequence. Input parameters are left untouched, + output and input/output parameters replaced with possibly new values. + + The procedure may also provide a result set as output. This must then + be made available through the standard .fetch*() methods. + """ + + @property + def rows(self): + return self._rows + + def close(self): + """ + Close the cursor now (rather than whenever __del__ is called). + + The cursor will be unusable from this point forward; an Error (or + subclass) exception will be raised if any operation is attempted with + the cursor. + """ + self._c = None + self._rows = None + + def execute(self, query, params=None): + """ + Prepare and execute a database operation (query or command). + + Parameters may be provided as sequence or mapping and will be bound + to variables in the operation. Variables are specified in a + database-specific notation (see the module's paramstyle attribute for + details). + + A reference to the operation will be retained by the cursor. If the + same operation object is passed in again, then the cursor can + optimize its behavior. This is most effective for algorithms where + the same operation is used, but different parameters are bound to it + (many times). + + For maximum efficiency when reusing an operation, it is best to use + the .setinputsizes() method to specify the parameter types and sizes + ahead of time. It is legal for a parameter to not match the + predefined information; the implementation should compensate, + possibly with a loss of efficiency. + + The parameters may also be specified as list of tuples to e.g. insert + multiple rows in a single operation, but this kind of usage is + deprecated: .executemany() should be used instead. + + Return values are not defined. + """ + if self.closed: + raise ProgrammingError() + + response = self._c.execute(query, params) + + self._rows = response.data if len(response.body) > 1 else None + + self._rowcount = response.rowcount + self._lastrowid = response.lastrowid + + def executemany(self, query, param_sets): + rowcount = 0 + for params in param_sets: + self.execute(query, params) + if self.rowcount == -1: + rowcount = -1 + if rowcount != -1: + rowcount += self.rowcount + self._rowcount = rowcount + + @property + def lastrowid(self): + """ + This read-only attribute provides the rowid of the last modified row + (most databases return a rowid only when a single INSERT operation is + performed). If the operation does not set a rowid or if the database + does not support rowids, this attribute should be set to None. + + The semantics of .lastrowid are undefined in case the last executed + statement modified more than one row, e.g. when using INSERT with + .executemany(). + + Warning Message: "DB-API extension cursor.lastrowid used" + """ + return self._lastrowid + + @property + def rowcount(self): + """ + This read-only attribute specifies the number of rows that the last + .execute*() produced (for DQL statements like SELECT) or affected ( + for DML statements like UPDATE or INSERT). + + The attribute is -1 in case no .execute*() has been performed on the + cursor or the rowcount of the last operation is cannot be determined + by the interface. + + Note: + Future versions of the DB API specification could redefine the latter + case to have the object return None instead of -1. + """ + return self._rowcount + + def fetchone(self): + """ + Fetch the next row of a query result set, returning a single + sequence, or None when no more data is available. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + """ + if self._rows is None: + raise ProgrammingError('Nothing to fetch') + return self.fetchmany(1)[0] if self._rows else None + + def fetchmany(self, size=None): + """ + Fetch the next set of rows of a query result, returning a sequence of + sequences (e.g. a list of tuples). An empty sequence is returned when + no more rows are available. + + The number of rows to fetch per call is specified by the parameter. + If it is not given, the cursor's arraysize determines the number of + rows to be fetched. The method should try to fetch as many rows as + indicated by the size parameter. If this is not possible due to the + specified number of rows not being available, fewer rows may be + returned. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + + Note there are performance considerations involved with the size + parameter. For optimal performance, it is usually best to use the + .arraysize attribute. If the size parameter is used, then it is best + for it to retain the same value from one .fetchmany() call to the next. + """ + size = size or self.arraysize + + if self._rows is None: + raise ProgrammingError('Nothing to fetch') + + if len(self._rows) < size: + items = self._rows + self._rows = [] + else: + items, self._rows = self._rows[:size], self._rows[size:] + + return items + + def fetchall(self): + """Fetch all (remaining) rows of a query result, returning them as a + sequence of sequences (e.g. a list of tuples). Note that the cursor's + arraysize attribute can affect the performance of this operation. + + An Error (or subclass) exception is raised if the previous call to + .execute*() did not produce any result set or no call was issued yet. + """ + if self._rows is None: + raise ProgrammingError('Nothing to fetch') + + items = self._rows + self._rows = [] + return items + + def setinputsizes(self, sizes): + """This can be used before a call to .execute*() to predefine memory + areas for the operation's parameters. + sizes is specified as a sequence - one item for each input parameter. + The item should be a Type Object that corresponds to the input that + will be used, or it should be an integer specifying the maximum + length of a string parameter. If the item is None, then no predefined + memory area will be reserved for that column (this is useful to avoid + predefined areas for large inputs). + + This method would be used before the .execute*() method is invoked. + + Implementations are free to have this method do nothing and users are + free to not use it.""" + + def setoutputsize(self, size, column=None): + """Set a column buffer size for fetches of large columns (e.g. LONGs, + BLOBs, etc.). The column is specified as an index into the result + sequence. Not specifying the column will set the default size for all + large columns in the cursor. + This method would be used before the .execute*() method is invoked. + Implementations are free to have this method do nothing and users are + free to not use it.""" + + +class Connection(BaseConnection): + + @property + def server_version(self): + return getattr(self, 'version_id', None) + + def close(self): + self._check_not_closed() + super(Connection, self).close() + + def commit(self): + """ + Commit any pending transaction to the database. + + Note that if the database supports an auto-commit feature, this must + be initially off. An interface method may be provided to turn it back + on. + + Database modules that do not support transactions should implement + this method with void functionality. + """ + self._check_not_closed() + + def rollback(self): + """ + In case a database does provide transactions this method causes the + database to roll back to the start of any pending transaction. + Closing a connection without committing the changes first will cause + an implicit rollback to be performed. + """ + self._check_not_closed() + + def cursor(self, params=None): + """ + Return a new Cursor Object using the connection. + + If the database does not provide a direct cursor concept, the module + will have to emulate cursors using other means to the extent needed + by this specification. + """ + return Cursor(self) + + +def connect(dsn=None, host=None, port=None, + user=None, password=None, **kwargs): + """ + Constructor for creating a connection to the database. + + :param str dsn: Data source name (Tarantool URI) + ([[[username[:password]@]host:]port) + :param str host: Server hostname or IP-address + :param int port: Server port + :param str user: Tarantool user + :param str password: User password + :rtype: Connection + """ + + if dsn: + raise NotImplementedError("dsn param is not implemented in" + "this version of dbapi module") + params = {} + if host: + params["host"] = host + if port: + params["port"] = port + if user: + params["user"] = user + if password: + params["password"] = password + + if kwargs.get("use_list") is False: + params["use_list"] = False + + return Connection(**params) diff --git a/tarantool/error.py b/tarantool/error.py index cc66e8c5..2b4caf26 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -32,6 +32,13 @@ class Error(StandardError): class Error(Exception): '''Base class for error exceptions''' +try: + class Warning(StandardError): + pass +except NameError: + class Warning(Exception): + pass + class DatabaseError(Error): '''Error related to the database engine''' @@ -49,6 +56,34 @@ class ConfigurationError(Error): ''' +class InternalError(DatabaseError): + pass + + +class OperationalError(DatabaseError): + pass + + +class ProgrammingError(DatabaseError): + pass + + +class IntegrityError(DatabaseError): + pass + + +class DataError(DatabaseError): + pass + + +class NotSupportedError(DatabaseError): + pass + + +__all__ = ("Warning", "Error", "InterfaceError", "DatabaseError", "DataError", + "OperationalError", "IntegrityError", "InternalError", + "ProgrammingError", "NotSupportedError",) + # Monkey patch os.strerror for win32 if sys.platform == "win32": # Windows Sockets Error Codes (not all, but related on network errors) diff --git a/tarantool/request.py b/tarantool/request.py index 2e45c8d9..8d5b681d 100644 --- a/tarantool/request.py +++ b/tarantool/request.py @@ -7,7 +7,7 @@ import msgpack import hashlib - +from tarantool.error import DatabaseError from tarantool.const import ( IPROTO_CODE, IPROTO_SYNC, @@ -27,6 +27,8 @@ IPROTO_OPS, # IPROTO_INDEX_BASE, IPROTO_SCHEMA_ID, + IPROTO_SQL_TEXT, + IPROTO_SQL_BIND, REQUEST_TYPE_OK, REQUEST_TYPE_PING, REQUEST_TYPE_SELECT, @@ -37,11 +39,13 @@ REQUEST_TYPE_UPSERT, REQUEST_TYPE_CALL16, REQUEST_TYPE_CALL, + REQUEST_TYPE_EXECUTE, REQUEST_TYPE_EVAL, REQUEST_TYPE_AUTHENTICATE, REQUEST_TYPE_JOIN, REQUEST_TYPE_SUBSCRIBE ) +from tarantool.response import Response, ResponseExecute from tarantool.utils import ( strxor, binary_types @@ -64,6 +68,7 @@ def __init__(self, conn): self.conn = conn self._sync = None self._body = '' + self.response_class = Response def __bytes__(self): return self.header(len(self._body)) + self._body @@ -332,3 +337,24 @@ def __init__(self, conn, sync): request_body = msgpack.dumps({IPROTO_CODE: self.request_type, IPROTO_SYNC: sync}) self._body = request_body + + +class RequestExecute(Request): + ''' + Represents EXECUTE request + ''' + request_type = REQUEST_TYPE_EXECUTE + + # pylint: disable=W0231 + def __init__(self, conn, sql, args): + super(RequestExecute, self).__init__(conn) + if isinstance(args, dict): + args = [{":%s" % name: value} for name, value in args.items()] + try: + request_body = msgpack.dumps({IPROTO_SQL_TEXT: sql, + IPROTO_SQL_BIND: args}) + except ValueError as e: + raise DatabaseError("Value error: %s" % e) + + self._body = request_body + self.response_class = ResponseExecute diff --git a/tarantool/response.py b/tarantool/response.py index 9516cd39..47a3d424 100644 --- a/tarantool/response.py +++ b/tarantool/response.py @@ -17,7 +17,10 @@ IPROTO_ERROR, IPROTO_SYNC, IPROTO_SCHEMA_ID, - REQUEST_TYPE_ERROR + REQUEST_TYPE_ERROR, + IPROTO_SQL_INFO, + IPROTO_SQL_INFO_ROW_COUNT, + IPROTO_SQL_INFO_AUTOINCREMENT_IDS ) from tarantool.error import ( DatabaseError, @@ -50,16 +53,19 @@ def __init__(self, conn, response): # created in the __new__(). # super(Response, self).__init__() + kwargs = dict(use_list=conn.use_list) + if msgpack.version >= (1, 0, 0): + # XXX: Explain why it is necessary. + kwargs['strict_map_key'] = False if msgpack.version >= (0, 5, 2) and conn.encoding == 'utf-8': # Get rid of the following warning. # > PendingDeprecationWarning: encoding is deprecated, # > Use raw=False instead. - unpacker = msgpack.Unpacker(use_list=True, raw=False) + kwargs['raw'] = False elif conn.encoding is not None: - unpacker = msgpack.Unpacker(use_list=True, encoding=conn.encoding) - else: - unpacker = msgpack.Unpacker(use_list=True) + kwargs['encoding'] = conn.encoding + unpacker = msgpack.Unpacker(**kwargs) unpacker.feed(response) header = unpacker.unpack() @@ -245,3 +251,30 @@ def __str__(self): return ''.join(output) __repr__ = __str__ + + +class ResponseExecute(Response): + @property + def lastrowid(self): + if self.body is None: + raise InterfaceError("Trying to access data, when there's no data") + info = self.body.get(IPROTO_SQL_INFO) + + if info is None: + return None + + lastrowids = info.get(IPROTO_SQL_INFO_AUTOINCREMENT_IDS) + + return lastrowids[-1] if lastrowids else None + + @property + def rowcount(self): + if self._body is None: + raise InterfaceError("Trying to access data, when there's no data") + + info = self._body.get(IPROTO_SQL_INFO) + + if info is None: + return -1 + + return info.get(IPROTO_SQL_INFO_ROW_COUNT, -1) diff --git a/test.sh b/test.sh index a8c1b577..dde324c4 100755 --- a/test.sh +++ b/test.sh @@ -11,7 +11,7 @@ sudo apt-get -q -y install tarantool # Install testing dependencies. pip install -r requirements.txt -pip install pyyaml +pip install pyyaml dbapi-compliance==1.15.0 # Run tests. python setup.py test diff --git a/unit/suites/__init__.py b/unit/suites/__init__.py index ead75297..b4d1cf2b 100644 --- a/unit/suites/__init__.py +++ b/unit/suites/__init__.py @@ -9,9 +9,10 @@ from .test_protocol import TestSuite_Protocol from .test_reconnect import TestSuite_Reconnect from .test_mesh import TestSuite_Mesh +from .test_dbapi import TestSuite_DBAPI test_cases = (TestSuite_Schema, TestSuite_Request, TestSuite_Protocol, - TestSuite_Reconnect, TestSuite_Mesh) + TestSuite_Reconnect, TestSuite_Mesh, TestSuite_DBAPI) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/unit/suites/test_dbapi.py b/unit/suites/test_dbapi.py new file mode 100644 index 00000000..a0156f69 --- /dev/null +++ b/unit/suites/test_dbapi.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import sys +import unittest + +import dbapi20 + +import tarantool +from tarantool import dbapi +from .lib.tarantool_server import TarantoolServer + + +class TestSuite_DBAPI(dbapi20.DatabaseAPI20Test): + table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables + + ddl1 = 'create table %sbooze (name varchar(20) primary key)' % table_prefix + ddl2 = 'create table %sbarflys (name varchar(20) primary key, ' \ + 'drink varchar(30))' % table_prefix + + @classmethod + def setUpClass(self): + print(' DBAPI '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + self.srv = TarantoolServer() + self.srv.script = 'unit/suites/box.lua' + self.srv.start() + self.con = tarantool.Connection(self.srv.host, self.srv.args['primary']) + self.driver = dbapi + self.connect_kw_args = dict( + host=self.srv.host, + port=self.srv.args['primary']) + print("DRIVER +++++++++++++++", self.driver) + + def setUp(self): + # prevent a remote tarantool from clean our session + if self.srv.is_started(): + self.srv.touch_lock() + self.con.flush_schema() + + # grant full access to guest + self.srv.admin("box.schema.user.grant('guest', 'create,read,write," + "execute', 'universe')") + + @classmethod + def tearDownClass(self): + self.con.close() + self.srv.stop() + self.srv.clean() + + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + dbapi20._failUnless(self,cur.rowcount in (-1,1), # Bug #543885 + 'cursor.rowcount should be -1 or 0 after executing no-result ' + 'statements' + str(cur.rowcount) + ) + cur.execute("%s into %sbooze values ('Victoria Bitter')" % ( + self.insert, self.table_prefix + )) + dbapi20._failUnless(self,cur.rowcount in (-1,1), + 'cursor.rowcount should == number or rows inserted, or ' + 'set to -1 after executing an insert statement' + ) + cur.execute("select name from %sbooze" % self.table_prefix) + dbapi20._failUnless(self,cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) + dbapi20._failUnless(self,cur.rowcount in (-1,1), # Bug #543885 + 'cursor.rowcount should be -1 or 0 after executing no-result ' + 'statements' + ) + finally: + con.close() + + @unittest.skip('Not implemented') + def test_Binary(self): + pass + + @unittest.skip('Not implemented') + def test_STRING(self): + pass + + @unittest.skip('Not implemented') + def test_BINARY(self): + pass + + @unittest.skip('Not implemented') + def test_NUMBER(self): + pass + + @unittest.skip('Not implemented') + def test_DATETIME(self): + pass + + @unittest.skip('Not implemented') + def test_ROWID(self): + pass + + @unittest.skip('Not implemented') + def test_Date(self): + pass + + @unittest.skip('Not implemented') + def test_Time(self): + pass + + @unittest.skip('Not implemented') + def test_Timestamp(self): + pass + + @unittest.skip('Not implemented as optional.') + def test_nextset(self): + pass + + @unittest.skip('To do') + def test_callproc(self): + pass + + def test_setoutputsize(self): # Do nothing + pass + + @unittest.skip('To do') + def test_description(self): + pass