Files
sqlalchemy/test/sql/test_update.py
T
Mike Bayer 20cdc64588 trying different approaches to test layout. in this one, the testing modules
become an externally usable package but still remains within the main sqlalchemy parent package.
in this system, we use kind of an ugly hack to get the noseplugin imported outside of the
"sqlalchemy" package, while still making it available within sqlalchemy for usage by
third party libraries.
2012-09-27 02:37:33 -04:00

347 lines
12 KiB
Python

from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql, util
from sqlalchemy.engine import default, base
from sqlalchemy import testing
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.dialects import mysql
class _UpdateFromTestBase(object):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
)
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('name', String(30), nullable=False),
Column('email_address', String(50), nullable=False),
)
Table("dingalings", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
Column('data', String(30)),
)
@classmethod
def fixtures(cls):
return dict(
users = (
('id', 'name'),
(7, 'jack'),
(8, 'ed'),
(9, 'fred'),
(10, 'chuck')
),
addresses = (
('id', 'user_id', 'name', 'email_address'),
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed@wood.com"),
(3, 8, 'x', "ed@bettyboop.com"),
(4, 8, 'x', "ed@lala.com"),
(5, 9, 'x', "fred@fred.com")
),
dingalings = (
('id', 'address_id', 'data'),
(1, 2, 'ding 1/2'),
(2, 5, 'ding 2/5')
),
)
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = 'default'
run_create_tables = run_inserts = run_deletes = None
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1'),
"UPDATE users SET name=:name FROM addresses "
"WHERE users.id = addresses.user_id AND "
"addresses.email_address = :email_address_1",
checkparams={u'email_address_1': 'e1', 'name': 'newname'}
)
def test_render_multi_table(self):
users, addresses, dingalings = \
self.tables.users, \
self.tables.addresses, \
self.tables.dingalings
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1').\
where(addresses.c.id==dingalings.c.address_id).\
where(dingalings.c.id==2),
"UPDATE users SET name=:name FROM addresses, "
"dingalings WHERE users.id = addresses.user_id "
"AND addresses.email_address = :email_address_1 "
"AND addresses.id = dingalings.address_id AND "
"dingalings.id = :id_1",
checkparams={u'email_address_1': 'e1', u'id_1': 2,
'name': 'newname'}
)
def test_render_table_mysql(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1'),
"UPDATE users, addresses SET users.name=%s "
"WHERE users.id = addresses.user_id AND "
"addresses.email_address = %s",
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
dialect=mysql.dialect()
)
def test_render_subquery(self):
users, addresses = self.tables.users, self.tables.addresses
subq = select([addresses.c.id,
addresses.c.user_id,
addresses.c.email_address]).\
where(addresses.c.id==7).alias()
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==subq.c.user_id).\
where(subq.c.email_address=='e1'),
"UPDATE users SET name=:name FROM "
"(SELECT addresses.id AS id, addresses.user_id "
"AS user_id, addresses.email_address AS "
"email_address FROM addresses WHERE addresses.id = "
":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
"AND anon_1.email_address = :email_address_1",
checkparams={u'email_address_1': 'e1',
u'id_1': 7, 'name': 'newname'}
)
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
@testing.requires.update_from
def test_exec_two_table(self):
users, addresses = self.tables.users, self.tables.addresses
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
testing.db.execute(
addresses.select().\
order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.requires.update_from
def test_exec_two_table_plus_alias(self):
users, addresses = self.tables.users, self.tables.addresses
a1 = addresses.alias()
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==a1.c.user_id).\
where(users.c.name=='ed').\
where(a1.c.id==addresses.c.id)
)
eq_(
testing.db.execute(
addresses.select().\
order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.requires.update_from
def test_exec_three_table(self):
users, addresses, dingalings = \
self.tables.users, \
self.tables.addresses, \
self.tables.dingalings
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed').
where(addresses.c.id==dingalings.c.address_id).\
where(dingalings.c.id==1),
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)
).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed@bettyboop.com"),
(4, 8, 'x', "ed@lala.com"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.only_on('mysql', 'Multi table update')
def test_exec_multitable(self):
users, addresses = self.tables.users, self.tables.addresses
testing.db.execute(
addresses.update().\
values({
addresses.c.email_address:users.c.name,
users.c.name:'ed2'
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(7, 'jack'),
(8, 'ed2'),
(9, 'fred'),
(10, 'chuck')
]
)
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
Column('some_update', String(30), onupdate="im the update")
)
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('email_address', String(50), nullable=False),
)
@classmethod
def fixtures(cls):
return dict(
users = (
('id', 'name', 'some_update'),
(8, 'ed', 'value'),
(9, 'fred', 'value'),
),
addresses = (
('id', 'user_id', 'email_address'),
(2, 8, "ed@wood.com"),
(3, 8, "ed@bettyboop.com"),
(4, 9, "fred@fred.com")
),
)
@testing.only_on('mysql', 'Multi table update')
def test_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
ret = testing.db.execute(
addresses.update().\
values({
addresses.c.email_address:users.c.name,
users.c.name:'ed2'
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
set(ret.prefetch_cols()),
set([users.c.some_update])
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(2, 8, "ed"),
(3, 8, "ed"),
(4, 9, "fred@fred.com")
]
)
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(8, 'ed2', 'im the update'),
(9, 'fred', 'value'),
]
)
@testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
ret = testing.db.execute(
addresses.update().\
values({
'email_address':users.c.name,
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
ret.prefetch_cols(),[]
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(2, 8, "ed"),
(3, 8, "ed"),
(4, 9, "fred@fred.com")
]
)
# users table not actually updated,
# so no onupdate
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(8, 'ed', 'value'),
(9, 'fred', 'value'),
]
)