- Fixed bugs in ORM object comparisons where comparison of

many-to-one ``!= None`` would fail if the source were an aliased
class, or if the query needed to apply special aliasing to the
expression due to aliased joins or polymorphic querying; also fixed
bug in the case where comparing a many-to-one to an object state
would fail if the query needed to apply special aliasing
due to aliased joins or polymorphic querying.
fixes #3310
This commit is contained in:
Mike Bayer
2015-02-20 15:14:08 -05:00
parent 4f63e24970
commit 305ea84004
4 changed files with 171 additions and 13 deletions
+12
View File
@@ -14,6 +14,18 @@
.. changelog::
:version: 0.9.9
.. change::
:tags: bug, orm
:tickets: 3310
Fixed bugs in ORM object comparisons where comparison of
many-to-one ``!= None`` would fail if the source were an aliased
class, or if the query needed to apply special aliasing to the
expression due to aliased joins or polymorphic querying; also fixed
bug in the case where comparing a many-to-one to an object state
would fail if the query needed to apply special aliasing
due to aliased joins or polymorphic querying.
.. change::
:tags: bug, orm
:tickets: 3309
+4 -3
View File
@@ -1291,8 +1291,9 @@ class RelationshipProperty(StrategizedProperty):
"""
if isinstance(other, (util.NoneType, expression.Null)):
if self.property.direction == MANYTOONE:
return sql.or_(*[x != None for x in
self.property._calculated_foreign_keys])
return _orm_annotate(~self.property._optimized_compare(
None, adapt_source=self.adapter))
else:
return self._criterion_exists()
elif self.property.uselist:
@@ -1301,7 +1302,7 @@ class RelationshipProperty(StrategizedProperty):
" to an object or collection; use "
"contains() to test for membership.")
else:
return self.__negated_contains_or_equals(other)
return _orm_annotate(self.__negated_contains_or_equals(other))
@util.memoized_property
def property(self):
@@ -328,6 +328,7 @@ class SelfReferentialJ2JSelfTest(fixtures.MappedTest):
def test_relationship_compare(self):
sess = self._five_obj_fixture()
e1 = sess.query(Engineer).filter_by(name='e1').one()
e2 = sess.query(Engineer).filter_by(name='e2').one()
eq_(sess.query(Engineer)
.join(Engineer.engineers, aliased=True)
@@ -339,6 +340,11 @@ class SelfReferentialJ2JSelfTest(fixtures.MappedTest):
.filter(Engineer.reports_to == e1).all(),
[e1])
eq_(sess.query(Engineer)
.join(Engineer.engineers, aliased=True)
.filter(Engineer.reports_to != None).all(),
[e1, e2])
class M2MFilterTest(fixtures.MappedTest):
run_setup_mappers = 'once'
+149 -10
View File
@@ -780,6 +780,18 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
clause = sess.query(entity).filter(clause)
self.assert_compile(clause, expected)
def _test_filter_aliases(self, clause, expected, from_, onclause):
dialect = default.DefaultDialect()
sess = Session()
lead = sess.query(from_).join(onclause, aliased=True)
full = lead.filter(clause)
context = lead._compile_context()
context.statement.use_labels = True
lead = context.statement.compile(dialect=dialect)
expected = (str(lead) + " WHERE " + expected).replace("\n", "")
self.assert_compile(full, expected)
def test_arithmetic(self):
User = self.classes.User
@@ -798,7 +810,7 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
(literal(5), 'b', ':param_1 %s :param_2'),
(literal(5), User.id, ':param_1 %s users.id'),
(literal(5), literal(6), ':param_1 %s :param_2'),
):
):
self._test(py_op(lhs, rhs), res % sql_op)
def test_comparison(self):
@@ -826,7 +838,7 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
(User.id, ualias.name, 'users.id', 'users_1.name'),
(User.name, ualias.name, 'users.name', 'users_1.name'),
(ualias.name, User.name, 'users_1.name', 'users.name'),
):
):
# the compiled clause should match either (e.g.):
# 'a' < 'b' -or- 'b' > 'a'.
@@ -839,18 +851,70 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
def test_negated_null(self):
User, Address = self.classes.User, self.classes.Address
def test_o2m_compare_to_null(self):
User = self.classes.User
self._test(User.id == None, "users.id IS NULL")
self._test(User.id != None, "users.id IS NOT NULL")
self._test(~(User.id == None), "users.id IS NOT NULL")
self._test(~(User.id != None), "users.id IS NULL")
self._test(None == User.id, "users.id IS NULL")
self._test(~(None == User.id), "users.id IS NOT NULL")
def test_m2o_compare_to_null(self):
Address = self.classes.Address
self._test(Address.user == None, "addresses.user_id IS NULL")
self._test(~(Address.user == None), "addresses.user_id IS NOT NULL")
self._test(~(Address.user != None), "addresses.user_id IS NULL")
self._test(None == Address.user, "addresses.user_id IS NULL")
self._test(~(None == Address.user), "addresses.user_id IS NOT NULL")
def test_o2m_compare_to_null_orm_adapt(self):
User, Address = self.classes.User, self.classes.Address
self._test_filter_aliases(
User.id == None,
"users_1.id IS NULL", Address, Address.user),
self._test_filter_aliases(
User.id != None,
"users_1.id IS NOT NULL", Address, Address.user),
self._test_filter_aliases(
~(User.id == None),
"users_1.id IS NOT NULL", Address, Address.user),
self._test_filter_aliases(
~(User.id != None),
"users_1.id IS NULL", Address, Address.user),
def test_m2o_compare_to_null_orm_adapt(self):
User, Address = self.classes.User, self.classes.Address
self._test_filter_aliases(
Address.user == None,
"addresses_1.user_id IS NULL", User, User.addresses),
self._test_filter_aliases(
Address.user != None,
"addresses_1.user_id IS NOT NULL", User, User.addresses),
self._test_filter_aliases(
~(Address.user == None),
"addresses_1.user_id IS NOT NULL", User, User.addresses),
self._test_filter_aliases(
~(Address.user != None),
"addresses_1.user_id IS NULL", User, User.addresses),
def test_o2m_compare_to_null_aliased(self):
User = self.classes.User
u1 = aliased(User)
self._test(u1.id == None, "users_1.id IS NULL")
self._test(u1.id != None, "users_1.id IS NOT NULL")
self._test(~(u1.id == None), "users_1.id IS NOT NULL")
self._test(~(u1.id != None), "users_1.id IS NULL")
def test_m2o_compare_to_null_aliased(self):
Address = self.classes.Address
a1 = aliased(Address)
self._test(a1.user == None, "addresses_1.user_id IS NULL")
self._test(~(a1.user == None), "addresses_1.user_id IS NOT NULL")
self._test(a1.user != None, "addresses_1.user_id IS NOT NULL")
self._test(~(a1.user != None), "addresses_1.user_id IS NULL")
def test_relationship_unimplemented(self):
User = self.classes.User
for op in [
@@ -861,9 +925,8 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
]:
assert_raises(NotImplementedError, op, "x")
def test_relationship(self):
def test_o2m_any(self):
User, Address = self.classes.User, self.classes.Address
self._test(
User.addresses.any(Address.id == 17),
"EXISTS (SELECT 1 FROM addresses "
@@ -871,17 +934,88 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
entity=User
)
def test_o2m_any_aliased(self):
User, Address = self.classes.User, self.classes.Address
u1 = aliased(User)
a1 = aliased(Address)
self._test(
u1.addresses.of_type(a1).any(a1.id == 17),
"EXISTS (SELECT 1 FROM addresses AS addresses_1 "
"WHERE users_1.id = addresses_1.user_id AND "
"addresses_1.id = :id_1)",
entity=u1
)
def test_o2m_any_orm_adapt(self):
User, Address = self.classes.User, self.classes.Address
self._test_filter_aliases(
User.addresses.any(Address.id == 17),
"EXISTS (SELECT 1 FROM addresses "
"WHERE users_1.id = addresses.user_id AND addresses.id = :id_1)",
Address, Address.user
)
def test_m2o_compare_instance(self):
User, Address = self.classes.User, self.classes.Address
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
self._test(Address.user == u7, ":param_1 = addresses.user_id")
self._test(Address.user != u7,
"addresses.user_id != :user_id_1 OR addresses.user_id IS NULL")
def test_m2o_compare_instance_negated(self):
User, Address = self.classes.User, self.classes.Address
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
self._test(Address.user == None, "addresses.user_id IS NULL")
self._test(
Address.user != u7,
"addresses.user_id != :user_id_1 OR addresses.user_id IS NULL")
self._test(Address.user != None, "addresses.user_id IS NOT NULL")
def test_m2o_compare_instance_orm_adapt(self):
User, Address = self.classes.User, self.classes.Address
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
self._test_filter_aliases(
Address.user == u7,
":param_1 = addresses_1.user_id", User, User.addresses
)
def test_m2o_compare_instance_negated_orm_adapt(self):
User, Address = self.classes.User, self.classes.Address
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
self._test_filter_aliases(
Address.user != u7,
"addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
User, User.addresses
)
self._test_filter_aliases(
~(Address.user == u7), ":param_1 != addresses_1.user_id",
User, User.addresses
)
self._test_filter_aliases(
~(Address.user != u7),
"NOT (addresses_1.user_id != :user_id_1 "
"OR addresses_1.user_id IS NULL)", User, User.addresses
)
def test_m2o_compare_instance_aliased(self):
User, Address = self.classes.User, self.classes.Address
u7 = User(id=7)
attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
a1 = aliased(Address)
self._test(
a1.user == u7,
":param_1 = addresses_1.user_id")
self._test(
a1.user != u7,
"addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL")
def test_selfref_relationship(self):
@@ -915,6 +1049,11 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
"nodes_1.parent_id IS NULL"
)
self._test(
nalias.parent != None,
"nodes_1.parent_id IS NOT NULL"
)
self._test(
nalias.children == None,
"NOT (EXISTS ("