- merge default

This commit is contained in:
Mike Bayer
2013-04-21 11:48:01 -04:00
7 changed files with 191 additions and 298 deletions
+9
View File
@@ -6,6 +6,15 @@
.. changelog::
:version: 0.8.1
.. change::
:tags: bug, orm
:tickets: 2710
Fixed bug where many-to-many relationship with uselist=False
would fail to delete the association row and raise an error
if the scalar attribute were set to None. This was a
regression introduced by the changes for :ticket:`2229`.
.. change::
:tags: bug, orm
:tickets: 2708
+7 -3
View File
@@ -863,7 +863,7 @@ class DetectKeySwitch(DependencyProcessor):
related = state.get_impl(self.key).get(state, dict_,
passive=self._passive_update_flag)
if related is not attributes.PASSIVE_NO_RESULT and \
related is not None:
related is not None:
related_state = attributes.instance_state(dict_[self.key])
if related_state in switchers:
uowcommit.register_object(state,
@@ -1127,11 +1127,15 @@ class ManyToManyDP(DependencyProcessor):
def _synchronize(self, state, child, associationrow,
clearkeys, uowcommit, operation):
if associationrow is None:
return
# this checks for None if uselist=True
self._verify_canload(child)
# but if uselist=False we get here. If child is None,
# no association row can be generated, so return.
if child is None:
return False
if child is not None and not uowcommit.session._contains_state(child):
if not child.deleted:
util.warn(
+5 -4
View File
@@ -19,6 +19,7 @@ from sqlalchemy import exc, schema, types
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import HSTORE, hstore, array
import decimal
from sqlalchemy import util
from sqlalchemy.testing.util import round_decimal
from sqlalchemy.sql import table, column, operators
import logging
@@ -2836,8 +2837,8 @@ class HStoreTest(fixtures.TestBase):
dialect = default.DefaultDialect()
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(
proc({"key1": "value1", "key2": "value2"}),
'"key2"=>"value2", "key1"=>"value1"'
proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
'"key1"=>"value1", "key2"=>"value2"'
)
def test_parse_error(self):
@@ -2878,8 +2879,8 @@ class HStoreTest(fixtures.TestBase):
dialect._has_native_hstore = False
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(
proc({"key1": "value1", "key2": "value2"}),
'"key2"=>"value2", "key1"=>"value1"'
proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
'"key1"=>"value1", "key2"=>"value2"'
)
def test_result_deserialize_psycopg2(self):
+12 -6
View File
@@ -971,8 +971,10 @@ class ReconstitutionTest(fixtures.TestBase):
p.kids.extend(['c1', 'c2'])
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == ['c1', 'c2']
r2 = pickle.loads(pickle.dumps(p.kids))
assert r2 == ['c1', 'c2']
# can't do this without parent having a cycle
#r2 = pickle.loads(pickle.dumps(p.kids))
#assert r2 == ['c1', 'c2']
def test_pickle_set(self):
mapper(Parent, self.parents,
@@ -983,8 +985,10 @@ class ReconstitutionTest(fixtures.TestBase):
p.kids.update(['c1', 'c2'])
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == set(['c1', 'c2'])
r2 = pickle.loads(pickle.dumps(p.kids))
assert r2 == set(['c1', 'c2'])
# can't do this without parent having a cycle
#r2 = pickle.loads(pickle.dumps(p.kids))
#assert r2 == set(['c1', 'c2'])
def test_pickle_dict(self):
mapper(Parent, self.parents,
@@ -997,8 +1001,10 @@ class ReconstitutionTest(fixtures.TestBase):
assert p.kids == {'c1': 'c1', 'c2': 'c2'}
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == {'c1': 'c1', 'c2': 'c2'}
r2 = pickle.loads(pickle.dumps(p.kids))
assert r2 == {'c1': 'c1', 'c2': 'c2'}
# can't do this without parent having a cycle
#r2 = pickle.loads(pickle.dumps(p.kids))
#assert r2 == {'c1': 'c1', 'c2': 'c2'}
class PickleKeyFunc(object):
def __init__(self, name):
+7 -7
View File
@@ -1649,10 +1649,10 @@ class OptimizedLoadTest(fixtures.MappedTest):
pass
mapper(Base, base)
mapper(JoinBase, base.outerjoin(sub), properties={
'id': [base.c.id, sub.c.id],
'counter': [base.c.counter, sub.c.counter]
})
mapper(JoinBase, base.outerjoin(sub), properties=util.OrderedDict(
[('id', [base.c.id, sub.c.id]),
('counter', [base.c.counter, sub.c.counter])])
)
mapper(SubJoinBase, inherits=JoinBase)
sess = Session()
@@ -1672,9 +1672,9 @@ class OptimizedLoadTest(fixtures.MappedTest):
testing.db,
go,
CompiledSQL(
"SELECT base.counter AS base_counter, "
"sub.counter AS sub_counter, base.id AS base_id, "
"sub.id AS sub_id, base.data AS base_data, "
"SELECT base.id AS base_id, sub.id AS sub_id, "
"base.counter AS base_counter, sub.counter AS sub_counter, "
"base.data AS base_data, "
"base.type AS base_type, sub.sub AS sub_sub, "
"sub.counter2 AS sub_counter2 FROM base "
"LEFT OUTER JOIN sub ON base.id = sub.id "
+1 -1
View File
@@ -453,7 +453,7 @@ class MapperInitTest(fixtures.ORMTest):
assert_raises_message(
sa.exc.SAWarning,
r"__del__\(\) method on class "
"<class 'test.orm.test_instrumentation.A'> will cause "
"<class '.*\.A'> will cause "
"unreachable cycles and memory leaks, as SQLAlchemy "
"instrumentation often creates reference cycles. "
"Please remove this method.",
+150 -277
View File
@@ -5,8 +5,8 @@ from sqlalchemy import testing
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.schema import Column
from sqlalchemy.orm import mapper, relationship, create_session, \
exc as orm_exc, sessionmaker
from sqlalchemy.orm import mapper, relationship, Session, \
exc as orm_exc, sessionmaker, backref
from sqlalchemy.testing import fixtures
@@ -14,7 +14,7 @@ class M2MTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('place', metadata,
Column('place_id', Integer, sa.Sequence('pid_seq', optional=True),
Column('place_id', Integer, test_needs_autoincrement=True,
primary_key=True),
Column('name', String(30), nullable=False),
test_needs_acid=True,
@@ -22,13 +22,13 @@ class M2MTest(fixtures.MappedTest):
Table('transition', metadata,
Column('transition_id', Integer,
sa.Sequence('tid_seq', optional=True), primary_key=True),
test_needs_autoincrement=True, primary_key=True),
Column('name', String(30), nullable=False),
test_needs_acid=True,
)
Table('place_thingy', metadata,
Column('thingy_id', Integer, sa.Sequence('thid_seq', optional=True),
Column('thingy_id', Integer, test_needs_autoincrement=True,
primary_key=True),
Column('place_id', Integer, ForeignKey('place.place_id'),
nullable=False),
@@ -61,27 +61,18 @@ class M2MTest(fixtures.MappedTest):
@classmethod
def setup_classes(cls):
class Place(cls.Basic):
def __init__(self, name=None):
def __init__(self, name):
self.name = name
def __str__(self):
return "(Place '%s')" % self.name
__repr__ = __str__
class PlaceThingy(cls.Basic):
def __init__(self, name=None):
def __init__(self, name):
self.name = name
class Transition(cls.Basic):
def __init__(self, name=None):
def __init__(self, name):
self.name = name
self.inputs = []
self.outputs = []
def __repr__(self):
return ' '.join((object.__repr__(self),
repr(self.inputs),
repr(self.outputs)))
def test_error(self):
def test_overlapping_attribute_error(self):
place, Transition, place_input, Place, transition = (self.tables.place,
self.classes.Transition,
self.tables.place_input,
@@ -89,16 +80,18 @@ class M2MTest(fixtures.MappedTest):
self.tables.transition)
mapper(Place, place, properties={
'transitions':relationship(Transition, secondary=place_input, backref='places')
'transitions': relationship(Transition,
secondary=place_input, backref='places')
})
mapper(Transition, transition, properties={
'places':relationship(Place, secondary=place_input, backref='transitions')
'places': relationship(Place,
secondary=place_input, backref='transitions')
})
assert_raises_message(sa.exc.ArgumentError, "Error creating backref",
sa.orm.configure_mappers)
assert_raises_message(sa.exc.ArgumentError,
"property of that name exists",
sa.orm.configure_mappers)
def test_circular(self):
"""test a many-to-many relationship from a table to itself."""
def test_self_referential_roundtrip(self):
place, Place, place_place = (self.tables.place,
self.classes.Place,
@@ -108,13 +101,13 @@ class M2MTest(fixtures.MappedTest):
'places': relationship(
Place,
secondary=place_place,
primaryjoin=place.c.place_id==place_place.c.pl1_id,
secondaryjoin=place.c.place_id==place_place.c.pl2_id,
primaryjoin=place.c.place_id == place_place.c.pl1_id,
secondaryjoin=place.c.place_id == place_place.c.pl2_id,
order_by=place_place.c.pl2_id
)
})
sess = create_session()
sess = Session()
p1 = Place('place1')
p2 = Place('place2')
p3 = Place('place3')
@@ -131,30 +124,17 @@ class M2MTest(fixtures.MappedTest):
p1.places.append(p5)
p4.places.append(p3)
p3.places.append(p4)
sess.flush()
sess.expunge_all()
l = sess.query(Place).order_by(place.c.place_id).all()
(p1, p2, p3, p4, p5, p6, p7) = l
assert p1.places == [p2,p3,p5]
assert p5.places == [p6]
assert p7.places == [p1]
assert p6.places == [p1]
assert p4.places == [p3]
assert p3.places == [p4]
assert p2.places == []
for p in l:
pp = p.places
print "Place " + str(p) +" places " + repr(pp)
[sess.delete(p) for p in p1,p2,p3,p4,p5,p6,p7]
sess.flush()
def test_circular_mutation(self):
"""Test that a mutation in a self-ref m2m of both sides succeeds."""
sess.commit()
eq_(p1.places, [p2, p3, p5])
eq_(p5.places, [p6])
eq_(p7.places, [p1])
eq_(p6.places, [p1])
eq_(p4.places, [p3])
eq_(p3.places, [p4])
eq_(p2.places, [])
def test_self_referential_bidirectional_mutation(self):
place, Place, place_place = (self.tables.place,
self.classes.Place,
self.tables.place_place)
@@ -163,31 +143,32 @@ class M2MTest(fixtures.MappedTest):
'child_places': relationship(
Place,
secondary=place_place,
primaryjoin=place.c.place_id==place_place.c.pl1_id,
secondaryjoin=place.c.place_id==place_place.c.pl2_id,
primaryjoin=place.c.place_id == place_place.c.pl1_id,
secondaryjoin=place.c.place_id == place_place.c.pl2_id,
order_by=place_place.c.pl2_id,
backref='parent_places'
)
})
sess = create_session()
sess = Session()
p1 = Place('place1')
p2 = Place('place2')
p2.parent_places = [p1]
sess.add_all([p1, p2])
p1.parent_places.append(p2)
sess.flush()
sess.commit()
sess.expire_all()
assert p1 in p2.parent_places
assert p2 in p1.parent_places
def test_double(self):
def test_joinedload_on_double(self):
"""test that a mapper can have two eager relationships to the same table, via
two different association tables. aliases are required."""
place_input, transition, Transition, PlaceThingy, place, place_thingy, Place, place_output = (self.tables.place_input,
place_input, transition, Transition, PlaceThingy, \
place, place_thingy, Place, \
place_output = (self.tables.place_input,
self.tables.transition,
self.classes.Transition,
self.classes.PlaceThingy,
@@ -197,13 +178,14 @@ class M2MTest(fixtures.MappedTest):
self.tables.place_output)
Place.mapper = mapper(Place, place, properties = {
'thingies':relationship(mapper(PlaceThingy, place_thingy), lazy='joined')
mapper(PlaceThingy, place_thingy)
mapper(Place, place, properties={
'thingies': relationship(PlaceThingy, lazy='joined')
})
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relationship(Place.mapper, place_output, lazy='joined'),
outputs = relationship(Place.mapper, place_input, lazy='joined'),
mapper(Transition, transition, properties=dict(
inputs=relationship(Place, place_output, lazy='joined'),
outputs=relationship(Place, place_input, lazy='joined'),
)
)
@@ -211,30 +193,36 @@ class M2MTest(fixtures.MappedTest):
tran.inputs.append(Place('place1'))
tran.outputs.append(Place('place2'))
tran.outputs.append(Place('place3'))
sess = create_session()
sess = Session()
sess.add(tran)
sess.flush()
sess.commit()
sess.expunge_all()
r = sess.query(Transition).all()
self.assert_unordered_result(r, Transition,
{'name': 'transition1',
'inputs': (Place, [{'name':'place1'}]),
'outputs': (Place, [{'name':'place2'}, {'name':'place3'}])
'inputs': (Place, [{'name': 'place1'}]),
'outputs': (Place, [{'name': 'place2'}, {'name': 'place3'}])
})
def test_bidirectional(self):
place_input, transition, Transition, Place, place, place_output = (self.tables.place_input,
place_input, transition, Transition, Place, place, place_output = (
self.tables.place_input,
self.tables.transition,
self.classes.Transition,
self.classes.Place,
self.tables.place,
self.tables.place_output)
Place.mapper = mapper(Place, place)
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relationship(Place.mapper, place_output, lazy='select', backref='inputs'),
outputs = relationship(Place.mapper, place_input, lazy='select', backref='outputs'),
mapper(Place, place)
mapper(Transition, transition, properties=dict(
inputs=relationship(Place, place_output,
backref=backref('inputs',
order_by=transition.c.transition_id),
order_by=Place.place_id),
outputs=relationship(Place, place_input,
backref=backref('outputs',
order_by=transition.c.transition_id),
order_by=Place.place_id),
)
)
@@ -252,23 +240,29 @@ class M2MTest(fixtures.MappedTest):
p2.inputs.append(t2)
p3.inputs.append(t2)
p1.outputs.append(t1)
sess = create_session()
sess.add_all((t1, t2, t3,p1, p2, p3))
sess.flush()
sess = Session()
sess.add_all((t1, t2, t3, p1, p2, p3))
sess.commit()
self.assert_result([t1], Transition, {'outputs': (Place, [{'name':'place3'}, {'name':'place1'}])})
self.assert_result([p2], Place, {'inputs': (Transition, [{'name':'transition1'},{'name':'transition2'}])})
self.assert_result([t1],
Transition, {'outputs':
(Place, [{'name': 'place3'}, {'name': 'place1'}])})
self.assert_result([p2],
Place, {'inputs':
(Transition, [{'name': 'transition1'},
{'name': 'transition2'}])})
@testing.requires.sane_multi_rowcount
def test_stale_conditions(self):
Place, Transition, place_input, place, transition = (self.classes.Place,
Place, Transition, place_input, place, transition = (
self.classes.Place,
self.classes.Transition,
self.tables.place_input,
self.tables.place,
self.tables.transition)
mapper(Place, place, properties={
'transitions':relationship(Transition, secondary=place_input,
'transitions': relationship(Transition, secondary=place_input,
passive_updates=False)
})
mapper(Transition, transition)
@@ -305,219 +299,98 @@ class M2MTest(fixtures.MappedTest):
sess.commit
)
class M2MTest2(fixtures.MappedTest):
class AssortedPersistenceTests(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('student', metadata,
Column('name', String(20), primary_key=True))
Table("left", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(30))
)
Table('course', metadata,
Column('name', String(20), primary_key=True))
Table("right", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(30)),
)
Table('enroll', metadata,
Column('student_id', String(20), ForeignKey('student.name'),
primary_key=True),
Column('course_id', String(20), ForeignKey('course.name'),
primary_key=True))
Table('secondary', metadata,
Column('left_id', Integer, ForeignKey('left.id'),
primary_key=True),
Column('right_id', Integer, ForeignKey('right.id'),
primary_key=True),
)
@classmethod
def setup_classes(cls):
class Student(cls.Basic):
def __init__(self, name=''):
self.name = name
class Course(cls.Basic):
def __init__(self, name=''):
self.name = name
def test_circular(self):
course, enroll, Student, student, Course = (self.tables.course,
self.tables.enroll,
self.classes.Student,
self.tables.student,
self.classes.Course)
mapper(Student, student)
mapper(Course, course, properties={
'students': relationship(Student, enroll, backref='courses')})
sess = create_session()
s1 = Student('Student1')
c1 = Course('Course1')
c2 = Course('Course2')
c3 = Course('Course3')
s1.courses.append(c1)
s1.courses.append(c2)
c3.students.append(s1)
self.assert_(len(s1.courses) == 3)
self.assert_(len(c1.students) == 1)
sess.add(s1)
sess.flush()
sess.expunge_all()
s = sess.query(Student).filter_by(name='Student1').one()
c = sess.query(Course).filter_by(name='Course3').one()
self.assert_(len(s.courses) == 3)
del s.courses[1]
self.assert_(len(s.courses) == 2)
def test_dupliates_raise(self):
"""test constraint error is raised for dupe entries in a list"""
course, enroll, Student, student, Course = (self.tables.course,
self.tables.enroll,
self.classes.Student,
self.tables.student,
self.classes.Course)
mapper(Student, student)
mapper(Course, course, properties={
'students': relationship(Student, enroll, backref='courses')})
sess = create_session()
s1 = Student("s1")
c1 = Course('c1')
s1.courses.append(c1)
s1.courses.append(c1)
sess.add(s1)
assert_raises(sa.exc.DBAPIError, sess.flush)
def test_delete(self):
"""A many-to-many table gets cleared out with deletion from the backref side"""
course, enroll, Student, student, Course = (self.tables.course,
self.tables.enroll,
self.classes.Student,
self.tables.student,
self.classes.Course)
mapper(Student, student)
mapper(Course, course, properties = {
'students': relationship(Student, enroll, lazy='select',
backref='courses')})
sess = create_session()
s1 = Student('Student1')
c1 = Course('Course1')
c2 = Course('Course2')
c3 = Course('Course3')
s1.courses.append(c1)
s1.courses.append(c2)
c3.students.append(s1)
sess.add(s1)
sess.flush()
sess.delete(s1)
sess.flush()
assert enroll.count().scalar() == 0
class M2MTest3(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('c', metadata,
Column('c1', Integer, primary_key = True),
Column('c2', String(20)))
Table('a', metadata,
Column('a1', Integer, primary_key=True),
Column('a2', String(20)),
Column('c1', Integer, ForeignKey('c.c1')))
Table('c2a1', metadata,
Column('c1', Integer, ForeignKey('c.c1')),
Column('a1', Integer, ForeignKey('a.a1')))
Table('c2a2', metadata,
Column('c1', Integer, ForeignKey('c.c1')),
Column('a1', Integer, ForeignKey('a.a1')))
Table('b', metadata,
Column('b1', Integer, primary_key=True),
Column('a1', Integer, ForeignKey('a.a1')),
Column('b2', sa.Boolean))
def test_basic(self):
a, c, b, c2a1, c2a2 = (self.tables.a,
self.tables.c,
self.tables.b,
self.tables.c2a1,
self.tables.c2a2)
class C(object):pass
class A(object):pass
class B(object):pass
mapper(B, b)
mapper(A, a, properties={
'tbs': relationship(B, primaryjoin=sa.and_(b.c.a1 == a.c.a1,
b.c.b2 == True),
lazy='joined')})
mapper(C, c, properties={
'a1s': relationship(A, secondary=c2a1, lazy='joined'),
'a2s': relationship(A, secondary=c2a2, lazy='joined')})
assert create_session().query(C).with_labels().statement is not None
# TODO: seems like just a test for an ancient exception throw.
# how about some data/inserts/queries/assertions for this one
class M2MTest4(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
table1 = Table("table1", metadata,
Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('col2', String(30))
)
table2 = Table("table2", metadata,
Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('col2', String(30)),
)
table3 = Table('table3', metadata,
Column('t1', Integer, ForeignKey('table1.col1')),
Column('t2', Integer, ForeignKey('table2.col1')),
)
def test_delete_parent(self):
table2, table3, table1 = (self.tables.table2,
self.tables.table3,
self.tables.table1)
class A(fixtures.ComparableEntity):
class A(cls.Comparable):
pass
class B(fixtures.ComparableEntity):
class B(cls.Comparable):
pass
mapper(A, table1, properties={
'bs':relationship(B, secondary=table3, backref='as', order_by=table3.c.t1)
def _standard_bidirectional_fixture(self):
left, secondary, right = self.tables.left, \
self.tables.secondary, self.tables.right
A, B = self.classes.A, self.classes.B
mapper(A, left, properties={
'bs': relationship(B, secondary=secondary,
backref='as', order_by=right.c.id)
})
mapper(B, table2)
mapper(B, right)
sess = create_session()
a1 = A(col2='a1')
a2 = A(col2='a2')
b1 = B(col2='b1')
b2 = B(col2='b2')
a1.bs.append(b1)
a2.bs.append(b2)
for x in [a1,a2]:
sess.add(x)
def _bidirectional_onescalar_fixture(self):
left, secondary, right = self.tables.left, \
self.tables.secondary, self.tables.right
A, B = self.classes.A, self.classes.B
mapper(A, left, properties={
'bs': relationship(B, secondary=secondary,
backref=backref('a', uselist=False),
order_by=right.c.id)
})
mapper(B, right)
def test_session_delete(self):
self._standard_bidirectional_fixture()
A, B = self.classes.A, self.classes.B
secondary = self.tables.secondary
sess = Session()
sess.add_all([
A(data='a1', bs=[B(data='b1')]),
A(data='a2', bs=[B(data='b2')])
])
sess.commit()
a1 = sess.query(A).filter_by(data='a1').one()
sess.delete(a1)
sess.flush()
sess.expunge_all()
eq_(sess.query(secondary).count(), 1)
alist = sess.query(A).order_by(A.col1).all()
eq_(
[
A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')])
],
alist)
for a in alist:
sess.delete(a)
a2 = sess.query(A).filter_by(data='a2').one()
sess.delete(a2)
sess.flush()
eq_(sess.query(table3).count(), 0)
eq_(sess.query(secondary).count(), 0)
def test_remove_scalar(self):
# test setting a uselist=False to None
self._bidirectional_onescalar_fixture()
A, B = self.classes.A, self.classes.B
secondary = self.tables.secondary
sess = Session()
sess.add_all([
A(data='a1', bs=[B(data='b1'), B(data='b2')]),
])
sess.commit()
a1 = sess.query(A).filter_by(data='a1').one()
b2 = sess.query(B).filter_by(data='b2').one()
assert b2.a is a1
b2.a = None
sess.commit()
eq_(a1.bs, [B(data='b1')])
eq_(b2.a, None)
eq_(sess.query(secondary).count(), 1)