mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-08 09:51:28 -04:00
346 lines
12 KiB
Python
346 lines
12 KiB
Python
from test.lib.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
|
|
import datetime
|
|
from sqlalchemy import *
|
|
from sqlalchemy import exc, sql, util
|
|
from sqlalchemy.engine import default, base
|
|
from test.lib import *
|
|
from test.lib.schema import Table, Column
|
|
from sqlalchemy.dialects import mysql
|
|
|
|
class _UpdateFromTestBase(object):
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table('users', metadata,
|
|
Column('id', Integer, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('name', String(30), nullable=False),
|
|
)
|
|
|
|
Table('addresses', metadata,
|
|
Column('id', Integer, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('user_id', None, ForeignKey('users.id')),
|
|
Column('name', String(30), nullable=False),
|
|
Column('email_address', String(50), nullable=False),
|
|
)
|
|
|
|
Table("dingalings", metadata,
|
|
Column('id', Integer, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('address_id', None, ForeignKey('addresses.id')),
|
|
Column('data', String(30)),
|
|
)
|
|
|
|
@classmethod
|
|
def fixtures(cls):
|
|
return dict(
|
|
users = (
|
|
('id', 'name'),
|
|
(7, 'jack'),
|
|
(8, 'ed'),
|
|
(9, 'fred'),
|
|
(10, 'chuck')
|
|
),
|
|
|
|
addresses = (
|
|
('id', 'user_id', 'name', 'email_address'),
|
|
(1, 7, 'x', "jack@bean.com"),
|
|
(2, 8, 'x', "ed@wood.com"),
|
|
(3, 8, 'x', "ed@bettyboop.com"),
|
|
(4, 8, 'x', "ed@lala.com"),
|
|
(5, 9, 'x', "fred@fred.com")
|
|
),
|
|
dingalings = (
|
|
('id', 'address_id', 'data'),
|
|
(1, 2, 'ding 1/2'),
|
|
(2, 5, 'ding 2/5')
|
|
),
|
|
)
|
|
|
|
|
|
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
|
|
__dialect__ = 'default'
|
|
|
|
run_create_tables = run_inserts = run_deletes = None
|
|
|
|
def test_render_table(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
self.assert_compile(
|
|
users.update().\
|
|
values(name='newname').\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(addresses.c.email_address=='e1'),
|
|
"UPDATE users SET name=:name FROM addresses "
|
|
"WHERE users.id = addresses.user_id AND "
|
|
"addresses.email_address = :email_address_1",
|
|
checkparams={u'email_address_1': 'e1', 'name': 'newname'}
|
|
)
|
|
|
|
def test_render_multi_table(self):
|
|
users, addresses, dingalings = \
|
|
self.tables.users, \
|
|
self.tables.addresses, \
|
|
self.tables.dingalings
|
|
self.assert_compile(
|
|
users.update().\
|
|
values(name='newname').\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(addresses.c.email_address=='e1').\
|
|
where(addresses.c.id==dingalings.c.address_id).\
|
|
where(dingalings.c.id==2),
|
|
"UPDATE users SET name=:name FROM addresses, "
|
|
"dingalings WHERE users.id = addresses.user_id "
|
|
"AND addresses.email_address = :email_address_1 "
|
|
"AND addresses.id = dingalings.address_id AND "
|
|
"dingalings.id = :id_1",
|
|
checkparams={u'email_address_1': 'e1', u'id_1': 2,
|
|
'name': 'newname'}
|
|
)
|
|
|
|
def test_render_table_mysql(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
self.assert_compile(
|
|
users.update().\
|
|
values(name='newname').\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(addresses.c.email_address=='e1'),
|
|
"UPDATE users, addresses SET users.name=%s "
|
|
"WHERE users.id = addresses.user_id AND "
|
|
"addresses.email_address = %s",
|
|
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
|
|
dialect=mysql.dialect()
|
|
)
|
|
|
|
def test_render_subquery(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
subq = select([addresses.c.id,
|
|
addresses.c.user_id,
|
|
addresses.c.email_address]).\
|
|
where(addresses.c.id==7).alias()
|
|
self.assert_compile(
|
|
users.update().\
|
|
values(name='newname').\
|
|
where(users.c.id==subq.c.user_id).\
|
|
where(subq.c.email_address=='e1'),
|
|
"UPDATE users SET name=:name FROM "
|
|
"(SELECT addresses.id AS id, addresses.user_id "
|
|
"AS user_id, addresses.email_address AS "
|
|
"email_address FROM addresses WHERE addresses.id = "
|
|
":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
|
|
"AND anon_1.email_address = :email_address_1",
|
|
checkparams={u'email_address_1': 'e1',
|
|
u'id_1': 7, 'name': 'newname'}
|
|
)
|
|
|
|
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
|
|
|
|
@testing.requires.update_from
|
|
def test_exec_two_table(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
testing.db.execute(
|
|
addresses.update().\
|
|
values(email_address=users.c.name).\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(users.c.name=='ed')
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().\
|
|
order_by(addresses.c.id)).fetchall(),
|
|
[
|
|
(1, 7, 'x', "jack@bean.com"),
|
|
(2, 8, 'x', "ed"),
|
|
(3, 8, 'x', "ed"),
|
|
(4, 8, 'x', "ed"),
|
|
(5, 9, 'x', "fred@fred.com")
|
|
]
|
|
)
|
|
|
|
@testing.requires.update_from
|
|
def test_exec_two_table_plus_alias(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
a1 = addresses.alias()
|
|
|
|
testing.db.execute(
|
|
addresses.update().\
|
|
values(email_address=users.c.name).\
|
|
where(users.c.id==a1.c.user_id).\
|
|
where(users.c.name=='ed').\
|
|
where(a1.c.id==addresses.c.id)
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().\
|
|
order_by(addresses.c.id)).fetchall(),
|
|
[
|
|
(1, 7, 'x', "jack@bean.com"),
|
|
(2, 8, 'x', "ed"),
|
|
(3, 8, 'x', "ed"),
|
|
(4, 8, 'x', "ed"),
|
|
(5, 9, 'x', "fred@fred.com")
|
|
]
|
|
)
|
|
|
|
@testing.requires.update_from
|
|
def test_exec_three_table(self):
|
|
users, addresses, dingalings = \
|
|
self.tables.users, \
|
|
self.tables.addresses, \
|
|
self.tables.dingalings
|
|
testing.db.execute(
|
|
addresses.update().\
|
|
values(email_address=users.c.name).\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(users.c.name=='ed').
|
|
where(addresses.c.id==dingalings.c.address_id).\
|
|
where(dingalings.c.id==1),
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().order_by(addresses.c.id)
|
|
).fetchall(),
|
|
[
|
|
(1, 7, 'x', "jack@bean.com"),
|
|
(2, 8, 'x', "ed"),
|
|
(3, 8, 'x', "ed@bettyboop.com"),
|
|
(4, 8, 'x', "ed@lala.com"),
|
|
(5, 9, 'x', "fred@fred.com")
|
|
]
|
|
)
|
|
|
|
@testing.only_on('mysql', 'Multi table update')
|
|
def test_exec_multitable(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
testing.db.execute(
|
|
addresses.update().\
|
|
values({
|
|
addresses.c.email_address:users.c.name,
|
|
users.c.name:'ed2'
|
|
}).\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(users.c.name=='ed')
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().order_by(addresses.c.id)).fetchall(),
|
|
[
|
|
(1, 7, 'x', "jack@bean.com"),
|
|
(2, 8, 'x', "ed"),
|
|
(3, 8, 'x', "ed"),
|
|
(4, 8, 'x', "ed"),
|
|
(5, 9, 'x', "fred@fred.com")
|
|
]
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
users.select().order_by(users.c.id)).fetchall(),
|
|
[
|
|
(7, 'jack'),
|
|
(8, 'ed2'),
|
|
(9, 'fred'),
|
|
(10, 'chuck')
|
|
]
|
|
)
|
|
|
|
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
|
|
@classmethod
|
|
def define_tables(cls, metadata):
|
|
Table('users', metadata,
|
|
Column('id', Integer, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('name', String(30), nullable=False),
|
|
Column('some_update', String(30), onupdate="im the update")
|
|
)
|
|
|
|
Table('addresses', metadata,
|
|
Column('id', Integer, primary_key=True,
|
|
test_needs_autoincrement=True),
|
|
Column('user_id', None, ForeignKey('users.id')),
|
|
Column('email_address', String(50), nullable=False),
|
|
)
|
|
|
|
@classmethod
|
|
def fixtures(cls):
|
|
return dict(
|
|
users = (
|
|
('id', 'name', 'some_update'),
|
|
(8, 'ed', 'value'),
|
|
(9, 'fred', 'value'),
|
|
),
|
|
|
|
addresses = (
|
|
('id', 'user_id', 'email_address'),
|
|
(2, 8, "ed@wood.com"),
|
|
(3, 8, "ed@bettyboop.com"),
|
|
(4, 9, "fred@fred.com")
|
|
),
|
|
)
|
|
|
|
@testing.only_on('mysql', 'Multi table update')
|
|
def test_defaults_second_table(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
ret = testing.db.execute(
|
|
addresses.update().\
|
|
values({
|
|
addresses.c.email_address:users.c.name,
|
|
users.c.name:'ed2'
|
|
}).\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(users.c.name=='ed')
|
|
)
|
|
eq_(
|
|
set(ret.prefetch_cols()),
|
|
set([users.c.some_update])
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().order_by(addresses.c.id)).fetchall(),
|
|
[
|
|
(2, 8, "ed"),
|
|
(3, 8, "ed"),
|
|
(4, 9, "fred@fred.com")
|
|
]
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
users.select().order_by(users.c.id)).fetchall(),
|
|
[
|
|
(8, 'ed2', 'im the update'),
|
|
(9, 'fred', 'value'),
|
|
]
|
|
)
|
|
|
|
@testing.only_on('mysql', 'Multi table update')
|
|
def test_no_defaults_second_table(self):
|
|
users, addresses = self.tables.users, self.tables.addresses
|
|
ret = testing.db.execute(
|
|
addresses.update().\
|
|
values({
|
|
'email_address':users.c.name,
|
|
}).\
|
|
where(users.c.id==addresses.c.user_id).\
|
|
where(users.c.name=='ed')
|
|
)
|
|
eq_(
|
|
ret.prefetch_cols(),[]
|
|
)
|
|
eq_(
|
|
testing.db.execute(
|
|
addresses.select().order_by(addresses.c.id)).fetchall(),
|
|
[
|
|
(2, 8, "ed"),
|
|
(3, 8, "ed"),
|
|
(4, 9, "fred@fred.com")
|
|
]
|
|
)
|
|
# users table not actually updated,
|
|
# so no onupdate
|
|
eq_(
|
|
testing.db.execute(
|
|
users.select().order_by(users.c.id)).fetchall(),
|
|
[
|
|
(8, 'ed', 'value'),
|
|
(9, 'fred', 'value'),
|
|
]
|
|
)
|