Files
Federico Caselli 0be89aaa38 Switch to cython pure python mode
Replaces the pyx files with py files that can be both compiled
by cython or imported as is by python.
This avoids the need of duplicating the code to have a python
only fallback.
The cython files are also reorganized to be in the module they use
instead of all being in the cyextension package, that has been
removed.

The performance is pretty much equal between main and this change.
A detailed comparison is at this link
https://docs.google.com/spreadsheets/d/1jkmGpnCyEcPyy6aRK9alElGjxlNHu44Wxjr4VrD99so/edit?usp=sharing

Change-Id: Iaed232ea5dfb41534cc9f58f6ea2f912a93263af
2024-03-25 23:44:27 +01:00

1373 lines
38 KiB
Python

import operator
import sys
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.engine import result
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.util import picklers
from sqlalchemy.util import compat
from sqlalchemy.util.langhelpers import load_uncompiled_module
class ResultTupleTest(fixtures.TestBase):
def _fixture(self, values, labels):
return result.result_tuple(labels)(values)
def test_empty(self):
keyed_tuple = self._fixture([], [])
eq_(str(keyed_tuple), "()")
eq_(len(keyed_tuple), 0)
eq_(list(keyed_tuple._mapping.keys()), [])
eq_(keyed_tuple._fields, ())
eq_(keyed_tuple._asdict(), {})
def test_values_none_labels(self):
keyed_tuple = self._fixture([1, 2], [None, None])
eq_(str(keyed_tuple), "(1, 2)")
eq_(len(keyed_tuple), 2)
eq_(list(keyed_tuple._mapping.keys()), [])
eq_(keyed_tuple._fields, ())
eq_(keyed_tuple._asdict(), {})
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
def test_creation(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(str(keyed_tuple), "(1, 2)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
eq_(keyed_tuple._fields, ("a", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 2})
def test_index_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
def should_raise():
keyed_tuple[2]
assert_raises(IndexError, should_raise)
def test_negative_index_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[-1], 2)
eq_(keyed_tuple[-2:-1], (1,))
def test_slice_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple[0:2], (1, 2))
def test_slices_arent_in_mappings(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
if compat.py312:
with expect_raises(KeyError):
keyed_tuple._mapping[0:2]
else:
with expect_raises(TypeError):
keyed_tuple._mapping[0:2]
def test_integers_arent_in_mappings(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
assert_raises(KeyError, lambda: keyed_tuple._mapping[1])
def test_getter(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
getter = keyed_tuple._parent._getter("b")
eq_(getter(keyed_tuple), 2)
getter = keyed_tuple._parent._getter(2)
eq_(getter(keyed_tuple), 3)
def test_tuple_getter(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "c"])
getter = keyed_tuple._parent._row_as_tuple_getter(["b", "c"])
eq_(getter(keyed_tuple), (2, 3))
# row as tuple getter doesn't accept ints. for ints, just
# use plain python
getter = operator.itemgetter(2, 0, 1)
# getter = keyed_tuple._parent._row_as_tuple_getter([2, 0, 1])
eq_(getter(keyed_tuple), (3, 1, 2))
def test_attribute_access(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 2)
def should_raise():
keyed_tuple.c
assert_raises(AttributeError, should_raise)
def test_contains(self):
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])
is_true("x" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("z" not in keyed_tuple)
is_false("x" not in keyed_tuple)
# we don't do keys
is_false("a" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("a" not in keyed_tuple)
is_true("z" not in keyed_tuple)
def test_contains_mapping(self):
keyed_tuple = self._fixture(["x", "y"], ["a", "b"])._mapping
is_false("x" in keyed_tuple)
is_false("z" in keyed_tuple)
is_true("z" not in keyed_tuple)
is_true("x" not in keyed_tuple)
# we do keys
is_true("a" in keyed_tuple)
is_true("b" in keyed_tuple)
def test_none_label(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
eq_(str(keyed_tuple), "(1, 2, 3)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b"])
eq_(keyed_tuple._fields, ("a", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
# attribute access: can't get at value 2
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 3)
# index access: can get at value 2
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
eq_(keyed_tuple[2], 3)
def test_duplicate_labels(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", "b", "b"])
eq_(str(keyed_tuple), "(1, 2, 3)")
eq_(list(keyed_tuple._mapping.keys()), ["a", "b", "b"])
eq_(keyed_tuple._fields, ("a", "b", "b"))
eq_(keyed_tuple._asdict(), {"a": 1, "b": 3})
# attribute access: can't get at value 2
eq_(keyed_tuple.a, 1)
eq_(keyed_tuple.b, 3)
# index access: can get at value 2
eq_(keyed_tuple[0], 1)
eq_(keyed_tuple[1], 2)
eq_(keyed_tuple[2], 3)
def test_immutable(self):
keyed_tuple = self._fixture([1, 2], ["a", "b"])
eq_(str(keyed_tuple), "(1, 2)")
eq_(keyed_tuple.a, 1)
# eh
# assert_raises(AttributeError, setattr, keyed_tuple, "a", 5)
def should_raise():
keyed_tuple[0] = 100
assert_raises(TypeError, should_raise)
def test_serialize(self):
keyed_tuple = self._fixture([1, 2, 3], ["a", None, "b"])
for loads, dumps in picklers():
kt = loads(dumps(keyed_tuple))
eq_(str(kt), "(1, 2, 3)")
eq_(list(kt._mapping.keys()), ["a", "b"])
eq_(kt._fields, ("a", "b"))
eq_(kt._asdict(), {"a": 1, "b": 3})
@testing.fixture
def _load_module(self):
from sqlalchemy.engine import _row_cy as _cy_row
_py_row = load_uncompiled_module(_cy_row)
# allow pickle to serialize the two rowproxy_reconstructor functions
# create a new virtual module
new_name = _py_row.__name__ + "py_only"
sys.modules[new_name] = _py_row
_py_row.__name__ = new_name
for item in vars(_py_row).values():
# only the rowproxy_reconstructor module is required to change,
# but set every one for consistency
if getattr(item, "__module__", None) == _cy_row.__name__:
item.__module__ = new_name
yield _cy_row, _py_row
sys.modules.pop(new_name)
@testing.requires.cextensions
@testing.variation("direction", ["py_to_cy", "cy_to_py"])
def test_serialize_cy_py_cy(
self, direction: testing.Variation, _load_module
):
_cy_row, _py_row = _load_module
global Row
p = result.SimpleResultMetaData(["a", "w", "b"])
if direction.py_to_cy:
dump_cls = _py_row.BaseRow
load_cls = _cy_row.BaseRow
elif direction.cy_to_py:
dump_cls = _cy_row.BaseRow
load_cls = _py_row.BaseRow
else:
direction.fail()
for loads, dumps in picklers():
class Row(dump_cls):
pass
row = Row(p, p._processors, p._key_to_index, (1, 2, 3))
state = dumps(row)
class Row(load_cls):
pass
row2 = loads(state)
is_true(isinstance(row2, load_cls))
is_false(isinstance(row2, dump_cls))
state2 = dumps(row2)
class Row(dump_cls):
pass
row3 = loads(state2)
is_true(isinstance(row3, dump_cls))
def test_processors(self):
parent = result.SimpleResultMetaData(["a", "b", "c", "d"])
data = (1, 99, "42", "foo")
row_none = result.Row(parent, None, parent._key_to_index, data)
eq_(row_none._to_tuple_instance(), data)
row_all_p = result.Row(
parent, [str, float, int, str.upper], parent._key_to_index, data
)
eq_(row_all_p._to_tuple_instance(), ("1", 99.0, 42, "FOO"))
row_some_p = result.Row(
parent, [None, str, None, str.upper], parent._key_to_index, data
)
eq_(row_some_p._to_tuple_instance(), (1, "99", "42", "FOO"))
with expect_raises(AssertionError):
result.Row(parent, [None, str], parent._key_to_index, data)
def test_tuplegetter(self):
data = list(range(10, 20))
eq_(result.tuplegetter(1)(data), [11])
eq_(result.tuplegetter(1, 9, 3)(data), (11, 19, 13))
eq_(result.tuplegetter(2, 3, 4)(data), [12, 13, 14])
class ResultTest(fixtures.TestBase):
def _fixture(
self,
extras=None,
alt_row=None,
num_rows=None,
default_filters=None,
data=None,
):
if data is None:
data = [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)]
if num_rows is not None:
data = data[:num_rows]
res = result.IteratorResult(
result.SimpleResultMetaData(["a", "b", "c"], extra=extras),
iter(data),
)
if default_filters:
res._metadata._unique_filters = default_filters
if alt_row:
res._process_row = alt_row
return res
def test_close_attributes(self):
"""test #8710"""
r1 = self._fixture()
is_false(r1.closed)
is_false(r1._soft_closed)
r1._soft_close()
is_false(r1.closed)
is_true(r1._soft_closed)
r1.close()
is_true(r1.closed)
is_true(r1._soft_closed)
def test_class_presented(self):
"""To support different kinds of objects returned vs. rows,
there are two wrapper classes for Result.
"""
r1 = self._fixture()
r2 = r1.columns(0, 1, 2)
assert isinstance(r2, result.Result)
m1 = r1.mappings()
assert isinstance(m1, result.MappingResult)
s1 = r1.scalars(1)
assert isinstance(s1, result.ScalarResult)
def test_mapping_plus_base(self):
r1 = self._fixture()
m1 = r1.mappings()
eq_(m1.fetchone(), {"a": 1, "b": 1, "c": 1})
eq_(r1.fetchone(), (2, 1, 2))
@expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_tuples_plus_base(self):
r1 = self._fixture()
t1 = r1.tuples()
eq_(t1.fetchone(), (1, 1, 1))
eq_(r1.fetchone(), (2, 1, 2))
def test_scalar_plus_base(self):
r1 = self._fixture()
m1 = r1.scalars()
# base is not affected
eq_(r1.fetchone(), (1, 1, 1))
# scalars
eq_(m1.first(), 2)
def test_index_extra(self):
ex1a, ex1b, ex2, ex3a, ex3b = (
object(),
object(),
object(),
object(),
object(),
)
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_(
result.columns(ex2, ex3b).columns(ex3a).all(),
[(1,), (2,), (2,), (2,)],
)
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_([row._mapping[ex1b] for row in result], [1, 2, 1, 4])
result = self._fixture(
extras=[
(ex1a, ex1b),
(ex2,),
(
ex3a,
ex3b,
),
]
)
eq_(
[
dict(r)
for r in result.columns(ex2, ex3b).columns(ex3a).mappings()
],
[{"c": 1}, {"c": 2}, {"c": 2}, {"c": 2}],
)
def test_unique_default_filters(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(result.unique().all(), [(1, 1, 1), (1, 3, 2), (4, 1, 2)])
def test_unique_default_filters_rearrange_scalar(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(result.unique().scalars(1).all(), [1, 3])
def test_unique_default_filters_rearrange_order(self):
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
eq_(
result.unique().columns("b", "a", "c").all(),
[(1, 1, 1), (3, 1, 2), (1, 4, 2)],
)
def test_unique_default_filters_rearrange_twice(self):
# test that the default uniqueness filter is reconfigured
# each time columns() is called
result = self._fixture(
default_filters=[lambda x: x < 4, lambda x: x, lambda x: True]
)
result = result.unique()
# 1, 1, 1 -> True, 1, True
eq_(result.fetchone(), (1, 1, 1))
# now rearrange for b, a, c
# 1, 2, 2 -> 1, True, True
# 3, 1, 2 -> 3, True, True
result = result.columns("b", "a", "c")
eq_(result.fetchone(), (3, 1, 2))
# now rearrange for c, a
# 2, 4 -> True, False
result = result.columns("c", "a")
eq_(result.fetchall(), [(2, 4)])
def test_unique_scalars_all(self):
result = self._fixture()
eq_(result.unique().scalars(1).all(), [1, 3])
def test_unique_mappings_all(self):
result = self._fixture()
def uniq(row):
return row[0]
eq_(
result.unique(uniq).mappings().all(),
[
{"a": 1, "b": 1, "c": 1},
{"a": 2, "b": 1, "c": 2},
{"a": 4, "b": 1, "c": 2},
],
)
def test_unique_filtered_all(self):
result = self._fixture()
def uniq(row):
return row[0]
eq_(result.unique(uniq).all(), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
def test_unique_scalars_many(self):
result = self._fixture()
result = result.unique().scalars(1)
eq_(result.fetchmany(2), [1, 3])
eq_(result.fetchmany(2), [])
def test_unique_filtered_many(self):
result = self._fixture()
def uniq(row):
return row[0]
result = result.unique(uniq)
eq_(result.fetchmany(2), [(1, 1, 1), (2, 1, 2)])
eq_(result.fetchmany(2), [(4, 1, 2)])
eq_(result.fetchmany(2), [])
def test_unique_scalars_many_none(self):
result = self._fixture()
result = result.unique().scalars(1)
# this assumes the default fetchmany behavior of all() for
# the ListFetchStrategy
eq_(result.fetchmany(None), [1, 3])
eq_(result.fetchmany(None), [])
def test_unique_scalars_iterate(self):
result = self._fixture()
result = result.unique().scalars(1)
eq_(list(result), [1, 3])
def test_unique_filtered_iterate(self):
result = self._fixture()
def uniq(row):
return row[0]
result = result.unique(uniq)
eq_(list(result), [(1, 1, 1), (2, 1, 2), (4, 1, 2)])
def test_all(self):
result = self._fixture()
eq_(result.all(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(result.all(), [])
def test_many_then_all(self):
result = self._fixture()
eq_(result.fetchmany(3), [(1, 1, 1), (2, 1, 2), (1, 3, 2)])
eq_(result.all(), [(4, 1, 2)])
eq_(result.all(), [])
def test_scalars(self):
result = self._fixture()
eq_(list(result.scalars()), [1, 2, 1, 4])
result = self._fixture()
eq_(list(result.scalars(2)), [1, 2, 2, 2])
def test_scalars_mappings(self):
result = self._fixture()
eq_(
list(result.columns(0).mappings()),
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 4}],
)
def test_scalars_no_fetchone(self):
result = self._fixture()
s = result.scalars()
assert not hasattr(s, "fetchone")
# original result is unchanged
eq_(result.mappings().fetchone(), {"a": 1, "b": 1, "c": 1})
# scalars
eq_(s.all(), [2, 1, 4])
def test_first(self):
result = self._fixture()
row = result.first()
eq_(row, (1, 1, 1))
# note this is a behavior change in 1.4.27 due to
# adding a real result.close() to Result, previously this would
# return an empty list. this is already the
# behavior with CursorResult, but was mis-implemented for
# other non-cursor result sets.
assert_raises(exc.ResourceClosedError, result.all)
def test_one_unique(self):
# assert that one() counts rows after uniqueness has been applied.
# this would raise if we didn't have unique
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
row = result.unique().one()
eq_(row, (1, 1, 1))
def test_one_unique_tricky_one(self):
# one() needs to keep consuming rows in order to find a non-unique
# one. unique() really slows things down
result = self._fixture(
data=[(1, 1, 1), (1, 1, 1), (1, 1, 1), (2, 1, 1)]
)
assert_raises(exc.MultipleResultsFound, result.unique().one)
def test_one_unique_mapping(self):
# assert that one() counts rows after uniqueness has been applied.
# this would raise if we didn't have unique
result = self._fixture(data=[(1, 1, 1), (1, 1, 1)])
row = result.mappings().unique().one()
eq_(row, {"a": 1, "b": 1, "c": 1})
def test_one_mapping(self):
result = self._fixture(num_rows=1)
row = result.mappings().one()
eq_(row, {"a": 1, "b": 1, "c": 1})
def test_one(self):
result = self._fixture(num_rows=1)
row = result.one()
eq_(row, (1, 1, 1))
def test_scalar_one(self):
result = self._fixture(num_rows=1)
row = result.scalar_one()
eq_(row, 1)
def test_scalars_plus_one(self):
result = self._fixture(num_rows=1)
row = result.scalars().one()
eq_(row, 1)
def test_scalars_plus_one_none(self):
result = self._fixture(num_rows=0)
result = result.scalars()
assert_raises_message(
exc.NoResultFound,
"No row was found when one was required",
result.one,
)
def test_one_none(self):
result = self._fixture(num_rows=0)
assert_raises_message(
exc.NoResultFound,
"No row was found when one was required",
result.one,
)
def test_one_or_none(self):
result = self._fixture(num_rows=1)
eq_(result.one_or_none(), (1, 1, 1))
def test_scalar_one_or_none(self):
result = self._fixture(num_rows=1)
eq_(result.scalar_one_or_none(), 1)
def test_scalar_one_or_none_none(self):
result = self._fixture(num_rows=0)
eq_(result.scalar_one_or_none(), None)
def test_one_or_none_none(self):
result = self._fixture(num_rows=0)
eq_(result.one_or_none(), None)
def test_one_raise_mutiple(self):
result = self._fixture(num_rows=2)
assert_raises_message(
exc.MultipleResultsFound,
"Multiple rows were found when exactly one was required",
result.one,
)
def test_one_or_none_raise_multiple(self):
result = self._fixture(num_rows=2)
assert_raises_message(
exc.MultipleResultsFound,
"Multiple rows were found when one or none was required",
result.one_or_none,
)
def test_scalar(self):
result = self._fixture()
eq_(result.scalar(), 1)
# note this is a behavior change in 1.4.27 due to
# adding a real result.close() to Result, previously this would
# return an empty list. this is already the
# behavior with CursorResult, but was mis-implemented for
# other non-cursor result sets.
assert_raises(exc.ResourceClosedError, result.all)
def test_partition(self):
result = self._fixture()
r = []
for partition in result.partitions(2):
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_partition_unique_yield_per(self):
result = self._fixture()
r = []
for partition in result.unique().yield_per(2).partitions():
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_partition_yield_per(self):
result = self._fixture()
r = []
for partition in result.yield_per(2).partitions():
r.append(list(partition))
eq_(r, [[(1, 1, 1), (2, 1, 2)], [(1, 3, 2), (4, 1, 2)]])
eq_(result.all(), [])
def test_columns(self):
result = self._fixture()
result = result.columns("b", "c")
eq_(result.keys(), ["b", "c"])
eq_(result.all(), [(1, 1), (1, 2), (3, 2), (1, 2)])
def test_columns_ints(self):
result = self._fixture()
eq_(result.columns(1, -2).all(), [(1, 1), (1, 1), (3, 3), (1, 1)])
def test_columns_again(self):
result = self._fixture()
eq_(
result.columns("b", "c", "a").columns(1, 2).all(),
[(1, 1), (2, 2), (2, 1), (2, 4)],
)
def test_mappings(self):
result = self._fixture()
eq_(
[dict(r) for r in result.mappings()],
[
{"a": 1, "b": 1, "c": 1},
{"a": 2, "b": 1, "c": 2},
{"a": 1, "b": 3, "c": 2},
{"a": 4, "b": 1, "c": 2},
],
)
def test_columns_with_mappings(self):
result = self._fixture()
eq_(
[dict(r) for r in result.columns("b", "c").mappings().all()],
[
{"b": 1, "c": 1},
{"b": 1, "c": 2},
{"b": 3, "c": 2},
{"b": 1, "c": 2},
],
)
def test_mappings_with_columns(self):
result = self._fixture()
m1 = result.mappings().columns("b", "c")
eq_(m1.fetchmany(2), [{"b": 1, "c": 1}, {"b": 1, "c": 2}])
# no slice here
eq_(result.fetchone(), (1, 3, 2))
# still slices
eq_(m1.fetchone(), {"b": 1, "c": 2})
def test_scalar_none_iterate(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(list(result), [None, 4, None, None, 4])
def test_scalar_none_many(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(result.fetchmany(3), [None, 4, None])
eq_(result.fetchmany(5), [None, 4])
def test_scalar_none_all(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1)
eq_(result.all(), [None, 4, None, None, 4])
def test_scalar_none_all_unique(self):
result = self._fixture(
data=[
(1, None, 2),
(3, 4, 5),
(3, None, 5),
(3, None, 5),
(3, 4, 5),
]
)
result = result.scalars(1).unique()
eq_(result.all(), [None, 4])
def test_scalar_only_on_filter_w_unique(self):
# test a mixture of the "real" result and the
# scalar filter, where scalar has unique and real result does not.
# this is new as of [ticket:5503] where we have created
# ScalarResult / MappingResult "filters" that don't modify
# the Result
result = self._fixture(
data=[
(1, 1, 2),
(3, 4, 5),
(1, 1, 2),
(3, None, 5),
(3, 4, 5),
(3, None, 5),
]
)
# result is non-unique. u_s is unique on column 0
u_s = result.scalars(0).unique()
eq_(next(u_s), 1) # unique value 1 from first row
eq_(next(result), (3, 4, 5)) # second row
eq_(next(u_s), 3) # skip third row, return 3 for fourth row
eq_(next(result), (3, 4, 5)) # non-unique fifth row
eq_(u_s.all(), []) # unique set is done because only 3 is left
def test_scalar_none_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
# one is returning None, see?
eq_(result.one(), None)
def test_scalar_none_one_or_none_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
# the orm.Query can actually do this right now, so we sort of
# have to allow for this unforuntately, unless we want to raise?
eq_(result.one_or_none(), None)
def test_scalar_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.unique()
eq_(result.scalar_one(), 1)
def test_scalars_one_w_unique(self):
result = self._fixture(data=[(1, None, 2)])
result = result.unique()
eq_(result.scalars().one(), 1)
def test_scalar_none_first(self):
result = self._fixture(data=[(1, None, 2)])
result = result.scalars(1).unique()
eq_(result.first(), None)
def test_freeze(self):
result = self._fixture()
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(r1.fetchall(), [])
r2 = frozen()
eq_(r1.fetchall(), [])
eq_(r2.fetchall(), [(1, 1, 1), (2, 1, 2), (1, 3, 2), (4, 1, 2)])
eq_(r2.fetchall(), [])
def test_columns_unique_freeze(self):
result = self._fixture()
result = result.columns("b", "c").unique()
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2)])
def test_columns_freeze(self):
result = self._fixture()
result = result.columns("b", "c")
frozen = result.freeze()
r1 = frozen()
eq_(r1.fetchall(), [(1, 1), (1, 2), (3, 2), (1, 2)])
r2 = frozen().unique()
eq_(r2.fetchall(), [(1, 1), (1, 2), (3, 2)])
def test_scalars_freeze(self):
result = self._fixture()
frozen = result.freeze()
r1 = frozen()
eq_(r1.scalars(1).fetchall(), [1, 1, 3, 1])
r2 = frozen().scalars(1).unique()
eq_(r2.fetchall(), [1, 3])
class MergeResultTest(fixtures.TestBase):
@testing.fixture
def merge_fixture(self):
r1 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(7, "u1"), (8, "u2")]),
)
r2 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(9, "u3")]),
)
r3 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(10, "u4"), (11, "u5")]),
)
r4 = result.IteratorResult(
result.SimpleResultMetaData(["user_id", "user_name"]),
iter([(12, "u6")]),
)
return r1, r2, r3, r4
@testing.fixture
def dupe_fixture(self):
r1 = result.IteratorResult(
result.SimpleResultMetaData(["x", "y", "z"]),
iter([(1, 2, 1), (2, 2, 1)]),
)
r2 = result.IteratorResult(
result.SimpleResultMetaData(["x", "y", "z"]),
iter([(3, 1, 2), (3, 3, 3)]),
)
return r1, r2
def test_merge_results(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(result.keys(), ["user_id", "user_name"])
row = result.fetchone()
eq_(row, (7, "u1"))
result.close()
def test_fetchall(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.fetchall(),
[
(7, "u1"),
(8, "u2"),
(9, "u3"),
(10, "u4"),
(11, "u5"),
(12, "u6"),
],
)
def test_first(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.first(),
(7, "u1"),
)
def test_columns(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
result = r1.merge(r2, r3, r4)
eq_(
result.columns("user_name").fetchmany(4),
[("u1",), ("u2",), ("u3",), ("u4",)],
)
result.close()
def test_merge_scalars(self, merge_fixture):
r1, r2, r3, r4 = merge_fixture
for r in (r1, r2, r3, r4):
r.scalars(0)
result = r1.merge(r2, r3, r4)
eq_(result.scalars(0).all(), [7, 8, 9, 10, 11, 12])
def test_merge_unique(self, dupe_fixture):
r1, r2 = dupe_fixture
r1.scalars("y")
r2.scalars("y")
result = r1.merge(r2)
# uniqued 2, 2, 1, 3
eq_(result.scalars("y").unique().all(), [2, 1, 3])
def test_merge_preserve_unique(self, dupe_fixture):
r1, r2 = dupe_fixture
r1.unique().scalars("y")
r2.unique().scalars("y")
result = r1.merge(r2)
# unique takes place
eq_(result.scalars("y").all(), [2, 1, 3])
class OnlyScalarsTest(fixtures.TestBase):
"""the chunkediterator supports "non tuple mode", where we bypass
the expense of generating rows when we have only scalar values.
"""
@testing.fixture
def no_tuple_fixture(self):
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
@testing.fixture
def no_tuple_one_fixture(self):
data = [(1, 1, 1)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
@testing.fixture
def normal_fixture(self):
data = [(1, 1, 1), (2, 1, 2), (1, 1, 1), (1, 3, 2), (4, 1, 2)]
def chunks(num):
while data:
rows = data[0:num]
data[:] = []
yield [row[0] for row in rows]
return chunks
def test_scalar_mode_columns0_mapping(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.columns(0).mappings()
eq_(
list(r),
[{"a": 1}, {"a": 2}, {"a": 1}, {"a": 1}, {"a": 4}],
)
def test_scalar_mode_columns0_plain(self, no_tuple_fixture):
"""test #7953"""
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.columns(0)
eq_(
list(r),
[(1,), (2,), (1,), (1,), (4,)],
)
def test_scalar_mode_scalars0(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars(0)
eq_(
list(r),
[1, 2, 1, 1, 4],
)
def test_scalar_mode_but_accessed_nonscalar_result(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
s1 = r.scalars()
eq_(r.fetchone(), (1,))
eq_(s1.all(), [2, 1, 1, 4])
def test_scalar_mode_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(r.all(), [1, 2, 1, 1, 4])
def test_scalar_mode_mfiltered_unique_rows_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _unique_filters=[int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(r.all(), [(1,), (2,), (4,)])
@testing.combinations(
lambda r: r.scalar(),
lambda r: r.scalar_one(),
lambda r: r.scalar_one_or_none(),
argnames="get",
)
def test_unique_scalar_accessors(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _unique_filters=[int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_one_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(get(r), 1)
def test_scalar_mode_mfiltered_unique_mappings_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _unique_filters=[int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.unique()
eq_(r.mappings().all(), [{"a": 1}, {"a": 2}, {"a": 4}])
def test_scalar_mode_mfiltered_unique_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(
["a", "b", "c"], _unique_filters=[int]
)
r = result.ChunkedIteratorResult(
metadata,
no_tuple_fixture,
source_supports_scalars=True,
)
r = r.scalars().unique()
eq_(r.all(), [1, 2, 4])
def test_scalar_mode_unique_scalars_all(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.unique().scalars()
eq_(r.all(), [1, 2, 4])
def test_scalar_mode_scalars_fetchmany(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(list(r.partitions(2)), [[1, 2], [1, 1], [4]])
def test_scalar_mode_unique_scalars_fetchmany(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.scalars().unique()
eq_(list(r.partitions(2)), [[1, 2], [4]])
def test_scalar_mode_unique_tuples_all(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
r = r.unique()
eq_(r.all(), [(1,), (2,), (4,)])
def test_scalar_mode_tuples_all(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
eq_(r.all(), [(1,), (2,), (1,), (1,), (4,)])
def test_scalar_mode_scalars_iterate(self, no_tuple_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_fixture, source_supports_scalars=True
)
r = r.scalars()
eq_(list(r), [1, 2, 1, 1, 4])
def test_scalar_mode_tuples_iterate(self, normal_fixture):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, normal_fixture, source_supports_scalars=True
)
eq_(list(r), [(1,), (2,), (1,), (1,), (4,)])
@testing.combinations(
lambda r: r.one(),
lambda r: r.first(),
lambda r: r.one_or_none(),
argnames="get",
)
def test_scalar_mode_first(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_one_fixture, source_supports_scalars=True
)
eq_(get(r), (1,))
@testing.combinations(
lambda r: r.scalar(),
lambda r: r.scalar_one(),
lambda r: r.scalar_one_or_none(),
argnames="get",
)
def test_scalar_mode_scalar_one(self, no_tuple_one_fixture, get):
metadata = result.SimpleResultMetaData(["a", "b", "c"])
r = result.ChunkedIteratorResult(
metadata, no_tuple_one_fixture, source_supports_scalars=True
)
eq_(get(r), 1)