Files
Mike Bayer 93e11a9d7d - Fixed bug where :meth:.Table.update and :meth:.Table.delete
would produce an empty WHERE clause when an empty :func:`.and_()`
or :func:`.or_()` or other blank expression were applied.  This is
now consistent with that of :func:`.select`.
fixes #3045
2014-05-08 22:45:01 -04:00

569 lines
19 KiB
Python

from sqlalchemy import *
from sqlalchemy import testing
from sqlalchemy.dialects import mysql
from sqlalchemy.engine import default
from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
from sqlalchemy.testing.schema import Table, Column
class _UpdateFromTestBase(object):
@classmethod
def define_tables(cls, metadata):
Table('mytable', metadata,
Column('myid', Integer),
Column('name', String(30)),
Column('description', String(50)))
Table('myothertable', metadata,
Column('otherid', Integer),
Column('othername', String(30)))
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('name', String(30), nullable=False),
Column('email_address', String(50), nullable=False))
Table('dingalings', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
Column('data', String(30)))
@classmethod
def fixtures(cls):
return dict(
users=(
('id', 'name'),
(7, 'jack'),
(8, 'ed'),
(9, 'fred'),
(10, 'chuck')
),
addresses = (
('id', 'user_id', 'name', 'email_address'),
(1, 7, 'x', 'jack@bean.com'),
(2, 8, 'x', 'ed@wood.com'),
(3, 8, 'x', 'ed@bettyboop.com'),
(4, 8, 'x', 'ed@lala.com'),
(5, 9, 'x', 'fred@fred.com')
),
dingalings = (
('id', 'address_id', 'data'),
(1, 2, 'ding 1/2'),
(2, 5, 'ding 2/5')
),
)
class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = 'default'
def test_update_1(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1, table1.c.myid == 7),
'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
params={table1.c.name: 'fred'})
def test_update_2(self):
table1 = self.tables.mytable
self.assert_compile(
table1.update().
where(table1.c.myid == 7).
values({table1.c.myid: 5}),
'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
checkparams={'myid': 5, 'myid_1': 7})
def test_update_3(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1, table1.c.myid == 7),
'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
params={'name': 'fred'})
def test_update_4(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1, values={table1.c.name: table1.c.myid}),
'UPDATE mytable SET name=mytable.myid')
def test_update_5(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1,
whereclause=table1.c.name == bindparam('crit'),
values={table1.c.name: 'hi'}),
'UPDATE mytable SET name=:name WHERE mytable.name = :crit',
params={'crit': 'notthere'},
checkparams={'crit': 'notthere', 'name': 'hi'})
def test_update_6(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1,
table1.c.myid == 12,
values={table1.c.name: table1.c.myid}),
'UPDATE mytable '
'SET name=mytable.myid, description=:description '
'WHERE mytable.myid = :myid_1',
params={'description': 'test'},
checkparams={'description': 'test', 'myid_1': 12})
def test_update_7(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
'UPDATE mytable '
'SET myid=:myid, description=:description '
'WHERE mytable.myid = :myid_1',
params={'myid_1': 12, 'myid': 9, 'description': 'test'})
def test_update_8(self):
table1 = self.tables.mytable
self.assert_compile(
update(table1, table1.c.myid == 12),
'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
def test_update_9(self):
table1 = self.tables.mytable
s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
c = s.compile(column_keys=['id', 'name'])
eq_(str(s), str(c))
def test_update_10(self):
table1 = self.tables.mytable
v1 = {table1.c.name: table1.c.myid}
v2 = {table1.c.name: table1.c.name + 'foo'}
self.assert_compile(
update(table1, table1.c.myid == 12, values=v1).values(v2),
'UPDATE mytable '
'SET '
'name=(mytable.name || :name_1), '
'description=:description '
'WHERE mytable.myid = :myid_1',
params={'description': 'test'})
def test_update_11(self):
table1 = self.tables.mytable
values = {
table1.c.name: table1.c.name + 'lala',
table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
}
self.assert_compile(update(table1,
(table1.c.myid == func.hoho(4)) &
(table1.c.name == literal('foo') +
table1.c.name + literal('lala')),
values=values),
'UPDATE mytable '
'SET '
'myid=do_stuff(mytable.myid, :param_1), '
'name=(mytable.name || :name_1) '
'WHERE '
'mytable.myid = hoho(:hoho_1) AND '
'mytable.name = :param_2 || mytable.name || :param_3')
def test_where_empty(self):
table1 = self.tables.mytable
self.assert_compile(
table1.update().where(and_()),
"UPDATE mytable SET myid=:myid, name=:name, description=:description"
)
self.assert_compile(
table1.update().where(or_()),
"UPDATE mytable SET myid=:myid, name=:name, description=:description"
)
def test_prefix_with(self):
table1 = self.tables.mytable
stmt = table1.update().\
prefix_with('A', 'B', dialect='mysql').\
prefix_with('C', 'D')
self.assert_compile(stmt,
'UPDATE C D mytable SET myid=:myid, name=:name, '
'description=:description')
self.assert_compile(stmt,
'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s',
dialect=mysql.dialect())
def test_alias(self):
table1 = self.tables.mytable
talias1 = table1.alias('t1')
self.assert_compile(update(talias1, talias1.c.myid == 7),
'UPDATE mytable AS t1 '
'SET name=:name '
'WHERE t1.myid = :myid_1',
params={table1.c.name: 'fred'})
self.assert_compile(update(talias1, table1.c.myid == 7),
'UPDATE mytable AS t1 '
'SET name=:name '
'FROM mytable '
'WHERE mytable.myid = :myid_1',
params={table1.c.name: 'fred'})
def test_update_to_expression(self):
"""test update from an expression.
this logic is triggered currently by a left side that doesn't
have a key. The current supported use case is updating the index
of a Postgresql ARRAY type.
"""
table1 = self.tables.mytable
expr = func.foo(table1.c.myid)
assert not hasattr(expr, 'key')
self.assert_compile(table1.update().values({expr: 'bar'}),
'UPDATE mytable SET foo(myid)=:param_1')
def test_update_bound_ordering(self):
"""test that bound parameters between the UPDATE and FROM clauses
order correctly in different SQL compilation scenarios.
"""
table1 = self.tables.mytable
table2 = self.tables.myothertable
sel = select([table2]).where(table2.c.otherid == 5).alias()
upd = table1.update().\
where(table1.c.name == sel.c.othername).\
values(name='foo')
dialect = default.DefaultDialect()
dialect.positional = True
self.assert_compile(
upd,
"UPDATE mytable SET name=:name FROM (SELECT "
"myothertable.otherid AS otherid, "
"myothertable.othername AS othername "
"FROM myothertable "
"WHERE myothertable.otherid = :otherid_1) AS anon_1 "
"WHERE mytable.name = anon_1.othername",
checkpositional=('foo', 5),
dialect=dialect
)
self.assert_compile(
upd,
"UPDATE mytable, (SELECT myothertable.otherid AS otherid, "
"myothertable.othername AS othername "
"FROM myothertable "
"WHERE myothertable.otherid = %s) AS anon_1 SET mytable.name=%s "
"WHERE mytable.name = anon_1.othername",
checkpositional=(5, 'foo'),
dialect=mysql.dialect()
)
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
AssertsCompiledSQL):
__dialect__ = 'default'
run_create_tables = run_inserts = run_deletes = None
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().
values(name='newname').
where(users.c.id == addresses.c.user_id).
where(addresses.c.email_address == 'e1'),
'UPDATE users '
'SET name=:name FROM addresses '
'WHERE '
'users.id = addresses.user_id AND '
'addresses.email_address = :email_address_1',
checkparams={u'email_address_1': 'e1', 'name': 'newname'})
def test_render_multi_table(self):
users = self.tables.users
addresses = self.tables.addresses
dingalings = self.tables.dingalings
checkparams = {
u'email_address_1': 'e1',
u'id_1': 2,
'name': 'newname'
}
self.assert_compile(
users.update().
values(name='newname').
where(users.c.id == addresses.c.user_id).
where(addresses.c.email_address == 'e1').
where(addresses.c.id == dingalings.c.address_id).
where(dingalings.c.id == 2),
'UPDATE users '
'SET name=:name '
'FROM addresses, dingalings '
'WHERE '
'users.id = addresses.user_id AND '
'addresses.email_address = :email_address_1 AND '
'addresses.id = dingalings.address_id AND '
'dingalings.id = :id_1',
checkparams=checkparams)
def test_render_table_mysql(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().
values(name='newname').
where(users.c.id == addresses.c.user_id).
where(addresses.c.email_address == 'e1'),
'UPDATE users, addresses '
'SET users.name=%s '
'WHERE '
'users.id = addresses.user_id AND '
'addresses.email_address = %s',
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
dialect=mysql.dialect())
def test_render_subquery(self):
users, addresses = self.tables.users, self.tables.addresses
checkparams = {
u'email_address_1': 'e1',
u'id_1': 7,
'name': 'newname'
}
cols = [
addresses.c.id,
addresses.c.user_id,
addresses.c.email_address
]
subq = select(cols).where(addresses.c.id == 7).alias()
self.assert_compile(
users.update().
values(name='newname').
where(users.c.id == subq.c.user_id).
where(subq.c.email_address == 'e1'),
'UPDATE users '
'SET name=:name FROM ('
'SELECT '
'addresses.id AS id, '
'addresses.user_id AS user_id, '
'addresses.email_address AS email_address '
'FROM addresses '
'WHERE addresses.id = :id_1'
') AS anon_1 '
'WHERE users.id = anon_1.user_id '
'AND anon_1.email_address = :email_address_1',
checkparams=checkparams)
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
@testing.requires.update_from
def test_exec_two_table(self):
users, addresses = self.tables.users, self.tables.addresses
testing.db.execute(
addresses.update().
values(email_address=users.c.name).
where(users.c.id == addresses.c.user_id).
where(users.c.name == 'ed'))
expected = [
(1, 7, 'x', 'jack@bean.com'),
(2, 8, 'x', 'ed'),
(3, 8, 'x', 'ed'),
(4, 8, 'x', 'ed'),
(5, 9, 'x', 'fred@fred.com')]
self._assert_addresses(addresses, expected)
@testing.requires.update_from
def test_exec_two_table_plus_alias(self):
users, addresses = self.tables.users, self.tables.addresses
a1 = addresses.alias()
testing.db.execute(
addresses.update().
values(email_address=users.c.name).
where(users.c.id == a1.c.user_id).
where(users.c.name == 'ed').
where(a1.c.id == addresses.c.id)
)
expected = [
(1, 7, 'x', 'jack@bean.com'),
(2, 8, 'x', 'ed'),
(3, 8, 'x', 'ed'),
(4, 8, 'x', 'ed'),
(5, 9, 'x', 'fred@fred.com')]
self._assert_addresses(addresses, expected)
@testing.requires.update_from
def test_exec_three_table(self):
users = self.tables.users
addresses = self.tables.addresses
dingalings = self.tables.dingalings
testing.db.execute(
addresses.update().
values(email_address=users.c.name).
where(users.c.id == addresses.c.user_id).
where(users.c.name == 'ed').
where(addresses.c.id == dingalings.c.address_id).
where(dingalings.c.id == 1))
expected = [
(1, 7, 'x', 'jack@bean.com'),
(2, 8, 'x', 'ed'),
(3, 8, 'x', 'ed@bettyboop.com'),
(4, 8, 'x', 'ed@lala.com'),
(5, 9, 'x', 'fred@fred.com')]
self._assert_addresses(addresses, expected)
@testing.only_on('mysql', 'Multi table update')
def test_exec_multitable(self):
users, addresses = self.tables.users, self.tables.addresses
values = {
addresses.c.email_address: users.c.name,
users.c.name: 'ed2'
}
testing.db.execute(
addresses.update().
values(values).
where(users.c.id == addresses.c.user_id).
where(users.c.name == 'ed'))
expected = [
(1, 7, 'x', 'jack@bean.com'),
(2, 8, 'x', 'ed'),
(3, 8, 'x', 'ed'),
(4, 8, 'x', 'ed'),
(5, 9, 'x', 'fred@fred.com')]
self._assert_addresses(addresses, expected)
expected = [
(7, 'jack'),
(8, 'ed2'),
(9, 'fred'),
(10, 'chuck')]
self._assert_users(users, expected)
def _assert_addresses(self, addresses, expected):
stmt = addresses.select().order_by(addresses.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)
def _assert_users(self, users, expected):
stmt = users.select().order_by(users.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
Column('some_update', String(30), onupdate='im the update'))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('email_address', String(50), nullable=False))
@classmethod
def fixtures(cls):
return dict(
users=(
('id', 'name', 'some_update'),
(8, 'ed', 'value'),
(9, 'fred', 'value'),
),
addresses=(
('id', 'user_id', 'email_address'),
(2, 8, 'ed@wood.com'),
(3, 8, 'ed@bettyboop.com'),
(4, 9, 'fred@fred.com')
),
)
@testing.only_on('mysql', 'Multi table update')
def test_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
values = {
addresses.c.email_address: users.c.name,
users.c.name: 'ed2'
}
ret = testing.db.execute(
addresses.update().
values(values).
where(users.c.id == addresses.c.user_id).
where(users.c.name == 'ed'))
eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
expected = [
(2, 8, 'ed'),
(3, 8, 'ed'),
(4, 9, 'fred@fred.com')]
self._assert_addresses(addresses, expected)
expected = [
(8, 'ed2', 'im the update'),
(9, 'fred', 'value')]
self._assert_users(users, expected)
@testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
ret = testing.db.execute(
addresses.update().
values({'email_address': users.c.name}).
where(users.c.id == addresses.c.user_id).
where(users.c.name == 'ed'))
eq_(ret.prefetch_cols(), [])
expected = [
(2, 8, 'ed'),
(3, 8, 'ed'),
(4, 9, 'fred@fred.com')]
self._assert_addresses(addresses, expected)
# users table not actually updated, so no onupdate
expected = [
(8, 'ed', 'value'),
(9, 'fred', 'value')]
self._assert_users(users, expected)
def _assert_addresses(self, addresses, expected):
stmt = addresses.select().order_by(addresses.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)
def _assert_users(self, users, expected):
stmt = users.select().order_by(users.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)