Merge "Add support for two-phase commit in oracledb." into rel_2_0

This commit is contained in:
Michael Bayer
2024-08-01 17:47:06 +00:00
committed by Gerrit Code Review
9 changed files with 156 additions and 66 deletions
+9
View File
@@ -0,0 +1,9 @@
.. change::
:tags: usecase, oracle
:tickets: 11480
Implemented two-phase transactions for the oracledb dialect. Historically,
this feature never worked with the cx_Oracle dialect, however recent
improvements to the oracledb successor now allow this to be possible. The
two phase transaction API is available at the Core level via the
:meth:`_engine.Connection.begin_twophase` method.
+1 -2
View File
@@ -326,7 +326,6 @@ returned as well.
on parity with other backends.
ON UPDATE CASCADE
-----------------
@@ -467,7 +466,7 @@ is reflected and the type is reported as ``DATE``, the time-supporting
.. _oracle_table_options:
Oracle Table Options
-------------------------
--------------------
The CREATE TABLE phrase supports the following options with Oracle
in conjunction with the :class:`_schema.Table` construct:
+5 -14
View File
@@ -377,14 +377,12 @@ buffered objects with a ``read()`` method, the parameter
``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`,
which takes place only engine-wide.
Two Phase Transactions Not Supported
-------------------------------------
Two Phase Transactions Not Supported (use oracledb)
---------------------------------------------------
Two phase transactions are **not supported** under cx_Oracle due to poor
driver support. As of cx_Oracle 6.0b1, the interface for
two phase transactions has been changed to be more of a direct pass-through
to the underlying OCI layer with less automation. The additional logic
to support this system is not implemented in SQLAlchemy.
Two phase transactions are **not supported** under cx_Oracle due to poor driver
support. The newer :ref:`oracledb` dialect however **does** support two phase
transactions and should be preferred.
.. _cx_oracle_numeric:
@@ -1423,13 +1421,6 @@ class OracleDialect_cx_oracle(OracleDialect):
return False
def create_xid(self):
"""create a two-phase transaction ID.
this id will be passed to do_begin_twophase(), do_rollback_twophase(),
do_commit_twophase(). its format is unspecified.
"""
id_ = random.randint(0, 2**128)
return (0x1234, "%032x" % id_, "%032x" % 9)
+75 -1
View File
@@ -13,6 +13,9 @@ r"""
:connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
:url: https://oracle.github.io/python-oracledb/
Description
-----------
python-oracledb is released by Oracle to supersede the cx_Oracle driver.
It is fully compatible with cx_Oracle and features both a "thin" client
mode that requires no dependencies, as well as a "thick" mode that uses
@@ -21,7 +24,7 @@ the Oracle Client Interface in the same way as cx_Oracle.
.. seealso::
:ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver
as well.
as well, with the exception that oracledb supports two phase transactions.
The SQLAlchemy ``oracledb`` dialect provides both a sync and an async
implementation under the same dialect name. The proper version is
@@ -70,6 +73,16 @@ like the ``lib_dir`` path, a dict may be passed to this parameter, as in::
https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client
Two Phase Transactions Supported
--------------------------------
Two phase transactions are fully supported under oracledb. Starting with
oracledb 2.3 two phase transactions are supported also in thin mode. APIs
for two phase transactions are provided at the Core level via
:meth:`_engine.Connection.begin_twophase` and :paramref:`_orm.Session.twophase`
for transparent ORM use.
.. versionchanged:: 2.0.32 added support for two phase transactions
.. versionadded:: 2.0.0 added support for oracledb driver.
@@ -155,6 +168,49 @@ class OracleDialect_oracledb(_OracleDialect_cx_oracle):
f"oracledb version {self._min_version} and above are supported"
)
def do_begin_twophase(self, connection, xid):
conn_xis = connection.connection.xid(*xid)
connection.connection.tpc_begin(conn_xis)
connection.connection.info["oracledb_xid"] = conn_xis
def do_prepare_twophase(self, connection, xid):
should_commit = connection.connection.tpc_prepare()
connection.info["oracledb_should_commit"] = should_commit
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False
):
if recover:
conn_xid = connection.connection.xid(*xid)
else:
conn_xid = None
connection.connection.tpc_rollback(conn_xid)
def do_commit_twophase(
self, connection, xid, is_prepared=True, recover=False
):
conn_xid = None
if not is_prepared:
should_commit = connection.connection.tpc_prepare()
elif recover:
conn_xid = connection.connection.xid(*xid)
should_commit = True
else:
should_commit = connection.info["oracledb_should_commit"]
if should_commit:
connection.connection.tpc_commit(conn_xid)
def do_recover_twophase(self, connection):
return [
# oracledb seems to return bytes
(
fi,
gti.decode() if isinstance(gti, bytes) else gti,
bq.decode() if isinstance(bq, bytes) else bq,
)
for fi, gti, bq in connection.connection.tpc_recover()
]
class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
_cursor: AsyncCursor
@@ -251,6 +307,24 @@ class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection):
def cursor(self):
return AsyncAdapt_oracledb_cursor(self)
def xid(self, *args: Any, **kwargs: Any) -> Any:
return self._connection.xid(*args, **kwargs)
def tpc_begin(self, *args: Any, **kwargs: Any) -> Any:
return self.await_(self._connection.tpc_begin(*args, **kwargs))
def tpc_commit(self, *args: Any, **kwargs: Any) -> Any:
return self.await_(self._connection.tpc_commit(*args, **kwargs))
def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any:
return self.await_(self._connection.tpc_prepare(*args, **kwargs))
def tpc_recover(self, *args: Any, **kwargs: Any) -> Any:
return self.await_(self._connection.tpc_recover(*args, **kwargs))
def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any:
return self.await_(self._connection.tpc_rollback(*args, **kwargs))
class AsyncAdaptFallback_oracledb_connection(
AsyncAdaptFallback_dbapi_connection, AsyncAdapt_oracledb_connection
@@ -97,7 +97,7 @@ def drop_all_schema_objects_pre_tables(cfg, eng):
for xid in conn.exec_driver_sql(
"select gid from pg_prepared_xacts"
).scalars():
conn.execute("ROLLBACK PREPARED '%s'" % xid)
conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
@drop_all_schema_objects_post_tables.for_db("postgresql")
+1
View File
@@ -83,6 +83,7 @@ from .util import provide_metadata
from .util import resolve_lambda
from .util import rowset
from .util import run_as_contextmanager
from .util import skip_if_timeout
from .util import teardown_events
from .warnings import assert_warnings
from .warnings import warn_test_suite
+18
View File
@@ -10,13 +10,16 @@
from __future__ import annotations
from collections import deque
import contextlib
import decimal
import gc
from itertools import chain
import random
import sys
from sys import getsizeof
import time
import types
from typing import Any
from . import config
from . import mock
@@ -517,3 +520,18 @@ def count_cache_key_tuples(tup):
if elem:
stack = list(elem) + [sentinel] + stack
return num_elements
@contextlib.contextmanager
def skip_if_timeout(seconds: float, cleanup: Any = None):
now = time.time()
yield
sec = time.time() - now
if sec > seconds:
try:
cleanup()
finally:
config.skip_test(
f"test took too long ({sec:.4f} seconds > {seconds})"
)
+24 -11
View File
@@ -473,7 +473,8 @@ class TransactionTest(fixtures.TablesTest):
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
@testing.variation("commit", [True, False])
def test_two_phase_recover(self, commit):
users = self.tables.users
# 2020, still can't get this to work w/ modern MySQL or MariaDB.
@@ -501,17 +502,29 @@ class TransactionTest(fixtures.TablesTest):
[],
)
# recover_twophase needs to be run in a new transaction
with testing.db.connect() as connection2:
recoverables = connection2.recover_twophase()
assert transaction.xid in recoverables
connection2.commit_prepared(transaction.xid, recover=True)
with testing.db.connect() as connection3:
# oracle transactions can't be recovered for commit after...
# about 1 second? OK
with testing.skip_if_timeout(
0.75,
cleanup=(
lambda: connection3.rollback_prepared(
transaction.xid, recover=True
)
),
):
recoverables = connection3.recover_twophase()
assert transaction.xid in recoverables
eq_(
connection2.execute(
select(users.c.user_id).order_by(users.c.user_id)
).fetchall(),
[(1,)],
)
if commit:
connection3.commit_prepared(transaction.xid, recover=True)
res = [(1,)]
else:
connection3.rollback_prepared(transaction.xid, recover=True)
res = []
stmt = select(users.c.user_id).order_by(users.c.user_id)
eq_(connection3.execute(stmt).fetchall(), res)
@testing.requires.two_phase_transactions
def test_multiple_two_phase(self, local_connection):
+22 -37
View File
@@ -858,32 +858,27 @@ class DefaultRequirements(SuiteRequirements):
else:
return num > 0
return (
skip_if(
[
no_support(
"mssql", "two-phase xact not supported by drivers"
),
no_support(
"sqlite", "two-phase xact not supported by database"
),
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
# we are evaluating which modern MySQL / MariaDB versions
# can handle two-phase testing without too many problems
# no_support(
# "mysql",
# "recent MySQL community editions have too many "
# "issues (late 2016), disabling for now",
# ),
NotPredicate(
LambdaPredicate(
pg_prepared_transaction,
"max_prepared_transactions not available or zero",
)
),
]
)
+ self.skip_on_oracledb_thin
return skip_if(
[
no_support("mssql", "two-phase xact not supported by drivers"),
no_support(
"sqlite", "two-phase xact not supported by database"
),
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
# we are evaluating which modern MySQL / MariaDB versions
# can handle two-phase testing without too many problems
# no_support(
# "mysql",
# "recent MySQL community editions have too many "
# "issues (late 2016), disabling for now",
# ),
NotPredicate(
LambdaPredicate(
pg_prepared_transaction,
"max_prepared_transactions not available or zero",
)
),
]
)
@property
@@ -893,7 +888,7 @@ class DefaultRequirements(SuiteRequirements):
["mysql", "mariadb"],
"still can't get recover to work w/ MariaDB / MySQL",
)
+ skip_if("oracle", "recovery not functional")
+ skip_if("oracle+cx_oracle", "recovery not functional")
)
@property
@@ -1877,16 +1872,6 @@ class DefaultRequirements(SuiteRequirements):
and config.db.dialect.cx_oracle_ver < (6,)
)
@property
def skip_on_oracledb_thin(self):
def go(config):
if against(config, "oracle+oracledb"):
with config.db.connect() as conn:
return config.db.dialect.is_thin_mode(conn)
return False
return skip_if(go)
@property
def computed_columns(self):
return skip_if(["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"])