Files
Kadir Can Ozden 0f74af5c33 Fix WeakSequence.__getitem__ catching KeyError instead of IndexError
### Description

`WeakSequence.__getitem__` catches `KeyError` but the internal `_storage` is a `list`, which raises `IndexError` for out-of-range access. This means the `except KeyError` handler never executes, and the custom error message is never shown.

### Current behavior

```python
def __getitem__(self, index):
    try:
        obj = self._storage[index]  # _storage is a list
    except KeyError:  # lists don't raise KeyError
        raise IndexError("Index %s out of range" % index)
    else:
        return obj()
```

On an out-of-range index, the raw `IndexError` from list access propagates directly (e.g., `list index out of range`) instead of the intended custom message.

### Fix

Changed `except KeyError` to `except IndexError` so the handler actually catches the exception raised by list indexing.

Closes: #13136
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/13136
Pull-request-sha: ceb8f9fe46

Change-Id: Ia36c2b9b0f3aec97624bcd2a9e49f43b9ed786d9
2026-02-23 22:12:46 +01:00

3790 lines
102 KiB
Python

import copy
from decimal import Decimal
import inspect
from pathlib import Path
import pickle
import sys
from sqlalchemy import exc
from sqlalchemy import sql
from sqlalchemy import testing
from sqlalchemy import util
from sqlalchemy.sql import column
from sqlalchemy.sql.base import DedupeColumnCollection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import combinations
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_none
from sqlalchemy.testing import is_not
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
from sqlalchemy.testing import not_in
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import picklers
from sqlalchemy.util import classproperty
from sqlalchemy.util import compat
from sqlalchemy.util import FastIntFlag
from sqlalchemy.util import get_callable_argspec
from sqlalchemy.util import is_non_string_iterable
from sqlalchemy.util import langhelpers
from sqlalchemy.util import preloaded
from sqlalchemy.util import WeakSequence
from sqlalchemy.util._collections import merge_lists_w_ordering
from sqlalchemy.util._has_cython import _all_cython_modules
class WeakSequenceTest(fixtures.TestBase):
@testing.requires.predictable_gc
def test_cleanout_elements(self):
class Foo:
pass
f1, f2, f3 = Foo(), Foo(), Foo()
w = WeakSequence([f1, f2, f3])
eq_(len(w), 3)
eq_(len(w._storage), 3)
del f2
gc_collect()
eq_(len(w), 2)
eq_(len(w._storage), 2)
@testing.requires.predictable_gc
def test_cleanout_appended(self):
class Foo:
pass
f1, f2, f3 = Foo(), Foo(), Foo()
w = WeakSequence()
w.append(f1)
w.append(f2)
w.append(f3)
eq_(len(w), 3)
eq_(len(w._storage), 3)
del f2
gc_collect()
eq_(len(w), 2)
eq_(len(w._storage), 2)
def test_index_error(self):
class Foo:
pass
f1, f2, f3 = Foo(), Foo(), Foo()
w = WeakSequence()
w.append(f1)
w.append(f2)
w.append(f3)
with expect_raises(IndexError):
w[4]
class MergeListsWOrderingTest(fixtures.TestBase):
@testing.combinations(
(
["__tablename__", "id", "x", "created_at"],
["id", "name", "data", "y", "created_at"],
["__tablename__", "id", "name", "data", "y", "x", "created_at"],
),
(["a", "b", "c", "d", "e", "f"], [], ["a", "b", "c", "d", "e", "f"]),
([], ["a", "b", "c", "d", "e", "f"], ["a", "b", "c", "d", "e", "f"]),
([], [], []),
(["a", "b", "c"], ["a", "b", "c"], ["a", "b", "c"]),
(
["a", "b", "c"],
["a", "b", "c", "d", "e"],
["a", "b", "c", "d", "e"],
),
(["a", "b", "c", "d"], ["c", "d", "e"], ["a", "b", "c", "d", "e"]),
(
["a", "c", "e", "g"],
["b", "d", "f", "g"],
["a", "c", "e", "b", "d", "f", "g"], # no overlaps until "g"
),
(
["a", "b", "e", "f", "g"],
["b", "c", "d", "e"],
["a", "b", "c", "d", "e", "f", "g"],
),
(
["a", "b", "c", "e", "f"],
["c", "d", "f", "g"],
["a", "b", "c", "d", "e", "f", "g"],
),
(
["c", "d", "f", "g"],
["a", "b", "c", "e", "f"],
["a", "b", "c", "e", "d", "f", "g"],
),
argnames="a,b,expected",
)
def test_merge_lists(self, a, b, expected):
eq_(merge_lists_w_ordering(a, b), expected)
class OrderedDictTest(fixtures.TestBase):
def test_odict(self):
o = util.OrderedDict()
o["a"] = 1
o["b"] = 2
o["snack"] = "attack"
o["c"] = 3
eq_(list(o.keys()), ["a", "b", "snack", "c"])
eq_(list(o.values()), [1, 2, "attack", 3])
o.pop("snack")
eq_(list(o.keys()), ["a", "b", "c"])
eq_(list(o.values()), [1, 2, 3])
try:
o.pop("eep")
assert False
except KeyError:
pass
eq_(o.pop("eep", "woot"), "woot")
try:
o.pop("whiff", "bang", "pow")
assert False
except TypeError:
pass
eq_(list(o.keys()), ["a", "b", "c"])
eq_(list(o.values()), [1, 2, 3])
o2 = util.OrderedDict(d=4)
o2["e"] = 5
eq_(list(o2.keys()), ["d", "e"])
eq_(list(o2.values()), [4, 5])
o.update(o2)
eq_(list(o.keys()), ["a", "b", "c", "d", "e"])
eq_(list(o.values()), [1, 2, 3, 4, 5])
o.setdefault("c", "zzz")
o.setdefault("f", 6)
eq_(list(o.keys()), ["a", "b", "c", "d", "e", "f"])
eq_(list(o.values()), [1, 2, 3, 4, 5, 6])
def test_odict_constructor(self):
o = util.OrderedDict(
[("name", "jbe"), ("fullname", "jonathan"), ("password", "")]
)
eq_(list(o.keys()), ["name", "fullname", "password"])
def test_odict_copy(self):
o = util.OrderedDict()
o["zzz"] = 1
o["aaa"] = 2
eq_(list(o.keys()), ["zzz", "aaa"])
o2 = o.copy()
eq_(list(o2.keys()), list(o.keys()))
o3 = copy.copy(o)
eq_(list(o3.keys()), list(o.keys()))
def test_no_sort_legacy_dictionary(self):
d1 = {"c": 1, "b": 2, "a": 3}
util.sort_dictionary(d1)
eq_(list(d1), ["a", "b", "c"])
def test_sort_dictionary(self):
o = util.OrderedDict()
o["za"] = 1
o["az"] = 2
o["cc"] = 3
eq_(
list(o),
["za", "az", "cc"],
)
util.sort_dictionary(o)
eq_(list(o), ["az", "cc", "za"])
util.sort_dictionary(o, lambda key: key[1])
eq_(list(o), ["za", "cc", "az"])
class OrderedSetTest(fixtures.TestBase):
def test_mutators_against_iter(self):
# testing a set modified against an iterator
o = util.OrderedSet([3, 2, 4, 5])
eq_(o.difference(iter([3, 4])), util.OrderedSet([2, 5]))
eq_(o.intersection(iter([3, 4, 6])), util.OrderedSet([3, 4]))
eq_(o.union(iter([3, 4, 6])), util.OrderedSet([3, 2, 4, 5, 6]))
eq_(
o.symmetric_difference(iter([3, 4, 6])), util.OrderedSet([2, 5, 6])
)
def test_mutators_against_iter_update(self):
# testing a set modified against an iterator
o = util.OrderedSet([3, 2, 4, 5])
o.difference_update(iter([3, 4]))
eq_(list(o), [2, 5])
o = util.OrderedSet([3, 2, 4, 5])
o.intersection_update(iter([3, 4]))
eq_(list(o), [3, 4])
o = util.OrderedSet([3, 2, 4, 5])
o.update(iter([3, 4, 6]))
eq_(list(o), [3, 2, 4, 5, 6])
o = util.OrderedSet([3, 2, 4, 5])
o.symmetric_difference_update(iter([3, 4, 6]))
eq_(list(o), [2, 5, 6])
def test_len(self):
eq_(len(util.OrderedSet([1, 2, 3])), 3)
def test_eq_no_insert_order(self):
eq_(util.OrderedSet([3, 2, 4, 5]), util.OrderedSet([2, 3, 4, 5]))
ne_(util.OrderedSet([3, 2, 4, 5]), util.OrderedSet([3, 2, 4, 5, 6]))
def test_eq_non_ordered_set(self):
eq_(util.OrderedSet([3, 2, 4, 5]), {2, 3, 4, 5})
ne_(util.OrderedSet([3, 2, 4, 5]), {3, 2, 4, 5, 6})
def test_repr(self):
o = util.OrderedSet([])
eq_(str(o), "OrderedSet([])")
o = util.OrderedSet([3, 2, 4, 5])
eq_(str(o), "OrderedSet([3, 2, 4, 5])")
def test_modify(self):
o = util.OrderedSet([3, 9, 11])
is_none(o.add(42))
in_(42, o)
in_(3, o)
is_none(o.remove(9))
not_in(9, o)
in_(3, o)
is_none(o.discard(11))
in_(3, o)
o.add(99)
is_none(o.insert(1, 13))
eq_(list(o), [3, 13, 42, 99])
eq_(o[2], 42)
val = o.pop()
eq_(val, 99)
not_in(99, o)
eq_(list(o), [3, 13, 42])
is_none(o.clear())
not_in(3, o)
is_false(bool(o))
def test_empty_pop(self):
with expect_raises_message(KeyError, "pop from an empty set"):
util.OrderedSet().pop()
@testing.combinations(
lambda o: o + util.OrderedSet([11, 22]),
lambda o: o | util.OrderedSet([11, 22]),
lambda o: o.union(util.OrderedSet([11, 22])),
lambda o: o.union([11, 2], [22, 8]),
)
def test_op(self, fn):
o = util.OrderedSet(range(10))
x = fn(o)
is_instance_of(x, util.OrderedSet)
in_(9, x)
in_(11, x)
not_in(11, o)
def test_update(self):
o = util.OrderedSet(range(10))
is_none(o.update([22, 2], [33, 11]))
in_(11, o)
in_(22, o)
def test_set_ops(self):
o1, o2 = util.OrderedSet([1, 3, 5, 7]), {2, 3, 4, 5}
eq_(o1 & o2, {3, 5})
eq_(o1.intersection(o2), {3, 5})
o3 = o1.copy()
o3 &= o2
eq_(o3, {3, 5})
o3 = o1.copy()
is_none(o3.intersection_update(o2))
eq_(o3, {3, 5})
eq_(o1 | o2, {1, 2, 3, 4, 5, 7})
eq_(o1.union(o2), {1, 2, 3, 4, 5, 7})
o3 = o1.copy()
o3 |= o2
eq_(o3, {1, 2, 3, 4, 5, 7})
o3 = o1.copy()
is_none(o3.update(o2))
eq_(o3, {1, 2, 3, 4, 5, 7})
eq_(o1 - o2, {1, 7})
eq_(o1.difference(o2), {1, 7})
o3 = o1.copy()
o3 -= o2
eq_(o3, {1, 7})
o3 = o1.copy()
is_none(o3.difference_update(o2))
eq_(o3, {1, 7})
eq_(o1 ^ o2, {1, 2, 4, 7})
eq_(o1.symmetric_difference(o2), {1, 2, 4, 7})
o3 = o1.copy()
o3 ^= o2
eq_(o3, {1, 2, 4, 7})
o3 = o1.copy()
is_none(o3.symmetric_difference_update(o2))
eq_(o3, {1, 2, 4, 7})
def test_copy(self):
o = util.OrderedSet([3, 2, 4, 5])
cp = o.copy()
is_instance_of(cp, util.OrderedSet)
eq_(o, cp)
o.add(42)
is_false(42 in cp)
def test_pickle(self):
o = util.OrderedSet([2, 4, 9, 42])
for loads, dumps in picklers():
l = loads(dumps(o))
is_instance_of(l, util.OrderedSet)
eq_(list(l), [2, 4, 9, 42])
class ImmutableDictTest(fixtures.TestBase):
methods = combinations(
util.immutabledict.union,
util.immutabledict.merge_with,
argnames="method",
)
@methods
def test_no_change(self, method):
d = util.immutabledict({1: 2, 3: 4})
d2 = method(d)
is_(d2, d)
d2 = method(d, {})
is_(d2, d)
d2 = method(d, None)
is_(d2, d)
d2 = method(d, {}, {}, {}, None)
is_(d2, d)
@methods
def test_no_change_self_empty(self, method):
d = util.immutabledict({1: 2, 3: 4})
e = util.immutabledict()
d2 = method(e, d)
eq_(e, {})
is_(d2, d)
d2 = method(e, {}, d)
is_(d2, d)
d2 = method(e, None, d, {}, {})
is_(d2, d)
d2 = method(e, {1: 2, 3: 4})
eq_(d2, d)
assert isinstance(d2, util.immutabledict)
d2 = method(e, {1: 2, 3: 4}, {3: 5, 4: 7})
eq_(d2, {1: 2, 3: 5, 4: 7})
assert isinstance(d2, util.immutabledict)
@methods
def test_start_empty_but_then_populate(self, method):
d = util.immutabledict()
d2 = method(d, {1: 2})
eq_(d2, {1: 2})
is_not(d2, d)
d3 = method(d, util.immutabledict(), {1: 2})
eq_(d3, {1: 2})
d4 = method(
d, util.immutabledict(), util.immutabledict({1: 2}), {3: 4}
)
eq_(d4, {1: 2, 3: 4})
@methods
def test_no_change_everyone_empty(self, method):
d = util.immutabledict()
e = util.immutabledict()
d2 = method(e, d)
eq_(e, {})
is_(d2, e)
f = {}
d3 = method(e, d, f)
eq_(e, {})
is_(d3, e)
g = util.immutabledict()
d4 = method(e, d, f, g)
eq_(e, {})
is_(d4, e)
@methods
def test_no_change_against_self(self, method):
d = util.immutabledict()
e = d
d2 = method(e, d)
eq_(e, {})
is_(d2, e)
f = d
d3 = method(e, d, f)
eq_(e, {})
is_(d3, e)
@methods
def test_multiple_dicts(self, method):
d = util.immutabledict({1: 2, 3: 4})
d2 = method(d, {17: 42})
eq_(d, {1: 2, 3: 4})
eq_(d2, {1: 2, 3: 4, 17: 42})
d3 = method(d, {3: 5, 7: 12}, {9: 18, 15: 25}, None)
eq_(d3, {1: 2, 3: 5, 7: 12, 9: 18, 15: 25})
assert isinstance(d3, util.immutabledict)
@methods
def test_multiple_immutabledict(self, method):
d = util.immutabledict({1: 2, 3: 4})
d2 = method(d, util.immutabledict({3: 5, 7: 12}))
eq_(d2, {1: 2, 3: 5, 7: 12})
assert isinstance(d2, util.immutabledict)
d2 = method(
d,
util.immutabledict({3: 5, 7: 12}),
util.immutabledict({7: 6, 11: 12}),
)
eq_(d2, {1: 2, 3: 5, 7: 6, 11: 12})
assert isinstance(d2, util.immutabledict)
e = util.immutabledict()
d2 = method(
e,
util.immutabledict({3: 5, 7: 12}),
util.immutabledict({7: 6, 11: 12}),
)
eq_(d2, {3: 5, 7: 6, 11: 12})
assert isinstance(d2, util.immutabledict)
@methods
def test_with_tuples(self, method):
# this is not really supported, but it's useful to test the non-dict
# case
d = util.immutabledict({1: 2, 3: 4})
d2 = method(d, [(3, 5), (7, 12)], [(9, 18), (15, 25)])
eq_(d, {1: 2, 3: 4})
eq_(d2, {1: 2, 3: 5, 7: 12, 9: 18, 15: 25})
def _dont_test_union_kw(self):
d = util.immutabledict({"a": "b", "c": "d"})
d2 = d.union(e="f", g="h")
assert isinstance(d2, util.immutabledict)
eq_(d, {"a": "b", "c": "d"})
eq_(d2, {"a": "b", "c": "d", "e": "f", "g": "h"})
def test_keys(self):
d = util.immutabledict({1: 2, 3: 4})
eq_(set(d.keys()), {1, 3})
def test_values(self):
d = util.immutabledict({1: 2, 3: 4})
eq_(set(d.values()), {2, 4})
def test_items(self):
d = util.immutabledict({1: 2, 3: 4})
eq_(set(d.items()), {(1, 2), (3, 4)})
def test_contains(self):
d = util.immutabledict({1: 2, 3: 4})
assert 1 in d
assert "foo" not in d
def test_rich_compare(self):
d = util.immutabledict({1: 2, 3: 4})
d2 = util.immutabledict({1: 2, 3: 4})
d3 = util.immutabledict({5: 12})
d4 = {5: 12}
eq_(d, d2)
ne_(d, d3)
ne_(d, d4)
eq_(d3, d4)
def test_copy(self):
d = util.immutabledict({1: 2, 3: 4})
is_(d.copy(), d)
def test_serialize(self):
d = util.immutabledict({1: 2, 3: 4})
for loads, dumps in picklers():
d2 = loads(dumps(d))
eq_(d2, {1: 2, 3: 4})
assert isinstance(d2, util.immutabledict)
def test_repr(self):
# this is used by the stub generator in alembic
i = util.immutabledict()
eq_(str(i), "immutabledict({})")
i2 = util.immutabledict({"a": 42, 42: "a"})
eq_(str(i2), "immutabledict({'a': 42, 42: 'a'})")
def test_pep584(self):
i = util.immutabledict({"a": 2})
with expect_raises_message(TypeError, "object is immutable"):
i |= {"b": 42}
eq_(i, {"a": 2})
i2 = i | {"x": 3}
eq_(i, {"a": 2})
eq_(i2, {"a": 2, "x": 3})
is_true(isinstance(i2, util.immutabledict))
i2 = {"x": 3} | i2
eq_(i, {"a": 2})
eq_(i2, {"a": 2, "x": 3})
is_true(isinstance(i2, util.immutabledict))
class ImmutableTest(fixtures.TestBase):
@combinations(util.immutabledict({1: 2, 3: 4}), util.FacadeDict({2: 3}))
def test_immutable(self, d):
calls = (
lambda: d.__delitem__(1),
lambda: d.__setitem__(2, 3),
lambda: d.__setattr__(2, 3),
d.clear,
lambda: d.setdefault(1, 3),
lambda: d.update({2: 4}),
)
if hasattr(d, "pop"):
calls += (lambda: d.pop(2), d.popitem)
for m in calls:
with expect_raises_message(TypeError, "object is immutable"):
m()
def test_readonly_properties(self):
d = util.ReadOnlyProperties({3: 4})
calls = (
lambda: d.__delitem__(1),
lambda: d.__setitem__(2, 3),
lambda: d.__setattr__(2, 3),
)
for m in calls:
with expect_raises_message(TypeError, "object is immutable"):
m()
class MemoizedAttrTest(fixtures.TestBase):
def test_memoized_property(self):
val = [20]
class Foo:
@util.memoized_property
def bar(self):
v = val[0]
val[0] += 1
return v
ne_(Foo.bar, None)
f1 = Foo()
assert "bar" not in f1.__dict__
eq_(f1.bar, 20)
eq_(f1.bar, 20)
eq_(val[0], 21)
eq_(f1.__dict__["bar"], 20)
def test_memoized_instancemethod(self):
val = [20]
class Foo:
@util.memoized_instancemethod
def bar(self):
v = val[0]
val[0] += 1
return v
assert inspect.ismethod(Foo().bar)
ne_(Foo.bar, None)
f1 = Foo()
assert "bar" not in f1.__dict__
eq_(f1.bar(), 20)
eq_(f1.bar(), 20)
eq_(val[0], 21)
def test_memoized_slots(self):
canary = mock.Mock()
class Foob(util.MemoizedSlots):
__slots__ = ("foo_bar", "gogo")
def _memoized_method_gogo(self):
canary.method()
return "gogo"
def _memoized_attr_foo_bar(self):
canary.attr()
return "foobar"
f1 = Foob()
assert_raises(AttributeError, setattr, f1, "bar", "bat")
eq_(f1.foo_bar, "foobar")
eq_(f1.foo_bar, "foobar")
eq_(f1.gogo(), "gogo")
eq_(f1.gogo(), "gogo")
eq_(canary.mock_calls, [mock.call.attr(), mock.call.method()])
class WrapCallableTest(fixtures.TestBase):
def test_wrapping_update_wrapper_fn(self):
def my_fancy_default():
"""run the fancy default"""
return 10
c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default)
eq_(c.__name__, "my_fancy_default")
eq_(c.__doc__, "run the fancy default")
def test_wrapping_update_wrapper_fn_nodocstring(self):
def my_fancy_default():
return 10
c = util.wrap_callable(lambda: my_fancy_default, my_fancy_default)
eq_(c.__name__, "my_fancy_default")
eq_(c.__doc__, None)
def test_wrapping_update_wrapper_cls(self):
class MyFancyDefault:
"""a fancy default"""
def __call__(self):
"""run the fancy default"""
return 10
def_ = MyFancyDefault()
c = util.wrap_callable(lambda: def_(), def_)
eq_(c.__name__, "MyFancyDefault")
eq_(c.__doc__, "run the fancy default")
def test_wrapping_update_wrapper_cls_noclsdocstring(self):
class MyFancyDefault:
def __call__(self):
"""run the fancy default"""
return 10
def_ = MyFancyDefault()
c = util.wrap_callable(lambda: def_(), def_)
eq_(c.__name__, "MyFancyDefault")
eq_(c.__doc__, "run the fancy default")
def test_wrapping_update_wrapper_cls_nomethdocstring(self):
class MyFancyDefault:
"""a fancy default"""
def __call__(self):
return 10
def_ = MyFancyDefault()
c = util.wrap_callable(lambda: def_(), def_)
eq_(c.__name__, "MyFancyDefault")
eq_(c.__doc__, "a fancy default")
def test_wrapping_update_wrapper_cls_noclsdocstring_nomethdocstring(self):
class MyFancyDefault:
def __call__(self):
return 10
def_ = MyFancyDefault()
c = util.wrap_callable(lambda: def_(), def_)
eq_(c.__name__, "MyFancyDefault")
eq_(c.__doc__, None)
def test_wrapping_update_wrapper_functools_parial(self):
def my_default(x):
return x
import functools
my_functools_default = functools.partial(my_default, 5)
c = util.wrap_callable(
lambda: my_functools_default(), my_functools_default
)
eq_(c.__name__, "partial")
if not compat.pypy: # pypy fails this check
eq_(c.__doc__, my_functools_default.__call__.__doc__)
eq_(c(), 5)
class ToListTest(fixtures.TestBase):
def test_from_string(self):
eq_(util.to_list("xyz"), ["xyz"])
def test_from_set(self):
spec = util.to_list({1, 2, 3})
assert isinstance(spec, list)
eq_(sorted(spec), [1, 2, 3])
def test_from_dict(self):
spec = util.to_list({1: "a", 2: "b", 3: "c"})
assert isinstance(spec, list)
eq_(sorted(spec), [1, 2, 3])
def test_from_tuple(self):
eq_(util.to_list((1, 2, 3)), [1, 2, 3])
def test_from_bytes(self):
eq_(util.to_list(compat.b("abc")), [compat.b("abc")])
eq_(
util.to_list([compat.b("abc"), compat.b("def")]),
[compat.b("abc"), compat.b("def")],
)
class ColumnCollectionCommon(testing.AssertsCompiledSQL):
def _assert_collection_integrity(self, coll):
eq_(coll._colset, {c for k, c, _ in coll._collection})
d = {}
for k, col, _ in coll._collection:
d.setdefault(k, (k, col))
d.update(
{idx: (k, col) for idx, (k, col, _) in enumerate(coll._collection)}
)
eq_(coll._index, d)
if not coll._proxy_index:
coll._init_proxy_index()
all_metrics = {
metrics for mm in coll._proxy_index.values() for metrics in mm
}
eq_(
all_metrics,
{m for (_, _, m) in coll._collection},
)
for mm in all_metrics:
for eps_col in mm.get_expanded_proxy_set():
assert mm in coll._proxy_index[eps_col]
for mm_ in coll._proxy_index[eps_col]:
assert eps_col in mm_.get_expanded_proxy_set()
def test_keys(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
keys = cc.keys()
eq_(keys, ["c1", "foo", "c3"])
ne_(id(keys), id(cc.keys()))
ci = cc.as_readonly()
eq_(ci.keys(), ["c1", "foo", "c3"])
def test_values(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
val = cc.values()
eq_(val, [c1, c2, c3])
ne_(id(val), id(cc.values()))
ci = cc.as_readonly()
eq_(ci.values(), [c1, c2, c3])
def test_items(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
items = cc.items()
eq_(items, [("c1", c1), ("foo", c2), ("c3", c3)])
ne_(id(items), id(cc.items()))
ci = cc.as_readonly()
eq_(ci.items(), [("c1", c1), ("foo", c2), ("c3", c3)])
def test_getitem_tuple_str(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
sub_cc = cc["c3", "foo"]
is_(sub_cc.c3, c3)
eq_(list(sub_cc), [c3, c2])
def test_getitem_tuple_int(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
sub_cc = cc[2, 1]
is_(sub_cc.c3, c3)
eq_(list(sub_cc), [c3, c2])
def test_key_index_error(self):
cc = self._column_collection(
columns=[
("col1", sql.column("col1")),
("col2", sql.column("col2")),
]
)
assert_raises(KeyError, lambda: cc["foo"])
assert_raises(KeyError, lambda: cc[object()])
assert_raises(IndexError, lambda: cc[5])
def test_contains_column(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
cc = self._column_collection(columns=[("c1", c1), ("c2", c2)])
is_true(cc.contains_column(c1))
is_false(cc.contains_column(c3))
def test_contains_column_not_column(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
cc = self._column_collection(columns=[("c1", c1), ("c2", c2)])
is_false(cc.contains_column(c3 == 2))
with testing.expect_raises_message(
exc.ArgumentError,
"contains_column cannot be used with string arguments",
):
cc.contains_column("c1")
with testing.expect_raises_message(
exc.ArgumentError,
"contains_column cannot be used with string arguments",
):
cc.contains_column("foo")
def test_in(self):
col1 = sql.column("col1")
cc = self._column_collection(
columns=[
("col1", col1),
("col2", sql.column("col2")),
("col3", sql.column("col3")),
]
)
assert "col1" in cc
assert "col2" in cc
assert_raises_message(
exc.ArgumentError,
"__contains__ requires a string argument",
lambda: col1 in cc,
)
def test_compare(self):
c1 = sql.column("col1")
c2 = c1.label("col2")
c3 = sql.column("col3")
is_true(
self._column_collection(
[("col1", c1), ("col2", c2), ("col3", c3)]
).compare(
self._column_collection(
[("col1", c1), ("col2", c2), ("col3", c3)]
)
)
)
is_false(
self._column_collection(
[("col1", c1), ("col2", c2), ("col3", c3)]
).compare(self._column_collection([("col1", c1), ("col2", c2)]))
)
def test_str(self):
c1 = sql.column("col1")
c2 = c1.label("col2")
c3 = sql.column("col3")
cc = self._column_collection(
[("col1", c1), ("col2", c2), ("col3", c3)]
)
eq_(str(cc), "%s(%s, %s, %s)" % (type(cc).__name__, c1, c2, c3))
eq_(repr(cc), object.__repr__(cc))
class ColumnCollectionTest(ColumnCollectionCommon, fixtures.TestBase):
def _column_collection(self, columns=None):
return sql.WriteableColumnCollection(columns=columns)
def test_separate_key_all_cols(self):
c1, c2 = sql.column("col1"), sql.column("col2")
cc = self._column_collection([("kcol1", c1), ("kcol2", c2)])
eq_(cc._all_columns, [c1, c2])
def test_separate_key_get(self):
c1, c2 = sql.column("col1"), sql.column("col2")
cc = self._column_collection([("kcol1", c1), ("kcol2", c2)])
is_(cc.kcol1, c1)
is_(cc.kcol2, c2)
def test_separate_key_in(self):
cc = self._column_collection(
columns=[
("kcol1", sql.column("col1")),
("kcol2", sql.column("col2")),
("kcol3", sql.column("col3")),
]
)
assert "col1" not in cc
assert "kcol2" in cc
def test_dupes_add(self):
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc = sql.WriteableColumnCollection()
cc.add(c1)
cc.add(c2a, "c2")
cc.add(c3)
cc.add(c2b)
eq_(cc._all_columns, [c1, c2a, c3, c2b])
eq_(list(cc), [c1, c2a, c3, c2b])
eq_(cc.keys(), ["c1", "c2", "c3", "c2"])
assert cc.contains_column(c2a)
assert cc.contains_column(c2b)
# this is deterministic
is_(cc["c2"], c2a)
self._assert_collection_integrity(cc)
ci = cc.as_readonly()
eq_(ci._all_columns, [c1, c2a, c3, c2b])
eq_(list(ci), [c1, c2a, c3, c2b])
eq_(ci.keys(), ["c1", "c2", "c3", "c2"])
def test_dupes_construct(self):
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc = sql.WriteableColumnCollection(
columns=[("c1", c1), ("c2", c2a), ("c3", c3), ("c2", c2b)]
)
eq_(cc._all_columns, [c1, c2a, c3, c2b])
eq_(list(cc), [c1, c2a, c3, c2b])
eq_(cc.keys(), ["c1", "c2", "c3", "c2"])
assert cc.contains_column(c2a)
assert cc.contains_column(c2b)
# this is deterministic
is_(cc["c2"], c2a)
self._assert_collection_integrity(cc)
ci = cc.as_readonly()
eq_(ci._all_columns, [c1, c2a, c3, c2b])
eq_(list(ci), [c1, c2a, c3, c2b])
eq_(ci.keys(), ["c1", "c2", "c3", "c2"])
def test_identical_dupe_construct(self):
c1, c2, c3 = (column("c1"), column("c2"), column("c3"))
cc = sql.WriteableColumnCollection(
columns=[("c1", c1), ("c2", c2), ("c3", c3), ("c2", c2)]
)
eq_(cc._all_columns, [c1, c2, c3, c2])
# for iter, c2a is replaced by c2b, ordering
# is maintained in that way. ideally, iter would be
# the same as the "_all_columns" collection.
eq_(list(cc), [c1, c2, c3, c2])
assert cc.contains_column(c2)
self._assert_collection_integrity(cc)
ci = cc.as_readonly()
eq_(ci._all_columns, [c1, c2, c3, c2])
eq_(list(ci), [c1, c2, c3, c2])
class DedupeColumnCollectionTest(ColumnCollectionCommon, fixtures.TestBase):
def _column_collection(self, columns=None):
return DedupeColumnCollection(columns=columns)
def test_separate_key_cols(self):
c1, c2 = sql.column("col1"), sql.column("col2")
assert_raises_message(
exc.ArgumentError,
"DedupeColumnCollection requires columns be under "
"the same key as their .key",
self._column_collection,
[("kcol1", c1), ("kcol2", c2)],
)
cc = self._column_collection()
assert_raises_message(
exc.ArgumentError,
"DedupeColumnCollection requires columns be under "
"the same key as their .key",
cc.add,
c1,
"kcol1",
)
def test_pickle_w_mutation(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(columns=[("c1", c1), ("foo", c2)])
ci = cc.as_readonly()
d = {"cc": cc, "ci": ci}
for loads, dumps in picklers():
dp = loads(dumps(d))
cp = dp["cc"]
cpi = dp["ci"]
self._assert_collection_integrity(cp)
self._assert_collection_integrity(cpi)
assert cp._colset is cpi._colset
assert cp._index is cpi._index
assert cp._collection is cpi._collection
cp.add(c3)
eq_(cp.keys(), ["c1", "foo", "c3"])
eq_(cpi.keys(), ["c1", "foo", "c3"])
assert cp.contains_column(c3)
assert cpi.contains_column(c3)
def test_keys_after_replace(self):
c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
c2.key = "foo"
cc = self._column_collection(
columns=[("c1", c1), ("foo", c2), ("c3", c3)]
)
eq_(cc.keys(), ["c1", "foo", "c3"])
c4 = sql.column("c3")
cc.replace(c4)
eq_(cc.keys(), ["c1", "foo", "c3"])
self._assert_collection_integrity(cc)
def test_dupes_add_dedupe(self):
cc = DedupeColumnCollection()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc.add(c1)
cc.add(c2a)
cc.add(c3)
cc.add(c2b)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
self._assert_collection_integrity(cc)
def test_dupes_construct_dedupe(self):
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc = DedupeColumnCollection(
columns=[("c1", c1), ("c2", c2a), ("c3", c3), ("c2", c2b)]
)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
self._assert_collection_integrity(cc)
def test_identical_dupe_add_dedupes(self):
cc = DedupeColumnCollection()
c1, c2, c3 = (column("c1"), column("c2"), column("c3"))
cc.add(c1)
cc.add(c2)
cc.add(c3)
cc.add(c2)
eq_(cc._all_columns, [c1, c2, c3])
# for iter, c2a is replaced by c2b, ordering
# is maintained in that way. ideally, iter would be
# the same as the "_all_columns" collection.
eq_(list(cc), [c1, c2, c3])
assert cc.contains_column(c2)
self._assert_collection_integrity(cc)
ci = cc.as_readonly()
eq_(ci._all_columns, [c1, c2, c3])
eq_(list(ci), [c1, c2, c3])
def test_identical_dupe_construct_dedupes(self):
c1, c2, c3 = (column("c1"), column("c2"), column("c3"))
cc = DedupeColumnCollection(
columns=[("c1", c1), ("c2", c2), ("c3", c3), ("c2", c2)]
)
eq_(cc._all_columns, [c1, c2, c3])
# for iter, c2a is replaced by c2b, ordering
# is maintained in that way. ideally, iter would be
# the same as the "_all_columns" collection.
eq_(list(cc), [c1, c2, c3])
assert cc.contains_column(c2)
self._assert_collection_integrity(cc)
ci = cc.as_readonly()
eq_(ci._all_columns, [c1, c2, c3])
eq_(list(ci), [c1, c2, c3])
def test_replace(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc.add(c1)
cc.add(c2a)
cc.add(c3)
cc.replace(c2b)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
is_(cc[1], c2b)
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
self._assert_collection_integrity(cc)
eq_(ci._all_columns, [c1, c2b, c3])
eq_(list(ci), [c1, c2b, c3])
is_(ci[1], c2b)
def test_replace_key_matches_name_of_another(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c4"),
)
c2b.key = "c2"
cc.add(c1)
cc.add(c2a)
cc.add(c3)
cc.replace(c2b)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
is_(cc[1], c2b)
self._assert_collection_integrity(cc)
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
eq_(ci._all_columns, [c1, c2b, c3])
eq_(list(ci), [c1, c2b, c3])
is_(ci[1], c2b)
def test_replace_key_matches(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("X"),
)
c2b.key = "c2"
cc.add(c1)
cc.add(c2a)
cc.add(c3)
cc.replace(c2b)
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
is_(cc[1], c2b)
assert_raises(IndexError, lambda: cc[3])
self._assert_collection_integrity(cc)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
eq_(ci._all_columns, [c1, c2b, c3])
eq_(list(ci), [c1, c2b, c3])
is_(ci[1], c2b)
assert_raises(IndexError, lambda: ci[3])
def test_replace_name_matches(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
c2b.key = "X"
cc.add(c1)
cc.add(c2a)
cc.add(c3)
cc.replace(c2b)
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
eq_(len(cc), 3)
is_(cc[1], c2b)
self._assert_collection_integrity(cc)
eq_(ci._all_columns, [c1, c2b, c3])
eq_(list(ci), [c1, c2b, c3])
eq_(len(ci), 3)
is_(ci[1], c2b)
def test_replace_no_match(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2, c3, c4 = column("c1"), column("c2"), column("c3"), column("c4")
c4.key = "X"
cc.add(c1)
cc.add(c2)
cc.add(c3)
cc.replace(c4)
assert cc.contains_column(c2)
assert cc.contains_column(c4)
eq_(cc._all_columns, [c1, c2, c3, c4])
eq_(list(cc), [c1, c2, c3, c4])
is_(cc[3], c4)
self._assert_collection_integrity(cc)
eq_(ci._all_columns, [c1, c2, c3, c4])
eq_(list(ci), [c1, c2, c3, c4])
is_(ci[3], c4)
def test_replace_switch_key_name(self):
c1 = column("id")
c2 = column("street")
c3 = column("user_id")
cc = DedupeColumnCollection(
columns=[("id", c1), ("street", c2), ("user_id", c3)]
)
# for replace col with different key than name, it necessarily
# removes two columns
c4 = column("id")
c4.key = "street"
cc.replace(c4)
eq_(list(cc), [c4, c3])
self._assert_collection_integrity(cc)
def test_remove(self):
c1, c2, c3 = column("c1"), column("c2"), column("c3")
cc = DedupeColumnCollection(
columns=[("c1", c1), ("c2", c2), ("c3", c3)]
)
ci = cc.as_readonly()
eq_(cc._all_columns, [c1, c2, c3])
eq_(list(cc), [c1, c2, c3])
assert cc.contains_column(c2)
assert "c2" in cc
eq_(ci._all_columns, [c1, c2, c3])
eq_(list(ci), [c1, c2, c3])
assert ci.contains_column(c2)
assert "c2" in ci
cc.remove(c2)
eq_(cc._all_columns, [c1, c3])
eq_(list(cc), [c1, c3])
is_(cc[0], c1)
is_(cc[1], c3)
assert not cc.contains_column(c2)
assert "c2" not in cc
self._assert_collection_integrity(cc)
eq_(ci._all_columns, [c1, c3])
eq_(list(ci), [c1, c3])
is_(ci[0], c1)
is_(ci[1], c3)
assert not ci.contains_column(c2)
assert "c2" not in ci
assert_raises(IndexError, lambda: ci[2])
def test_remove_doesnt_change_iteration(self):
c1, c2, c3, c4, c5 = (
column("c1"),
column("c2"),
column("c3"),
column("c4"),
column("c5"),
)
cc = DedupeColumnCollection(
columns=[
("c1", c1),
("c2", c2),
("c3", c3),
("c4", c4),
("c5", c5),
]
)
for col in cc:
if col.name not in ["c1", "c2"]:
cc.remove(col)
eq_(cc.keys(), ["c1", "c2"])
eq_([c.name for c in cc], ["c1", "c2"])
self._assert_collection_integrity(cc)
def test_dupes_extend(self):
cc = DedupeColumnCollection()
ci = cc.as_readonly()
c1, c2a, c3, c2b = (
column("c1"),
column("c2"),
column("c3"),
column("c2"),
)
cc.add(c1)
cc.add(c2a)
cc.extend([c3, c2b]) # this should remove c2a
eq_(cc._all_columns, [c1, c2b, c3])
eq_(list(cc), [c1, c2b, c3])
is_(cc[1], c2b)
is_(cc[2], c3)
assert_raises(IndexError, lambda: cc[3])
self._assert_collection_integrity(cc)
assert not cc.contains_column(c2a)
assert cc.contains_column(c2b)
eq_(ci._all_columns, [c1, c2b, c3])
eq_(list(ci), [c1, c2b, c3])
is_(ci[1], c2b)
is_(ci[2], c3)
assert_raises(IndexError, lambda: ci[3])
assert not ci.contains_column(c2a)
assert ci.contains_column(c2b)
def test_extend_existing_maintains_ordering(self):
cc = DedupeColumnCollection()
c1, c2, c3, c4, c5 = (
column("c1"),
column("c2"),
column("c3"),
column("c4"),
column("c5"),
)
cc.extend([c1, c2])
eq_(cc._all_columns, [c1, c2])
self._assert_collection_integrity(cc)
cc.extend([c3])
eq_(cc._all_columns, [c1, c2, c3])
self._assert_collection_integrity(cc)
cc.extend([c4, c2, c5])
eq_(cc._all_columns, [c1, c2, c3, c4, c5])
self._assert_collection_integrity(cc)
class LRUTest(fixtures.TestBase):
def test_lru(self):
class item:
def __init__(self, id_):
self.id = id_
def __str__(self):
return "item id %d" % self.id
lru = util.LRUCache(10, threshold=0.2)
for id_ in range(1, 20):
lru[id_] = item(id_)
# first couple of items should be gone
assert 1 not in lru
assert 2 not in lru
# next batch over the threshold of 10 should be present
for id_ in range(11, 20):
assert id_ in lru
lru[12]
lru[15]
lru[23] = item(23)
lru[24] = item(24)
lru[25] = item(25)
lru[26] = item(26)
lru[27] = item(27)
assert 11 not in lru
assert 13 not in lru
for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15):
assert id_ in lru
lru[25]
i2 = item(25)
lru[25] = i2
assert 25 in lru
assert lru[25] is i2
class ImmutableSubclass(str):
pass
class FlattenIteratorTest(fixtures.TestBase):
def test_flatten(self):
assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7, 8])) == [
1,
2,
3,
4,
5,
6,
7,
8,
]
def test_str_with_iter(self):
"""ensure that a str object with an __iter__ method (like in
PyPy) is not interpreted as an iterable.
"""
class IterString(str):
def __iter__(self):
return iter(self + "")
iter_list = [IterString("asdf"), [IterString("x"), IterString("y")]]
assert list(util.flatten_iterator(iter_list)) == ["asdf", "x", "y"]
class HashOverride:
def __init__(self, value=None):
self.value = value
def __hash__(self):
return hash(self.value)
class NoHash:
def __init__(self, value=None):
self.value = value
__hash__ = None
class EqOverride:
def __init__(self, value=None):
self.value = value
__hash__ = object.__hash__
def __eq__(self, other):
if isinstance(other, EqOverride):
return self.value == other.value
else:
return False
def __ne__(self, other):
if isinstance(other, EqOverride):
return self.value != other.value
else:
return True
class HashEqOverride:
def __init__(self, value=None):
self.value = value
def __hash__(self):
return hash(self.value)
def __eq__(self, other):
if isinstance(other, EqOverride):
return self.value == other.value
else:
return False
def __ne__(self, other):
if isinstance(other, EqOverride):
return self.value != other.value
else:
return True
class MiscTest(fixtures.TestBase):
@testing.combinations(
(["one", "two", "three"], True),
(("one", "two", "three"), True),
((), True),
("four", False),
(252, False),
(Decimal("252"), False),
(b"four", False),
(iter("four"), True),
(b"", False),
("", False),
(None, False),
({"dict": "value"}, True),
({}, True),
({"set", "two"}, True),
(set(), True),
(util.immutabledict(), True),
(util.immutabledict({"key": "value"}), True),
)
def test_non_string_iterable_check(self, fixture, expected):
is_(is_non_string_iterable(fixture), expected)
class IdentitySetTest(fixtures.TestBase):
obj_type = object
def assert_eq(self, identityset, expected_iterable):
expected = sorted([id(o) for o in expected_iterable])
found = sorted([id(o) for o in identityset])
eq_(found, expected)
def test_init(self):
ids = util.IdentitySet([1, 2, 3, 2, 1])
self.assert_eq(ids, [1, 2, 3])
ids = util.IdentitySet(ids)
self.assert_eq(ids, [1, 2, 3])
ids = util.IdentitySet()
self.assert_eq(ids, [])
ids = util.IdentitySet([])
self.assert_eq(ids, [])
ids = util.IdentitySet(ids)
self.assert_eq(ids, [])
def test_add(self):
for type_ in (object, ImmutableSubclass):
data = [type_(), type_()]
ids = util.IdentitySet()
for i in list(range(2)) + list(range(2)):
ids.add(data[i])
self.assert_eq(ids, data)
for type_ in (NoHash, EqOverride, HashOverride, HashEqOverride):
data = [type_(1), type_(1), type_(2)]
ids = util.IdentitySet()
for i in list(range(3)) + list(range(3)):
ids.add(data[i])
self.assert_eq(ids, data)
def test_dunder_sub2(self):
IdentitySet = util.IdentitySet
o1, o2, o3 = self.obj_type(), self.obj_type(), self.obj_type()
ids1 = IdentitySet([o1])
ids2 = IdentitySet([o1, o2, o3])
eq_(ids2 - ids1, IdentitySet([o2, o3]))
ids2 -= ids1
eq_(ids2, IdentitySet([o2, o3]))
def test_dunder_eq(self):
_, _, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(twin1 == twin2, True)
eq_(unique1 == unique2, False)
# not an IdentitySet
not_an_identity_set = object()
eq_(unique1 == not_an_identity_set, False)
def test_dunder_ne(self):
_, _, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(twin1 != twin2, False)
eq_(unique1 != unique2, True)
# not an IdentitySet
not_an_identity_set = object()
eq_(unique1 != not_an_identity_set, True)
def test_dunder_le(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_ <= super_, True)
eq_(super_ <= sub_, False)
# the same sets
eq_(twin1 <= twin2, True)
eq_(twin2 <= twin1, True)
# totally different sets
eq_(unique1 <= unique2, False)
eq_(unique2 <= unique1, False)
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 <= not_an_identity_set
self._assert_unorderable_types(should_raise)
def test_dunder_lt(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_ < super_, True)
eq_(super_ < sub_, False)
# the same sets
eq_(twin1 < twin2, False)
eq_(twin2 < twin1, False)
# totally different sets
eq_(unique1 < unique2, False)
eq_(unique2 < unique1, False)
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 < not_an_identity_set
self._assert_unorderable_types(should_raise)
def test_dunder_ge(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_ >= super_, False)
eq_(super_ >= sub_, True)
# the same sets
eq_(twin1 >= twin2, True)
eq_(twin2 >= twin1, True)
# totally different sets
eq_(unique1 >= unique2, False)
eq_(unique2 >= unique1, False)
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 >= not_an_identity_set
self._assert_unorderable_types(should_raise)
def test_dunder_gt(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_ > super_, False)
eq_(super_ > sub_, True)
# the same sets
eq_(twin1 > twin2, False)
eq_(twin2 > twin1, False)
# totally different sets
eq_(unique1 > unique2, False)
eq_(unique2 > unique1, False)
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 > not_an_identity_set
self._assert_unorderable_types(should_raise)
def test_issubset(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_.issubset(super_), True)
eq_(super_.issubset(sub_), False)
# the same sets
eq_(twin1.issubset(twin2), True)
eq_(twin2.issubset(twin1), True)
# totally different sets
eq_(unique1.issubset(unique2), False)
eq_(unique2.issubset(unique1), False)
# not an IdentitySet
not_an_identity_set = object()
assert_raises(TypeError, unique1.issubset, not_an_identity_set)
def test_issuperset(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_.issuperset(super_), False)
eq_(super_.issuperset(sub_), True)
# the same sets
eq_(twin1.issuperset(twin2), True)
eq_(twin2.issuperset(twin1), True)
# totally different sets
eq_(unique1.issuperset(unique2), False)
eq_(unique2.issuperset(unique1), False)
# not an IdentitySet
not_an_identity_set = object()
assert_raises(TypeError, unique1.issuperset, not_an_identity_set)
def test_union(self):
super_, sub_, twin1, twin2, _, _ = self._create_sets()
# basic set math
eq_(sub_.union(super_), super_)
eq_(super_.union(sub_), super_)
# the same sets
eq_(twin1.union(twin2), twin1)
eq_(twin2.union(twin1), twin1)
# empty sets
empty = util.IdentitySet([])
eq_(empty.union(empty), empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1.union(unique2), util.IdentitySet([1, 2]))
# not an IdentitySet
not_an_identity_set = object()
assert_raises(TypeError, unique1.union, not_an_identity_set)
def test_dunder_or(self):
super_, sub_, twin1, twin2, _, _ = self._create_sets()
# basic set math
eq_(sub_ | super_, super_)
eq_(super_ | sub_, super_)
# the same sets
eq_(twin1 | twin2, twin1)
eq_(twin2 | twin1, twin1)
# empty sets
empty = util.IdentitySet([])
eq_(empty | empty, empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1 | unique2, util.IdentitySet([1, 2]))
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 | not_an_identity_set
assert_raises(TypeError, should_raise)
def test_update(self):
pass # TODO
def test_dunder_ior(self):
super_, sub_, _, _, _, _ = self._create_sets()
# basic set math
sub_ |= super_
eq_(sub_, super_)
super_ |= sub_
eq_(super_, super_)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
unique1 |= unique2
eq_(unique1, util.IdentitySet([1, 2]))
eq_(unique2, util.IdentitySet([2]))
# not an IdentitySet
def should_raise():
unique = util.IdentitySet([1])
not_an_identity_set = object()
unique |= not_an_identity_set
assert_raises(TypeError, should_raise)
def test_difference(self):
_, _, twin1, twin2, _, _ = self._create_sets()
# basic set math
set1 = util.IdentitySet([1, 2, 3])
set2 = util.IdentitySet([2, 3, 4])
eq_(set1.difference(set2), util.IdentitySet([1]))
eq_(set2.difference(set1), util.IdentitySet([4]))
# empty sets
empty = util.IdentitySet([])
eq_(empty.difference(empty), empty)
# the same sets
eq_(twin1.difference(twin2), empty)
eq_(twin2.difference(twin1), empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1.difference(unique2), util.IdentitySet([1]))
eq_(unique2.difference(unique1), util.IdentitySet([2]))
# not an IdentitySet
not_an_identity_set = object()
assert_raises(TypeError, unique1.difference, not_an_identity_set)
def test_dunder_sub(self):
_, _, twin1, twin2, _, _ = self._create_sets()
# basic set math
set1 = util.IdentitySet([1, 2, 3])
set2 = util.IdentitySet([2, 3, 4])
eq_(set1 - set2, util.IdentitySet([1]))
eq_(set2 - set1, util.IdentitySet([4]))
# empty sets
empty = util.IdentitySet([])
eq_(empty - empty, empty)
# the same sets
eq_(twin1 - twin2, empty)
eq_(twin2 - twin1, empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1 - unique2, util.IdentitySet([1]))
eq_(unique2 - unique1, util.IdentitySet([2]))
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
unique1 - not_an_identity_set
assert_raises(TypeError, should_raise)
def test_difference_update(self):
pass # TODO
def test_dunder_isub(self):
pass # TODO
def test_intersection(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_.intersection(super_), sub_)
eq_(super_.intersection(sub_), sub_)
# the same sets
eq_(twin1.intersection(twin2), twin1)
eq_(twin2.intersection(twin1), twin1)
# empty sets
empty = util.IdentitySet([])
eq_(empty.intersection(empty), empty)
# totally different sets
eq_(unique1.intersection(unique2), empty)
# not an IdentitySet
not_an_identity_set = object()
assert_raises(TypeError, unique1.intersection, not_an_identity_set)
def test_dunder_and(self):
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
# basic set math
eq_(sub_ & super_, sub_)
eq_(super_ & sub_, sub_)
# the same sets
eq_(twin1 & twin2, twin1)
eq_(twin2 & twin1, twin1)
# empty sets
empty = util.IdentitySet([])
eq_(empty & empty, empty)
# totally different sets
eq_(unique1 & unique2, empty)
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 & not_an_identity_set
assert_raises(TypeError, should_raise)
def test_intersection_update(self):
pass # TODO
def test_dunder_iand(self):
pass # TODO
def test_symmetric_difference(self):
_, _, twin1, twin2, _, _ = self._create_sets()
# basic set math
set1 = util.IdentitySet([1, 2, 3])
set2 = util.IdentitySet([2, 3, 4])
eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4]))
eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4]))
# empty sets
empty = util.IdentitySet([])
eq_(empty.symmetric_difference(empty), empty)
# the same sets
eq_(twin1.symmetric_difference(twin2), empty)
eq_(twin2.symmetric_difference(twin1), empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2]))
eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2]))
# not an IdentitySet
not_an_identity_set = object()
assert_raises(
TypeError, unique1.symmetric_difference, not_an_identity_set
)
def test_dunder_xor(self):
_, _, twin1, twin2, _, _ = self._create_sets()
# basic set math
set1 = util.IdentitySet([1, 2, 3])
set2 = util.IdentitySet([2, 3, 4])
eq_(set1 ^ set2, util.IdentitySet([1, 4]))
eq_(set2 ^ set1, util.IdentitySet([1, 4]))
# empty sets
empty = util.IdentitySet([])
eq_(empty ^ empty, empty)
# the same sets
eq_(twin1 ^ twin2, empty)
eq_(twin2 ^ twin1, empty)
# totally different sets
unique1 = util.IdentitySet([1])
unique2 = util.IdentitySet([2])
eq_(unique1 ^ unique2, util.IdentitySet([1, 2]))
eq_(unique2 ^ unique1, util.IdentitySet([1, 2]))
# not an IdentitySet
def should_raise():
not_an_identity_set = object()
return unique1 ^ not_an_identity_set
assert_raises(TypeError, should_raise)
def test_symmetric_difference_update(self):
pass # TODO
def _create_sets(self):
o1, o2, o3, o4, o5 = (
self.obj_type(),
self.obj_type(),
self.obj_type(),
self.obj_type(),
self.obj_type(),
)
super_ = util.IdentitySet([o1, o2, o3])
sub_ = util.IdentitySet([o2])
twin1 = util.IdentitySet([o3])
twin2 = util.IdentitySet([o3])
unique1 = util.IdentitySet([o4])
unique2 = util.IdentitySet([o5])
return super_, sub_, twin1, twin2, unique1, unique2
def _assert_unorderable_types(self, callable_):
assert_raises_message(
TypeError, "not supported between instances of", callable_
)
def test_basic_sanity(self):
IdentitySet = util.IdentitySet
o1, o2, o3 = self.obj_type(), self.obj_type(), self.obj_type()
ids = IdentitySet([o1])
ids.discard(o1)
ids.discard(o1)
ids.add(o1)
ids.remove(o1)
assert_raises(KeyError, ids.remove, o1)
eq_(ids.copy(), ids)
# explicit __eq__ and __ne__ tests
assert ids != None # noqa
assert not (ids == None) # noqa
ne_(ids, IdentitySet([o1, o2, o3]))
ids.clear()
assert o1 not in ids
ids.add(o2)
assert o2 in ids
eq_(ids.pop(), o2)
ids.add(o1)
eq_(len(ids), 1)
isuper = IdentitySet([o1, o2])
assert ids < isuper
assert ids.issubset(isuper)
assert isuper.issuperset(ids)
assert isuper > ids
eq_(ids.union(isuper), isuper)
eq_(ids | isuper, isuper)
eq_(isuper - ids, IdentitySet([o2]))
eq_(isuper.difference(ids), IdentitySet([o2]))
eq_(ids.intersection(isuper), IdentitySet([o1]))
eq_(ids & isuper, IdentitySet([o1]))
eq_(ids.symmetric_difference(isuper), IdentitySet([o2]))
eq_(ids ^ isuper, IdentitySet([o2]))
ids.update(isuper)
ids |= isuper
ids.difference_update(isuper)
ids -= isuper
ids.intersection_update(isuper)
ids &= isuper
ids.symmetric_difference_update(isuper)
ids ^= isuper
ids.update("foobar")
try:
ids |= "foobar"
assert False
except TypeError:
assert True
try:
s = {o1, o2}
s |= ids
assert False
except TypeError:
assert True
assert_raises(TypeError, util.cmp, ids)
assert_raises(TypeError, hash, ids)
def test_repr(self):
i = util.IdentitySet([])
eq_(str(i), "IdentitySet([])")
i = util.IdentitySet([1, 2, 3])
eq_(str(i), "IdentitySet([1, 2, 3])")
class NoHashIdentitySetTest(IdentitySetTest):
obj_type = NoHash
class OrderedIdentitySetTest(fixtures.TestBase):
def assert_eq(self, identityset, expected_iterable):
expected = [id(o) for o in expected_iterable]
found = [id(o) for o in identityset]
eq_(found, expected)
def test_add(self):
elem = object
s = util.OrderedIdentitySet()
s.add(elem())
s.add(elem())
def test_intersection(self):
elem = object
eq_ = self.assert_eq
a, b, c, d, e, f, g = (
elem(),
elem(),
elem(),
elem(),
elem(),
elem(),
elem(),
)
s1 = util.OrderedIdentitySet([a, b, c])
s2 = util.OrderedIdentitySet([d, e, f])
s3 = util.OrderedIdentitySet([a, d, f, g])
eq_(s1.intersection(s2), [])
eq_(s1.intersection(s3), [a])
eq_(s1.union(s2).intersection(s3), [a, d, f])
class DictlikeIteritemsTest(fixtures.TestBase):
baseline = {("a", 1), ("b", 2), ("c", 3)}
def _ok(self, instance):
iterator = util.dictlike_iteritems(instance)
eq_(set(iterator), self.baseline)
def _notok(self, instance):
assert_raises(TypeError, util.dictlike_iteritems, instance)
def test_dict(self):
d = dict(a=1, b=2, c=3)
self._ok(d)
def test_subdict(self):
class subdict(dict):
pass
d = subdict(a=1, b=2, c=3)
self._ok(d)
def test_object(self):
self._notok(object())
def test_duck_2(self):
class duck2:
def items(duck):
return list(self.baseline)
self._ok(duck2())
def test_duck_4(self):
class duck4:
def iterkeys(duck):
return iter(["a", "b", "c"])
self._notok(duck4())
def test_duck_5(self):
class duck5:
def keys(duck):
return ["a", "b", "c"]
def get(duck, key):
return dict(a=1, b=2, c=3).get(key)
self._ok(duck5())
def test_duck_6(self):
class duck6:
def keys(duck):
return ["a", "b", "c"]
self._notok(duck6())
class DuckTypeCollectionTest(fixtures.TestBase):
def test_sets(self):
class SetLike:
def add(self):
pass
class ForcedSet(list):
__emulates__ = set
for type_ in (set, SetLike, ForcedSet):
eq_(util.duck_type_collection(type_), set)
instance = type_()
eq_(util.duck_type_collection(instance), set)
for type_ in (frozenset,):
is_(util.duck_type_collection(type_), None)
instance = type_()
is_(util.duck_type_collection(instance), None)
class ArgInspectionTest(fixtures.TestBase):
def test_get_cls_kwargs(self):
class A:
def __init__(self, a):
pass
class A1(A):
def __init__(self, a1):
pass
class A11(A1):
def __init__(self, a11, **kw):
pass
class B:
def __init__(self, b, **kw):
pass
class B1(B):
def __init__(self, b1, **kw):
pass
class B2(B):
def __init__(self, b2):
pass
class AB(A, B):
def __init__(self, ab):
pass
class BA(B, A):
def __init__(self, ba, **kwargs):
pass
class BA1(BA):
pass
class CAB(A, B):
pass
class CBA(B, A):
pass
class CB1A1(B1, A1):
pass
class CAB1(A, B1):
pass
class CB1A(B1, A):
pass
class CB2A(B2, A):
pass
class D:
pass
class BA2(B, A):
pass
class A11B1(A11, B1):
pass
def test(cls, *expected):
eq_(set(util.get_cls_kwargs(cls)), set(expected))
test(A, "a")
test(A1, "a1")
test(A11, "a11", "a1")
test(B, "b")
test(B1, "b1", "b")
test(AB, "ab")
test(BA, "ba", "b", "a")
test(BA1, "ba", "b", "a")
test(CAB, "a")
test(CBA, "b", "a")
test(CAB1, "a")
test(CB1A, "b1", "b", "a")
test(CB2A, "b2")
test(CB1A1, "a1", "b1", "b")
test(D)
test(BA2, "a", "b")
test(A11B1, "a1", "a11", "b", "b1")
def test_get_func_kwargs(self):
def f1():
pass
def f2(foo):
pass
def f3(*foo):
pass
def f4(**foo):
pass
def test(fn, *expected):
eq_(set(util.get_func_kwargs(fn)), set(expected))
test(f1)
test(f2, "foo")
test(f3)
test(f4)
def test_callable_argspec_fn(self):
def foo(x, y, **kw):
pass
eq_(
get_callable_argspec(foo),
compat.FullArgSpec(["x", "y"], None, "kw", None, [], None, {}),
)
def test_callable_argspec_fn_no_self(self):
def foo(x, y, **kw):
pass
eq_(
get_callable_argspec(foo, no_self=True),
compat.FullArgSpec(["x", "y"], None, "kw", None, [], None, {}),
)
def test_callable_argspec_fn_no_self_but_self(self):
def foo(self, x, y, **kw):
pass
eq_(
get_callable_argspec(foo, no_self=True),
compat.FullArgSpec(
["self", "x", "y"], None, "kw", None, [], None, {}
),
)
@testing.requires.cpython
def test_callable_argspec_py_builtin(self):
import datetime
assert_raises(TypeError, get_callable_argspec, datetime.datetime.now)
@testing.requires.cpython
def test_callable_argspec_obj_init(self):
assert_raises(TypeError, get_callable_argspec, object)
def test_callable_argspec_method(self):
class Foo:
def foo(self, x, y, **kw):
pass
eq_(
get_callable_argspec(Foo.foo),
compat.FullArgSpec(
["self", "x", "y"], None, "kw", None, [], None, {}
),
)
def test_callable_argspec_instance_method_no_self(self):
class Foo:
def foo(self, x, y, **kw):
pass
eq_(
get_callable_argspec(Foo().foo, no_self=True),
compat.FullArgSpec(["x", "y"], None, "kw", None, [], None, {}),
)
def test_callable_argspec_unbound_method_no_self(self):
class Foo:
def foo(self, x, y, **kw):
pass
eq_(
get_callable_argspec(Foo.foo, no_self=True),
compat.FullArgSpec(
["self", "x", "y"], None, "kw", None, [], None, {}
),
)
def test_callable_argspec_init(self):
class Foo:
def __init__(self, x, y):
pass
eq_(
get_callable_argspec(Foo),
compat.FullArgSpec(
["self", "x", "y"], None, None, None, [], None, {}
),
)
def test_callable_argspec_init_no_self(self):
class Foo:
def __init__(self, x, y):
pass
eq_(
get_callable_argspec(Foo, no_self=True),
compat.FullArgSpec(["x", "y"], None, None, None, [], None, {}),
)
def test_callable_argspec_call(self):
class Foo:
def __call__(self, x, y):
pass
eq_(
get_callable_argspec(Foo()),
compat.FullArgSpec(
["self", "x", "y"], None, None, None, [], None, {}
),
)
def test_callable_argspec_call_no_self(self):
class Foo:
def __call__(self, x, y):
pass
eq_(
get_callable_argspec(Foo(), no_self=True),
compat.FullArgSpec(["x", "y"], None, None, None, [], None, {}),
)
@testing.requires.cpython
def test_callable_argspec_partial(self):
from functools import partial
def foo(x, y, z, **kw):
pass
bar = partial(foo, 5)
assert_raises(TypeError, get_callable_argspec, bar)
def test_getargspec_6_tuple(self):
def foo(x, y, z, **kw):
pass
spec = compat.inspect_getfullargspec(foo)
eq_(
spec,
compat.FullArgSpec(
args=["x", "y", "z"],
varargs=None,
varkw="kw",
defaults=None,
kwonlyargs=[],
kwonlydefaults=None,
annotations={},
),
)
class SymbolTest(fixtures.TestBase):
def test_basic(self):
sym1 = util.symbol("foo")
assert sym1.name == "foo"
sym2 = util.symbol("foo")
assert sym1 is sym2
assert sym1 == sym2
sym3 = util.symbol("bar")
assert sym1 is not sym3
assert sym1 != sym3
def test_fast_int_flag(self):
class Enum(FastIntFlag):
fi_sym1 = 1
fi_sym2 = 2
fi_sym3 = 3
assert Enum.fi_sym1 is not Enum.fi_sym3
assert Enum.fi_sym1 != Enum.fi_sym3
assert Enum.fi_sym1.name == "fi_sym1"
# modified for #8783
eq_(
list(Enum.__members__.values()),
[Enum.fi_sym1, Enum.fi_sym2, Enum.fi_sym3],
)
def test_fast_int_flag_still_global(self):
"""FastIntFlag still causes elements to be global symbols.
This is to support pickling. There are likely other ways to
achieve this, however this is what we have for now.
"""
class Enum1(FastIntFlag):
fi_sym1 = 1
fi_sym2 = 2
class Enum2(FastIntFlag):
fi_sym1 = 1
fi_sym2 = 2
# they are global
assert Enum1.fi_sym1 is Enum2.fi_sym1
def test_fast_int_flag_dont_allow_conflicts(self):
"""FastIntFlag still causes elements to be global symbols.
While we do this and haven't yet changed it, make sure conflicting
int values for the same name don't come in.
"""
class Enum1(FastIntFlag):
fi_sym1 = 1
fi_sym2 = 2
with expect_raises_message(
TypeError,
"Can't replace canonical symbol for 'fi_sym1' "
"with new int value 2",
):
class Enum2(FastIntFlag):
fi_sym1 = 2
fi_sym2 = 3
@testing.combinations("native", "ours", argnames="native")
def test_compare_to_native_py_intflag(self, native):
"""monitor IntFlag behavior in upstream Python for #8783"""
if native == "native":
from enum import IntFlag
else:
from sqlalchemy.util import FastIntFlag as IntFlag
class Enum(IntFlag):
fi_sym1 = 1
fi_sym2 = 2
fi_sym4 = 4
fi_sym1plus2 = 3
# not an alias because there's no 16
fi_sym17 = 17
sym1, sym2, sym4, sym1plus2, sym17 = Enum.__members__.values()
eq_(
[sym1, sym2, sym4, sym1plus2, sym17],
[
Enum.fi_sym1,
Enum.fi_sym2,
Enum.fi_sym4,
Enum.fi_sym1plus2,
Enum.fi_sym17,
],
)
def test_pickle(self):
sym1 = util.symbol("foo")
sym2 = util.symbol("foo")
assert sym1 is sym2
# default
s = pickle.dumps(sym1)
pickle.loads(s)
for _, dumper in picklers():
serial = dumper(sym1)
rt = pickle.loads(serial)
assert rt is sym1
assert rt is sym2
def test_bitflags(self):
sym1 = util.symbol("sym1", canonical=1)
sym2 = util.symbol("sym2", canonical=2)
assert sym1 & sym1
assert not sym1 & sym2
assert not sym1 & sym1 & sym2
def test_composites(self):
sym1 = util.symbol("sym1", canonical=1)
sym2 = util.symbol("sym2", canonical=2)
sym3 = util.symbol("sym3", canonical=4)
sym4 = util.symbol("sym4", canonical=8)
assert sym1 & (sym2 | sym1 | sym4)
assert not sym1 & (sym2 | sym3)
assert not (sym1 | sym2) & (sym3 | sym4)
assert (sym1 | sym2) & (sym2 | sym4)
def test_fast_int_flag_no_more_iter(self):
"""test #8783"""
class MyEnum(FastIntFlag):
sym1 = 1
sym2 = 2
sym3 = 4
sym4 = 8
with expect_raises_message(
NotImplementedError, "iter not implemented to ensure compatibility"
):
list(MyEnum)
def test_parser(self):
class MyEnum(FastIntFlag):
sym1 = 1
sym2 = 2
sym3 = 4
sym4 = 8
sym1, sym2, sym3, sym4 = tuple(MyEnum.__members__.values())
lookup_one = {sym1: [], sym2: [True], sym3: [False], sym4: [None]}
lookup_two = {sym1: [], sym2: [True], sym3: [False]}
lookup_three = {sym1: [], sym2: ["symbol2"], sym3: []}
is_(
langhelpers.parse_user_argument_for_enum(
"sym2", lookup_one, "some_name", resolve_symbol_names=True
),
sym2,
)
assert_raises_message(
exc.ArgumentError,
"Invalid value for 'some_name': 'sym2'",
langhelpers.parse_user_argument_for_enum,
"sym2",
lookup_one,
"some_name",
)
is_(
langhelpers.parse_user_argument_for_enum(
True, lookup_one, "some_name", resolve_symbol_names=False
),
sym2,
)
is_(
langhelpers.parse_user_argument_for_enum(
sym2, lookup_one, "some_name"
),
sym2,
)
is_(
langhelpers.parse_user_argument_for_enum(
None, lookup_one, "some_name"
),
sym4,
)
is_(
langhelpers.parse_user_argument_for_enum(
None, lookup_two, "some_name"
),
None,
)
is_(
langhelpers.parse_user_argument_for_enum(
"symbol2", lookup_three, "some_name"
),
sym2,
)
assert_raises_message(
exc.ArgumentError,
"Invalid value for 'some_name': 'foo'",
langhelpers.parse_user_argument_for_enum,
"foo",
lookup_three,
"some_name",
)
class _Py3KFixtures:
def _kw_only_fixture(self, a, *, b, c):
pass
def _kw_plus_posn_fixture(self, a, *args, b, c):
pass
def _kw_opt_fixture(self, a, *, b, c="c"):
pass
def _ret_annotation_fixture(self, a, b) -> int:
return 1
py3k_fixtures = _Py3KFixtures()
class TestFormatArgspec(_Py3KFixtures, fixtures.TestBase):
@testing.combinations(
(
lambda: None,
{
"grouped_args": "()",
"self_arg": None,
"apply_kw": "()",
"apply_pos": "()",
"apply_pos_proxied": "()",
"apply_kw_proxied": "()",
},
True,
),
(
lambda: None,
{
"grouped_args": "()",
"self_arg": None,
"apply_kw": "",
"apply_pos": "",
"apply_pos_proxied": "",
"apply_kw_proxied": "",
},
False,
),
(
lambda self: None,
{
"grouped_args": "(self)",
"self_arg": "self",
"apply_kw": "(self)",
"apply_pos": "(self)",
"apply_pos_proxied": "()",
"apply_kw_proxied": "()",
},
True,
),
(
lambda self: None,
{
"grouped_args": "(self)",
"self_arg": "self",
"apply_kw": "self",
"apply_pos": "self",
"apply_pos_proxied": "",
"apply_kw_proxied": "",
},
False,
),
(
lambda *a: None,
{
"grouped_args": "(*a)",
"self_arg": "a[0]",
"apply_kw": "(*a)",
"apply_pos": "(*a)",
"apply_pos_proxied": "(*a)",
"apply_kw_proxied": "(*a)",
},
True,
),
(
lambda **kw: None,
{
"grouped_args": "(**kw)",
"self_arg": None,
"apply_kw": "(**kw)",
"apply_pos": "(**kw)",
"apply_pos_proxied": "(**kw)",
"apply_kw_proxied": "(**kw)",
},
True,
),
(
lambda *a, **kw: None,
{
"grouped_args": "(*a, **kw)",
"self_arg": "a[0]",
"apply_kw": "(*a, **kw)",
"apply_pos": "(*a, **kw)",
"apply_pos_proxied": "(*a, **kw)",
"apply_kw_proxied": "(*a, **kw)",
},
True,
),
(
lambda a, *b: None,
{
"grouped_args": "(a, *b)",
"self_arg": "a",
"apply_kw": "(a, *b)",
"apply_pos": "(a, *b)",
"apply_pos_proxied": "(*b)",
"apply_kw_proxied": "(*b)",
},
True,
),
(
lambda a, **b: None,
{
"grouped_args": "(a, **b)",
"self_arg": "a",
"apply_kw": "(a, **b)",
"apply_pos": "(a, **b)",
"apply_pos_proxied": "(**b)",
"apply_kw_proxied": "(**b)",
},
True,
),
(
lambda a, *b, **c: None,
{
"grouped_args": "(a, *b, **c)",
"self_arg": "a",
"apply_kw": "(a, *b, **c)",
"apply_pos": "(a, *b, **c)",
"apply_pos_proxied": "(*b, **c)",
"apply_kw_proxied": "(*b, **c)",
},
True,
),
(
lambda a, b=1, **c: None,
{
"grouped_args": "(a, b=1, **c)",
"self_arg": "a",
"apply_kw": "(a, b=b, **c)",
"apply_pos": "(a, b, **c)",
"apply_pos_proxied": "(b, **c)",
"apply_kw_proxied": "(b=b, **c)",
},
True,
),
(
lambda a=1, b=2: None,
{
"grouped_args": "(a=1, b=2)",
"self_arg": "a",
"apply_kw": "(a=a, b=b)",
"apply_pos": "(a, b)",
"apply_pos_proxied": "(b)",
"apply_kw_proxied": "(b=b)",
},
True,
),
(
lambda a=1, b=2: None,
{
"grouped_args": "(a=1, b=2)",
"self_arg": "a",
"apply_kw": "a=a, b=b",
"apply_pos": "a, b",
"apply_pos_proxied": "b",
"apply_kw_proxied": "b=b",
},
False,
),
(
py3k_fixtures._ret_annotation_fixture,
{
"grouped_args": "(self, a, b) -> 'int'",
"self_arg": "self",
"apply_pos": "self, a, b",
"apply_kw": "self, a, b",
"apply_pos_proxied": "a, b",
"apply_kw_proxied": "a, b",
},
False,
),
(
py3k_fixtures._kw_only_fixture,
{
"grouped_args": "(self, a, *, b, c)",
"self_arg": "self",
"apply_pos": "self, a, *, b, c",
"apply_kw": "self, a, b=b, c=c",
"apply_pos_proxied": "a, *, b, c",
"apply_kw_proxied": "a, b=b, c=c",
},
False,
),
(
py3k_fixtures._kw_plus_posn_fixture,
{
"grouped_args": "(self, a, *args, b, c)",
"self_arg": "self",
"apply_pos": "self, a, *args, b, c",
"apply_kw": "self, a, b=b, c=c, *args",
"apply_pos_proxied": "a, *args, b, c",
"apply_kw_proxied": "a, b=b, c=c, *args",
},
False,
),
(
py3k_fixtures._kw_opt_fixture,
{
"grouped_args": "(self, a, *, b, c='c')",
"self_arg": "self",
"apply_pos": "self, a, *, b, c",
"apply_kw": "self, a, b=b, c=c",
"apply_pos_proxied": "a, *, b, c",
"apply_kw_proxied": "a, b=b, c=c",
},
False,
),
argnames="fn,wanted,grouped",
)
def test_specs(self, fn, wanted, grouped):
# test direct function
if grouped is None:
parsed = util.format_argspec_plus(fn)
else:
parsed = util.format_argspec_plus(fn, grouped=grouped)
eq_(parsed, wanted)
# test sending fullargspec
spec = compat.inspect_getfullargspec(fn)
if grouped is None:
parsed = util.format_argspec_plus(spec)
else:
parsed = util.format_argspec_plus(spec, grouped=grouped)
eq_(parsed, wanted)
@testing.requires.cpython
def test_init_grouped(self):
object_spec = {
"grouped_args": "(self)",
"self_arg": "self",
"apply_pos": "(self)",
"apply_kw": "(self)",
"apply_pos_proxied": "()",
"apply_kw_proxied": "()",
}
wrapper_spec = {
"grouped_args": "(self, *args, **kwargs)",
"self_arg": "self",
"apply_pos": "(self, *args, **kwargs)",
"apply_kw": "(self, *args, **kwargs)",
"apply_pos_proxied": "(*args, **kwargs)",
"apply_kw_proxied": "(*args, **kwargs)",
}
custom_spec = {
"grouped_args": "(slef, a=123)",
"self_arg": "slef", # yes, slef
"apply_pos": "(slef, a)",
"apply_pos_proxied": "(a)",
"apply_kw_proxied": "(a=a)",
"apply_kw": "(slef, a=a)",
}
self._test_init(None, object_spec, wrapper_spec, custom_spec)
self._test_init(True, object_spec, wrapper_spec, custom_spec)
@testing.requires.cpython
def test_init_bare(self):
object_spec = {
"grouped_args": "(self)",
"self_arg": "self",
"apply_pos": "self",
"apply_kw": "self",
"apply_pos_proxied": "",
"apply_kw_proxied": "",
}
wrapper_spec = {
"grouped_args": "(self, *args, **kwargs)",
"self_arg": "self",
"apply_pos": "self, *args, **kwargs",
"apply_kw": "self, *args, **kwargs",
"apply_pos_proxied": "*args, **kwargs",
"apply_kw_proxied": "*args, **kwargs",
}
custom_spec = {
"grouped_args": "(slef, a=123)",
"self_arg": "slef", # yes, slef
"apply_pos": "slef, a",
"apply_kw": "slef, a=a",
"apply_pos_proxied": "a",
"apply_kw_proxied": "a=a",
}
self._test_init(False, object_spec, wrapper_spec, custom_spec)
def _test_init(self, grouped, object_spec, wrapper_spec, custom_spec):
def test(fn, wanted):
if grouped is None:
parsed = util.format_argspec_init(fn)
else:
parsed = util.format_argspec_init(fn, grouped=grouped)
eq_(parsed, wanted)
class Obj:
pass
test(Obj.__init__, object_spec)
class Obj:
def __init__(self):
pass
test(Obj.__init__, object_spec)
class Obj:
def __init__(slef, a=123):
pass
test(Obj.__init__, custom_spec)
class Obj(list):
pass
test(Obj.__init__, wrapper_spec)
class Obj(list):
def __init__(self, *args, **kwargs):
pass
test(Obj.__init__, wrapper_spec)
class Obj(list):
def __init__(self):
pass
test(Obj.__init__, object_spec)
class Obj(list):
def __init__(slef, a=123):
pass
test(Obj.__init__, custom_spec)
class GenericReprTest(fixtures.TestBase):
def test_all_positional(self):
class Foo:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
eq_(util.generic_repr(Foo(1, 2, 3)), "Foo(1, 2, 3)")
def test_positional_plus_kw(self):
class Foo:
def __init__(self, a, b, c=5, d=4):
self.a = a
self.b = b
self.c = c
self.d = d
eq_(util.generic_repr(Foo(1, 2, 3, 6)), "Foo(1, 2, c=3, d=6)")
def test_kw_defaults(self):
class Foo:
def __init__(self, a=1, b=2, c=3, d=4):
self.a = a
self.b = b
self.c = c
self.d = d
eq_(util.generic_repr(Foo(1, 5, 3, 7)), "Foo(b=5, d=7)")
def test_multi_kw(self):
class Foo:
def __init__(self, a, b, c=3, d=4):
self.a = a
self.b = b
self.c = c
self.d = d
class Bar(Foo):
def __init__(self, e, f, g=5, **kw):
self.e = e
self.f = f
self.g = g
super().__init__(**kw)
eq_(
util.generic_repr(
Bar("e", "f", g=7, a=6, b=5, d=9), to_inspect=[Bar, Foo]
),
"Bar('e', 'f', g=7, a=6, b=5, d=9)",
)
eq_(
util.generic_repr(Bar("e", "f", a=6, b=5), to_inspect=[Bar, Foo]),
"Bar('e', 'f', a=6, b=5)",
)
def test_multi_kw_repeated(self):
class Foo:
def __init__(self, a=1, b=2):
self.a = a
self.b = b
class Bar(Foo):
def __init__(self, b=3, c=4, **kw):
self.c = c
super().__init__(b=b, **kw)
eq_(
util.generic_repr(Bar(a="a", b="b", c="c"), to_inspect=[Bar, Foo]),
"Bar(b='b', c='c', a='a')",
)
def test_discard_vargs(self):
class Foo:
def __init__(self, a, b, *args):
self.a = a
self.b = b
self.c, self.d = args[0:2]
eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2)")
def test_discard_vargs_kwargs(self):
class Foo:
def __init__(self, a, b, *args, **kw):
self.a = a
self.b = b
self.c, self.d = args[0:2]
eq_(util.generic_repr(Foo(1, 2, 3, 4, x=7, y=4)), "Foo(1, 2)")
def test_significant_vargs(self):
class Foo:
def __init__(self, a, b, *args):
self.a = a
self.b = b
self.args = args
eq_(util.generic_repr(Foo(1, 2, 3, 4)), "Foo(1, 2, 3, 4)")
def test_no_args(self):
class Foo:
def __init__(self):
pass
eq_(util.generic_repr(Foo()), "Foo()")
def test_no_init(self):
class Foo:
pass
eq_(util.generic_repr(Foo()), "Foo()")
class AsInterfaceTest(fixtures.TestBase):
class Something:
def _ignoreme(self):
pass
def foo(self):
pass
def bar(self):
pass
class Partial:
def bar(self):
pass
class Object:
pass
def test_no_cls_no_methods(self):
obj = object()
assert_raises(TypeError, util.as_interface, obj)
def test_instance(self):
obj = object()
assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
assert_raises(TypeError, util.as_interface, obj, methods=("foo"))
assert_raises(
TypeError,
util.as_interface,
obj,
cls=self.Something,
required=("foo"),
)
obj = self.Something()
eq_(obj, util.as_interface(obj, cls=self.Something))
eq_(obj, util.as_interface(obj, methods=("foo",)))
eq_(
obj,
util.as_interface(
obj, cls=self.Something, required=("outofband",)
),
)
partial = self.Partial()
slotted = self.Object()
slotted.bar = lambda self: 123
for obj in partial, slotted:
eq_(obj, util.as_interface(obj, cls=self.Something))
assert_raises(TypeError, util.as_interface, obj, methods=("foo"))
eq_(obj, util.as_interface(obj, methods=("bar",)))
eq_(
obj,
util.as_interface(obj, cls=self.Something, required=("bar",)),
)
assert_raises(
TypeError,
util.as_interface,
obj,
cls=self.Something,
required=("foo",),
)
assert_raises(
TypeError,
util.as_interface,
obj,
cls=self.Something,
required=self.Something,
)
def test_dict(self):
obj = {}
assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
assert_raises(TypeError, util.as_interface, obj, methods="foo")
assert_raises(
TypeError,
util.as_interface,
obj,
cls=self.Something,
required="foo",
)
def assertAdapted(obj, *methods):
assert isinstance(obj, type)
found = {m for m in dir(obj) if not m.startswith("_")}
for method in methods:
assert method in found
found.remove(method)
assert not found
def fn(self):
return 123
obj = {"foo": fn, "bar": fn}
res = util.as_interface(obj, cls=self.Something)
assertAdapted(res, "foo", "bar")
res = util.as_interface(
obj, cls=self.Something, required=self.Something
)
assertAdapted(res, "foo", "bar")
res = util.as_interface(obj, cls=self.Something, required=("foo",))
assertAdapted(res, "foo", "bar")
res = util.as_interface(obj, methods=("foo", "bar"))
assertAdapted(res, "foo", "bar")
res = util.as_interface(obj, methods=("foo", "bar", "baz"))
assertAdapted(res, "foo", "bar")
res = util.as_interface(obj, methods=("foo", "bar"), required=("foo",))
assertAdapted(res, "foo", "bar")
assert_raises(TypeError, util.as_interface, obj, methods=("foo",))
assert_raises(
TypeError,
util.as_interface,
obj,
methods=("foo", "bar", "baz"),
required=("baz",),
)
obj = {"foo": 123}
assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
class TestClassHierarchy(fixtures.TestBase):
def test_object(self):
eq_(set(util.class_hierarchy(object)), {object})
def test_single(self):
class A:
pass
class B:
pass
eq_(set(util.class_hierarchy(A)), {A, object})
eq_(set(util.class_hierarchy(B)), {B, object})
class C(A, B):
pass
eq_(set(util.class_hierarchy(A)), {A, B, C, object})
eq_(set(util.class_hierarchy(B)), {A, B, C, object})
class TestClassProperty(fixtures.TestBase):
def test_simple(self):
class A:
something = {"foo": 1}
class B(A):
@classproperty
def something(cls):
d = dict(super().something)
d.update({"bazz": 2})
return d
eq_(B.something, {"foo": 1, "bazz": 2})
class TestProperties(fixtures.TestBase):
def test_pickle(self):
data = {"hello": "bla"}
props = util.Properties(data)
for loader, dumper in picklers():
s = dumper(props)
p = loader(s)
eq_(props._data, p._data)
eq_(props.keys(), p.keys())
def test_keys_in_dir(self):
data = {"hello": "bla"}
props = util.Properties(data)
in_("hello", dir(props))
def test_pickle_immuatbleprops(self):
data = {"hello": "bla"}
props = util.Properties(data).as_readonly()
for loader, dumper in picklers():
s = dumper(props)
p = loader(s)
eq_(props._data, p._data)
eq_(props.keys(), p.keys())
def test_pickle_orderedprops(self):
data = {"hello": "bla"}
props = util.OrderedProperties()
props.update(data)
for loader, dumper in picklers():
s = dumper(props)
p = loader(s)
eq_(props._data, p._data)
eq_(props.keys(), p.keys())
class QuotedTokenParserTest(fixtures.TestBase):
def _test(self, string, expected):
eq_(langhelpers.quoted_token_parser(string), expected)
def test_single(self):
self._test("name", ["name"])
def test_dotted(self):
self._test("schema.name", ["schema", "name"])
def test_dotted_quoted_left(self):
self._test('"Schema".name', ["Schema", "name"])
def test_dotted_quoted_left_w_quote_left_edge(self):
self._test('"""Schema".name', ['"Schema', "name"])
def test_dotted_quoted_left_w_quote_right_edge(self):
self._test('"Schema""".name', ['Schema"', "name"])
def test_dotted_quoted_left_w_quote_middle(self):
self._test('"Sch""ema".name', ['Sch"ema', "name"])
def test_dotted_quoted_right(self):
self._test('schema."SomeName"', ["schema", "SomeName"])
def test_dotted_quoted_right_w_quote_left_edge(self):
self._test('schema."""name"', ["schema", '"name'])
def test_dotted_quoted_right_w_quote_right_edge(self):
self._test('schema."name"""', ["schema", 'name"'])
def test_dotted_quoted_right_w_quote_middle(self):
self._test('schema."na""me"', ["schema", 'na"me'])
def test_quoted_single_w_quote_left_edge(self):
self._test('"""name"', ['"name'])
def test_quoted_single_w_quote_right_edge(self):
self._test('"name"""', ['name"'])
def test_quoted_single_w_quote_middle(self):
self._test('"na""me"', ['na"me'])
def test_dotted_quoted_left_w_dot_left_edge(self):
self._test('".Schema".name', [".Schema", "name"])
def test_dotted_quoted_left_w_dot_right_edge(self):
self._test('"Schema.".name', ["Schema.", "name"])
def test_dotted_quoted_left_w_dot_middle(self):
self._test('"Sch.ema".name', ["Sch.ema", "name"])
def test_dotted_quoted_right_w_dot_left_edge(self):
self._test('schema.".name"', ["schema", ".name"])
def test_dotted_quoted_right_w_dot_right_edge(self):
self._test('schema."name."', ["schema", "name."])
def test_dotted_quoted_right_w_dot_middle(self):
self._test('schema."na.me"', ["schema", "na.me"])
def test_quoted_single_w_dot_left_edge(self):
self._test('".name"', [".name"])
def test_quoted_single_w_dot_right_edge(self):
self._test('"name."', ["name."])
def test_quoted_single_w_dot_middle(self):
self._test('"na.me"', ["na.me"])
class BackslashReplaceTest(fixtures.TestBase):
def test_ascii_to_utf8(self):
eq_(
compat.decode_backslashreplace(util.b("hello world"), "utf-8"),
"hello world",
)
def test_utf8_to_utf8(self):
eq_(
compat.decode_backslashreplace(
"some message méil".encode(), "utf-8"
),
"some message méil",
)
def test_latin1_to_utf8(self):
eq_(
compat.decode_backslashreplace(
"some message méil".encode("latin-1"), "utf-8"
),
"some message m\\xe9il",
)
eq_(
compat.decode_backslashreplace(
"some message méil".encode("latin-1"), "latin-1"
),
"some message méil",
)
def test_cp1251_to_utf8(self):
message = "some message П".encode("cp1251")
eq_(message, b"some message \xcf")
eq_(
compat.decode_backslashreplace(message, "utf-8"),
"some message \\xcf",
)
eq_(
compat.decode_backslashreplace(message, "cp1251"),
"some message П",
)
class TestModuleRegistry(fixtures.TestBase):
def test_modules_are_loaded(self):
to_restore = []
for m in ("xml.dom", "wsgiref.simple_server"):
to_restore.append((m, sys.modules.pop(m, None)))
try:
mr = preloaded._ModuleRegistry()
ret = mr.preload_module(
"xml.dom", "wsgiref.simple_server", "sqlalchemy.sql.util"
)
o = object()
is_(ret(o), o)
is_false(hasattr(mr, "xml_dom"))
mr.import_prefix("xml")
is_true("xml.dom" in sys.modules)
is_(sys.modules["xml.dom"], mr.xml_dom)
is_true("wsgiref.simple_server" not in sys.modules)
mr.import_prefix("wsgiref")
is_true("wsgiref.simple_server" in sys.modules)
is_(sys.modules["wsgiref.simple_server"], mr.wsgiref_simple_server)
mr.import_prefix("sqlalchemy")
is_(sys.modules["sqlalchemy.sql.util"], mr.sql_util)
finally:
for name, mod in to_restore:
if mod is not None:
sys.modules[name] = mod
class MethodOveriddenTest(fixtures.TestBase):
def test_subclass_overrides_cls_given(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
def bar(self):
pass
is_true(util.method_is_overridden(Bar, Foo.bar))
def test_subclass_overrides(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
def bar(self):
pass
is_true(util.method_is_overridden(Bar(), Foo.bar))
def test_subclass_overrides_skiplevel(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
pass
class Bat(Bar):
def bar(self):
pass
is_true(util.method_is_overridden(Bat(), Foo.bar))
def test_subclass_overrides_twolevels(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
def bar(self):
pass
class Bat(Bar):
pass
is_true(util.method_is_overridden(Bat(), Foo.bar))
def test_subclass_doesnt_override_cls_given(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
pass
is_false(util.method_is_overridden(Bar, Foo.bar))
def test_subclass_doesnt_override(self):
class Foo:
def bar(self):
pass
class Bar(Foo):
pass
is_false(util.method_is_overridden(Bar(), Foo.bar))
def test_subclass_overrides_multi_mro(self):
class Base:
pass
class Foo:
pass
class Bat(Base):
def bar(self):
pass
class HoHo(Foo, Bat):
def bar(self):
pass
is_true(util.method_is_overridden(HoHo(), Bat.bar))
class CyExtensionTest(fixtures.TestBase):
__requires__ = ("cextensions",)
def test_all_cyext_imported(self):
ext = _all_cython_modules()
lib_folder = (Path(__file__).parent / ".." / ".." / "lib").resolve()
sa_folder = lib_folder / "sqlalchemy"
cython_files = [f.resolve() for f in sa_folder.glob("**/*_cy.py")]
eq_(len(ext), len(cython_files))
names = {
".".join(f.relative_to(lib_folder).parts).replace(".py", "")
for f in cython_files
}
eq_({m.__name__ for m in ext}, set(names))
@testing.combinations(*_all_cython_modules())
def test_load_uncompiled_module(self, module):
is_true(module._is_compiled())
py_module = langhelpers.load_uncompiled_module(module)
is_false(py_module._is_compiled())
eq_(py_module.__name__, module.__name__)
eq_(py_module.__package__, module.__package__)
def test_setup_defines_all_files(self):
try:
import setuptools # noqa: F401
except ImportError:
testing.skip_test("setuptools is required")
with (
mock.patch("setuptools.setup", mock.MagicMock()),
mock.patch.dict(
"os.environ",
{"DISABLE_SQLALCHEMY_CEXT": "", "REQUIRE_SQLALCHEMY_CEXT": ""},
),
):
import setup
setup_modules = {f"sqlalchemy.{m}" for m in setup.CYTHON_MODULES}
expected = {e.__name__ for e in _all_cython_modules()}
print(expected)
print(setup_modules)
eq_(setup_modules, expected)
class TestTest(fixtures.TestBase):
"""Test of test things"""
@testing.variation("foo", ["foo", "bar", "baz"])
def test_variations(self, foo):
match foo:
case "foo":
is_true(foo.foo)
is_false(foo.bar)
case "bar":
is_true(foo.bar)
is_false(foo.foo)
case "baz":
is_true(foo.baz)
is_false(foo.foo)
case _:
foo.fail()