Files
2012-08-10 11:22:37 -04:00

346 lines
12 KiB
Python

from test.lib.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql, util
from sqlalchemy.engine import default, base
from test.lib import *
from test.lib.schema import Table, Column
from sqlalchemy.dialects import mysql
class _UpdateFromTestBase(object):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
)
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('name', String(30), nullable=False),
Column('email_address', String(50), nullable=False),
)
Table("dingalings", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
Column('data', String(30)),
)
@classmethod
def fixtures(cls):
return dict(
users = (
('id', 'name'),
(7, 'jack'),
(8, 'ed'),
(9, 'fred'),
(10, 'chuck')
),
addresses = (
('id', 'user_id', 'name', 'email_address'),
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed@wood.com"),
(3, 8, 'x', "ed@bettyboop.com"),
(4, 8, 'x', "ed@lala.com"),
(5, 9, 'x', "fred@fred.com")
),
dingalings = (
('id', 'address_id', 'data'),
(1, 2, 'ding 1/2'),
(2, 5, 'ding 2/5')
),
)
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = 'default'
run_create_tables = run_inserts = run_deletes = None
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1'),
"UPDATE users SET name=:name FROM addresses "
"WHERE users.id = addresses.user_id AND "
"addresses.email_address = :email_address_1",
checkparams={u'email_address_1': 'e1', 'name': 'newname'}
)
def test_render_multi_table(self):
users, addresses, dingalings = \
self.tables.users, \
self.tables.addresses, \
self.tables.dingalings
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1').\
where(addresses.c.id==dingalings.c.address_id).\
where(dingalings.c.id==2),
"UPDATE users SET name=:name FROM addresses, "
"dingalings WHERE users.id = addresses.user_id "
"AND addresses.email_address = :email_address_1 "
"AND addresses.id = dingalings.address_id AND "
"dingalings.id = :id_1",
checkparams={u'email_address_1': 'e1', u'id_1': 2,
'name': 'newname'}
)
def test_render_table_mysql(self):
users, addresses = self.tables.users, self.tables.addresses
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==addresses.c.user_id).\
where(addresses.c.email_address=='e1'),
"UPDATE users, addresses SET users.name=%s "
"WHERE users.id = addresses.user_id AND "
"addresses.email_address = %s",
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
dialect=mysql.dialect()
)
def test_render_subquery(self):
users, addresses = self.tables.users, self.tables.addresses
subq = select([addresses.c.id,
addresses.c.user_id,
addresses.c.email_address]).\
where(addresses.c.id==7).alias()
self.assert_compile(
users.update().\
values(name='newname').\
where(users.c.id==subq.c.user_id).\
where(subq.c.email_address=='e1'),
"UPDATE users SET name=:name FROM "
"(SELECT addresses.id AS id, addresses.user_id "
"AS user_id, addresses.email_address AS "
"email_address FROM addresses WHERE addresses.id = "
":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
"AND anon_1.email_address = :email_address_1",
checkparams={u'email_address_1': 'e1',
u'id_1': 7, 'name': 'newname'}
)
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
@testing.requires.update_from
def test_exec_two_table(self):
users, addresses = self.tables.users, self.tables.addresses
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
testing.db.execute(
addresses.select().\
order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.requires.update_from
def test_exec_two_table_plus_alias(self):
users, addresses = self.tables.users, self.tables.addresses
a1 = addresses.alias()
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==a1.c.user_id).\
where(users.c.name=='ed').\
where(a1.c.id==addresses.c.id)
)
eq_(
testing.db.execute(
addresses.select().\
order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.requires.update_from
def test_exec_three_table(self):
users, addresses, dingalings = \
self.tables.users, \
self.tables.addresses, \
self.tables.dingalings
testing.db.execute(
addresses.update().\
values(email_address=users.c.name).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed').
where(addresses.c.id==dingalings.c.address_id).\
where(dingalings.c.id==1),
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)
).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed@bettyboop.com"),
(4, 8, 'x', "ed@lala.com"),
(5, 9, 'x', "fred@fred.com")
]
)
@testing.only_on('mysql', 'Multi table update')
def test_exec_multitable(self):
users, addresses = self.tables.users, self.tables.addresses
testing.db.execute(
addresses.update().\
values({
addresses.c.email_address:users.c.name,
users.c.name:'ed2'
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(1, 7, 'x', "jack@bean.com"),
(2, 8, 'x', "ed"),
(3, 8, 'x', "ed"),
(4, 8, 'x', "ed"),
(5, 9, 'x', "fred@fred.com")
]
)
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(7, 'jack'),
(8, 'ed2'),
(9, 'fred'),
(10, 'chuck')
]
)
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
Column('some_update', String(30), onupdate="im the update")
)
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('email_address', String(50), nullable=False),
)
@classmethod
def fixtures(cls):
return dict(
users = (
('id', 'name', 'some_update'),
(8, 'ed', 'value'),
(9, 'fred', 'value'),
),
addresses = (
('id', 'user_id', 'email_address'),
(2, 8, "ed@wood.com"),
(3, 8, "ed@bettyboop.com"),
(4, 9, "fred@fred.com")
),
)
@testing.only_on('mysql', 'Multi table update')
def test_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
ret = testing.db.execute(
addresses.update().\
values({
addresses.c.email_address:users.c.name,
users.c.name:'ed2'
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
set(ret.prefetch_cols()),
set([users.c.some_update])
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(2, 8, "ed"),
(3, 8, "ed"),
(4, 9, "fred@fred.com")
]
)
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(8, 'ed2', 'im the update'),
(9, 'fred', 'value'),
]
)
@testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
ret = testing.db.execute(
addresses.update().\
values({
'email_address':users.c.name,
}).\
where(users.c.id==addresses.c.user_id).\
where(users.c.name=='ed')
)
eq_(
ret.prefetch_cols(),[]
)
eq_(
testing.db.execute(
addresses.select().order_by(addresses.c.id)).fetchall(),
[
(2, 8, "ed"),
(3, 8, "ed"),
(4, 9, "fred@fred.com")
]
)
# users table not actually updated,
# so no onupdate
eq_(
testing.db.execute(
users.select().order_by(users.c.id)).fetchall(),
[
(8, 'ed', 'value'),
(9, 'fred', 'value'),
]
)