Files
sqlalchemy/test/base/test_events.py
T
Mike Bayer 576224edd0 - add workaround for sqlite memusage tests, so no longer need to count to 220/skip tests
- Fixed potential memory leak which could occur if an
arbitrary number of :class:`.sessionmaker` objects
were created.   The anonymous subclass created by
the sessionmaker, when dereferenced, would not be garbage
collected due to remaining class-level references from the
event package.  This issue also applies to any custom system
that made use of ad-hoc subclasses in conjunction with
an event dispatcher.  Also in 0.7.10. [ticket:2650]
2013-01-12 17:21:35 -05:00

682 lines
18 KiB
Python

"""Test event registration and listening."""
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
is_, is_not_
from sqlalchemy import event, exc
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.util import gc_collect
class EventsTest(fixtures.TestBase):
"""Test class- and instance-level event registration."""
def setUp(self):
assert 'event_one' not in event._registrars
assert 'event_two' not in event._registrars
class TargetEvents(event.Events):
def event_one(self, x, y):
pass
def event_two(self, x):
pass
def event_three(self, x):
pass
class Target(object):
dispatch = event.dispatcher(TargetEvents)
self.Target = Target
def tearDown(self):
event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_register_class(self):
def listen(x, y):
pass
event.listen(self.Target, "event_one", listen)
eq_(len(self.Target().dispatch.event_one), 1)
eq_(len(self.Target().dispatch.event_two), 0)
def test_register_instance(self):
def listen(x, y):
pass
t1 = self.Target()
event.listen(t1, "event_one", listen)
eq_(len(self.Target().dispatch.event_one), 0)
eq_(len(t1.dispatch.event_one), 1)
eq_(len(self.Target().dispatch.event_two), 0)
eq_(len(t1.dispatch.event_two), 0)
def test_bool_clslevel(self):
def listen_one(x, y):
pass
event.listen(self.Target, "event_one", listen_one)
t = self.Target()
assert t.dispatch.event_one
def test_register_class_instance(self):
def listen_one(x, y):
pass
def listen_two(x, y):
pass
event.listen(self.Target, "event_one", listen_one)
t1 = self.Target()
event.listen(t1, "event_one", listen_two)
eq_(len(self.Target().dispatch.event_one), 1)
eq_(len(t1.dispatch.event_one), 2)
eq_(len(self.Target().dispatch.event_two), 0)
eq_(len(t1.dispatch.event_two), 0)
def listen_three(x, y):
pass
event.listen(self.Target, "event_one", listen_three)
eq_(len(self.Target().dispatch.event_one), 2)
eq_(len(t1.dispatch.event_one), 3)
def test_append_vs_insert(self):
def listen_one(x, y):
pass
def listen_two(x, y):
pass
def listen_three(x, y):
pass
event.listen(self.Target, "event_one", listen_one)
event.listen(self.Target, "event_one", listen_two)
event.listen(self.Target, "event_one", listen_three, insert=True)
eq_(
list(self.Target().dispatch.event_one),
[listen_three, listen_one, listen_two]
)
def test_decorator(self):
@event.listens_for(self.Target, "event_one")
def listen_one(x, y):
pass
@event.listens_for(self.Target, "event_two")
@event.listens_for(self.Target, "event_three")
def listen_two(x, y):
pass
eq_(
list(self.Target().dispatch.event_one),
[listen_one]
)
eq_(
list(self.Target().dispatch.event_two),
[listen_two]
)
eq_(
list(self.Target().dispatch.event_three),
[listen_two]
)
def test_no_instance_level_collections(self):
@event.listens_for(self.Target, "event_one")
def listen_one(x, y):
pass
t1 = self.Target()
t2 = self.Target()
t1.dispatch.event_one(5, 6)
t2.dispatch.event_one(5, 6)
is_(
t1.dispatch.__dict__['event_one'],
self.Target.dispatch.event_one.\
_empty_listeners[self.Target]
)
@event.listens_for(t1, "event_one")
def listen_two(x, y):
pass
is_not_(
t1.dispatch.__dict__['event_one'],
self.Target.dispatch.event_one.\
_empty_listeners[self.Target]
)
is_(
t2.dispatch.__dict__['event_one'],
self.Target.dispatch.event_one.\
_empty_listeners[self.Target]
)
def test_immutable_methods(self):
t1 = self.Target()
for meth in [
t1.dispatch.event_one.exec_once,
t1.dispatch.event_one.insert,
t1.dispatch.event_one.append,
t1.dispatch.event_one.remove,
t1.dispatch.event_one.clear,
]:
assert_raises_message(
NotImplementedError,
r"need to call for_modify\(\)",
meth
)
class ClsLevelListenTest(fixtures.TestBase):
def tearDown(self):
event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
def setUp(self):
class TargetEventsOne(event.Events):
def event_one(self, x, y):
pass
class TargetOne(object):
dispatch = event.dispatcher(TargetEventsOne)
self.TargetOne = TargetOne
def tearDown(self):
event._remove_dispatcher(
self.TargetOne.__dict__['dispatch'].events)
def test_lis_subcalss_lis(self):
@event.listens_for(self.TargetOne, "event_one")
def handler1(x, y):
print 'handler1'
class SubTarget(self.TargetOne):
pass
@event.listens_for(self.TargetOne, "event_one")
def handler2(x, y):
pass
eq_(
len(SubTarget().dispatch.event_one),
2
)
def test_lis_multisub_lis(self):
@event.listens_for(self.TargetOne, "event_one")
def handler1(x, y):
print 'handler1'
class SubTarget(self.TargetOne):
pass
class SubSubTarget(SubTarget):
pass
@event.listens_for(self.TargetOne, "event_one")
def handler2(x, y):
pass
eq_(
len(SubTarget().dispatch.event_one),
2
)
eq_(
len(SubSubTarget().dispatch.event_one),
2
)
def test_two_sub_lis(self):
class SubTarget1(self.TargetOne):
pass
class SubTarget2(self.TargetOne):
pass
@event.listens_for(self.TargetOne, "event_one")
def handler1(x, y):
pass
@event.listens_for(SubTarget1, "event_one")
def handler2(x, y):
pass
s1 = SubTarget1()
assert handler1 in s1.dispatch.event_one
assert handler2 in s1.dispatch.event_one
s2 = SubTarget2()
assert handler1 in s2.dispatch.event_one
assert handler2 not in s2.dispatch.event_one
class AcceptTargetsTest(fixtures.TestBase):
"""Test default target acceptance."""
def setUp(self):
class TargetEventsOne(event.Events):
def event_one(self, x, y):
pass
class TargetEventsTwo(event.Events):
def event_one(self, x, y):
pass
class TargetOne(object):
dispatch = event.dispatcher(TargetEventsOne)
class TargetTwo(object):
dispatch = event.dispatcher(TargetEventsTwo)
self.TargetOne = TargetOne
self.TargetTwo = TargetTwo
def tearDown(self):
event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
event._remove_dispatcher(self.TargetTwo.__dict__['dispatch'].events)
def test_target_accept(self):
"""Test that events of the same name are routed to the correct
collection based on the type of target given.
"""
def listen_one(x, y):
pass
def listen_two(x, y):
pass
def listen_three(x, y):
pass
def listen_four(x, y):
pass
event.listen(self.TargetOne, "event_one", listen_one)
event.listen(self.TargetTwo, "event_one", listen_two)
eq_(
list(self.TargetOne().dispatch.event_one),
[listen_one]
)
eq_(
list(self.TargetTwo().dispatch.event_one),
[listen_two]
)
t1 = self.TargetOne()
t2 = self.TargetTwo()
event.listen(t1, "event_one", listen_three)
event.listen(t2, "event_one", listen_four)
eq_(
list(t1.dispatch.event_one),
[listen_one, listen_three]
)
eq_(
list(t2.dispatch.event_one),
[listen_two, listen_four]
)
class CustomTargetsTest(fixtures.TestBase):
"""Test custom target acceptance."""
def setUp(self):
class TargetEvents(event.Events):
@classmethod
def _accept_with(cls, target):
if target == 'one':
return Target
else:
return None
def event_one(self, x, y):
pass
class Target(object):
dispatch = event.dispatcher(TargetEvents)
self.Target = Target
def tearDown(self):
event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_indirect(self):
def listen(x, y):
pass
event.listen("one", "event_one", listen)
eq_(
list(self.Target().dispatch.event_one),
[listen]
)
assert_raises(
exc.InvalidRequestError,
event.listen,
listen, "event_one", self.Target
)
class SubclassGrowthTest(fixtures.TestBase):
"""test that ad-hoc subclasses are garbage collected."""
def setUp(self):
class TargetEvents(event.Events):
def some_event(self, x, y):
pass
class Target(object):
dispatch = event.dispatcher(TargetEvents)
self.Target = Target
def test_subclass(self):
class SubTarget(self.Target):
pass
st = SubTarget()
st.dispatch.some_event(1, 2)
del st
del SubTarget
gc_collect()
eq_(self.Target.__subclasses__(), [])
class ListenOverrideTest(fixtures.TestBase):
"""Test custom listen functions which change the listener function signature."""
def setUp(self):
class TargetEvents(event.Events):
@classmethod
def _listen(cls, target, identifier, fn, add=False):
if add:
def adapt(x, y):
fn(x + y)
else:
adapt = fn
event.Events._listen(target, identifier, adapt)
def event_one(self, x, y):
pass
class Target(object):
dispatch = event.dispatcher(TargetEvents)
self.Target = Target
def tearDown(self):
event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_listen_override(self):
result = []
def listen_one(x):
result.append(x)
def listen_two(x, y):
result.append((x, y))
event.listen(self.Target, "event_one", listen_one, add=True)
event.listen(self.Target, "event_one", listen_two)
t1 = self.Target()
t1.dispatch.event_one(5, 7)
t1.dispatch.event_one(10, 5)
eq_(result,
[
12, (5, 7), 15, (10, 5)
]
)
class PropagateTest(fixtures.TestBase):
def setUp(self):
class TargetEvents(event.Events):
def event_one(self, arg):
pass
def event_two(self, arg):
pass
class Target(object):
dispatch = event.dispatcher(TargetEvents)
self.Target = Target
def test_propagate(self):
result = []
def listen_one(target, arg):
result.append((target, arg))
def listen_two(target, arg):
result.append((target, arg))
t1 = self.Target()
event.listen(t1, "event_one", listen_one, propagate=True)
event.listen(t1, "event_two", listen_two)
t2 = self.Target()
t2.dispatch._update(t1.dispatch)
t2.dispatch.event_one(t2, 1)
t2.dispatch.event_two(t2, 2)
eq_(result, [(t2, 1)])
class JoinTest(fixtures.TestBase):
def setUp(self):
class TargetEvents(event.Events):
def event_one(self, target, arg):
pass
class BaseTarget(object):
dispatch = event.dispatcher(TargetEvents)
class TargetFactory(BaseTarget):
def create(self):
return TargetElement(self)
class TargetElement(BaseTarget):
def __init__(self, parent):
self.dispatch = self.dispatch._join(parent.dispatch)
def run_event(self, arg):
list(self.dispatch.event_one)
self.dispatch.event_one(self, arg)
self.BaseTarget = BaseTarget
self.TargetFactory = TargetFactory
self.TargetElement = TargetElement
def tearDown(self):
for cls in (self.TargetElement,
self.TargetFactory, self.BaseTarget):
if 'dispatch' in cls.__dict__:
event._remove_dispatcher(cls.__dict__['dispatch'].events)
def _listener(self):
canary = []
def listen(target, arg):
canary.append((target, arg))
return listen, canary
def test_neither(self):
element = self.TargetFactory().create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
def test_parent_class_only(self):
_listener, canary = self._listener()
event.listen(self.TargetFactory, "event_one", _listener)
element = self.TargetFactory().create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
canary,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_class_child_class(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
event.listen(self.TargetFactory, "event_one", l1)
event.listen(self.TargetElement, "event_one", l2)
element = self.TargetFactory().create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)
eq_(
c2,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_class_child_instance_apply_after(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
event.listen(self.TargetFactory, "event_one", l1)
element = self.TargetFactory().create()
element.run_event(1)
event.listen(element, "event_one", l2)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)
eq_(
c2,
[(element, 2), (element, 3)]
)
def test_parent_class_child_instance_apply_before(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
event.listen(self.TargetFactory, "event_one", l1)
element = self.TargetFactory().create()
event.listen(element, "event_one", l2)
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)
eq_(
c2,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_instance_child_class_apply_before(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
event.listen(self.TargetElement, "event_one", l2)
factory = self.TargetFactory()
event.listen(factory, "event_one", l1)
element = factory.create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)
eq_(
c2,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_instance_child_class_apply_after(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
event.listen(self.TargetElement, "event_one", l2)
factory = self.TargetFactory()
element = factory.create()
element.run_event(1)
event.listen(factory, "event_one", l1)
element.run_event(2)
element.run_event(3)
# c1 gets no events due to _JoinedListener
# fixing the "parent" at construction time.
# this can be changed to be "live" at the cost
# of performance.
eq_(
c1,
[]
#(element, 2), (element, 3)]
)
eq_(
c2,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_instance_child_instance_apply_before(self):
l1, c1 = self._listener()
l2, c2 = self._listener()
factory = self.TargetFactory()
event.listen(factory, "event_one", l1)
element = factory.create()
event.listen(element, "event_one", l2)
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)
eq_(
c2,
[(element, 1), (element, 2), (element, 3)]
)
def test_parent_events_child_no_events(self):
l1, c1 = self._listener()
factory = self.TargetFactory()
event.listen(self.TargetElement, "event_one", l1)
element = factory.create()
element.run_event(1)
element.run_event(2)
element.run_event(3)
eq_(
c1,
[(element, 1), (element, 2), (element, 3)]
)