Skip to content

Commit a9c0487

Browse files
CaselITzzzeek
authored andcommitted
Add support for two-phase commit in oracledb.
Implemented two-phase transactions for the oracledb dialect. Historically, this feature never worked with the cx_Oracle dialect, however recent improvements to the oracledb successor now allow this to be possible. The two phase transaction API is available at the Core level via the :meth:`_engine.Connection.begin_twophase` method. As part of this change, added new facility for testing that allows a test to skip if a certain step takes too long, allowing for a separate cleanup step. this is needed as oracle tpc wont allow commit recovery if transaction is older than about 1 second, could not find any docs on how to increase this timeout. Fixed an execute call in the PostgreSQL dialect's provisioning that drops old tpc transactions which was non-working, which indicates that we've apparently never had any PG tpc transactions needing to be cleaned up in CI for some years now, so that's good Fixes: #11480 Change-Id: If3ad19cc29999e70f07f767b88afd330f6e5a4be
1 parent 1442a71 commit a9c0487

File tree

9 files changed

+157
-67
lines changed

9 files changed

+157
-67
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
.. change::
2+
:tags: usecase, oracle
3+
:tickets: 11480
4+
5+
Implemented two-phase transactions for the oracledb dialect. Historically,
6+
this feature never worked with the cx_Oracle dialect, however recent
7+
improvements to the oracledb successor now allow this to be possible. The
8+
two phase transaction API is available at the Core level via the
9+
:meth:`_engine.Connection.begin_twophase` method.

lib/sqlalchemy/dialects/oracle/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,6 @@
338338
on parity with other backends.
339339
340340
341-
342341
ON UPDATE CASCADE
343342
-----------------
344343
@@ -479,7 +478,7 @@
479478
.. _oracle_table_options:
480479
481480
Oracle Table Options
482-
-------------------------
481+
--------------------
483482
484483
The CREATE TABLE phrase supports the following options with Oracle
485484
in conjunction with the :class:`_schema.Table` construct:

lib/sqlalchemy/dialects/oracle/cx_oracle.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -377,14 +377,12 @@ def _remove_clob(inputsizes, cursor, statement, parameters, context):
377377
``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`,
378378
which takes place only engine-wide.
379379
380-
Two Phase Transactions Not Supported
381-
-------------------------------------
380+
Two Phase Transactions Not Supported (use oracledb)
381+
---------------------------------------------------
382382
383-
Two phase transactions are **not supported** under cx_Oracle due to poor
384-
driver support. As of cx_Oracle 6.0b1, the interface for
385-
two phase transactions has been changed to be more of a direct pass-through
386-
to the underlying OCI layer with less automation. The additional logic
387-
to support this system is not implemented in SQLAlchemy.
383+
Two phase transactions are **not supported** under cx_Oracle due to poor driver
384+
support. The newer :ref:`oracledb` dialect however **does** support two phase
385+
transactions and should be preferred.
388386
389387
.. _cx_oracle_numeric:
390388
@@ -1423,13 +1421,6 @@ def is_disconnect(self, e, connection, cursor):
14231421
return False
14241422

14251423
def create_xid(self):
1426-
"""create a two-phase transaction ID.
1427-
1428-
this id will be passed to do_begin_twophase(), do_rollback_twophase(),
1429-
do_commit_twophase(). its format is unspecified.
1430-
1431-
"""
1432-
14331424
id_ = random.randint(0, 2**128)
14341425
return (0x1234, "%032x" % id_, "%032x" % 9)
14351426

lib/sqlalchemy/dialects/oracle/oracledb.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
:connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
1414
:url: https://oracle.github.io/python-oracledb/
1515
16+
Description
17+
-----------
18+
1619
python-oracledb is released by Oracle to supersede the cx_Oracle driver.
1720
It is fully compatible with cx_Oracle and features both a "thin" client
1821
mode that requires no dependencies, as well as a "thick" mode that uses
@@ -21,7 +24,7 @@
2124
.. seealso::
2225
2326
:ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver
24-
as well.
27+
as well, with the exception that oracledb supports two phase transactions.
2528
2629
The SQLAlchemy ``oracledb`` dialect provides both a sync and an async
2730
implementation under the same dialect name. The proper version is
@@ -70,6 +73,16 @@
7073
7174
https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client
7275
76+
Two Phase Transactions Supported
77+
--------------------------------
78+
79+
Two phase transactions are fully supported under oracledb. Starting with
80+
oracledb 2.3 two phase transactions are supported also in thin mode. APIs
81+
for two phase transactions are provided at the Core level via
82+
:meth:`_engine.Connection.begin_twophase` and :paramref:`_orm.Session.twophase`
83+
for transparent ORM use.
84+
85+
.. versionchanged:: 2.0.32 added support for two phase transactions
7386
7487
.. versionadded:: 2.0.0 added support for oracledb driver.
7588
@@ -151,6 +164,49 @@ def _load_version(self, dbapi_module):
151164
f"oracledb version {self._min_version} and above are supported"
152165
)
153166

167+
def do_begin_twophase(self, connection, xid):
168+
conn_xis = connection.connection.xid(*xid)
169+
connection.connection.tpc_begin(conn_xis)
170+
connection.connection.info["oracledb_xid"] = conn_xis
171+
172+
def do_prepare_twophase(self, connection, xid):
173+
should_commit = connection.connection.tpc_prepare()
174+
connection.info["oracledb_should_commit"] = should_commit
175+
176+
def do_rollback_twophase(
177+
self, connection, xid, is_prepared=True, recover=False
178+
):
179+
if recover:
180+
conn_xid = connection.connection.xid(*xid)
181+
else:
182+
conn_xid = None
183+
connection.connection.tpc_rollback(conn_xid)
184+
185+
def do_commit_twophase(
186+
self, connection, xid, is_prepared=True, recover=False
187+
):
188+
conn_xid = None
189+
if not is_prepared:
190+
should_commit = connection.connection.tpc_prepare()
191+
elif recover:
192+
conn_xid = connection.connection.xid(*xid)
193+
should_commit = True
194+
else:
195+
should_commit = connection.info["oracledb_should_commit"]
196+
if should_commit:
197+
connection.connection.tpc_commit(conn_xid)
198+
199+
def do_recover_twophase(self, connection):
200+
return [
201+
# oracledb seems to return bytes
202+
(
203+
fi,
204+
gti.decode() if isinstance(gti, bytes) else gti,
205+
bq.decode() if isinstance(bq, bytes) else bq,
206+
)
207+
for fi, gti, bq in connection.connection.tpc_recover()
208+
]
209+
154210

155211
class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
156212
_cursor: AsyncCursor
@@ -241,6 +297,24 @@ def stmtcachesize(self, value):
241297
def cursor(self):
242298
return AsyncAdapt_oracledb_cursor(self)
243299

300+
def xid(self, *args: Any, **kwargs: Any) -> Any:
301+
return self._connection.xid(*args, **kwargs)
302+
303+
def tpc_begin(self, *args: Any, **kwargs: Any) -> Any:
304+
return await_(self._connection.tpc_begin(*args, **kwargs))
305+
306+
def tpc_commit(self, *args: Any, **kwargs: Any) -> Any:
307+
return await_(self._connection.tpc_commit(*args, **kwargs))
308+
309+
def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any:
310+
return await_(self._connection.tpc_prepare(*args, **kwargs))
311+
312+
def tpc_recover(self, *args: Any, **kwargs: Any) -> Any:
313+
return await_(self._connection.tpc_recover(*args, **kwargs))
314+
315+
def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any:
316+
return await_(self._connection.tpc_rollback(*args, **kwargs))
317+
244318

245319
class OracledbAdaptDBAPI:
246320
def __init__(self, oracledb) -> None:

lib/sqlalchemy/dialects/postgresql/provision.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def drop_all_schema_objects_pre_tables(cfg, eng):
9797
for xid in conn.exec_driver_sql(
9898
"select gid from pg_prepared_xacts"
9999
).scalars():
100-
conn.execute("ROLLBACK PREPARED '%s'" % xid)
100+
conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
101101

102102

103103
@drop_all_schema_objects_post_tables.for_db("postgresql")

lib/sqlalchemy/testing/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
from .util import resolve_lambda
8484
from .util import rowset
8585
from .util import run_as_contextmanager
86+
from .util import skip_if_timeout
8687
from .util import teardown_events
8788
from .warnings import assert_warnings
8889
from .warnings import warn_test_suite

lib/sqlalchemy/testing/util.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010
from __future__ import annotations
1111

1212
from collections import deque
13+
import contextlib
1314
import decimal
1415
import gc
1516
from itertools import chain
1617
import random
1718
import sys
1819
from sys import getsizeof
20+
import time
1921
import types
22+
from typing import Any
2023

2124
from . import config
2225
from . import mock
@@ -517,3 +520,18 @@ def count_cache_key_tuples(tup):
517520
if elem:
518521
stack = list(elem) + [sentinel] + stack
519522
return num_elements
523+
524+
525+
@contextlib.contextmanager
526+
def skip_if_timeout(seconds: float, cleanup: Any = None):
527+
528+
now = time.time()
529+
yield
530+
sec = time.time() - now
531+
if sec > seconds:
532+
try:
533+
cleanup()
534+
finally:
535+
config.skip_test(
536+
f"test took too long ({sec:.4f} seconds > {seconds})"
537+
)

test/engine/test_transaction.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,8 @@ def test_two_phase_transaction(self, local_connection):
473473

474474
@testing.requires.two_phase_transactions
475475
@testing.requires.two_phase_recovery
476-
def test_two_phase_recover(self):
476+
@testing.variation("commit", [True, False])
477+
def test_two_phase_recover(self, commit):
477478
users = self.tables.users
478479

479480
# 2020, still can't get this to work w/ modern MySQL or MariaDB.
@@ -501,17 +502,29 @@ def test_two_phase_recover(self):
501502
[],
502503
)
503504
# recover_twophase needs to be run in a new transaction
504-
with testing.db.connect() as connection2:
505-
recoverables = connection2.recover_twophase()
506-
assert transaction.xid in recoverables
507-
connection2.commit_prepared(transaction.xid, recover=True)
508-
509-
eq_(
510-
connection2.execute(
511-
select(users.c.user_id).order_by(users.c.user_id)
512-
).fetchall(),
513-
[(1,)],
514-
)
505+
with testing.db.connect() as connection3:
506+
# oracle transactions can't be recovered for commit after...
507+
# about 1 second? OK
508+
with testing.skip_if_timeout(
509+
0.75,
510+
cleanup=(
511+
lambda: connection3.rollback_prepared(
512+
transaction.xid, recover=True
513+
)
514+
),
515+
):
516+
recoverables = connection3.recover_twophase()
517+
assert transaction.xid in recoverables
518+
519+
if commit:
520+
connection3.commit_prepared(transaction.xid, recover=True)
521+
res = [(1,)]
522+
else:
523+
connection3.rollback_prepared(transaction.xid, recover=True)
524+
res = []
525+
526+
stmt = select(users.c.user_id).order_by(users.c.user_id)
527+
eq_(connection3.execute(stmt).fetchall(), res)
515528

516529
@testing.requires.two_phase_transactions
517530
def test_multiple_two_phase(self, local_connection):

test/requirements.py

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -858,32 +858,27 @@ def pg_prepared_transaction(config):
858858
else:
859859
return num > 0
860860

861-
return (
862-
skip_if(
863-
[
864-
no_support(
865-
"mssql", "two-phase xact not supported by drivers"
866-
),
867-
no_support(
868-
"sqlite", "two-phase xact not supported by database"
869-
),
870-
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
871-
# we are evaluating which modern MySQL / MariaDB versions
872-
# can handle two-phase testing without too many problems
873-
# no_support(
874-
# "mysql",
875-
# "recent MySQL community editions have too many "
876-
# "issues (late 2016), disabling for now",
877-
# ),
878-
NotPredicate(
879-
LambdaPredicate(
880-
pg_prepared_transaction,
881-
"max_prepared_transactions not available or zero",
882-
)
883-
),
884-
]
885-
)
886-
+ self.skip_on_oracledb_thin
861+
return skip_if(
862+
[
863+
no_support("mssql", "two-phase xact not supported by drivers"),
864+
no_support(
865+
"sqlite", "two-phase xact not supported by database"
866+
),
867+
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
868+
# we are evaluating which modern MySQL / MariaDB versions
869+
# can handle two-phase testing without too many problems
870+
# no_support(
871+
# "mysql",
872+
# "recent MySQL community editions have too many "
873+
# "issues (late 2016), disabling for now",
874+
# ),
875+
NotPredicate(
876+
LambdaPredicate(
877+
pg_prepared_transaction,
878+
"max_prepared_transactions not available or zero",
879+
)
880+
),
881+
]
887882
)
888883

889884
@property
@@ -893,7 +888,7 @@ def two_phase_recovery(self):
893888
["mysql", "mariadb"],
894889
"still can't get recover to work w/ MariaDB / MySQL",
895890
)
896-
+ skip_if("oracle", "recovery not functional")
891+
+ skip_if("oracle+cx_oracle", "recovery not functional")
897892
)
898893

899894
@property
@@ -1870,16 +1865,6 @@ def go(config):
18701865

18711866
return only_if(go)
18721867

1873-
@property
1874-
def skip_on_oracledb_thin(self):
1875-
def go(config):
1876-
if against(config, "oracle+oracledb"):
1877-
with config.db.connect() as conn:
1878-
return config.db.dialect.is_thin_mode(conn)
1879-
return False
1880-
1881-
return skip_if(go)
1882-
18831868
@property
18841869
def computed_columns(self):
18851870
return skip_if(["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"])

0 commit comments

Comments
 (0)