Files
sqlalchemy/test/base/test_result.py
T
me-saurabhkohli 4fb459aaf0 Add ambiguous column support to SimpleResultMetaData
Fixed issue where :meth:`.Result.freeze` would lose track of ambiguous
column names present in the original :class:`.CursorResult`, causing
key-based access on the thawed result to silently return a value instead of
raising :class:`.InvalidRequestError`.  The
:class:`.SimpleResultMetaData` now accepts and propagates ambiguous key
information so that frozen, thawed, and pickled results raise consistently
for duplicate column names.  Pull request courtesy Saurabh Kohli.

Fixes: #9427
Closes: #13335
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/13335
Pull-request-sha: c03904ece2

Change-Id: Ia184f77b442b069e6f9a4f94a967ead41a1704b6
2026-05-29 18:20:14 -04:00

1401 lines
39 KiB
Python

import operator
import sys
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.engine import result
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.util import picklers
from sqlalchemy.util import compat
from sqlalchemy.util.langhelpers import load_uncompiled_module
class ResultTupleTest(fixtures.TestBase):
def _fixture(self, values, labels):
return result.result_tuple(labels)(values)
def test_empty(self):
keyed_tuple = self._fixture([], [])
eq_(str(keyed_tuple), "()")
eq_(len(keyed_tuple), 0)
eq_(list(keyed_tuple._mapping.keys()), [])
eq_(keyed_tuple._fields, ())
eq_(keyed_tuple._asdict(), {})
def test_values_none_labels(self):
keyed_tuple = self._fixture([1, 2], [None, None])
eq_(str(keyed_tuple), "(1, 2)")
eq_(len(keyed_tuple), 2)
eq_(list(keyed_tuple._mapping.keys()), [])
eq_(keyed_tuple._fields, ())
eq_(keyed_tuple._asdict(), {})
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
def test_creation(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(str(keyed_tuple), "(1, 2)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
eq_(keyed_tuple._fields, ("a", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 2})
def test_index_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
def should_raise():
keyed_tuple[2]
assert_raises(IndexError, should_raise)
def test_negative_index_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[-1], 2)
eq_(keyed_tuple[-2:-1], (1,))
def test_slice_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[0:2], (1, 2))
def test_slices_arent_in_mappings(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
if compat.py312:
with expect_raises(KeyError):
keyed_tuple._mapping[0:2]
else:
with expect_raises(TypeError):
keyed_tuple._mapping[0:2]
def test_integers_arent_in_mappings(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
assert_raises(KeyError, lambda: keyed_tuple._mapping[1])
def test_getter(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
getter = keyed_tuple._parent._getter("b")
eq_(getter(keyed_tuple), 2)
getter = keyed_tuple._parent._getter(2)
eq_(getter(keyed_tuple), 3)
def test_tuple_getter(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
getter = keyed_tuple._parent._row_as_tuple_getter(["b", "c"])
eq_(getter(keyed_tuple), (2, 3))
# row as tuple getter doesn't accept ints. for ints, just
# use plain python
getter = operator.itemgetter(2, 0, 1)
# getter = keyed_tuple._parent._row_as_tuple_getter([2, 0, 1])
eq_(getter(keyed_tuple), (3, 1, 2))
def test_attribute_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 2)
def should_raise():
keyed_tuple.c
assert_raises(AttributeError, should_raise)
def test_contains(self):
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])
is_true("x" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("z" not in keyed_tuple)
is_false("x" not in keyed_tuple)
# we don't do keys
is_false("a" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("a" not in keyed_tuple)
is_true("z" not in keyed_tuple)
def test_contains_mapping(self):
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])._mapping
is_false("x" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("z" not in keyed_tuple)
is_true("x" not in keyed_tuple)
# we do keys
is_true("a" in keyed_tuple)
is_true("b" in keyed_tuple)
def test_none_label(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
eq_(str(keyed_tuple), "(1, 2, 3)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
eq_(keyed_tuple._fields, ("a", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
# attribute access: can't get at value 2
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 3)
# index access: can get at value 2
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
eq_(keyed_tuple[2], 3)
def test_duplicate_labels(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "b"])
eq_(str(keyed_tuple), "(1, 2, 3)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b", "b"])
eq_(keyed_tuple._fields, ("a", "b", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
# attribute access: can't get at value 2
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 3)
# index access: can get at value 2
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
eq_(keyed_tuple[2], 3)
def test_immutable(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(str(keyed_tuple), "(1, 2)")
eq_(keyed_tuple.a, 1)
# eh
# assert_raises(AttributeError, setattr, keyed_tuple, "a", 5)
def should_raise():
keyed_tuple[0] = 100
assert_raises(TypeError, should_raise)
def test_serialize(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
for loads, dumps in picklers():
kt = loads(dumps(keyed_tuple))
eq_(str(kt), "(1, 2, 3)")
eq_(list(kt._mapping.keys()), ["a", "b"])
eq_(kt._fields, ("a", "b"))
eq_(kt._asdict(), {"a": 1, "b": 3})
@testing.fixture
def _load_module(self):
from sqlalchemy.engine import _row_cy as _cy_row
_py_row = load_uncompiled_module(_cy_row)
# allow pickle to serialize the two rowproxy_reconstructor functions
# create a new virtual module
new_name = _py_row.__name__ + "py_only"
sys.modules[new_name] = _py_row
_py_row.__name__ = new_name
for item in vars(_py_row).values():
# only the rowproxy_reconstructor module is required to change,
# but set every one for consistency
if getattr(item, "__module__", None) == _cy_row.__name__:
item.__module__ = new_name
yield _cy_row, _py_row
sys.modules.pop(new_name)
@testing.requires.cextensions
@testing.variation("direction", ["py_to_cy", "cy_to_py"])
def test_serialize_cy_py_cy(
self, direction: testing.Variation, _load_module
):
_cy_row, _py_row = _load_module
global Row
p = result.SimpleResultMetaData(["a", "w", "b"])
if direction.py_to_cy:
dump_cls = _py_row.BaseRow
load_cls = _cy_row.BaseRow
elif direction.cy_to_py:
dump_cls = _cy_row.BaseRow
load_cls = _py_row.BaseRow
else:
direction.fail()
for loads, dumps in picklers():
class Row(dump_cls):
pass
row = Row(p, p._processors, p._key_to_index, (1, 2, 3))
state = dumps(row)
class Row(load_cls):
pass
row2 = loads(state)
is_true(isinstance(row2, load_cls))
is_false(isinstance(row2, dump_cls))
state2 = dumps(row2)
class Row(dump_cls):
pass
row3 = loads(state2)
is_true(isinstance(row3, dump_cls))
def test_processors(self):
parent = result.SimpleResultMetaData(["a", "b", "c", "d"])
data = (1, 99, "42", "foo")
row_none = result.Row(parent, None, parent._key_to_index, data)
eq_(row_none._to_tuple_instance(), data)
row_all_p = result.Row(
parent, [str, float, int, str.upper], parent._key_to_index, data
)
eq_(row_all_p._to_tuple_instance(), ("1", 99.0, 42, "FOO"))
row_some_p = result.Row(
parent, [None, str, None, str.upper], parent._key_to_index, data
)
eq_(row_some_p._to_tuple_instance(), (1, "99", "42", "FOO"))
with expect_raises(AssertionError):
result.Row(parent, [None, str], parent._key_to_index, data)
def test_tuplegetter(self):
data = list(range(10, 20))
eq_(result.tuplegetter(1)(data), [11])
eq_(result.tuplegetter(1, 9, 3)(data), (11, 19, 13))
eq_(result.tuplegetter(2, 3, 4)(data), [12, 13, 14])
class ResultTest(fixtures.TestBase):
def _fixture(
self,
extras=None,
alt_row=None,
num_rows=None,
default_filters=None,
data=None,
):
if data is None:
data = [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)]
if num_rows is not None:
data = data[:num_rows]
res = result.IteratorResult(
result.SimpleResultMetaData(["a", "b", "c"], extra=extras),
iter(data),
)
if default_filters:
res._metadata._create_unique_filters = (
lambda result: default_filters
)
if alt_row:
res._process_row = alt_row
return res
def test_close_attributes(self):
"""test #8710"""
r1 = self._fixture()
is_false(r1.closed)
is_false(r1._soft_closed)
r1._soft_close()
is_false(r1.closed)
is_true(r1._soft_closed)
r1.close()
is_true(r1.closed)
is_true(r1._soft_closed)
def test_class_presented(self):
"""To support different kinds of objects returned vs. rows,
there are two wrapper classes for Result.
"""
r1 = self._fixture()
r2 = r1.columns(0, 1, 2)
assert isinstance(r2, result.Result)
m1 = r1.mappings()
assert isinstance(m1, result.MappingResult)
s1 = r1.scalars(1)
assert isinstance(s1, result.ScalarResult)
def test_mapping_plus_base(self):
r1 = self._fixture()
m1 = r1.mappings()
eq_(m1.fetchone(), {"a": 1, "b": 1, "c": 1})
eq_(r1.fetchone(), (2, 1, 2))
@expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_tuples_plus_base(self):
r1 = self._fixture()
t1 = r1.tuples()
eq_(t1.fetchone(), (1, 1, 1))
eq_(r1.fetchone(), (2, 1, 2))
def test_scalar_plus_base(self):
r1 = self._fixture()
m1 = r1.scalars()
# base is not affected
eq_(r1.fetchone(), (1, 1, 1))
# scalars
eq_(m1.first(), 2)
def test_index_extra(self):
ex1a, ex1b, ex2, ex3a, ex3b = (
object(),
object(),
object(),
object(),
object(),
)
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_(
result.columns(ex2, ex3b).columns(ex3a).all(),
[(1,), (2,), (2,), (2,)],
)
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_([row._mapping[ex1b] for row in result], [1, 2, 1, 4])
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_(
[
dict(r)
for r in result.columns(ex2, ex3b).columns(ex3a).mappings()
],
[{"c": 1}, {"c": 2}, {"c": 2}, {"c": 2}],
)
def test_unique_default_filters(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(result.unique().all(), [(1, 1, 1), (1, 3, 2), (4, 1, 2)])
def test_unique_default_filters_rearrange_scalar(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(result.unique().scalars(1).all(), [1, 3])
def test_unique_default_filters_rearrange_order(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(
result.unique().columns("b", "a", "c").all(),
[(1, 1, 1), (3, 1, 2), (1, 4, 2)],
)
def test_unique_default_filters_rearrange_twice(self):
# test that the default uniqueness filter is reconfigured
# each time columns() is called
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
result = result.unique()
# 1, 1, 1 -> True, 1, True
eq_(result.fetchone(), (1, 1, 1))
# now rearrange for b, a, c
# 1, 2, 2 -> 1, True, True
# 3, 1, 2 -> 3, True, True
result = result.columns("b", "a", "c")
eq_(result.fetchone(), (3, 1, 2))
# now rearrange for c, a
# 2, 4 -> True, False
result = result.columns("c", "a")
eq_(result.fetchall(), [(2, 4)])
def test_unique_scalars_all(self):
result = self._fixture()
eq_(result.unique().scalars(1).all(), [1, 3])
def test_unique_mappings_all(self):
result = self._fixture()
def uniq(row):
return row[0]
eq_(
result.unique(uniq).mappings().all(),
[
{"a": 1, "b": 1, "c": 1},
{"a": 2, "b": 1, "c": 2},
{"a": 4, "b": 1, "c": 2},
],
)
def test_unique_filtered_all(self):
result = self._fixture()
def uniq(row):
return row[0]
eq_(result.unique(uniq).all(), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
def test_unique_scalars_many(self):
result = self._fixture()
result = result.unique().scalars(1)
eq_(result.fetchmany(2), [1, 3])
eq_(result.fetchmany(2), [])
def test_unique_filtered_many(self):
result = self._fixture()
def uniq(row):
return row[0]
result = result.unique(uniq)
eq_(result.fetchmany(2), [(1, 1, 1), (2, 1, 2)])
eq_(result.fetchmany(2), [(4, 1, 2)])
eq_(result.fetchmany(2), [])
def test_unique_scalars_many_none(self):
result = self._fixture()
result = result.unique().scalars(1)
# this assumes the default fetchmany behavior of all() for
# the ListFetchStrategy
eq_(result.fetchmany(None), [1, 3])
eq_(result.fetchmany(None), [])
def test_unique_scalars_iterate(self):
result = self._fixture()
result = result.unique().scalars(1)
eq_(list(result), [1, 3])
def test_unique_filtered_iterate(self):
result = self._fixture()
def uniq(row):
return row[0]
result = result.unique(uniq)
eq_(list(result), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
def test_all(self):
result = self._fixture()
eq_(result.all(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(result.all(), [])
def test_many_then_all(self):
result = self._fixture()
eq_(result.fetchmany(3), [(1, 1, 1), (2, 1, 2), (1, 3, 2)])
eq_(result.all(), [(4, 1, 2)])
eq_(result.all(), [])
def test_scalars(self):
result = self._fixture()
eq_(list(result.scalars()), [1, 2, 1, 4])
result = self._fixture()
eq_(list(result.scalars(2)), [1, 2, 2, 2])
def test_scalars_mappings(self):
result = self._fixture()
eq_(
list(result.columns(0).mappings()),
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 4}],
)
def test_scalars_no_fetchone(self):
result = self._fixture()
s = result.scalars()
assert not hasattr(s, "fetchone")
# original result is unchanged
eq_(result.mappings().fetchone(), {"a": 1, "b": 1, "c": 1})
# scalars
eq_(s.all(), [2, 1, 4])
def test_first(self):
result = self._fixture()
row = result.first()
eq_(row, (1, 1, 1))
# note this is a behavior change in 1.4.27 due to
# adding a real result.close() to Result, previously this would
# return an empty list. this is already the
# behavior with CursorResult, but was mis-implemented for
# other non-cursor result sets.
assert_raises(exc.ResourceClosedError, result.all)
def test_one_unique(self):
# assert that one() counts rows after uniqueness has been applied.
# this would raise if we didn't have unique
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
row = result.unique().one()
eq_(row, (1, 1, 1))
def test_one_unique_tricky_one(self):
# one() needs to keep consuming rows in order to find a non-unique
# one. unique() really slows things down
result = self._fixture(
data=[(1, 1, 1), (1, 1, 1), (1, 1, 1), (2, 1, 1)]
)
assert_raises(exc.MultipleResultsFound, result.unique().one)
def test_one_unique_mapping(self):
# assert that one() counts rows after uniqueness has been applied.
# this would raise if we didn't have unique
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
row = result.mappings().unique().one()
eq_(row, {"a": 1, "b": 1, "c": 1})
def test_one_mapping(self):
result = self._fixture(num_rows=1)
row = result.mappings().one()
eq_(row, {"a": 1, "b": 1, "c": 1})
def test_one(self):
result = self._fixture(num_rows=1)
row = result.one()
eq_(row, (1, 1, 1))
def test_scalar_one(self):
result = self._fixture(num_rows=1)
row = result.scalar_one()
eq_(row, 1)
def test_scalars_plus_one(self):
result = self._fixture(num_rows=1)
row = result.scalars().one()
eq_(row, 1)
def test_scalars_plus_one_none(self):
result = self._fixture(num_rows=0)
result = result.scalars()
assert_raises_message(
exc.NoResultFound,
"No row was found when one was required",
result.one,
)
def test_one_none(self):
result = self._fixture(num_rows=0)
assert_raises_message(
exc.NoResultFound,
"No row was found when one was required",
result.one,
)
def test_one_or_none(self):
result = self._fixture(num_rows=1)
eq_(result.one_or_none(), (1, 1, 1))
def test_scalar_one_or_none(self):
result = self._fixture(num_rows=1)
eq_(result.scalar_one_or_none(), 1)
def test_scalar_one_or_none_none(self):
result = self._fixture(num_rows=0)
eq_(result.scalar_one_or_none(), None)
def test_one_or_none_none(self):
result = self._fixture(num_rows=0)
eq_(result.one_or_none(), None)
def test_one_raise_mutiple(self):
result = self._fixture(num_rows=2)
assert_raises_message(
exc.MultipleResultsFound,
"Multiple rows were found when exactly one was required",
result.one,
)
def test_one_or_none_raise_multiple(self):
result = self._fixture(num_rows=2)
assert_raises_message(
exc.MultipleResultsFound,
"Multiple rows were found when one or none was required",
result.one_or_none,
)
def test_scalar(self):
result = self._fixture()
eq_(result.scalar(), 1)
# note this is a behavior change in 1.4.27 due to
# adding a real result.close() to Result, previously this would
# return an empty list. this is already the
# behavior with CursorResult, but was mis-implemented for
# other non-cursor result sets.
assert_raises(exc.ResourceClosedError, result.all)
def test_partition(self):
result = self._fixture()
r = []
for partition in result.partitions(2):
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_partition_unique_yield_per(self):
result = self._fixture()
r = []
for partition in result.unique().yield_per(2).partitions():
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_partition_yield_per(self):
result = self._fixture()
r = []
for partition in result.yield_per(2).partitions():
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_columns(self):
result = self._fixture()
result = result.columns("b", "c")
eq_(result.keys(), ["b", "c"])
eq_(result.all(), [(1, 1), (1, 2), (3, 2), (1, 2)])
def test_columns_ints(self):
result = self._fixture()
eq_(result.columns(1, -2).all(), [(1, 1), (1, 1), (3, 3), (1, 1)])
def test_columns_again(self):
result = self._fixture()
eq_(
result.columns("b", "c", "a").columns(1, 2).all(),
[(1, 1), (2, 2), (2, 1), (2, 4)],
)
def test_mappings(self):
result = self._fixture()
eq_(
[dict(r) for r in result.mappings()],
[
{"a": 1, "b": 1, "c": 1},
{"a": 2, "b": 1, "c": 2},
{"a": 1, "b": 3, "c": 2},
{"a": 4, "b": 1, "c": 2},
],
)
def test_columns_with_mappings(self):
result = self._fixture()
eq_(
[dict(r) for r in result.columns("b", "c").mappings().all()],
[
{"b": 1, "c": 1},
{"b": 1, "c": 2},
{"b": 3, "c": 2},
{"b": 1, "c": 2},
],
)
def test_mappings_with_columns(self):
result = self._fixture()
m1 = result.mappings().columns("b", "c")
eq_(m1.fetchmany(2), [{"b": 1, "c": 1}, {"b": 1, "c": 2}])
# no slice here
eq_(result.fetchone(), (1, 3, 2))
# still slices
eq_(m1.fetchone(), {"b": 1, "c": 2})
def test_scalar_none_iterate(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(list(result), [None, 4, None, None, 4])
def test_scalar_none_many(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(result.fetchmany(3), [None, 4, None])
eq_(result.fetchmany(5), [None, 4])
def test_scalar_none_all(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(result.all(), [None, 4, None, None, 4])
def test_scalar_none_all_unique(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1).unique()
eq_(result.all(), [None, 4])
def test_scalar_only_on_filter_w_unique(self):
# test a mixture of the "real" result and the
# scalar filter, where scalar has unique and real result does not.
# this is new as of [ticket:5503] where we have created
# ScalarResult / MappingResult "filters" that don't modify
# the Result
result = self._fixture(
data=[
(1, 1, 2),
(3, 4, 5),
(1, 1, 2),
(3, None, 5),
(3, 4, 5),
(3, None, 5),
]
)
# result is non-unique. u_s is unique on column 0
u_s = result.scalars(0).unique()
eq_(next(u_s), 1) # unique value 1 from first row
eq_(next(result), (3, 4, 5)) # second row
eq_(next(u_s), 3) # skip third row, return 3 for fourth row
eq_(next(result), (3, 4, 5)) # non-unique fifth row
eq_(u_s.all(), []) # unique set is done because only 3 is left
def test_scalar_none_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
# one is returning None, see?
eq_(result.one(), None)
def test_scalar_none_one_or_none_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
# the orm.Query can actually do this right now, so we sort of
# have to allow for this unforuntately, unless we want to raise?
eq_(result.one_or_none(), None)
def test_scalar_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.unique()
eq_(result.scalar_one(), 1)
def test_scalars_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.unique()
eq_(result.scalars().one(), 1)
def test_scalar_none_first(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
eq_(result.first(), None)
def test_freeze(self):
result = self._fixture()
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(r1.fetchall(), [])
r2 = frozen()
eq_(r1.fetchall(), [])
eq_(r2.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(r2.fetchall(), [])
def test_columns_unique_freeze(self):
result = self._fixture()
result = result.columns("b", "c").unique()
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2)])
def test_columns_unique_freeze_w_unique_filters(self):
result = self._fixture(default_filters=[id, None, None])
result = result.columns("b", "c")
frozen = result.freeze()
r1 = frozen().unique()
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2)])
def test_columns_freeze(self):
result = self._fixture()
result = result.columns("b", "c")
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2), (1, 2)])
r2 = frozen().unique()
eq_(r2.fetchall(), [(1, 1), (1, 2), (3, 2)])
def test_scalars_freeze(self):
result = self._fixture()
frozen = result.freeze()
r1 = frozen()
eq_(r1.scalars(1).fetchall(), [1, 1, 3, 1])
r2 = frozen().scalars(1).unique()
eq_(r2.fetchall(), [1, 3])
def test_ambiguous_key_raises(self):
meta = result.SimpleResultMetaData(
["x", "x"], _ambiguous_keys=frozenset(["x"])
)
res_obj = result.IteratorResult(meta, iter([(4, 2)]))
row = res_obj.fetchone()
eq_(row[0], 4)
eq_(row[1], 2)
assert_raises_message(
exc.InvalidRequestError,
"Ambiguous column name 'x'",
lambda: row._mapping["x"],
)
class MergeResultTest(fixtures.TestBase):
@testing.fixture
def merge_fixture(self):
r1 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(7, "u1"), (8, "u2")]),
)
r2 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(9, "u3")]),
)
r3 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(10, "u4"), (11, "u5")]),
)
r4 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(12, "u6")]),
)
return r1, r2, r3, r4
@testing.fixture
def dupe_fixture(self):
r1 = result.IteratorResult(
result.SimpleResultMetaData(["x", "y", "z"]),
iter([(1, 2, 1), (2, 2, 1)]),
)
r2 = result.IteratorResult(
result.SimpleResultMetaData(["x", "y", "z"]),
iter([(3, 1, 2), (3, 3, 3)]),
)
return r1, r2
def test_merge_results(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(result.keys(), ["user_id", "user_name"])
row = result.fetchone()
eq_(row, (7, "u1"))
result.close()
def test_fetchall(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.fetchall(),
[
(7, "u1"),
(8, "u2"),
(9, "u3"),
(10, "u4"),
(11, "u5"),
(12, "u6"),
],
)
def test_first(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.first(),
(7, "u1"),
)
def test_columns(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.columns("user_name").fetchmany(4),
[("u1",), ("u2",), ("u3",), ("u4",)],
)
result.close()
def test_merge_scalars(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
for r in (r1, r2, r3, r4):
r.scalars(0)
result = r1.merge(r2, r3, r4)
eq_(result.scalars(0).all(), [7, 8, 9, 10, 11, 12])
def test_merge_unique(self, dupe_fixture):
r1, r2 = dupe_fixture
r1.scalars("y")
r2.scalars("y")
result = r1.merge(r2)
# uniqued 2, 2, 1, 3
eq_(result.scalars("y").unique().all(), [2, 1, 3])
def test_merge_preserve_unique(self, dupe_fixture):
r1, r2 = dupe_fixture
r1.unique().scalars("y")
r2.unique().scalars("y")
result = r1.merge(r2)
# unique takes place
eq_(result.scalars("y").all(), [2, 1, 3])
class OnlyScalarsTest(fixtures.TestBase):
"""the chunkediterator supports "non tuple mode", where we bypass
the expense of generating rows when we have only scalar values.
"""
@testing.fixture
def no_tuple_fixture(self):
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
@testing.fixture
def no_tuple_one_fixture(self):
data = [(1, 1, 1)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
@testing.fixture
def normal_fixture(self):
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
def test_scalar_mode_columns0_mapping(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.columns(0).mappings()
eq_(
list(r),
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 1}, {"a": 4}],
)
def test_scalar_mode_columns0_plain(self, no_tuple_fixture):
"""test #7953"""
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.columns(0)
eq_(
list(r),
[(1,), (2,), (1,), (1,), (4,)],
)
def test_scalar_mode_scalars0(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars(0)
eq_(
list(r),
[1, 2, 1, 1, 4],
)
def test_scalar_mode_but_accessed_nonscalar_result(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
s1 = r.scalars()
eq_(r.fetchone(), (1,))
eq_(s1.all(), [2, 1, 1, 4])
def test_scalar_mode_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(r.all(), [1, 2, 1, 1, 4])
def test_scalar_mode_mfiltered_unique_rows_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _create_unique_filters=lambda result: [int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(r.all(), [(1,), (2,), (4,)])
@testing.combinations(
lambda r: r.scalar(),
lambda r: r.scalar_one(),
lambda r: r.scalar_one_or_none(),
argnames="get",
)
def test_unique_scalar_accessors(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _create_unique_filters=lambda result: [int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_one_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(get(r), 1)
def test_scalar_mode_mfiltered_unique_mappings_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _create_unique_filters=lambda result: [int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(r.mappings().all(), [{"a": 1}, {"a": 2}, {"a": 4}])
def test_scalar_mode_mfiltered_unique_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _create_unique_filters=lambda result: [int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.scalars().unique()
eq_(r.all(), [1, 2, 4])
def test_scalar_mode_unique_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.unique().scalars()
eq_(r.all(), [1, 2, 4])
def test_scalar_mode_scalars_fetchmany(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(list(r.partitions(2)), [[1, 2], [1, 1], [4]])
def test_scalar_mode_unique_scalars_fetchmany(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.scalars().unique()
eq_(list(r.partitions(2)), [[1, 2], [4]])
def test_scalar_mode_unique_tuples_all(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.unique()
eq_(r.all(), [(1,), (2,), (4,)])
def test_scalar_mode_tuples_all(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
eq_(r.all(), [(1,), (2,), (1,), (1,), (4,)])
def test_scalar_mode_scalars_iterate(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(list(r), [1, 2, 1, 1, 4])
def test_scalar_mode_tuples_iterate(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
eq_(list(r), [(1,), (2,), (1,), (1,), (4,)])
@testing.combinations(
lambda r: r.one(),
lambda r: r.first(),
lambda r: r.one_or_none(),
argnames="get",
)
def test_scalar_mode_first(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_one_fixture, source_supports_scalars=True
)
eq_(get(r), (1,))
@testing.combinations(
lambda r: r.scalar(),
lambda r: r.scalar_one(),
lambda r: r.scalar_one_or_none(),
argnames="get",
)
def test_scalar_mode_scalar_one(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_one_fixture, source_supports_scalars=True
)
eq_(get(r), 1)