Delegate to the ForeignKeyField's db_value() method when converting models.

When a model instance is passed as a parameter on the right-hand-side of
an expression, Peewee would previously just grab the model's primary-key
value. This breaks queries where the left-hand-side is a foreign-key
that points to a field other than the primary-key.

To address this I've added a test to try and use the converter in the
scope (if available), and fall back to using the model pk.

Fixes #2304
This commit is contained in:
Charles Leifer
2020-11-25 15:29:18 -06:00
parent 87a6384814
commit ebe3ad5023
3 changed files with 81 additions and 2 deletions
+13 -2
View File
@@ -6607,10 +6607,21 @@ class Model(with_metaclass(ModelBase, Node)):
return not self == other
def __sql__(self, ctx):
# XXX: when comparing a foreign-key field whose related-field is not a
# NOTE: when comparing a foreign-key field whose related-field is not a
# primary-key, then doing an equality test for the foreign-key with a
# model instance will return the wrong value; since we always return
# model instance will return the wrong value; since we would return
# the primary key for a given model instance.
#
# This checks to see if we have a converter in the scope, and if so,
# hands the model instance to the converter rather than blindly
# grabbing the primary-key. In the event the provided converter fails
# to handle the model instance, then we will return the primary-key.
if ctx.state.converter is not None:
try:
return ctx.sql(Value(self, converter=ctx.state.converter))
except (TypeError, ValueError):
pass
return ctx.sql(Value(getattr(self, self._meta.primary_key.name),
converter=self._meta.primary_key.db_value))
+26
View File
@@ -427,3 +427,29 @@ class TestForeignKeyConstraints(ModelTestCase):
def test_disable_constraint(self):
self.set_foreign_key_pragma(False)
Note.create(user=0, content='test')
class FK_A(TestModel):
key = CharField(max_length=16, unique=True)
class FK_B(TestModel):
fk_a = ForeignKeyField(FK_A, field='key')
class TestFKtoNonPKField(ModelTestCase):
requires = [FK_A, FK_B]
def test_fk_to_non_pk_field(self):
a1 = FK_A.create(key='a1')
a2 = FK_A.create(key='a2')
b1 = FK_B.create(fk_a=a1)
b2 = FK_B.create(fk_a=a2)
args = (b1.fk_a, b1.fk_a_id, a1, a1.key)
for arg in args:
query = FK_B.select().where(FK_B.fk_a == arg)
self.assertSQL(query, (
'SELECT "t1"."id", "t1"."fk_a_id" FROM "fk_b" AS "t1" '
'WHERE ("t1"."fk_a_id" = ?)'), ['a1'])
b1_db = query.get()
self.assertEqual(b1_db.id, b1.id)
+42
View File
@@ -1436,3 +1436,45 @@ class TestLikeEscape(ModelTestCase):
)
for expr, expected in cases:
self.assertNames(expr, expected)
class FKF_A(TestModel):
key = CharField(max_length=16, unique=True)
class FKF_B(TestModel):
fk_a_1 = ForeignKeyField(FKF_A, field='key')
fk_a_2 = IntegerField()
class TestQueryWithModelInstanceParam(ModelTestCase):
requires = [FKF_A, FKF_B]
def test_query_with_model_instance_param(self):
a1 = FKF_A.create(key='k1')
a2 = FKF_A.create(key='k2')
b1 = FKF_B.create(fk_a_1=a1, fk_a_2=a1)
b2 = FKF_B.create(fk_a_1=a2, fk_a_2=a2)
# See also keys.TestFKtoNonPKField test, which replicates much of this.
args = (b1.fk_a_1, b1.fk_a_1_id, a1, a1.key)
for arg in args:
query = FKF_B.select().where(FKF_B.fk_a_1 == arg)
self.assertSQL(query, (
'SELECT "t1"."id", "t1"."fk_a_1_id", "t1"."fk_a_2" '
'FROM "fkf_b" AS "t1" '
'WHERE ("t1"."fk_a_1_id" = ?)'), ['k1'])
b1_db = query.get()
self.assertEqual(b1_db.id, b1.id)
# When we are handed a model instance and a conversion (an IntegerField
# in this case), when the attempted conversion fails we fall back to
# using the given model's primary-key.
args = (b1.fk_a_2, a1, a1.id)
for arg in args:
query = FKF_B.select().where(FKF_B.fk_a_2 == arg)
self.assertSQL(query, (
'SELECT "t1"."id", "t1"."fk_a_1_id", "t1"."fk_a_2" '
'FROM "fkf_b" AS "t1" '
'WHERE ("t1"."fk_a_2" = ?)'), [a1.id])
b1_db = query.get()
self.assertEqual(b1_db.id, b1.id)