mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-06 17:01:07 -04:00
0be89aaa38
Replaces the pyx files with py files that can be both compiled by cython or imported as is by python. This avoids the need of duplicating the code to have a python only fallback. The cython files are also reorganized to be in the module they use instead of all being in the cyextension package, that has been removed. The performance is pretty much equal between main and this change. A detailed comparison is at this link https://docs.google.com/spreadsheets/d/1jkmGpnCyEcPyy6aRK9alElGjxlNHu44Wxjr4VrD99so/edit?usp=sharing Change-Id: Iaed232ea5dfb41534cc9f58f6ea2f912a93263af
1373 lines
38 KiB
Python
1373 lines
38 KiB
Python
import operator
|
|
import sys
|
|
|
|
from sqlalchemy import exc
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.engine import result
|
|
from sqlalchemy.testing import assert_raises
|
|
from sqlalchemy.testing import assert_raises_message
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import is_false
|
|
from sqlalchemy.testing import is_true
|
|
from sqlalchemy.testing.assertions import expect_deprecated
|
|
from sqlalchemy.testing.assertions import expect_raises
|
|
from sqlalchemy.testing.util import picklers
|
|
from sqlalchemy.util import compat
|
|
from sqlalchemy.util.langhelpers import load_uncompiled_module
|
|
|
|
|
|
class ResultTupleTest(fixtures.TestBase):
|
|
def _fixture(self, values, labels):
|
|
return result.result_tuple(labels)(values)
|
|
|
|
def test_empty(self):
|
|
keyed_tuple = self._fixture([], [])
|
|
eq_(str(keyed_tuple), "()")
|
|
eq_(len(keyed_tuple), 0)
|
|
|
|
eq_(list(keyed_tuple._mapping.keys()), [])
|
|
eq_(keyed_tuple._fields, ())
|
|
eq_(keyed_tuple._asdict(), {})
|
|
|
|
def test_values_none_labels(self):
|
|
keyed_tuple = self._fixture([1, 2], [None, None])
|
|
eq_(str(keyed_tuple), "(1, 2)")
|
|
eq_(len(keyed_tuple), 2)
|
|
|
|
eq_(list(keyed_tuple._mapping.keys()), [])
|
|
eq_(keyed_tuple._fields, ())
|
|
eq_(keyed_tuple._asdict(), {})
|
|
|
|
eq_(keyed_tuple[0], 1)
|
|
eq_(keyed_tuple[1], 2)
|
|
|
|
def test_creation(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(str(keyed_tuple), "(1, 2)")
|
|
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
|
|
eq_(keyed_tuple._fields, ("a", "b"))
|
|
eq_(keyed_tuple._asdict(), {"a": 1, "b": 2})
|
|
|
|
def test_index_access(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(keyed_tuple[0], 1)
|
|
eq_(keyed_tuple[1], 2)
|
|
|
|
def should_raise():
|
|
keyed_tuple[2]
|
|
|
|
assert_raises(IndexError, should_raise)
|
|
|
|
def test_negative_index_access(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(keyed_tuple[-1], 2)
|
|
eq_(keyed_tuple[-2:-1], (1,))
|
|
|
|
def test_slice_access(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(keyed_tuple[0:2], (1, 2))
|
|
|
|
def test_slices_arent_in_mappings(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
|
|
if compat.py312:
|
|
with expect_raises(KeyError):
|
|
keyed_tuple._mapping[0:2]
|
|
else:
|
|
with expect_raises(TypeError):
|
|
keyed_tuple._mapping[0:2]
|
|
|
|
def test_integers_arent_in_mappings(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
|
|
assert_raises(KeyError, lambda: keyed_tuple._mapping[1])
|
|
|
|
def test_getter(self):
|
|
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
|
|
|
|
getter = keyed_tuple._parent._getter("b")
|
|
eq_(getter(keyed_tuple), 2)
|
|
|
|
getter = keyed_tuple._parent._getter(2)
|
|
eq_(getter(keyed_tuple), 3)
|
|
|
|
def test_tuple_getter(self):
|
|
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
|
|
|
|
getter = keyed_tuple._parent._row_as_tuple_getter(["b", "c"])
|
|
eq_(getter(keyed_tuple), (2, 3))
|
|
|
|
# row as tuple getter doesn't accept ints. for ints, just
|
|
# use plain python
|
|
|
|
getter = operator.itemgetter(2, 0, 1)
|
|
|
|
# getter = keyed_tuple._parent._row_as_tuple_getter([2, 0, 1])
|
|
eq_(getter(keyed_tuple), (3, 1, 2))
|
|
|
|
def test_attribute_access(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(keyed_tuple.a, 1)
|
|
eq_(keyed_tuple.b, 2)
|
|
|
|
def should_raise():
|
|
keyed_tuple.c
|
|
|
|
assert_raises(AttributeError, should_raise)
|
|
|
|
def test_contains(self):
|
|
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])
|
|
|
|
is_true("x" in keyed_tuple)
|
|
is_false("z" in keyed_tuple)
|
|
|
|
is_true("z" not in keyed_tuple)
|
|
is_false("x" not in keyed_tuple)
|
|
|
|
# we don't do keys
|
|
is_false("a" in keyed_tuple)
|
|
is_false("z" in keyed_tuple)
|
|
is_true("a" not in keyed_tuple)
|
|
is_true("z" not in keyed_tuple)
|
|
|
|
def test_contains_mapping(self):
|
|
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])._mapping
|
|
|
|
is_false("x" in keyed_tuple)
|
|
is_false("z" in keyed_tuple)
|
|
|
|
is_true("z" not in keyed_tuple)
|
|
is_true("x" not in keyed_tuple)
|
|
|
|
# we do keys
|
|
is_true("a" in keyed_tuple)
|
|
is_true("b" in keyed_tuple)
|
|
|
|
def test_none_label(self):
|
|
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
|
|
eq_(str(keyed_tuple), "(1, 2, 3)")
|
|
|
|
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
|
|
eq_(keyed_tuple._fields, ("a", "b"))
|
|
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
|
|
|
|
# attribute access: can't get at value 2
|
|
eq_(keyed_tuple.a, 1)
|
|
eq_(keyed_tuple.b, 3)
|
|
|
|
# index access: can get at value 2
|
|
eq_(keyed_tuple[0], 1)
|
|
eq_(keyed_tuple[1], 2)
|
|
eq_(keyed_tuple[2], 3)
|
|
|
|
def test_duplicate_labels(self):
|
|
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "b"])
|
|
eq_(str(keyed_tuple), "(1, 2, 3)")
|
|
|
|
eq_(list(keyed_tuple._mapping.keys()), ["a", "b", "b"])
|
|
eq_(keyed_tuple._fields, ("a", "b", "b"))
|
|
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
|
|
|
|
# attribute access: can't get at value 2
|
|
eq_(keyed_tuple.a, 1)
|
|
eq_(keyed_tuple.b, 3)
|
|
|
|
# index access: can get at value 2
|
|
eq_(keyed_tuple[0], 1)
|
|
eq_(keyed_tuple[1], 2)
|
|
eq_(keyed_tuple[2], 3)
|
|
|
|
def test_immutable(self):
|
|
keyed_tuple = self._fixture([1, 2], ["a", "b"])
|
|
eq_(str(keyed_tuple), "(1, 2)")
|
|
|
|
eq_(keyed_tuple.a, 1)
|
|
|
|
# eh
|
|
# assert_raises(AttributeError, setattr, keyed_tuple, "a", 5)
|
|
|
|
def should_raise():
|
|
keyed_tuple[0] = 100
|
|
|
|
assert_raises(TypeError, should_raise)
|
|
|
|
def test_serialize(self):
|
|
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
|
|
|
|
for loads, dumps in picklers():
|
|
kt = loads(dumps(keyed_tuple))
|
|
|
|
eq_(str(kt), "(1, 2, 3)")
|
|
|
|
eq_(list(kt._mapping.keys()), ["a", "b"])
|
|
eq_(kt._fields, ("a", "b"))
|
|
eq_(kt._asdict(), {"a": 1, "b": 3})
|
|
|
|
@testing.fixture
|
|
def _load_module(self):
|
|
from sqlalchemy.engine import _row_cy as _cy_row
|
|
|
|
_py_row = load_uncompiled_module(_cy_row)
|
|
|
|
# allow pickle to serialize the two rowproxy_reconstructor functions
|
|
# create a new virtual module
|
|
new_name = _py_row.__name__ + "py_only"
|
|
sys.modules[new_name] = _py_row
|
|
_py_row.__name__ = new_name
|
|
for item in vars(_py_row).values():
|
|
# only the rowproxy_reconstructor module is required to change,
|
|
# but set every one for consistency
|
|
if getattr(item, "__module__", None) == _cy_row.__name__:
|
|
item.__module__ = new_name
|
|
yield _cy_row, _py_row
|
|
sys.modules.pop(new_name)
|
|
|
|
@testing.requires.cextensions
|
|
@testing.variation("direction", ["py_to_cy", "cy_to_py"])
|
|
def test_serialize_cy_py_cy(
|
|
self, direction: testing.Variation, _load_module
|
|
):
|
|
_cy_row, _py_row = _load_module
|
|
|
|
global Row
|
|
|
|
p = result.SimpleResultMetaData(["a", "w", "b"])
|
|
|
|
if direction.py_to_cy:
|
|
dump_cls = _py_row.BaseRow
|
|
load_cls = _cy_row.BaseRow
|
|
elif direction.cy_to_py:
|
|
dump_cls = _cy_row.BaseRow
|
|
load_cls = _py_row.BaseRow
|
|
else:
|
|
direction.fail()
|
|
|
|
for loads, dumps in picklers():
|
|
|
|
class Row(dump_cls):
|
|
pass
|
|
|
|
row = Row(p, p._processors, p._key_to_index, (1, 2, 3))
|
|
|
|
state = dumps(row)
|
|
|
|
class Row(load_cls):
|
|
pass
|
|
|
|
row2 = loads(state)
|
|
is_true(isinstance(row2, load_cls))
|
|
is_false(isinstance(row2, dump_cls))
|
|
state2 = dumps(row2)
|
|
|
|
class Row(dump_cls):
|
|
pass
|
|
|
|
row3 = loads(state2)
|
|
is_true(isinstance(row3, dump_cls))
|
|
|
|
def test_processors(self):
|
|
parent = result.SimpleResultMetaData(["a", "b", "c", "d"])
|
|
data = (1, 99, "42", "foo")
|
|
row_none = result.Row(parent, None, parent._key_to_index, data)
|
|
eq_(row_none._to_tuple_instance(), data)
|
|
row_all_p = result.Row(
|
|
parent, [str, float, int, str.upper], parent._key_to_index, data
|
|
)
|
|
eq_(row_all_p._to_tuple_instance(), ("1", 99.0, 42, "FOO"))
|
|
row_some_p = result.Row(
|
|
parent, [None, str, None, str.upper], parent._key_to_index, data
|
|
)
|
|
eq_(row_some_p._to_tuple_instance(), (1, "99", "42", "FOO"))
|
|
with expect_raises(AssertionError):
|
|
result.Row(parent, [None, str], parent._key_to_index, data)
|
|
|
|
def test_tuplegetter(self):
|
|
data = list(range(10, 20))
|
|
eq_(result.tuplegetter(1)(data), [11])
|
|
eq_(result.tuplegetter(1, 9, 3)(data), (11, 19, 13))
|
|
eq_(result.tuplegetter(2, 3, 4)(data), [12, 13, 14])
|
|
|
|
|
|
class ResultTest(fixtures.TestBase):
|
|
def _fixture(
|
|
self,
|
|
extras=None,
|
|
alt_row=None,
|
|
num_rows=None,
|
|
default_filters=None,
|
|
data=None,
|
|
):
|
|
if data is None:
|
|
data = [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)]
|
|
if num_rows is not None:
|
|
data = data[:num_rows]
|
|
|
|
res = result.IteratorResult(
|
|
result.SimpleResultMetaData(["a", "b", "c"], extra=extras),
|
|
iter(data),
|
|
)
|
|
if default_filters:
|
|
res._metadata._unique_filters = default_filters
|
|
|
|
if alt_row:
|
|
res._process_row = alt_row
|
|
|
|
return res
|
|
|
|
def test_close_attributes(self):
|
|
"""test #8710"""
|
|
r1 = self._fixture()
|
|
|
|
is_false(r1.closed)
|
|
is_false(r1._soft_closed)
|
|
|
|
r1._soft_close()
|
|
is_false(r1.closed)
|
|
is_true(r1._soft_closed)
|
|
|
|
r1.close()
|
|
is_true(r1.closed)
|
|
is_true(r1._soft_closed)
|
|
|
|
def test_class_presented(self):
|
|
"""To support different kinds of objects returned vs. rows,
|
|
there are two wrapper classes for Result.
|
|
"""
|
|
|
|
r1 = self._fixture()
|
|
|
|
r2 = r1.columns(0, 1, 2)
|
|
assert isinstance(r2, result.Result)
|
|
|
|
m1 = r1.mappings()
|
|
assert isinstance(m1, result.MappingResult)
|
|
|
|
s1 = r1.scalars(1)
|
|
assert isinstance(s1, result.ScalarResult)
|
|
|
|
def test_mapping_plus_base(self):
|
|
r1 = self._fixture()
|
|
|
|
m1 = r1.mappings()
|
|
eq_(m1.fetchone(), {"a": 1, "b": 1, "c": 1})
|
|
eq_(r1.fetchone(), (2, 1, 2))
|
|
|
|
@expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
|
|
def test_tuples_plus_base(self):
|
|
r1 = self._fixture()
|
|
|
|
t1 = r1.tuples()
|
|
eq_(t1.fetchone(), (1, 1, 1))
|
|
eq_(r1.fetchone(), (2, 1, 2))
|
|
|
|
def test_scalar_plus_base(self):
|
|
r1 = self._fixture()
|
|
|
|
m1 = r1.scalars()
|
|
|
|
# base is not affected
|
|
eq_(r1.fetchone(), (1, 1, 1))
|
|
|
|
# scalars
|
|
eq_(m1.first(), 2)
|
|
|
|
def test_index_extra(self):
|
|
ex1a, ex1b, ex2, ex3a, ex3b = (
|
|
object(),
|
|
object(),
|
|
object(),
|
|
object(),
|
|
object(),
|
|
)
|
|
|
|
result = self._fixture(
|
|
extras=[
|
|
(ex1a, ex1b),
|
|
(ex2,),
|
|
(
|
|
ex3a,
|
|
ex3b,
|
|
),
|
|
]
|
|
)
|
|
eq_(
|
|
result.columns(ex2, ex3b).columns(ex3a).all(),
|
|
[(1,), (2,), (2,), (2,)],
|
|
)
|
|
|
|
result = self._fixture(
|
|
extras=[
|
|
(ex1a, ex1b),
|
|
(ex2,),
|
|
(
|
|
ex3a,
|
|
ex3b,
|
|
),
|
|
]
|
|
)
|
|
eq_([row._mapping[ex1b] for row in result], [1, 2, 1, 4])
|
|
|
|
result = self._fixture(
|
|
extras=[
|
|
(ex1a, ex1b),
|
|
(ex2,),
|
|
(
|
|
ex3a,
|
|
ex3b,
|
|
),
|
|
]
|
|
)
|
|
eq_(
|
|
[
|
|
dict(r)
|
|
for r in result.columns(ex2, ex3b).columns(ex3a).mappings()
|
|
],
|
|
[{"c": 1}, {"c": 2}, {"c": 2}, {"c": 2}],
|
|
)
|
|
|
|
def test_unique_default_filters(self):
|
|
result = self._fixture(
|
|
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
|
|
)
|
|
|
|
eq_(result.unique().all(), [(1, 1, 1), (1, 3, 2), (4, 1, 2)])
|
|
|
|
def test_unique_default_filters_rearrange_scalar(self):
|
|
result = self._fixture(
|
|
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
|
|
)
|
|
|
|
eq_(result.unique().scalars(1).all(), [1, 3])
|
|
|
|
def test_unique_default_filters_rearrange_order(self):
|
|
result = self._fixture(
|
|
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
|
|
)
|
|
|
|
eq_(
|
|
result.unique().columns("b", "a", "c").all(),
|
|
[(1, 1, 1), (3, 1, 2), (1, 4, 2)],
|
|
)
|
|
|
|
def test_unique_default_filters_rearrange_twice(self):
|
|
# test that the default uniqueness filter is reconfigured
|
|
# each time columns() is called
|
|
result = self._fixture(
|
|
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
|
|
)
|
|
|
|
result = result.unique()
|
|
|
|
# 1, 1, 1 -> True, 1, True
|
|
eq_(result.fetchone(), (1, 1, 1))
|
|
|
|
# now rearrange for b, a, c
|
|
# 1, 2, 2 -> 1, True, True
|
|
# 3, 1, 2 -> 3, True, True
|
|
result = result.columns("b", "a", "c")
|
|
eq_(result.fetchone(), (3, 1, 2))
|
|
|
|
# now rearrange for c, a
|
|
# 2, 4 -> True, False
|
|
result = result.columns("c", "a")
|
|
eq_(result.fetchall(), [(2, 4)])
|
|
|
|
def test_unique_scalars_all(self):
|
|
result = self._fixture()
|
|
|
|
eq_(result.unique().scalars(1).all(), [1, 3])
|
|
|
|
def test_unique_mappings_all(self):
|
|
result = self._fixture()
|
|
|
|
def uniq(row):
|
|
return row[0]
|
|
|
|
eq_(
|
|
result.unique(uniq).mappings().all(),
|
|
[
|
|
{"a": 1, "b": 1, "c": 1},
|
|
{"a": 2, "b": 1, "c": 2},
|
|
{"a": 4, "b": 1, "c": 2},
|
|
],
|
|
)
|
|
|
|
def test_unique_filtered_all(self):
|
|
result = self._fixture()
|
|
|
|
def uniq(row):
|
|
return row[0]
|
|
|
|
eq_(result.unique(uniq).all(), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
|
|
|
|
def test_unique_scalars_many(self):
|
|
result = self._fixture()
|
|
|
|
result = result.unique().scalars(1)
|
|
|
|
eq_(result.fetchmany(2), [1, 3])
|
|
eq_(result.fetchmany(2), [])
|
|
|
|
def test_unique_filtered_many(self):
|
|
result = self._fixture()
|
|
|
|
def uniq(row):
|
|
return row[0]
|
|
|
|
result = result.unique(uniq)
|
|
|
|
eq_(result.fetchmany(2), [(1, 1, 1), (2, 1, 2)])
|
|
eq_(result.fetchmany(2), [(4, 1, 2)])
|
|
eq_(result.fetchmany(2), [])
|
|
|
|
def test_unique_scalars_many_none(self):
|
|
result = self._fixture()
|
|
|
|
result = result.unique().scalars(1)
|
|
|
|
# this assumes the default fetchmany behavior of all() for
|
|
# the ListFetchStrategy
|
|
eq_(result.fetchmany(None), [1, 3])
|
|
eq_(result.fetchmany(None), [])
|
|
|
|
def test_unique_scalars_iterate(self):
|
|
result = self._fixture()
|
|
|
|
result = result.unique().scalars(1)
|
|
|
|
eq_(list(result), [1, 3])
|
|
|
|
def test_unique_filtered_iterate(self):
|
|
result = self._fixture()
|
|
|
|
def uniq(row):
|
|
return row[0]
|
|
|
|
result = result.unique(uniq)
|
|
|
|
eq_(list(result), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
|
|
|
|
def test_all(self):
|
|
result = self._fixture()
|
|
|
|
eq_(result.all(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
|
|
|
|
eq_(result.all(), [])
|
|
|
|
def test_many_then_all(self):
|
|
result = self._fixture()
|
|
|
|
eq_(result.fetchmany(3), [(1, 1, 1), (2, 1, 2), (1, 3, 2)])
|
|
eq_(result.all(), [(4, 1, 2)])
|
|
|
|
eq_(result.all(), [])
|
|
|
|
def test_scalars(self):
|
|
result = self._fixture()
|
|
|
|
eq_(list(result.scalars()), [1, 2, 1, 4])
|
|
|
|
result = self._fixture()
|
|
|
|
eq_(list(result.scalars(2)), [1, 2, 2, 2])
|
|
|
|
def test_scalars_mappings(self):
|
|
result = self._fixture()
|
|
|
|
eq_(
|
|
list(result.columns(0).mappings()),
|
|
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 4}],
|
|
)
|
|
|
|
def test_scalars_no_fetchone(self):
|
|
result = self._fixture()
|
|
|
|
s = result.scalars()
|
|
|
|
assert not hasattr(s, "fetchone")
|
|
|
|
# original result is unchanged
|
|
eq_(result.mappings().fetchone(), {"a": 1, "b": 1, "c": 1})
|
|
|
|
# scalars
|
|
eq_(s.all(), [2, 1, 4])
|
|
|
|
def test_first(self):
|
|
result = self._fixture()
|
|
|
|
row = result.first()
|
|
eq_(row, (1, 1, 1))
|
|
|
|
# note this is a behavior change in 1.4.27 due to
|
|
# adding a real result.close() to Result, previously this would
|
|
# return an empty list. this is already the
|
|
# behavior with CursorResult, but was mis-implemented for
|
|
# other non-cursor result sets.
|
|
assert_raises(exc.ResourceClosedError, result.all)
|
|
|
|
def test_one_unique(self):
|
|
# assert that one() counts rows after uniqueness has been applied.
|
|
# this would raise if we didn't have unique
|
|
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
|
|
|
|
row = result.unique().one()
|
|
eq_(row, (1, 1, 1))
|
|
|
|
def test_one_unique_tricky_one(self):
|
|
# one() needs to keep consuming rows in order to find a non-unique
|
|
# one. unique() really slows things down
|
|
result = self._fixture(
|
|
data=[(1, 1, 1), (1, 1, 1), (1, 1, 1), (2, 1, 1)]
|
|
)
|
|
|
|
assert_raises(exc.MultipleResultsFound, result.unique().one)
|
|
|
|
def test_one_unique_mapping(self):
|
|
# assert that one() counts rows after uniqueness has been applied.
|
|
# this would raise if we didn't have unique
|
|
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
|
|
|
|
row = result.mappings().unique().one()
|
|
eq_(row, {"a": 1, "b": 1, "c": 1})
|
|
|
|
def test_one_mapping(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
row = result.mappings().one()
|
|
eq_(row, {"a": 1, "b": 1, "c": 1})
|
|
|
|
def test_one(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
row = result.one()
|
|
eq_(row, (1, 1, 1))
|
|
|
|
def test_scalar_one(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
row = result.scalar_one()
|
|
eq_(row, 1)
|
|
|
|
def test_scalars_plus_one(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
row = result.scalars().one()
|
|
eq_(row, 1)
|
|
|
|
def test_scalars_plus_one_none(self):
|
|
result = self._fixture(num_rows=0)
|
|
|
|
result = result.scalars()
|
|
assert_raises_message(
|
|
exc.NoResultFound,
|
|
"No row was found when one was required",
|
|
result.one,
|
|
)
|
|
|
|
def test_one_none(self):
|
|
result = self._fixture(num_rows=0)
|
|
|
|
assert_raises_message(
|
|
exc.NoResultFound,
|
|
"No row was found when one was required",
|
|
result.one,
|
|
)
|
|
|
|
def test_one_or_none(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
eq_(result.one_or_none(), (1, 1, 1))
|
|
|
|
def test_scalar_one_or_none(self):
|
|
result = self._fixture(num_rows=1)
|
|
|
|
eq_(result.scalar_one_or_none(), 1)
|
|
|
|
def test_scalar_one_or_none_none(self):
|
|
result = self._fixture(num_rows=0)
|
|
|
|
eq_(result.scalar_one_or_none(), None)
|
|
|
|
def test_one_or_none_none(self):
|
|
result = self._fixture(num_rows=0)
|
|
|
|
eq_(result.one_or_none(), None)
|
|
|
|
def test_one_raise_mutiple(self):
|
|
result = self._fixture(num_rows=2)
|
|
|
|
assert_raises_message(
|
|
exc.MultipleResultsFound,
|
|
"Multiple rows were found when exactly one was required",
|
|
result.one,
|
|
)
|
|
|
|
def test_one_or_none_raise_multiple(self):
|
|
result = self._fixture(num_rows=2)
|
|
|
|
assert_raises_message(
|
|
exc.MultipleResultsFound,
|
|
"Multiple rows were found when one or none was required",
|
|
result.one_or_none,
|
|
)
|
|
|
|
def test_scalar(self):
|
|
result = self._fixture()
|
|
|
|
eq_(result.scalar(), 1)
|
|
|
|
# note this is a behavior change in 1.4.27 due to
|
|
# adding a real result.close() to Result, previously this would
|
|
# return an empty list. this is already the
|
|
# behavior with CursorResult, but was mis-implemented for
|
|
# other non-cursor result sets.
|
|
assert_raises(exc.ResourceClosedError, result.all)
|
|
|
|
def test_partition(self):
|
|
result = self._fixture()
|
|
|
|
r = []
|
|
for partition in result.partitions(2):
|
|
r.append(list(partition))
|
|
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
|
|
|
|
eq_(result.all(), [])
|
|
|
|
def test_partition_unique_yield_per(self):
|
|
result = self._fixture()
|
|
|
|
r = []
|
|
for partition in result.unique().yield_per(2).partitions():
|
|
r.append(list(partition))
|
|
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
|
|
|
|
eq_(result.all(), [])
|
|
|
|
def test_partition_yield_per(self):
|
|
result = self._fixture()
|
|
|
|
r = []
|
|
for partition in result.yield_per(2).partitions():
|
|
r.append(list(partition))
|
|
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
|
|
|
|
eq_(result.all(), [])
|
|
|
|
def test_columns(self):
|
|
result = self._fixture()
|
|
|
|
result = result.columns("b", "c")
|
|
eq_(result.keys(), ["b", "c"])
|
|
eq_(result.all(), [(1, 1), (1, 2), (3, 2), (1, 2)])
|
|
|
|
def test_columns_ints(self):
|
|
result = self._fixture()
|
|
|
|
eq_(result.columns(1, -2).all(), [(1, 1), (1, 1), (3, 3), (1, 1)])
|
|
|
|
def test_columns_again(self):
|
|
result = self._fixture()
|
|
|
|
eq_(
|
|
result.columns("b", "c", "a").columns(1, 2).all(),
|
|
[(1, 1), (2, 2), (2, 1), (2, 4)],
|
|
)
|
|
|
|
def test_mappings(self):
|
|
result = self._fixture()
|
|
|
|
eq_(
|
|
[dict(r) for r in result.mappings()],
|
|
[
|
|
{"a": 1, "b": 1, "c": 1},
|
|
{"a": 2, "b": 1, "c": 2},
|
|
{"a": 1, "b": 3, "c": 2},
|
|
{"a": 4, "b": 1, "c": 2},
|
|
],
|
|
)
|
|
|
|
def test_columns_with_mappings(self):
|
|
result = self._fixture()
|
|
eq_(
|
|
[dict(r) for r in result.columns("b", "c").mappings().all()],
|
|
[
|
|
{"b": 1, "c": 1},
|
|
{"b": 1, "c": 2},
|
|
{"b": 3, "c": 2},
|
|
{"b": 1, "c": 2},
|
|
],
|
|
)
|
|
|
|
def test_mappings_with_columns(self):
|
|
result = self._fixture()
|
|
|
|
m1 = result.mappings().columns("b", "c")
|
|
|
|
eq_(m1.fetchmany(2), [{"b": 1, "c": 1}, {"b": 1, "c": 2}])
|
|
|
|
# no slice here
|
|
eq_(result.fetchone(), (1, 3, 2))
|
|
|
|
# still slices
|
|
eq_(m1.fetchone(), {"b": 1, "c": 2})
|
|
|
|
def test_scalar_none_iterate(self):
|
|
result = self._fixture(
|
|
data=[
|
|
(1, None, 2),
|
|
(3, 4, 5),
|
|
(3, None, 5),
|
|
(3, None, 5),
|
|
(3, 4, 5),
|
|
]
|
|
)
|
|
|
|
result = result.scalars(1)
|
|
eq_(list(result), [None, 4, None, None, 4])
|
|
|
|
def test_scalar_none_many(self):
|
|
result = self._fixture(
|
|
data=[
|
|
(1, None, 2),
|
|
(3, 4, 5),
|
|
(3, None, 5),
|
|
(3, None, 5),
|
|
(3, 4, 5),
|
|
]
|
|
)
|
|
|
|
result = result.scalars(1)
|
|
|
|
eq_(result.fetchmany(3), [None, 4, None])
|
|
eq_(result.fetchmany(5), [None, 4])
|
|
|
|
def test_scalar_none_all(self):
|
|
result = self._fixture(
|
|
data=[
|
|
(1, None, 2),
|
|
(3, 4, 5),
|
|
(3, None, 5),
|
|
(3, None, 5),
|
|
(3, 4, 5),
|
|
]
|
|
)
|
|
|
|
result = result.scalars(1)
|
|
eq_(result.all(), [None, 4, None, None, 4])
|
|
|
|
def test_scalar_none_all_unique(self):
|
|
result = self._fixture(
|
|
data=[
|
|
(1, None, 2),
|
|
(3, 4, 5),
|
|
(3, None, 5),
|
|
(3, None, 5),
|
|
(3, 4, 5),
|
|
]
|
|
)
|
|
|
|
result = result.scalars(1).unique()
|
|
eq_(result.all(), [None, 4])
|
|
|
|
def test_scalar_only_on_filter_w_unique(self):
|
|
# test a mixture of the "real" result and the
|
|
# scalar filter, where scalar has unique and real result does not.
|
|
|
|
# this is new as of [ticket:5503] where we have created
|
|
# ScalarResult / MappingResult "filters" that don't modify
|
|
# the Result
|
|
result = self._fixture(
|
|
data=[
|
|
(1, 1, 2),
|
|
(3, 4, 5),
|
|
(1, 1, 2),
|
|
(3, None, 5),
|
|
(3, 4, 5),
|
|
(3, None, 5),
|
|
]
|
|
)
|
|
|
|
# result is non-unique. u_s is unique on column 0
|
|
u_s = result.scalars(0).unique()
|
|
|
|
eq_(next(u_s), 1) # unique value 1 from first row
|
|
eq_(next(result), (3, 4, 5)) # second row
|
|
eq_(next(u_s), 3) # skip third row, return 3 for fourth row
|
|
eq_(next(result), (3, 4, 5)) # non-unique fifth row
|
|
eq_(u_s.all(), []) # unique set is done because only 3 is left
|
|
|
|
def test_scalar_none_one_w_unique(self):
|
|
result = self._fixture(data=[(1, None, 2)])
|
|
|
|
result = result.scalars(1).unique()
|
|
|
|
# one is returning None, see?
|
|
eq_(result.one(), None)
|
|
|
|
def test_scalar_none_one_or_none_w_unique(self):
|
|
result = self._fixture(data=[(1, None, 2)])
|
|
|
|
result = result.scalars(1).unique()
|
|
|
|
# the orm.Query can actually do this right now, so we sort of
|
|
# have to allow for this unforuntately, unless we want to raise?
|
|
eq_(result.one_or_none(), None)
|
|
|
|
def test_scalar_one_w_unique(self):
|
|
result = self._fixture(data=[(1, None, 2)])
|
|
|
|
result = result.unique()
|
|
|
|
eq_(result.scalar_one(), 1)
|
|
|
|
def test_scalars_one_w_unique(self):
|
|
result = self._fixture(data=[(1, None, 2)])
|
|
|
|
result = result.unique()
|
|
|
|
eq_(result.scalars().one(), 1)
|
|
|
|
def test_scalar_none_first(self):
|
|
result = self._fixture(data=[(1, None, 2)])
|
|
|
|
result = result.scalars(1).unique()
|
|
eq_(result.first(), None)
|
|
|
|
def test_freeze(self):
|
|
result = self._fixture()
|
|
|
|
frozen = result.freeze()
|
|
|
|
r1 = frozen()
|
|
eq_(r1.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
|
|
eq_(r1.fetchall(), [])
|
|
|
|
r2 = frozen()
|
|
eq_(r1.fetchall(), [])
|
|
eq_(r2.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
|
|
eq_(r2.fetchall(), [])
|
|
|
|
def test_columns_unique_freeze(self):
|
|
result = self._fixture()
|
|
|
|
result = result.columns("b", "c").unique()
|
|
|
|
frozen = result.freeze()
|
|
|
|
r1 = frozen()
|
|
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2)])
|
|
|
|
def test_columns_freeze(self):
|
|
result = self._fixture()
|
|
|
|
result = result.columns("b", "c")
|
|
|
|
frozen = result.freeze()
|
|
|
|
r1 = frozen()
|
|
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2), (1, 2)])
|
|
|
|
r2 = frozen().unique()
|
|
eq_(r2.fetchall(), [(1, 1), (1, 2), (3, 2)])
|
|
|
|
def test_scalars_freeze(self):
|
|
result = self._fixture()
|
|
|
|
frozen = result.freeze()
|
|
|
|
r1 = frozen()
|
|
eq_(r1.scalars(1).fetchall(), [1, 1, 3, 1])
|
|
|
|
r2 = frozen().scalars(1).unique()
|
|
eq_(r2.fetchall(), [1, 3])
|
|
|
|
|
|
class MergeResultTest(fixtures.TestBase):
|
|
@testing.fixture
|
|
def merge_fixture(self):
|
|
r1 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["user_id", "user_name"]),
|
|
iter([(7, "u1"), (8, "u2")]),
|
|
)
|
|
r2 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["user_id", "user_name"]),
|
|
iter([(9, "u3")]),
|
|
)
|
|
r3 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["user_id", "user_name"]),
|
|
iter([(10, "u4"), (11, "u5")]),
|
|
)
|
|
r4 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["user_id", "user_name"]),
|
|
iter([(12, "u6")]),
|
|
)
|
|
|
|
return r1, r2, r3, r4
|
|
|
|
@testing.fixture
|
|
def dupe_fixture(self):
|
|
r1 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["x", "y", "z"]),
|
|
iter([(1, 2, 1), (2, 2, 1)]),
|
|
)
|
|
r2 = result.IteratorResult(
|
|
result.SimpleResultMetaData(["x", "y", "z"]),
|
|
iter([(3, 1, 2), (3, 3, 3)]),
|
|
)
|
|
|
|
return r1, r2
|
|
|
|
def test_merge_results(self, merge_fixture):
|
|
r1, r2, r3, r4 = merge_fixture
|
|
|
|
result = r1.merge(r2, r3, r4)
|
|
|
|
eq_(result.keys(), ["user_id", "user_name"])
|
|
row = result.fetchone()
|
|
eq_(row, (7, "u1"))
|
|
result.close()
|
|
|
|
def test_fetchall(self, merge_fixture):
|
|
r1, r2, r3, r4 = merge_fixture
|
|
|
|
result = r1.merge(r2, r3, r4)
|
|
eq_(
|
|
result.fetchall(),
|
|
[
|
|
(7, "u1"),
|
|
(8, "u2"),
|
|
(9, "u3"),
|
|
(10, "u4"),
|
|
(11, "u5"),
|
|
(12, "u6"),
|
|
],
|
|
)
|
|
|
|
def test_first(self, merge_fixture):
|
|
r1, r2, r3, r4 = merge_fixture
|
|
|
|
result = r1.merge(r2, r3, r4)
|
|
eq_(
|
|
result.first(),
|
|
(7, "u1"),
|
|
)
|
|
|
|
def test_columns(self, merge_fixture):
|
|
r1, r2, r3, r4 = merge_fixture
|
|
|
|
result = r1.merge(r2, r3, r4)
|
|
eq_(
|
|
result.columns("user_name").fetchmany(4),
|
|
[("u1",), ("u2",), ("u3",), ("u4",)],
|
|
)
|
|
result.close()
|
|
|
|
def test_merge_scalars(self, merge_fixture):
|
|
r1, r2, r3, r4 = merge_fixture
|
|
|
|
for r in (r1, r2, r3, r4):
|
|
r.scalars(0)
|
|
|
|
result = r1.merge(r2, r3, r4)
|
|
|
|
eq_(result.scalars(0).all(), [7, 8, 9, 10, 11, 12])
|
|
|
|
def test_merge_unique(self, dupe_fixture):
|
|
r1, r2 = dupe_fixture
|
|
|
|
r1.scalars("y")
|
|
r2.scalars("y")
|
|
result = r1.merge(r2)
|
|
|
|
# uniqued 2, 2, 1, 3
|
|
eq_(result.scalars("y").unique().all(), [2, 1, 3])
|
|
|
|
def test_merge_preserve_unique(self, dupe_fixture):
|
|
r1, r2 = dupe_fixture
|
|
|
|
r1.unique().scalars("y")
|
|
r2.unique().scalars("y")
|
|
result = r1.merge(r2)
|
|
|
|
# unique takes place
|
|
eq_(result.scalars("y").all(), [2, 1, 3])
|
|
|
|
|
|
class OnlyScalarsTest(fixtures.TestBase):
|
|
"""the chunkediterator supports "non tuple mode", where we bypass
|
|
the expense of generating rows when we have only scalar values.
|
|
|
|
"""
|
|
|
|
@testing.fixture
|
|
def no_tuple_fixture(self):
|
|
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
|
|
|
|
def chunks(num):
|
|
while data:
|
|
rows = data[0:num]
|
|
data[:] = []
|
|
|
|
yield [row[0] for row in rows]
|
|
|
|
return chunks
|
|
|
|
@testing.fixture
|
|
def no_tuple_one_fixture(self):
|
|
data = [(1, 1, 1)]
|
|
|
|
def chunks(num):
|
|
while data:
|
|
rows = data[0:num]
|
|
data[:] = []
|
|
|
|
yield [row[0] for row in rows]
|
|
|
|
return chunks
|
|
|
|
@testing.fixture
|
|
def normal_fixture(self):
|
|
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
|
|
|
|
def chunks(num):
|
|
while data:
|
|
rows = data[0:num]
|
|
data[:] = []
|
|
|
|
yield [row[0] for row in rows]
|
|
|
|
return chunks
|
|
|
|
def test_scalar_mode_columns0_mapping(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.columns(0).mappings()
|
|
eq_(
|
|
list(r),
|
|
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 1}, {"a": 4}],
|
|
)
|
|
|
|
def test_scalar_mode_columns0_plain(self, no_tuple_fixture):
|
|
"""test #7953"""
|
|
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.columns(0)
|
|
eq_(
|
|
list(r),
|
|
[(1,), (2,), (1,), (1,), (4,)],
|
|
)
|
|
|
|
def test_scalar_mode_scalars0(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.scalars(0)
|
|
eq_(
|
|
list(r),
|
|
[1, 2, 1, 1, 4],
|
|
)
|
|
|
|
def test_scalar_mode_but_accessed_nonscalar_result(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
s1 = r.scalars()
|
|
|
|
eq_(r.fetchone(), (1,))
|
|
|
|
eq_(s1.all(), [2, 1, 1, 4])
|
|
|
|
def test_scalar_mode_scalars_all(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.scalars()
|
|
|
|
eq_(r.all(), [1, 2, 1, 1, 4])
|
|
|
|
def test_scalar_mode_mfiltered_unique_rows_all(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(
|
|
["a", "b", "c"], _unique_filters=[int]
|
|
)
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata,
|
|
no_tuple_fixture,
|
|
source_supports_scalars=True,
|
|
)
|
|
|
|
r = r.unique()
|
|
|
|
eq_(r.all(), [(1,), (2,), (4,)])
|
|
|
|
@testing.combinations(
|
|
lambda r: r.scalar(),
|
|
lambda r: r.scalar_one(),
|
|
lambda r: r.scalar_one_or_none(),
|
|
argnames="get",
|
|
)
|
|
def test_unique_scalar_accessors(self, no_tuple_one_fixture, get):
|
|
metadata = result.SimpleResultMetaData(
|
|
["a", "b", "c"], _unique_filters=[int]
|
|
)
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata,
|
|
no_tuple_one_fixture,
|
|
source_supports_scalars=True,
|
|
)
|
|
|
|
r = r.unique()
|
|
|
|
eq_(get(r), 1)
|
|
|
|
def test_scalar_mode_mfiltered_unique_mappings_all(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(
|
|
["a", "b", "c"], _unique_filters=[int]
|
|
)
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata,
|
|
no_tuple_fixture,
|
|
source_supports_scalars=True,
|
|
)
|
|
|
|
r = r.unique()
|
|
|
|
eq_(r.mappings().all(), [{"a": 1}, {"a": 2}, {"a": 4}])
|
|
|
|
def test_scalar_mode_mfiltered_unique_scalars_all(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(
|
|
["a", "b", "c"], _unique_filters=[int]
|
|
)
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata,
|
|
no_tuple_fixture,
|
|
source_supports_scalars=True,
|
|
)
|
|
|
|
r = r.scalars().unique()
|
|
|
|
eq_(r.all(), [1, 2, 4])
|
|
|
|
def test_scalar_mode_unique_scalars_all(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.unique().scalars()
|
|
|
|
eq_(r.all(), [1, 2, 4])
|
|
|
|
def test_scalar_mode_scalars_fetchmany(self, normal_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, normal_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.scalars()
|
|
eq_(list(r.partitions(2)), [[1, 2], [1, 1], [4]])
|
|
|
|
def test_scalar_mode_unique_scalars_fetchmany(self, normal_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, normal_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.scalars().unique()
|
|
eq_(list(r.partitions(2)), [[1, 2], [4]])
|
|
|
|
def test_scalar_mode_unique_tuples_all(self, normal_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, normal_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.unique()
|
|
|
|
eq_(r.all(), [(1,), (2,), (4,)])
|
|
|
|
def test_scalar_mode_tuples_all(self, normal_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, normal_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
eq_(r.all(), [(1,), (2,), (1,), (1,), (4,)])
|
|
|
|
def test_scalar_mode_scalars_iterate(self, no_tuple_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
r = r.scalars()
|
|
|
|
eq_(list(r), [1, 2, 1, 1, 4])
|
|
|
|
def test_scalar_mode_tuples_iterate(self, normal_fixture):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, normal_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
eq_(list(r), [(1,), (2,), (1,), (1,), (4,)])
|
|
|
|
@testing.combinations(
|
|
lambda r: r.one(),
|
|
lambda r: r.first(),
|
|
lambda r: r.one_or_none(),
|
|
argnames="get",
|
|
)
|
|
def test_scalar_mode_first(self, no_tuple_one_fixture, get):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_one_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
eq_(get(r), (1,))
|
|
|
|
@testing.combinations(
|
|
lambda r: r.scalar(),
|
|
lambda r: r.scalar_one(),
|
|
lambda r: r.scalar_one_or_none(),
|
|
argnames="get",
|
|
)
|
|
def test_scalar_mode_scalar_one(self, no_tuple_one_fixture, get):
|
|
metadata = result.SimpleResultMetaData(["a", "b", "c"])
|
|
|
|
r = result.ChunkedIteratorResult(
|
|
metadata, no_tuple_one_fixture, source_supports_scalars=True
|
|
)
|
|
|
|
eq_(get(r), 1)
|