diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index 2a2925726c..175eae66c4 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -6,6 +6,15 @@ .. changelog:: :version: 0.8.1 + .. change:: + :tags: bug, orm + :tickets: 2710 + + Fixed bug where many-to-many relationship with uselist=False + would fail to delete the association row and raise an error + if the scalar attribute were set to None. This was a + regression introduced by the changes for :ticket:`2229`. + .. change:: :tags: bug, orm :tickets: 2708 diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py index 8236e69447..7bb1d97369 100644 --- a/lib/sqlalchemy/orm/dependency.py +++ b/lib/sqlalchemy/orm/dependency.py @@ -863,7 +863,7 @@ class DetectKeySwitch(DependencyProcessor): related = state.get_impl(self.key).get(state, dict_, passive=self._passive_update_flag) if related is not attributes.PASSIVE_NO_RESULT and \ - related is not None: + related is not None: related_state = attributes.instance_state(dict_[self.key]) if related_state in switchers: uowcommit.register_object(state, @@ -1127,11 +1127,15 @@ class ManyToManyDP(DependencyProcessor): def _synchronize(self, state, child, associationrow, clearkeys, uowcommit, operation): - if associationrow is None: - return + # this checks for None if uselist=True self._verify_canload(child) + # but if uselist=False we get here. If child is None, + # no association row can be generated, so return. + if child is None: + return False + if child is not None and not uowcommit.session._contains_state(child): if not child.deleted: util.warn( diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py index 005aed1cee..5f1ed76043 100644 --- a/test/dialect/test_postgresql.py +++ b/test/dialect/test_postgresql.py @@ -19,6 +19,7 @@ from sqlalchemy import exc, schema, types from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import HSTORE, hstore, array import decimal +from sqlalchemy import util from sqlalchemy.testing.util import round_decimal from sqlalchemy.sql import table, column, operators import logging @@ -2836,8 +2837,8 @@ class HStoreTest(fixtures.TestBase): dialect = default.DefaultDialect() proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( - proc({"key1": "value1", "key2": "value2"}), - '"key2"=>"value2", "key1"=>"value1"' + proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), + '"key1"=>"value1", "key2"=>"value2"' ) def test_parse_error(self): @@ -2878,8 +2879,8 @@ class HStoreTest(fixtures.TestBase): dialect._has_native_hstore = False proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( - proc({"key1": "value1", "key2": "value2"}), - '"key2"=>"value2", "key1"=>"value1"' + proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), + '"key1"=>"value1", "key2"=>"value2"' ) def test_result_deserialize_psycopg2(self): diff --git a/test/ext/test_associationproxy.py b/test/ext/test_associationproxy.py index 5bec0ad1db..c15b98bf2f 100644 --- a/test/ext/test_associationproxy.py +++ b/test/ext/test_associationproxy.py @@ -971,8 +971,10 @@ class ReconstitutionTest(fixtures.TestBase): p.kids.extend(['c1', 'c2']) r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == ['c1', 'c2'] - r2 = pickle.loads(pickle.dumps(p.kids)) - assert r2 == ['c1', 'c2'] + + # can't do this without parent having a cycle + #r2 = pickle.loads(pickle.dumps(p.kids)) + #assert r2 == ['c1', 'c2'] def test_pickle_set(self): mapper(Parent, self.parents, @@ -983,8 +985,10 @@ class ReconstitutionTest(fixtures.TestBase): p.kids.update(['c1', 'c2']) r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == set(['c1', 'c2']) - r2 = pickle.loads(pickle.dumps(p.kids)) - assert r2 == set(['c1', 'c2']) + + # can't do this without parent having a cycle + #r2 = pickle.loads(pickle.dumps(p.kids)) + #assert r2 == set(['c1', 'c2']) def test_pickle_dict(self): mapper(Parent, self.parents, @@ -997,8 +1001,10 @@ class ReconstitutionTest(fixtures.TestBase): assert p.kids == {'c1': 'c1', 'c2': 'c2'} r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == {'c1': 'c1', 'c2': 'c2'} - r2 = pickle.loads(pickle.dumps(p.kids)) - assert r2 == {'c1': 'c1', 'c2': 'c2'} + + # can't do this without parent having a cycle + #r2 = pickle.loads(pickle.dumps(p.kids)) + #assert r2 == {'c1': 'c1', 'c2': 'c2'} class PickleKeyFunc(object): def __init__(self, name): diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py index f883a07a7b..bbfa543830 100644 --- a/test/orm/inheritance/test_basic.py +++ b/test/orm/inheritance/test_basic.py @@ -1649,10 +1649,10 @@ class OptimizedLoadTest(fixtures.MappedTest): pass mapper(Base, base) - mapper(JoinBase, base.outerjoin(sub), properties={ - 'id': [base.c.id, sub.c.id], - 'counter': [base.c.counter, sub.c.counter] - }) + mapper(JoinBase, base.outerjoin(sub), properties=util.OrderedDict( + [('id', [base.c.id, sub.c.id]), + ('counter', [base.c.counter, sub.c.counter])]) + ) mapper(SubJoinBase, inherits=JoinBase) sess = Session() @@ -1672,9 +1672,9 @@ class OptimizedLoadTest(fixtures.MappedTest): testing.db, go, CompiledSQL( - "SELECT base.counter AS base_counter, " - "sub.counter AS sub_counter, base.id AS base_id, " - "sub.id AS sub_id, base.data AS base_data, " + "SELECT base.id AS base_id, sub.id AS sub_id, " + "base.counter AS base_counter, sub.counter AS sub_counter, " + "base.data AS base_data, " "base.type AS base_type, sub.sub AS sub_sub, " "sub.counter2 AS sub_counter2 FROM base " "LEFT OUTER JOIN sub ON base.id = sub.id " diff --git a/test/orm/test_instrumentation.py b/test/orm/test_instrumentation.py index 3f8fc67b68..3c564d910b 100644 --- a/test/orm/test_instrumentation.py +++ b/test/orm/test_instrumentation.py @@ -453,7 +453,7 @@ class MapperInitTest(fixtures.ORMTest): assert_raises_message( sa.exc.SAWarning, r"__del__\(\) method on class " - " will cause " + " will cause " "unreachable cycles and memory leaks, as SQLAlchemy " "instrumentation often creates reference cycles. " "Please remove this method.", diff --git a/test/orm/test_manytomany.py b/test/orm/test_manytomany.py index ca9c3c0959..3135258930 100644 --- a/test/orm/test_manytomany.py +++ b/test/orm/test_manytomany.py @@ -5,8 +5,8 @@ from sqlalchemy import testing from sqlalchemy import Integer, String, ForeignKey from sqlalchemy.testing.schema import Table from sqlalchemy.testing.schema import Column -from sqlalchemy.orm import mapper, relationship, create_session, \ - exc as orm_exc, sessionmaker +from sqlalchemy.orm import mapper, relationship, Session, \ + exc as orm_exc, sessionmaker, backref from sqlalchemy.testing import fixtures @@ -14,7 +14,7 @@ class M2MTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table('place', metadata, - Column('place_id', Integer, sa.Sequence('pid_seq', optional=True), + Column('place_id', Integer, test_needs_autoincrement=True, primary_key=True), Column('name', String(30), nullable=False), test_needs_acid=True, @@ -22,13 +22,13 @@ class M2MTest(fixtures.MappedTest): Table('transition', metadata, Column('transition_id', Integer, - sa.Sequence('tid_seq', optional=True), primary_key=True), + test_needs_autoincrement=True, primary_key=True), Column('name', String(30), nullable=False), test_needs_acid=True, ) Table('place_thingy', metadata, - Column('thingy_id', Integer, sa.Sequence('thid_seq', optional=True), + Column('thingy_id', Integer, test_needs_autoincrement=True, primary_key=True), Column('place_id', Integer, ForeignKey('place.place_id'), nullable=False), @@ -61,27 +61,18 @@ class M2MTest(fixtures.MappedTest): @classmethod def setup_classes(cls): class Place(cls.Basic): - def __init__(self, name=None): + def __init__(self, name): self.name = name - def __str__(self): - return "(Place '%s')" % self.name - __repr__ = __str__ class PlaceThingy(cls.Basic): - def __init__(self, name=None): + def __init__(self, name): self.name = name class Transition(cls.Basic): - def __init__(self, name=None): + def __init__(self, name): self.name = name - self.inputs = [] - self.outputs = [] - def __repr__(self): - return ' '.join((object.__repr__(self), - repr(self.inputs), - repr(self.outputs))) - def test_error(self): + def test_overlapping_attribute_error(self): place, Transition, place_input, Place, transition = (self.tables.place, self.classes.Transition, self.tables.place_input, @@ -89,16 +80,18 @@ class M2MTest(fixtures.MappedTest): self.tables.transition) mapper(Place, place, properties={ - 'transitions':relationship(Transition, secondary=place_input, backref='places') + 'transitions': relationship(Transition, + secondary=place_input, backref='places') }) mapper(Transition, transition, properties={ - 'places':relationship(Place, secondary=place_input, backref='transitions') + 'places': relationship(Place, + secondary=place_input, backref='transitions') }) - assert_raises_message(sa.exc.ArgumentError, "Error creating backref", - sa.orm.configure_mappers) + assert_raises_message(sa.exc.ArgumentError, + "property of that name exists", + sa.orm.configure_mappers) - def test_circular(self): - """test a many-to-many relationship from a table to itself.""" + def test_self_referential_roundtrip(self): place, Place, place_place = (self.tables.place, self.classes.Place, @@ -108,13 +101,13 @@ class M2MTest(fixtures.MappedTest): 'places': relationship( Place, secondary=place_place, - primaryjoin=place.c.place_id==place_place.c.pl1_id, - secondaryjoin=place.c.place_id==place_place.c.pl2_id, + primaryjoin=place.c.place_id == place_place.c.pl1_id, + secondaryjoin=place.c.place_id == place_place.c.pl2_id, order_by=place_place.c.pl2_id ) }) - sess = create_session() + sess = Session() p1 = Place('place1') p2 = Place('place2') p3 = Place('place3') @@ -131,30 +124,17 @@ class M2MTest(fixtures.MappedTest): p1.places.append(p5) p4.places.append(p3) p3.places.append(p4) - sess.flush() - - sess.expunge_all() - l = sess.query(Place).order_by(place.c.place_id).all() - (p1, p2, p3, p4, p5, p6, p7) = l - assert p1.places == [p2,p3,p5] - assert p5.places == [p6] - assert p7.places == [p1] - assert p6.places == [p1] - assert p4.places == [p3] - assert p3.places == [p4] - assert p2.places == [] - - for p in l: - pp = p.places - print "Place " + str(p) +" places " + repr(pp) - - [sess.delete(p) for p in p1,p2,p3,p4,p5,p6,p7] - sess.flush() - - def test_circular_mutation(self): - """Test that a mutation in a self-ref m2m of both sides succeeds.""" + sess.commit() + eq_(p1.places, [p2, p3, p5]) + eq_(p5.places, [p6]) + eq_(p7.places, [p1]) + eq_(p6.places, [p1]) + eq_(p4.places, [p3]) + eq_(p3.places, [p4]) + eq_(p2.places, []) + def test_self_referential_bidirectional_mutation(self): place, Place, place_place = (self.tables.place, self.classes.Place, self.tables.place_place) @@ -163,31 +143,32 @@ class M2MTest(fixtures.MappedTest): 'child_places': relationship( Place, secondary=place_place, - primaryjoin=place.c.place_id==place_place.c.pl1_id, - secondaryjoin=place.c.place_id==place_place.c.pl2_id, + primaryjoin=place.c.place_id == place_place.c.pl1_id, + secondaryjoin=place.c.place_id == place_place.c.pl2_id, order_by=place_place.c.pl2_id, backref='parent_places' ) }) - sess = create_session() + sess = Session() p1 = Place('place1') p2 = Place('place2') p2.parent_places = [p1] sess.add_all([p1, p2]) p1.parent_places.append(p2) - sess.flush() + sess.commit() - sess.expire_all() assert p1 in p2.parent_places assert p2 in p1.parent_places - def test_double(self): + def test_joinedload_on_double(self): """test that a mapper can have two eager relationships to the same table, via two different association tables. aliases are required.""" - place_input, transition, Transition, PlaceThingy, place, place_thingy, Place, place_output = (self.tables.place_input, + place_input, transition, Transition, PlaceThingy, \ + place, place_thingy, Place, \ + place_output = (self.tables.place_input, self.tables.transition, self.classes.Transition, self.classes.PlaceThingy, @@ -197,13 +178,14 @@ class M2MTest(fixtures.MappedTest): self.tables.place_output) - Place.mapper = mapper(Place, place, properties = { - 'thingies':relationship(mapper(PlaceThingy, place_thingy), lazy='joined') + mapper(PlaceThingy, place_thingy) + mapper(Place, place, properties={ + 'thingies': relationship(PlaceThingy, lazy='joined') }) - Transition.mapper = mapper(Transition, transition, properties = dict( - inputs = relationship(Place.mapper, place_output, lazy='joined'), - outputs = relationship(Place.mapper, place_input, lazy='joined'), + mapper(Transition, transition, properties=dict( + inputs=relationship(Place, place_output, lazy='joined'), + outputs=relationship(Place, place_input, lazy='joined'), ) ) @@ -211,30 +193,36 @@ class M2MTest(fixtures.MappedTest): tran.inputs.append(Place('place1')) tran.outputs.append(Place('place2')) tran.outputs.append(Place('place3')) - sess = create_session() + sess = Session() sess.add(tran) - sess.flush() + sess.commit() - sess.expunge_all() r = sess.query(Transition).all() self.assert_unordered_result(r, Transition, {'name': 'transition1', - 'inputs': (Place, [{'name':'place1'}]), - 'outputs': (Place, [{'name':'place2'}, {'name':'place3'}]) + 'inputs': (Place, [{'name': 'place1'}]), + 'outputs': (Place, [{'name': 'place2'}, {'name': 'place3'}]) }) def test_bidirectional(self): - place_input, transition, Transition, Place, place, place_output = (self.tables.place_input, + place_input, transition, Transition, Place, place, place_output = ( + self.tables.place_input, self.tables.transition, self.classes.Transition, self.classes.Place, self.tables.place, self.tables.place_output) - Place.mapper = mapper(Place, place) - Transition.mapper = mapper(Transition, transition, properties = dict( - inputs = relationship(Place.mapper, place_output, lazy='select', backref='inputs'), - outputs = relationship(Place.mapper, place_input, lazy='select', backref='outputs'), + mapper(Place, place) + mapper(Transition, transition, properties=dict( + inputs=relationship(Place, place_output, + backref=backref('inputs', + order_by=transition.c.transition_id), + order_by=Place.place_id), + outputs=relationship(Place, place_input, + backref=backref('outputs', + order_by=transition.c.transition_id), + order_by=Place.place_id), ) ) @@ -252,23 +240,29 @@ class M2MTest(fixtures.MappedTest): p2.inputs.append(t2) p3.inputs.append(t2) p1.outputs.append(t1) - sess = create_session() - sess.add_all((t1, t2, t3,p1, p2, p3)) - sess.flush() + sess = Session() + sess.add_all((t1, t2, t3, p1, p2, p3)) + sess.commit() - self.assert_result([t1], Transition, {'outputs': (Place, [{'name':'place3'}, {'name':'place1'}])}) - self.assert_result([p2], Place, {'inputs': (Transition, [{'name':'transition1'},{'name':'transition2'}])}) + self.assert_result([t1], + Transition, {'outputs': + (Place, [{'name': 'place3'}, {'name': 'place1'}])}) + self.assert_result([p2], + Place, {'inputs': + (Transition, [{'name': 'transition1'}, + {'name': 'transition2'}])}) @testing.requires.sane_multi_rowcount def test_stale_conditions(self): - Place, Transition, place_input, place, transition = (self.classes.Place, + Place, Transition, place_input, place, transition = ( + self.classes.Place, self.classes.Transition, self.tables.place_input, self.tables.place, self.tables.transition) mapper(Place, place, properties={ - 'transitions':relationship(Transition, secondary=place_input, + 'transitions': relationship(Transition, secondary=place_input, passive_updates=False) }) mapper(Transition, transition) @@ -305,219 +299,98 @@ class M2MTest(fixtures.MappedTest): sess.commit ) -class M2MTest2(fixtures.MappedTest): + +class AssortedPersistenceTests(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): - Table('student', metadata, - Column('name', String(20), primary_key=True)) + Table("left", metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(30)) + ) - Table('course', metadata, - Column('name', String(20), primary_key=True)) + Table("right", metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(30)), + ) - Table('enroll', metadata, - Column('student_id', String(20), ForeignKey('student.name'), - primary_key=True), - Column('course_id', String(20), ForeignKey('course.name'), - primary_key=True)) + Table('secondary', metadata, + Column('left_id', Integer, ForeignKey('left.id'), + primary_key=True), + Column('right_id', Integer, ForeignKey('right.id'), + primary_key=True), + ) @classmethod def setup_classes(cls): - class Student(cls.Basic): - def __init__(self, name=''): - self.name = name - class Course(cls.Basic): - def __init__(self, name=''): - self.name = name - - def test_circular(self): - course, enroll, Student, student, Course = (self.tables.course, - self.tables.enroll, - self.classes.Student, - self.tables.student, - self.classes.Course) - - - mapper(Student, student) - mapper(Course, course, properties={ - 'students': relationship(Student, enroll, backref='courses')}) - - sess = create_session() - s1 = Student('Student1') - c1 = Course('Course1') - c2 = Course('Course2') - c3 = Course('Course3') - s1.courses.append(c1) - s1.courses.append(c2) - c3.students.append(s1) - self.assert_(len(s1.courses) == 3) - self.assert_(len(c1.students) == 1) - sess.add(s1) - sess.flush() - sess.expunge_all() - s = sess.query(Student).filter_by(name='Student1').one() - c = sess.query(Course).filter_by(name='Course3').one() - self.assert_(len(s.courses) == 3) - del s.courses[1] - self.assert_(len(s.courses) == 2) - - def test_dupliates_raise(self): - """test constraint error is raised for dupe entries in a list""" - - course, enroll, Student, student, Course = (self.tables.course, - self.tables.enroll, - self.classes.Student, - self.tables.student, - self.classes.Course) - - - mapper(Student, student) - mapper(Course, course, properties={ - 'students': relationship(Student, enroll, backref='courses')}) - - sess = create_session() - s1 = Student("s1") - c1 = Course('c1') - s1.courses.append(c1) - s1.courses.append(c1) - sess.add(s1) - assert_raises(sa.exc.DBAPIError, sess.flush) - - def test_delete(self): - """A many-to-many table gets cleared out with deletion from the backref side""" - - course, enroll, Student, student, Course = (self.tables.course, - self.tables.enroll, - self.classes.Student, - self.tables.student, - self.classes.Course) - - - mapper(Student, student) - mapper(Course, course, properties = { - 'students': relationship(Student, enroll, lazy='select', - backref='courses')}) - - sess = create_session() - s1 = Student('Student1') - c1 = Course('Course1') - c2 = Course('Course2') - c3 = Course('Course3') - s1.courses.append(c1) - s1.courses.append(c2) - c3.students.append(s1) - sess.add(s1) - sess.flush() - sess.delete(s1) - sess.flush() - assert enroll.count().scalar() == 0 - -class M2MTest3(fixtures.MappedTest): - @classmethod - def define_tables(cls, metadata): - Table('c', metadata, - Column('c1', Integer, primary_key = True), - Column('c2', String(20))) - - Table('a', metadata, - Column('a1', Integer, primary_key=True), - Column('a2', String(20)), - Column('c1', Integer, ForeignKey('c.c1'))) - - Table('c2a1', metadata, - Column('c1', Integer, ForeignKey('c.c1')), - Column('a1', Integer, ForeignKey('a.a1'))) - - Table('c2a2', metadata, - Column('c1', Integer, ForeignKey('c.c1')), - Column('a1', Integer, ForeignKey('a.a1'))) - - Table('b', metadata, - Column('b1', Integer, primary_key=True), - Column('a1', Integer, ForeignKey('a.a1')), - Column('b2', sa.Boolean)) - - def test_basic(self): - a, c, b, c2a1, c2a2 = (self.tables.a, - self.tables.c, - self.tables.b, - self.tables.c2a1, - self.tables.c2a2) - - class C(object):pass - class A(object):pass - class B(object):pass - - mapper(B, b) - - mapper(A, a, properties={ - 'tbs': relationship(B, primaryjoin=sa.and_(b.c.a1 == a.c.a1, - b.c.b2 == True), - lazy='joined')}) - - mapper(C, c, properties={ - 'a1s': relationship(A, secondary=c2a1, lazy='joined'), - 'a2s': relationship(A, secondary=c2a2, lazy='joined')}) - - assert create_session().query(C).with_labels().statement is not None - - # TODO: seems like just a test for an ancient exception throw. - # how about some data/inserts/queries/assertions for this one - -class M2MTest4(fixtures.MappedTest): - @classmethod - def define_tables(cls, metadata): - table1 = Table("table1", metadata, - Column('col1', Integer, primary_key=True, test_needs_autoincrement=True), - Column('col2', String(30)) - ) - - table2 = Table("table2", metadata, - Column('col1', Integer, primary_key=True, test_needs_autoincrement=True), - Column('col2', String(30)), - ) - - table3 = Table('table3', metadata, - Column('t1', Integer, ForeignKey('table1.col1')), - Column('t2', Integer, ForeignKey('table2.col1')), - ) - - def test_delete_parent(self): - table2, table3, table1 = (self.tables.table2, - self.tables.table3, - self.tables.table1) - - class A(fixtures.ComparableEntity): + class A(cls.Comparable): pass - class B(fixtures.ComparableEntity): + class B(cls.Comparable): pass - mapper(A, table1, properties={ - 'bs':relationship(B, secondary=table3, backref='as', order_by=table3.c.t1) + def _standard_bidirectional_fixture(self): + left, secondary, right = self.tables.left, \ + self.tables.secondary, self.tables.right + A, B = self.classes.A, self.classes.B + mapper(A, left, properties={ + 'bs': relationship(B, secondary=secondary, + backref='as', order_by=right.c.id) }) - mapper(B, table2) + mapper(B, right) - sess = create_session() - a1 = A(col2='a1') - a2 = A(col2='a2') - b1 = B(col2='b1') - b2 = B(col2='b2') - a1.bs.append(b1) - a2.bs.append(b2) - for x in [a1,a2]: - sess.add(x) + def _bidirectional_onescalar_fixture(self): + left, secondary, right = self.tables.left, \ + self.tables.secondary, self.tables.right + A, B = self.classes.A, self.classes.B + mapper(A, left, properties={ + 'bs': relationship(B, secondary=secondary, + backref=backref('a', uselist=False), + order_by=right.c.id) + }) + mapper(B, right) + + def test_session_delete(self): + self._standard_bidirectional_fixture() + A, B = self.classes.A, self.classes.B + secondary = self.tables.secondary + + sess = Session() + sess.add_all([ + A(data='a1', bs=[B(data='b1')]), + A(data='a2', bs=[B(data='b2')]) + ]) + sess.commit() + + a1 = sess.query(A).filter_by(data='a1').one() + sess.delete(a1) sess.flush() - sess.expunge_all() + eq_(sess.query(secondary).count(), 1) - alist = sess.query(A).order_by(A.col1).all() - eq_( - [ - A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')]) - ], - alist) - - for a in alist: - sess.delete(a) + a2 = sess.query(A).filter_by(data='a2').one() + sess.delete(a2) sess.flush() - eq_(sess.query(table3).count(), 0) + eq_(sess.query(secondary).count(), 0) + def test_remove_scalar(self): + # test setting a uselist=False to None + self._bidirectional_onescalar_fixture() + A, B = self.classes.A, self.classes.B + secondary = self.tables.secondary + sess = Session() + sess.add_all([ + A(data='a1', bs=[B(data='b1'), B(data='b2')]), + ]) + sess.commit() + + a1 = sess.query(A).filter_by(data='a1').one() + b2 = sess.query(B).filter_by(data='b2').one() + assert b2.a is a1 + + b2.a = None + sess.commit() + + eq_(a1.bs, [B(data='b1')]) + eq_(b2.a, None) + eq_(sess.query(secondary).count(), 1)