mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-28 11:35:19 -04:00
084b559b44
String types. When present, renders as
COLLATE <collation>. This to support the
COLLATE keyword now supported by several
databases including MySQL, SQLite, and Postgresql.
[ticket:2276]
- [change] The Text() type renders the length
given to it, if a length was specified.
1571 lines
60 KiB
Python
1571 lines
60 KiB
Python
# coding: utf-8
|
||
|
||
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
|
||
|
||
|
||
from sqlalchemy import *
|
||
from sqlalchemy import sql, exc, schema, types as sqltypes
|
||
from sqlalchemy.dialects.mysql import base as mysql
|
||
from sqlalchemy.engine.url import make_url
|
||
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults
|
||
from sqlalchemy import testing
|
||
from sqlalchemy.testing import engines
|
||
from sqlalchemy.testing.engines import utf8_engine
|
||
import datetime
|
||
|
||
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
|
||
|
||
__dialect__ = mysql.dialect()
|
||
|
||
def test_reserved_words(self):
|
||
table = Table("mysql_table", MetaData(),
|
||
Column("col1", Integer),
|
||
Column("master_ssl_verify_server_cert", Integer))
|
||
x = select([table.c.col1, table.c.master_ssl_verify_server_cert])
|
||
|
||
self.assert_compile(x,
|
||
"SELECT mysql_table.col1, "
|
||
"mysql_table.`master_ssl_verify_server_cert` FROM mysql_table")
|
||
|
||
def test_create_index_simple(self):
|
||
m = MetaData()
|
||
tbl = Table('testtbl', m, Column('data', String(255)))
|
||
idx = Index('test_idx1', tbl.c.data)
|
||
|
||
self.assert_compile(schema.CreateIndex(idx),
|
||
'CREATE INDEX test_idx1 ON testtbl (data)',
|
||
dialect=mysql.dialect())
|
||
|
||
def test_create_index_with_length(self):
|
||
m = MetaData()
|
||
tbl = Table('testtbl', m, Column('data', String(255)))
|
||
idx1 = Index('test_idx1', tbl.c.data, mysql_length=10)
|
||
idx2 = Index('test_idx2', tbl.c.data, mysql_length=5)
|
||
|
||
self.assert_compile(schema.CreateIndex(idx1),
|
||
'CREATE INDEX test_idx1 ON testtbl (data(10))',
|
||
dialect=mysql.dialect())
|
||
self.assert_compile(schema.CreateIndex(idx2),
|
||
'CREATE INDEX test_idx2 ON testtbl (data(5))',
|
||
dialect=mysql.dialect())
|
||
|
||
def test_create_index_with_using(self):
|
||
m = MetaData()
|
||
tbl = Table('testtbl', m, Column('data', String(255)))
|
||
idx1 = Index('test_idx1', tbl.c.data, mysql_using='btree')
|
||
idx2 = Index('test_idx2', tbl.c.data, mysql_using='hash')
|
||
|
||
self.assert_compile(schema.CreateIndex(idx1),
|
||
'CREATE INDEX test_idx1 ON testtbl (data) USING btree',
|
||
dialect=mysql.dialect())
|
||
self.assert_compile(schema.CreateIndex(idx2),
|
||
'CREATE INDEX test_idx2 ON testtbl (data) USING hash',
|
||
dialect=mysql.dialect())
|
||
|
||
def test_create_pk_plain(self):
|
||
m = MetaData()
|
||
tbl = Table('testtbl', m, Column('data', String(255)),
|
||
PrimaryKeyConstraint('data'))
|
||
|
||
self.assert_compile(schema.CreateTable(tbl),
|
||
"CREATE TABLE testtbl (data VARCHAR(255), PRIMARY KEY (data))",
|
||
dialect=mysql.dialect())
|
||
|
||
def test_create_pk_with_using(self):
|
||
m = MetaData()
|
||
tbl = Table('testtbl', m, Column('data', String(255)),
|
||
PrimaryKeyConstraint('data', mysql_using='btree'))
|
||
|
||
self.assert_compile(schema.CreateTable(tbl),
|
||
"CREATE TABLE testtbl (data VARCHAR(255), "
|
||
"PRIMARY KEY (data) USING btree)",
|
||
dialect=mysql.dialect())
|
||
|
||
class DialectTest(fixtures.TestBase):
|
||
__only_on__ = 'mysql'
|
||
|
||
@testing.only_on(['mysql+mysqldb', 'mysql+oursql'],
|
||
'requires particular SSL arguments')
|
||
def test_ssl_arguments(self):
|
||
dialect = testing.db.dialect
|
||
kwarg = dialect.create_connect_args(
|
||
make_url("mysql://scott:tiger@localhost:3306/test"
|
||
"?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem")
|
||
)[1]
|
||
# args that differ among mysqldb and oursql
|
||
for k in ('use_unicode', 'found_rows', 'client_flag'):
|
||
kwarg.pop(k, None)
|
||
eq_(
|
||
kwarg,
|
||
{
|
||
'passwd': 'tiger', 'db': 'test',
|
||
'ssl': {'ca': '/ca.pem', 'cert': '/cert.pem',
|
||
'key': '/key.pem'},
|
||
'host': 'localhost', 'user': 'scott',
|
||
'port': 3306
|
||
}
|
||
)
|
||
|
||
|
||
class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
|
||
"Test MySQL column types"
|
||
|
||
__dialect__ = mysql.dialect()
|
||
|
||
def test_numeric(self):
|
||
"Exercise type specification and options for numeric types."
|
||
|
||
columns = [
|
||
# column type, args, kwargs, expected ddl
|
||
# e.g. Column(Integer(10, unsigned=True)) ==
|
||
# 'INTEGER(10) UNSIGNED'
|
||
(mysql.MSNumeric, [], {},
|
||
'NUMERIC'),
|
||
(mysql.MSNumeric, [None], {},
|
||
'NUMERIC'),
|
||
(mysql.MSNumeric, [12], {},
|
||
'NUMERIC(12)'),
|
||
(mysql.MSNumeric, [12, 4], {'unsigned':True},
|
||
'NUMERIC(12, 4) UNSIGNED'),
|
||
(mysql.MSNumeric, [12, 4], {'zerofill':True},
|
||
'NUMERIC(12, 4) ZEROFILL'),
|
||
(mysql.MSNumeric, [12, 4], {'zerofill':True, 'unsigned':True},
|
||
'NUMERIC(12, 4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSDecimal, [], {},
|
||
'DECIMAL'),
|
||
(mysql.MSDecimal, [None], {},
|
||
'DECIMAL'),
|
||
(mysql.MSDecimal, [12], {},
|
||
'DECIMAL(12)'),
|
||
(mysql.MSDecimal, [12, None], {},
|
||
'DECIMAL(12)'),
|
||
(mysql.MSDecimal, [12, 4], {'unsigned':True},
|
||
'DECIMAL(12, 4) UNSIGNED'),
|
||
(mysql.MSDecimal, [12, 4], {'zerofill':True},
|
||
'DECIMAL(12, 4) ZEROFILL'),
|
||
(mysql.MSDecimal, [12, 4], {'zerofill':True, 'unsigned':True},
|
||
'DECIMAL(12, 4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSDouble, [None, None], {},
|
||
'DOUBLE'),
|
||
(mysql.MSDouble, [12, 4], {'unsigned':True},
|
||
'DOUBLE(12, 4) UNSIGNED'),
|
||
(mysql.MSDouble, [12, 4], {'zerofill':True},
|
||
'DOUBLE(12, 4) ZEROFILL'),
|
||
(mysql.MSDouble, [12, 4], {'zerofill':True, 'unsigned':True},
|
||
'DOUBLE(12, 4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSReal, [None, None], {},
|
||
'REAL'),
|
||
(mysql.MSReal, [12, 4], {'unsigned':True},
|
||
'REAL(12, 4) UNSIGNED'),
|
||
(mysql.MSReal, [12, 4], {'zerofill':True},
|
||
'REAL(12, 4) ZEROFILL'),
|
||
(mysql.MSReal, [12, 4], {'zerofill':True, 'unsigned':True},
|
||
'REAL(12, 4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSFloat, [], {},
|
||
'FLOAT'),
|
||
(mysql.MSFloat, [None], {},
|
||
'FLOAT'),
|
||
(mysql.MSFloat, [12], {},
|
||
'FLOAT(12)'),
|
||
(mysql.MSFloat, [12, 4], {},
|
||
'FLOAT(12, 4)'),
|
||
(mysql.MSFloat, [12, 4], {'unsigned':True},
|
||
'FLOAT(12, 4) UNSIGNED'),
|
||
(mysql.MSFloat, [12, 4], {'zerofill':True},
|
||
'FLOAT(12, 4) ZEROFILL'),
|
||
(mysql.MSFloat, [12, 4], {'zerofill':True, 'unsigned':True},
|
||
'FLOAT(12, 4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSInteger, [], {},
|
||
'INTEGER'),
|
||
(mysql.MSInteger, [4], {},
|
||
'INTEGER(4)'),
|
||
(mysql.MSInteger, [4], {'unsigned':True},
|
||
'INTEGER(4) UNSIGNED'),
|
||
(mysql.MSInteger, [4], {'zerofill':True},
|
||
'INTEGER(4) ZEROFILL'),
|
||
(mysql.MSInteger, [4], {'zerofill':True, 'unsigned':True},
|
||
'INTEGER(4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSBigInteger, [], {},
|
||
'BIGINT'),
|
||
(mysql.MSBigInteger, [4], {},
|
||
'BIGINT(4)'),
|
||
(mysql.MSBigInteger, [4], {'unsigned':True},
|
||
'BIGINT(4) UNSIGNED'),
|
||
(mysql.MSBigInteger, [4], {'zerofill':True},
|
||
'BIGINT(4) ZEROFILL'),
|
||
(mysql.MSBigInteger, [4], {'zerofill':True, 'unsigned':True},
|
||
'BIGINT(4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSMediumInteger, [], {},
|
||
'MEDIUMINT'),
|
||
(mysql.MSMediumInteger, [4], {},
|
||
'MEDIUMINT(4)'),
|
||
(mysql.MSMediumInteger, [4], {'unsigned':True},
|
||
'MEDIUMINT(4) UNSIGNED'),
|
||
(mysql.MSMediumInteger, [4], {'zerofill':True},
|
||
'MEDIUMINT(4) ZEROFILL'),
|
||
(mysql.MSMediumInteger, [4], {'zerofill':True, 'unsigned':True},
|
||
'MEDIUMINT(4) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSTinyInteger, [], {},
|
||
'TINYINT'),
|
||
(mysql.MSTinyInteger, [1], {},
|
||
'TINYINT(1)'),
|
||
(mysql.MSTinyInteger, [1], {'unsigned':True},
|
||
'TINYINT(1) UNSIGNED'),
|
||
(mysql.MSTinyInteger, [1], {'zerofill':True},
|
||
'TINYINT(1) ZEROFILL'),
|
||
(mysql.MSTinyInteger, [1], {'zerofill':True, 'unsigned':True},
|
||
'TINYINT(1) UNSIGNED ZEROFILL'),
|
||
|
||
(mysql.MSSmallInteger, [], {},
|
||
'SMALLINT'),
|
||
(mysql.MSSmallInteger, [4], {},
|
||
'SMALLINT(4)'),
|
||
(mysql.MSSmallInteger, [4], {'unsigned':True},
|
||
'SMALLINT(4) UNSIGNED'),
|
||
(mysql.MSSmallInteger, [4], {'zerofill':True},
|
||
'SMALLINT(4) ZEROFILL'),
|
||
(mysql.MSSmallInteger, [4], {'zerofill':True, 'unsigned':True},
|
||
'SMALLINT(4) UNSIGNED ZEROFILL'),
|
||
]
|
||
|
||
for type_, args, kw, res in columns:
|
||
self.assert_compile(
|
||
type_(*args, **kw),
|
||
res
|
||
)
|
||
|
||
@testing.exclude('mysql', '<', (4, 1, 1), 'no charset support')
|
||
def test_charset(self):
|
||
"""Exercise CHARACTER SET and COLLATE-ish options on string types."""
|
||
|
||
columns = [
|
||
(mysql.MSChar, [1], {},
|
||
'CHAR(1)'),
|
||
(mysql.NCHAR, [1], {},
|
||
'NATIONAL CHAR(1)'),
|
||
(mysql.MSChar, [1], {'binary':True},
|
||
'CHAR(1) BINARY'),
|
||
(mysql.MSChar, [1], {'ascii':True},
|
||
'CHAR(1) ASCII'),
|
||
(mysql.MSChar, [1], {'unicode':True},
|
||
'CHAR(1) UNICODE'),
|
||
(mysql.MSChar, [1], {'ascii':True, 'binary':True},
|
||
'CHAR(1) ASCII BINARY'),
|
||
(mysql.MSChar, [1], {'unicode':True, 'binary':True},
|
||
'CHAR(1) UNICODE BINARY'),
|
||
(mysql.MSChar, [1], {'charset':'utf8'},
|
||
'CHAR(1) CHARACTER SET utf8'),
|
||
(mysql.MSChar, [1], {'charset':'utf8', 'binary':True},
|
||
'CHAR(1) CHARACTER SET utf8 BINARY'),
|
||
(mysql.MSChar, [1], {'charset':'utf8', 'unicode':True},
|
||
'CHAR(1) CHARACTER SET utf8'),
|
||
(mysql.MSChar, [1], {'charset':'utf8', 'ascii':True},
|
||
'CHAR(1) CHARACTER SET utf8'),
|
||
(mysql.MSChar, [1], {'collation': 'utf8_bin'},
|
||
'CHAR(1) COLLATE utf8_bin'),
|
||
(mysql.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin'},
|
||
'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
|
||
(mysql.MSChar, [1], {'charset': 'utf8', 'binary': True},
|
||
'CHAR(1) CHARACTER SET utf8 BINARY'),
|
||
(mysql.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin',
|
||
'binary': True},
|
||
'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
|
||
(mysql.MSChar, [1], {'national':True},
|
||
'NATIONAL CHAR(1)'),
|
||
(mysql.MSChar, [1], {'national':True, 'charset':'utf8'},
|
||
'NATIONAL CHAR(1)'),
|
||
(mysql.MSChar, [1], {'national':True, 'charset':'utf8',
|
||
'binary':True},
|
||
'NATIONAL CHAR(1) BINARY'),
|
||
(mysql.MSChar, [1], {'national':True, 'binary':True,
|
||
'unicode':True},
|
||
'NATIONAL CHAR(1) BINARY'),
|
||
(mysql.MSChar, [1], {'national':True, 'collation':'utf8_bin'},
|
||
'NATIONAL CHAR(1) COLLATE utf8_bin'),
|
||
|
||
(mysql.MSString, [1], {'charset':'utf8', 'collation':'utf8_bin'},
|
||
'VARCHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
|
||
(mysql.MSString, [1], {'national':True, 'collation':'utf8_bin'},
|
||
'NATIONAL VARCHAR(1) COLLATE utf8_bin'),
|
||
|
||
(mysql.MSTinyText, [], {'charset':'utf8', 'collation':'utf8_bin'},
|
||
'TINYTEXT CHARACTER SET utf8 COLLATE utf8_bin'),
|
||
|
||
(mysql.MSMediumText, [], {'charset':'utf8', 'binary':True},
|
||
'MEDIUMTEXT CHARACTER SET utf8 BINARY'),
|
||
|
||
(mysql.MSLongText, [], {'ascii':True},
|
||
'LONGTEXT ASCII'),
|
||
|
||
(mysql.ENUM, ["foo", "bar"], {'unicode':True},
|
||
'''ENUM('foo','bar') UNICODE'''),
|
||
|
||
(String, [20], {"collation":"utf8"}, 'VARCHAR(20) COLLATE utf8')
|
||
|
||
|
||
]
|
||
|
||
for type_, args, kw, res in columns:
|
||
self.assert_compile(
|
||
type_(*args, **kw),
|
||
res
|
||
)
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature')
|
||
@testing.provide_metadata
|
||
def test_charset_collate_table(self):
|
||
t = Table('foo', self.metadata,
|
||
Column('id', Integer),
|
||
mysql_default_charset='utf8',
|
||
mysql_collate='utf8_unicode_ci'
|
||
)
|
||
t.create()
|
||
m2 = MetaData(testing.db)
|
||
t2 = Table('foo', m2, autoload=True)
|
||
eq_(t2.kwargs['mysql_collate'], 'utf8_unicode_ci')
|
||
eq_(t2.kwargs['mysql_default charset'], 'utf8')
|
||
|
||
def test_bit_50(self):
|
||
"""Exercise BIT types on 5.0+ (not valid for all engine types)"""
|
||
|
||
for type_, expected in [
|
||
(mysql.MSBit(), "BIT"),
|
||
(mysql.MSBit(1), "BIT(1)"),
|
||
(mysql.MSBit(63), "BIT(63)"),
|
||
]:
|
||
self.assert_compile(type_, expected)
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature')
|
||
@testing.fails_on('mysql+oursql', 'some round trips fail, oursql bug ?')
|
||
@testing.provide_metadata
|
||
def test_bit_50_roundtrip(self):
|
||
bit_table = Table('mysql_bits', self.metadata,
|
||
Column('b1', mysql.MSBit),
|
||
Column('b2', mysql.MSBit()),
|
||
Column('b3', mysql.MSBit(), nullable=False),
|
||
Column('b4', mysql.MSBit(1)),
|
||
Column('b5', mysql.MSBit(8)),
|
||
Column('b6', mysql.MSBit(32)),
|
||
Column('b7', mysql.MSBit(63)),
|
||
Column('b8', mysql.MSBit(64)))
|
||
self.metadata.create_all()
|
||
|
||
meta2 = MetaData(testing.db)
|
||
reflected = Table('mysql_bits', meta2, autoload=True)
|
||
|
||
for table in bit_table, reflected:
|
||
|
||
def roundtrip(store, expected=None):
|
||
expected = expected or store
|
||
table.insert(store).execute()
|
||
row = table.select().execute().first()
|
||
try:
|
||
self.assert_(list(row) == expected)
|
||
except:
|
||
print "Storing %s" % store
|
||
print "Expected %s" % expected
|
||
print "Found %s" % list(row)
|
||
raise
|
||
table.delete().execute().close()
|
||
|
||
roundtrip([0] * 8)
|
||
roundtrip([None, None, 0, None, None, None, None, None])
|
||
roundtrip([1] * 8)
|
||
roundtrip([sql.text("b'1'")] * 8, [1] * 8)
|
||
|
||
i = 255
|
||
roundtrip([0, 0, 0, 0, i, i, i, i])
|
||
i = 2 ** 32 - 1
|
||
roundtrip([0, 0, 0, 0, 0, i, i, i])
|
||
i = 2 ** 63 - 1
|
||
roundtrip([0, 0, 0, 0, 0, 0, i, i])
|
||
i = 2 ** 64 - 1
|
||
roundtrip([0, 0, 0, 0, 0, 0, 0, i])
|
||
|
||
def test_boolean(self):
|
||
for type_, expected in [
|
||
(BOOLEAN(), "BOOL"),
|
||
(Boolean(), "BOOL"),
|
||
(mysql.TINYINT(1), "TINYINT(1)"),
|
||
(mysql.TINYINT(1, unsigned=True), "TINYINT(1) UNSIGNED")
|
||
]:
|
||
self.assert_compile(type_, expected)
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.provide_metadata
|
||
def test_boolean_roundtrip(self):
|
||
bool_table = Table(
|
||
'mysql_bool',
|
||
self.metadata,
|
||
Column('b1', BOOLEAN),
|
||
Column('b2', Boolean),
|
||
Column('b3', mysql.MSTinyInteger(1)),
|
||
Column('b4', mysql.MSTinyInteger(1, unsigned=True)),
|
||
Column('b5', mysql.MSTinyInteger),
|
||
)
|
||
self.metadata.create_all()
|
||
table = bool_table
|
||
|
||
def roundtrip(store, expected=None):
|
||
expected = expected or store
|
||
table.insert(store).execute()
|
||
row = table.select().execute().first()
|
||
self.assert_(list(row) == expected)
|
||
for i, val in enumerate(expected):
|
||
if isinstance(val, bool):
|
||
self.assert_(val is row[i])
|
||
table.delete().execute()
|
||
|
||
roundtrip([None, None, None, None, None])
|
||
roundtrip([True, True, 1, 1, 1])
|
||
roundtrip([False, False, 0, 0, 0])
|
||
roundtrip([True, True, True, True, True], [True, True, 1,
|
||
1, 1])
|
||
roundtrip([False, False, 0, 0, 0], [False, False, 0, 0, 0])
|
||
|
||
meta2 = MetaData(testing.db)
|
||
table = Table('mysql_bool', meta2, autoload=True)
|
||
eq_(colspec(table.c.b3), 'b3 TINYINT(1)')
|
||
eq_(colspec(table.c.b4), 'b4 TINYINT(1) UNSIGNED')
|
||
meta2 = MetaData(testing.db)
|
||
table = Table(
|
||
'mysql_bool',
|
||
meta2,
|
||
Column('b1', BOOLEAN),
|
||
Column('b2', Boolean),
|
||
Column('b3', BOOLEAN),
|
||
Column('b4', BOOLEAN),
|
||
autoload=True,
|
||
)
|
||
eq_(colspec(table.c.b3), 'b3 BOOL')
|
||
eq_(colspec(table.c.b4), 'b4 BOOL')
|
||
roundtrip([None, None, None, None, None])
|
||
roundtrip([True, True, 1, 1, 1], [True, True, True, True,
|
||
1])
|
||
roundtrip([False, False, 0, 0, 0], [False, False, False,
|
||
False, 0])
|
||
roundtrip([True, True, True, True, True], [True, True,
|
||
True, True, 1])
|
||
roundtrip([False, False, 0, 0, 0], [False, False, False,
|
||
False, 0])
|
||
|
||
def test_timestamp(self):
|
||
"""Exercise funky TIMESTAMP default syntax."""
|
||
|
||
columns = [
|
||
([TIMESTAMP],
|
||
'TIMESTAMP NULL'),
|
||
([mysql.MSTimeStamp],
|
||
'TIMESTAMP NULL'),
|
||
([mysql.MSTimeStamp,
|
||
DefaultClause(sql.text('CURRENT_TIMESTAMP'))],
|
||
"TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
|
||
([mysql.MSTimeStamp,
|
||
DefaultClause(sql.text("'1999-09-09 09:09:09'"))],
|
||
"TIMESTAMP DEFAULT '1999-09-09 09:09:09'"),
|
||
([mysql.MSTimeStamp,
|
||
DefaultClause(sql.text("'1999-09-09 09:09:09' "
|
||
"ON UPDATE CURRENT_TIMESTAMP"))],
|
||
"TIMESTAMP DEFAULT '1999-09-09 09:09:09' "
|
||
"ON UPDATE CURRENT_TIMESTAMP"),
|
||
([mysql.MSTimeStamp,
|
||
DefaultClause(sql.text("CURRENT_TIMESTAMP "
|
||
"ON UPDATE CURRENT_TIMESTAMP"))],
|
||
"TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
|
||
"ON UPDATE CURRENT_TIMESTAMP"),
|
||
]
|
||
for spec, expected in columns:
|
||
c = Column('t', *spec)
|
||
Table('t', MetaData(), c)
|
||
self.assert_compile(
|
||
schema.CreateColumn(c),
|
||
"t %s" % expected
|
||
|
||
)
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.provide_metadata
|
||
def test_timestamp_nullable(self):
|
||
ts_table = Table('mysql_timestamp', self.metadata,
|
||
Column('t1', TIMESTAMP),
|
||
Column('t2', TIMESTAMP, nullable=False),
|
||
)
|
||
self.metadata.create_all()
|
||
|
||
now = testing.db.execute("select now()").scalar()
|
||
|
||
# TIMESTAMP without NULL inserts current time when passed
|
||
# NULL. when not passed, generates 0000-00-00 quite
|
||
# annoyingly.
|
||
ts_table.insert().execute({'t1': now, 't2': None})
|
||
ts_table.insert().execute({'t1': None, 't2': None})
|
||
|
||
# normalize dates that are over the second boundary
|
||
def normalize(dt):
|
||
if dt is None:
|
||
return None
|
||
elif (dt - now).seconds < 5:
|
||
return now
|
||
else:
|
||
return dt
|
||
eq_(
|
||
[tuple([normalize(dt) for dt in row])
|
||
for row in ts_table.select().execute()],
|
||
[(now, now), (None, now)]
|
||
)
|
||
|
||
def test_time(self):
|
||
""""Exercise TIME."""
|
||
|
||
self.assert_compile(
|
||
mysql.TIME(),
|
||
"TIME"
|
||
)
|
||
|
||
self.assert_compile(
|
||
mysql.TIME(fsp=5),
|
||
"TIME(5)"
|
||
)
|
||
|
||
eq_(
|
||
mysql.TIME().result_processor(None, None)(
|
||
datetime.timedelta(seconds=35, minutes=517,
|
||
microseconds=450
|
||
)),
|
||
datetime.time(8, 37, 35, 450)
|
||
)
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.provide_metadata
|
||
def test_year(self):
|
||
"""Exercise YEAR."""
|
||
|
||
year_table = Table('mysql_year', self.metadata,
|
||
Column('y1', mysql.MSYear),
|
||
Column('y2', mysql.MSYear),
|
||
Column('y3', mysql.MSYear),
|
||
Column('y4', mysql.MSYear(2)),
|
||
Column('y5', mysql.MSYear(4)))
|
||
|
||
for col in year_table.c:
|
||
self.assert_(repr(col))
|
||
year_table.create()
|
||
reflected = Table('mysql_year', MetaData(testing.db),
|
||
autoload=True)
|
||
|
||
for table in year_table, reflected:
|
||
table.insert(['1950', '50', None, 50, 1950]).execute()
|
||
row = table.select().execute().first()
|
||
eq_(list(row), [1950, 2050, None, 50, 1950])
|
||
table.delete().execute()
|
||
self.assert_(colspec(table.c.y1).startswith('y1 YEAR'))
|
||
eq_(colspec(table.c.y4), 'y4 YEAR(2)')
|
||
eq_(colspec(table.c.y5), 'y5 YEAR(4)')
|
||
|
||
@testing.only_if('mysql')
|
||
@testing.provide_metadata
|
||
def test_set(self):
|
||
"""Exercise the SET type."""
|
||
|
||
set_table = Table('mysql_set', self.metadata,
|
||
Column('s1',
|
||
mysql.MSSet("'dq'", "'sq'")), Column('s2',
|
||
mysql.MSSet("'a'")), Column('s3',
|
||
mysql.MSSet("'5'", "'7'", "'9'")))
|
||
eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')")
|
||
eq_(colspec(set_table.c.s2), "s2 SET('a')")
|
||
eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')")
|
||
set_table.create()
|
||
reflected = Table('mysql_set', MetaData(testing.db),
|
||
autoload=True)
|
||
for table in set_table, reflected:
|
||
|
||
def roundtrip(store, expected=None):
|
||
expected = expected or store
|
||
table.insert(store).execute()
|
||
row = table.select().execute().first()
|
||
self.assert_(list(row) == expected)
|
||
table.delete().execute()
|
||
|
||
roundtrip([None, None, None], [None] * 3)
|
||
roundtrip(['', '', ''], [set([''])] * 3)
|
||
roundtrip([set(['dq']), set(['a']), set(['5'])])
|
||
roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']),
|
||
set(['5'])])
|
||
roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'
|
||
])])
|
||
roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'
|
||
])])
|
||
set_table.insert().execute({'s3': set(['5'])},
|
||
{'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])},
|
||
{'s3': set(['7', '9'])})
|
||
rows = select([set_table.c.s3], set_table.c.s3.in_([set(['5'
|
||
]), set(['5', '7']), set(['7', '5'
|
||
])])).execute().fetchall()
|
||
found = set([frozenset(row[0]) for row in rows])
|
||
eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
|
||
|
||
class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
|
||
|
||
__only_on__ = 'mysql'
|
||
__dialect__ = mysql.dialect()
|
||
|
||
|
||
@testing.uses_deprecated('Manually quoting ENUM value literals')
|
||
@testing.provide_metadata
|
||
def test_enum(self):
|
||
"""Exercise the ENUM type."""
|
||
|
||
enum_table = Table('mysql_enum', self.metadata,
|
||
Column('e1', mysql.ENUM("'a'", "'b'")),
|
||
Column('e2', mysql.ENUM("'a'", "'b'"),
|
||
nullable=False),
|
||
Column('e2generic', Enum("a", "b"),
|
||
nullable=False),
|
||
Column('e3', mysql.ENUM("'a'", "'b'", strict=True)),
|
||
Column('e4', mysql.ENUM("'a'", "'b'", strict=True),
|
||
nullable=False),
|
||
Column('e5', mysql.ENUM("a", "b")),
|
||
Column('e5generic', Enum("a", "b")),
|
||
Column('e6', mysql.ENUM("'a'", "b")),
|
||
)
|
||
|
||
eq_(colspec(enum_table.c.e1),
|
||
"e1 ENUM('a','b')")
|
||
eq_(colspec(enum_table.c.e2),
|
||
"e2 ENUM('a','b') NOT NULL")
|
||
eq_(colspec(enum_table.c.e2generic),
|
||
"e2generic ENUM('a','b') NOT NULL")
|
||
eq_(colspec(enum_table.c.e3),
|
||
"e3 ENUM('a','b')")
|
||
eq_(colspec(enum_table.c.e4),
|
||
"e4 ENUM('a','b') NOT NULL")
|
||
eq_(colspec(enum_table.c.e5),
|
||
"e5 ENUM('a','b')")
|
||
eq_(colspec(enum_table.c.e5generic),
|
||
"e5generic ENUM('a','b')")
|
||
eq_(colspec(enum_table.c.e6),
|
||
"e6 ENUM('''a''','b')")
|
||
enum_table.create()
|
||
|
||
assert_raises(exc.DBAPIError, enum_table.insert().execute,
|
||
e1=None, e2=None, e3=None, e4=None)
|
||
|
||
assert_raises(exc.StatementError, enum_table.insert().execute,
|
||
e1='c', e2='c', e2generic='c', e3='c',
|
||
e4='c', e5='c', e5generic='c', e6='c')
|
||
|
||
enum_table.insert().execute()
|
||
enum_table.insert().execute(e1='a', e2='a', e2generic='a', e3='a',
|
||
e4='a', e5='a', e5generic='a', e6="'a'")
|
||
enum_table.insert().execute(e1='b', e2='b', e2generic='b', e3='b',
|
||
e4='b', e5='b', e5generic='b', e6='b')
|
||
|
||
res = enum_table.select().execute().fetchall()
|
||
|
||
expected = [(None, 'a', 'a', None, 'a', None, None, None),
|
||
('a', 'a', 'a', 'a', 'a', 'a', 'a', "'a'"),
|
||
('b', 'b', 'b', 'b', 'b', 'b', 'b', 'b')]
|
||
|
||
eq_(res, expected)
|
||
|
||
def test_unicode_enum(self):
|
||
unicode_engine = utf8_engine()
|
||
metadata = MetaData(unicode_engine)
|
||
t1 = Table('table', metadata,
|
||
Column('id', Integer, primary_key=True),
|
||
Column('value', Enum(u'réveillé', u'drôle', u'S’il')),
|
||
Column('value2', mysql.ENUM(u'réveillé', u'drôle', u'S’il'))
|
||
)
|
||
metadata.create_all()
|
||
try:
|
||
t1.insert().execute(value=u'drôle', value2=u'drôle')
|
||
t1.insert().execute(value=u'réveillé', value2=u'réveillé')
|
||
t1.insert().execute(value=u'S’il', value2=u'S’il')
|
||
eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
|
||
[(1, u'drôle', u'drôle'), (2, u'réveillé', u'réveillé'),
|
||
(3, u'S’il', u'S’il')]
|
||
)
|
||
|
||
# test reflection of the enum labels
|
||
|
||
m2 = MetaData(testing.db)
|
||
t2 = Table('table', m2, autoload=True)
|
||
|
||
# TODO: what's wrong with the last element ? is there
|
||
# latin-1 stuff forcing its way in ?
|
||
|
||
assert t2.c.value.type.enums[0:2] == \
|
||
(u'réveillé', u'drôle') # u'S’il') # eh ?
|
||
|
||
assert t2.c.value2.type.enums[0:2] == \
|
||
(u'réveillé', u'drôle') # u'S’il') # eh ?
|
||
finally:
|
||
metadata.drop_all()
|
||
|
||
def test_enum_compile(self):
|
||
e1 = Enum('x', 'y', 'z', name='somename')
|
||
t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
|
||
self.assert_compile(schema.CreateTable(t1),
|
||
"CREATE TABLE sometable (somecolumn "
|
||
"ENUM('x','y','z'))")
|
||
t1 = Table('sometable', MetaData(), Column('somecolumn',
|
||
Enum('x', 'y', 'z', native_enum=False)))
|
||
self.assert_compile(schema.CreateTable(t1),
|
||
"CREATE TABLE sometable (somecolumn "
|
||
"VARCHAR(1), CHECK (somecolumn IN ('x', "
|
||
"'y', 'z')))")
|
||
|
||
@testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''")
|
||
@testing.uses_deprecated('Manually quoting ENUM value literals')
|
||
def test_enum_parse(self):
|
||
"""More exercises for the ENUM type."""
|
||
|
||
# MySQL 3.23 can't handle an ENUM of ''....
|
||
|
||
enum_table = Table('mysql_enum', MetaData(testing.db),
|
||
Column('e1', mysql.ENUM("'a'")),
|
||
Column('e2', mysql.ENUM("''")),
|
||
Column('e3', mysql.ENUM('a')),
|
||
Column('e4', mysql.ENUM('')),
|
||
Column('e5', mysql.ENUM("'a'", "''")),
|
||
Column('e6', mysql.ENUM("''", "'a'")),
|
||
Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''")))
|
||
|
||
for col in enum_table.c:
|
||
self.assert_(repr(col))
|
||
try:
|
||
enum_table.create()
|
||
reflected = Table('mysql_enum', MetaData(testing.db),
|
||
autoload=True)
|
||
for t in enum_table, reflected:
|
||
eq_(t.c.e1.type.enums, ("a",))
|
||
eq_(t.c.e2.type.enums, ("",))
|
||
eq_(t.c.e3.type.enums, ("a",))
|
||
eq_(t.c.e4.type.enums, ("",))
|
||
eq_(t.c.e5.type.enums, ("a", ""))
|
||
eq_(t.c.e6.type.enums, ("", "a"))
|
||
eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'"))
|
||
finally:
|
||
enum_table.drop()
|
||
|
||
class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
|
||
|
||
__only_on__ = 'mysql'
|
||
|
||
def test_default_reflection(self):
|
||
"""Test reflection of column defaults."""
|
||
|
||
from sqlalchemy.dialects.mysql import VARCHAR
|
||
def_table = Table(
|
||
'mysql_def',
|
||
MetaData(testing.db),
|
||
Column('c1', VARCHAR(10, collation='utf8_unicode_ci'),
|
||
DefaultClause(''), nullable=False),
|
||
Column('c2', String(10), DefaultClause('0')),
|
||
Column('c3', String(10), DefaultClause('abc')),
|
||
Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00'
|
||
)),
|
||
Column('c5', TIMESTAMP),
|
||
Column('c6', TIMESTAMP,
|
||
DefaultClause(sql.text("CURRENT_TIMESTAMP "
|
||
"ON UPDATE CURRENT_TIMESTAMP"))),
|
||
)
|
||
def_table.create()
|
||
try:
|
||
reflected = Table('mysql_def', MetaData(testing.db),
|
||
autoload=True)
|
||
finally:
|
||
def_table.drop()
|
||
assert def_table.c.c1.server_default.arg == ''
|
||
assert def_table.c.c2.server_default.arg == '0'
|
||
assert def_table.c.c3.server_default.arg == 'abc'
|
||
assert def_table.c.c4.server_default.arg \
|
||
== '2009-04-05 12:00:00'
|
||
assert str(reflected.c.c1.server_default.arg) == "''"
|
||
assert str(reflected.c.c2.server_default.arg) == "'0'"
|
||
assert str(reflected.c.c3.server_default.arg) == "'abc'"
|
||
assert str(reflected.c.c4.server_default.arg) \
|
||
== "'2009-04-05 12:00:00'"
|
||
assert reflected.c.c5.default is None
|
||
assert reflected.c.c5.server_default is None
|
||
assert reflected.c.c6.default is None
|
||
eq_(
|
||
str(reflected.c.c6.server_default.arg).upper(),
|
||
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
|
||
)
|
||
reflected.create()
|
||
try:
|
||
reflected2 = Table('mysql_def', MetaData(testing.db),
|
||
autoload=True)
|
||
finally:
|
||
reflected.drop()
|
||
assert str(reflected2.c.c1.server_default.arg) == "''"
|
||
assert str(reflected2.c.c2.server_default.arg) == "'0'"
|
||
assert str(reflected2.c.c3.server_default.arg) == "'abc'"
|
||
assert str(reflected2.c.c4.server_default.arg) \
|
||
== "'2009-04-05 12:00:00'"
|
||
assert reflected.c.c5.default is None
|
||
assert reflected.c.c5.server_default is None
|
||
assert reflected.c.c6.default is None
|
||
eq_(
|
||
str(reflected.c.c6.server_default.arg).upper(),
|
||
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
|
||
)
|
||
|
||
def test_reflection_with_table_options(self):
|
||
comment = r"""Comment types type speedily ' " \ '' Fun!"""
|
||
|
||
def_table = Table('mysql_def', MetaData(testing.db),
|
||
Column('c1', Integer()),
|
||
mysql_engine='MEMORY',
|
||
mysql_comment=comment,
|
||
mysql_default_charset='utf8',
|
||
mysql_auto_increment='5',
|
||
mysql_avg_row_length='3',
|
||
mysql_password='secret',
|
||
mysql_connection='fish',
|
||
)
|
||
|
||
def_table.create()
|
||
try:
|
||
reflected = Table('mysql_def', MetaData(testing.db),
|
||
autoload=True)
|
||
finally:
|
||
def_table.drop()
|
||
|
||
assert def_table.kwargs['mysql_engine'] == 'MEMORY'
|
||
assert def_table.kwargs['mysql_comment'] == comment
|
||
assert def_table.kwargs['mysql_default_charset'] == 'utf8'
|
||
assert def_table.kwargs['mysql_auto_increment'] == '5'
|
||
assert def_table.kwargs['mysql_avg_row_length'] == '3'
|
||
assert def_table.kwargs['mysql_password'] == 'secret'
|
||
assert def_table.kwargs['mysql_connection'] == 'fish'
|
||
|
||
assert reflected.kwargs['mysql_engine'] == 'MEMORY'
|
||
assert reflected.kwargs['mysql_comment'] == comment
|
||
assert reflected.kwargs['mysql_default charset'] == 'utf8'
|
||
assert reflected.kwargs['mysql_avg_row_length'] == '3'
|
||
assert reflected.kwargs['mysql_connection'] == 'fish'
|
||
|
||
# This field doesn't seem to be returned by mysql itself.
|
||
#assert reflected.kwargs['mysql_password'] == 'secret'
|
||
|
||
# This is explicitly ignored when reflecting schema.
|
||
#assert reflected.kwargs['mysql_auto_increment'] == '5'
|
||
|
||
def test_reflection_on_include_columns(self):
|
||
"""Test reflection of include_columns to be sure they respect case."""
|
||
|
||
case_table = Table('mysql_case', MetaData(testing.db),
|
||
Column('c1', String(10)),
|
||
Column('C2', String(10)),
|
||
Column('C3', String(10)))
|
||
|
||
try:
|
||
case_table.create()
|
||
reflected = Table('mysql_case', MetaData(testing.db),
|
||
autoload=True, include_columns=['c1', 'C2'])
|
||
for t in case_table, reflected:
|
||
assert 'c1' in t.c.keys()
|
||
assert 'C2' in t.c.keys()
|
||
reflected2 = Table('mysql_case', MetaData(testing.db),
|
||
autoload=True, include_columns=['c1', 'c2'])
|
||
assert 'c1' in reflected2.c.keys()
|
||
for c in ['c2', 'C2', 'C3']:
|
||
assert c not in reflected2.c.keys()
|
||
finally:
|
||
case_table.drop()
|
||
|
||
@testing.exclude('mysql', '<', (5, 0, 0), 'early types are squirrely')
|
||
@testing.uses_deprecated('Using String type with no length')
|
||
@testing.uses_deprecated('Manually quoting ENUM value literals')
|
||
def test_type_reflection(self):
|
||
# (ask_for, roundtripped_as_if_different)
|
||
specs = [( String(1), mysql.MSString(1), ),
|
||
( String(3), mysql.MSString(3), ),
|
||
( Text(), mysql.MSText(), ),
|
||
( Unicode(1), mysql.MSString(1), ),
|
||
( Unicode(3), mysql.MSString(3), ),
|
||
( UnicodeText(), mysql.MSText(), ),
|
||
( mysql.MSChar(1), ),
|
||
( mysql.MSChar(3), ),
|
||
( NCHAR(2), mysql.MSChar(2), ),
|
||
( mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only
|
||
( mysql.MSNVarChar(22), mysql.MSString(22), ),
|
||
( SmallInteger(), mysql.MSSmallInteger(), ),
|
||
( SmallInteger(), mysql.MSSmallInteger(4), ),
|
||
( mysql.MSSmallInteger(), ),
|
||
( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
|
||
( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
|
||
( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
|
||
( LargeBinary(3), mysql.TINYBLOB(), ),
|
||
( LargeBinary(), mysql.BLOB() ),
|
||
( mysql.MSBinary(3), mysql.MSBinary(3), ),
|
||
( mysql.MSVarBinary(3),),
|
||
( mysql.MSTinyBlob(),),
|
||
( mysql.MSBlob(),),
|
||
( mysql.MSBlob(1234), mysql.MSBlob()),
|
||
( mysql.MSMediumBlob(),),
|
||
( mysql.MSLongBlob(),),
|
||
( mysql.ENUM("''","'fleem'"), ),
|
||
]
|
||
|
||
columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
|
||
|
||
db = testing.db
|
||
m = MetaData(db)
|
||
t_table = Table('mysql_types', m, *columns)
|
||
try:
|
||
m.create_all()
|
||
|
||
m2 = MetaData(db)
|
||
rt = Table('mysql_types', m2, autoload=True)
|
||
try:
|
||
db.execute('CREATE OR REPLACE VIEW mysql_types_v '
|
||
'AS SELECT * from mysql_types')
|
||
rv = Table('mysql_types_v', m2, autoload=True)
|
||
|
||
expected = [len(c) > 1 and c[1] or c[0] for c in specs]
|
||
|
||
# Early 5.0 releases seem to report more "general" for columns
|
||
# in a view, e.g. char -> varchar, tinyblob -> mediumblob
|
||
#
|
||
# Not sure exactly which point version has the fix.
|
||
if db.dialect.server_version_info < (5, 0, 11):
|
||
tables = rt,
|
||
else:
|
||
tables = rt, rv
|
||
|
||
for table in tables:
|
||
for i, reflected in enumerate(table.c):
|
||
assert isinstance(reflected.type,
|
||
type(expected[i])), \
|
||
'element %d: %r not instance of %r' % (i,
|
||
reflected.type, type(expected[i]))
|
||
finally:
|
||
db.execute('DROP VIEW mysql_types_v')
|
||
finally:
|
||
m.drop_all()
|
||
|
||
def test_autoincrement(self):
|
||
meta = MetaData(testing.db)
|
||
try:
|
||
Table('ai_1', meta,
|
||
Column('int_y', Integer, primary_key=True),
|
||
Column('int_n', Integer, DefaultClause('0'),
|
||
primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_2', meta,
|
||
Column('int_y', Integer, primary_key=True),
|
||
Column('int_n', Integer, DefaultClause('0'),
|
||
primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_3', meta,
|
||
Column('int_n', Integer, DefaultClause('0'),
|
||
primary_key=True, autoincrement=False),
|
||
Column('int_y', Integer, primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_4', meta,
|
||
Column('int_n', Integer, DefaultClause('0'),
|
||
primary_key=True, autoincrement=False),
|
||
Column('int_n2', Integer, DefaultClause('0'),
|
||
primary_key=True, autoincrement=False),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_5', meta,
|
||
Column('int_y', Integer, primary_key=True),
|
||
Column('int_n', Integer, DefaultClause('0'),
|
||
primary_key=True, autoincrement=False),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_6', meta,
|
||
Column('o1', String(1), DefaultClause('x'),
|
||
primary_key=True),
|
||
Column('int_y', Integer, primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_7', meta,
|
||
Column('o1', String(1), DefaultClause('x'),
|
||
primary_key=True),
|
||
Column('o2', String(1), DefaultClause('x'),
|
||
primary_key=True),
|
||
Column('int_y', Integer, primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
Table('ai_8', meta,
|
||
Column('o1', String(1), DefaultClause('x'),
|
||
primary_key=True),
|
||
Column('o2', String(1), DefaultClause('x'),
|
||
primary_key=True),
|
||
mysql_engine='MyISAM')
|
||
meta.create_all()
|
||
|
||
table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
|
||
'ai_5', 'ai_6', 'ai_7', 'ai_8']
|
||
mr = MetaData(testing.db)
|
||
mr.reflect(only=table_names)
|
||
|
||
for tbl in [mr.tables[name] for name in table_names]:
|
||
for c in tbl.c:
|
||
if c.name.startswith('int_y'):
|
||
assert c.autoincrement
|
||
elif c.name.startswith('int_n'):
|
||
assert not c.autoincrement
|
||
tbl.insert().execute()
|
||
if 'int_y' in tbl.c:
|
||
assert select([tbl.c.int_y]).scalar() == 1
|
||
assert list(tbl.select().execute().first()).count(1) == 1
|
||
else:
|
||
assert 1 not in list(tbl.select().execute().first())
|
||
finally:
|
||
meta.drop_all()
|
||
|
||
@testing.exclude('mysql', '<', (5, 0, 0), 'no information_schema support')
|
||
def test_system_views(self):
|
||
dialect = testing.db.dialect
|
||
connection = testing.db.connect()
|
||
view_names = dialect.get_view_names(connection, "information_schema")
|
||
self.assert_('TABLES' in view_names)
|
||
|
||
|
||
class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
|
||
"""Tests MySQL-dialect specific compilation."""
|
||
|
||
__dialect__ = mysql.dialect()
|
||
|
||
def test_precolumns(self):
|
||
dialect = self.__dialect__
|
||
|
||
def gen(distinct=None, prefixes=None):
|
||
kw = {}
|
||
if distinct is not None:
|
||
kw['distinct'] = distinct
|
||
if prefixes is not None:
|
||
kw['prefixes'] = prefixes
|
||
return str(select(['q'], **kw).compile(dialect=dialect))
|
||
|
||
eq_(gen(None), 'SELECT q')
|
||
eq_(gen(True), 'SELECT DISTINCT q')
|
||
|
||
eq_(gen(prefixes=['ALL']), 'SELECT ALL q')
|
||
eq_(gen(prefixes=['DISTINCTROW']),
|
||
'SELECT DISTINCTROW q')
|
||
|
||
# Interaction with MySQL prefix extensions
|
||
eq_(
|
||
gen(None, ['straight_join']),
|
||
'SELECT straight_join q')
|
||
eq_(
|
||
gen(False, ['HIGH_PRIORITY', 'SQL_SMALL_RESULT', 'ALL']),
|
||
'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q')
|
||
eq_(
|
||
gen(True, ['high_priority', sql.text('sql_cache')]),
|
||
'SELECT high_priority sql_cache DISTINCT q')
|
||
|
||
@testing.uses_deprecated
|
||
def test_deprecated_distinct(self):
|
||
dialect = self.__dialect__
|
||
|
||
self.assert_compile(
|
||
select(['q'], distinct='ALL'),
|
||
'SELECT ALL q',
|
||
)
|
||
|
||
self.assert_compile(
|
||
select(['q'], distinct='distinctROW'),
|
||
'SELECT DISTINCTROW q',
|
||
)
|
||
|
||
self.assert_compile(
|
||
select(['q'], distinct='ALL',
|
||
prefixes=['HIGH_PRIORITY', 'SQL_SMALL_RESULT']),
|
||
'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q'
|
||
)
|
||
|
||
def test_backslash_escaping(self):
|
||
self.assert_compile(
|
||
sql.column('foo').like('bar', escape='\\'),
|
||
"foo LIKE %s ESCAPE '\\\\'"
|
||
)
|
||
|
||
dialect = mysql.dialect()
|
||
dialect._backslash_escapes=False
|
||
self.assert_compile(
|
||
sql.column('foo').like('bar', escape='\\'),
|
||
"foo LIKE %s ESCAPE '\\'",
|
||
dialect=dialect
|
||
)
|
||
|
||
def test_limit(self):
|
||
t = sql.table('t', sql.column('col1'), sql.column('col2'))
|
||
|
||
self.assert_compile(
|
||
select([t]).limit(10).offset(20),
|
||
"SELECT t.col1, t.col2 FROM t LIMIT %s, %s",
|
||
{'param_1':20, 'param_2':10}
|
||
)
|
||
self.assert_compile(
|
||
select([t]).limit(10),
|
||
"SELECT t.col1, t.col2 FROM t LIMIT %s",
|
||
{'param_1':10})
|
||
|
||
self.assert_compile(
|
||
select([t]).offset(10),
|
||
"SELECT t.col1, t.col2 FROM t LIMIT %s, 18446744073709551615",
|
||
{'param_1':10}
|
||
)
|
||
|
||
def test_varchar_raise(self):
|
||
for type_ in (
|
||
String,
|
||
VARCHAR,
|
||
String(),
|
||
VARCHAR(),
|
||
NVARCHAR(),
|
||
Unicode,
|
||
Unicode(),
|
||
):
|
||
type_ = sqltypes.to_instance(type_)
|
||
assert_raises_message(
|
||
exc.CompileError,
|
||
"VARCHAR requires a length on dialect mysql",
|
||
type_.compile,
|
||
dialect=mysql.dialect())
|
||
|
||
t1 = Table('sometable', MetaData(),
|
||
Column('somecolumn', type_)
|
||
)
|
||
assert_raises_message(
|
||
exc.CompileError,
|
||
r"\(in table 'sometable', column 'somecolumn'\)\: "
|
||
r"(?:N)?VARCHAR requires a length on dialect mysql",
|
||
schema.CreateTable(t1).compile,
|
||
dialect=mysql.dialect()
|
||
)
|
||
|
||
def test_update_limit(self):
|
||
t = sql.table('t', sql.column('col1'), sql.column('col2'))
|
||
|
||
self.assert_compile(
|
||
t.update(values={'col1':123}),
|
||
"UPDATE t SET col1=%s"
|
||
)
|
||
self.assert_compile(
|
||
t.update(values={'col1':123}, mysql_limit=5),
|
||
"UPDATE t SET col1=%s LIMIT 5"
|
||
)
|
||
self.assert_compile(
|
||
t.update(values={'col1':123}, mysql_limit=None),
|
||
"UPDATE t SET col1=%s"
|
||
)
|
||
self.assert_compile(
|
||
t.update(t.c.col2==456, values={'col1':123}, mysql_limit=1),
|
||
"UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1"
|
||
)
|
||
|
||
def test_utc_timestamp(self):
|
||
self.assert_compile(func.utc_timestamp(), "UTC_TIMESTAMP")
|
||
|
||
def test_sysdate(self):
|
||
self.assert_compile(func.sysdate(), "SYSDATE()")
|
||
|
||
def test_cast(self):
|
||
t = sql.table('t', sql.column('col'))
|
||
m = mysql
|
||
|
||
specs = [
|
||
(Integer, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(INT, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSInteger, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"),
|
||
(SmallInteger, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSSmallInteger, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSTinyInteger, "CAST(t.col AS SIGNED INTEGER)"),
|
||
# 'SIGNED INTEGER' is a bigint, so this is ok.
|
||
(m.MSBigInteger, "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSBigInteger(unsigned=False), "CAST(t.col AS SIGNED INTEGER)"),
|
||
(m.MSBigInteger(unsigned=True),
|
||
"CAST(t.col AS UNSIGNED INTEGER)"),
|
||
(m.MSBit, "t.col"),
|
||
|
||
# this is kind of sucky. thank you default arguments!
|
||
(NUMERIC, "CAST(t.col AS DECIMAL)"),
|
||
(DECIMAL, "CAST(t.col AS DECIMAL)"),
|
||
(Numeric, "CAST(t.col AS DECIMAL)"),
|
||
(m.MSNumeric, "CAST(t.col AS DECIMAL)"),
|
||
(m.MSDecimal, "CAST(t.col AS DECIMAL)"),
|
||
|
||
(FLOAT, "t.col"),
|
||
(Float, "t.col"),
|
||
(m.MSFloat, "t.col"),
|
||
(m.MSDouble, "t.col"),
|
||
(m.MSReal, "t.col"),
|
||
|
||
(TIMESTAMP, "CAST(t.col AS DATETIME)"),
|
||
(DATETIME, "CAST(t.col AS DATETIME)"),
|
||
(DATE, "CAST(t.col AS DATE)"),
|
||
(TIME, "CAST(t.col AS TIME)"),
|
||
(DateTime, "CAST(t.col AS DATETIME)"),
|
||
(Date, "CAST(t.col AS DATE)"),
|
||
(Time, "CAST(t.col AS TIME)"),
|
||
(DateTime, "CAST(t.col AS DATETIME)"),
|
||
(Date, "CAST(t.col AS DATE)"),
|
||
(m.MSTime, "CAST(t.col AS TIME)"),
|
||
(m.MSTimeStamp, "CAST(t.col AS DATETIME)"),
|
||
(m.MSYear, "t.col"),
|
||
(m.MSYear(2), "t.col"),
|
||
(Interval, "t.col"),
|
||
|
||
(String, "CAST(t.col AS CHAR)"),
|
||
(Unicode, "CAST(t.col AS CHAR)"),
|
||
(UnicodeText, "CAST(t.col AS CHAR)"),
|
||
(VARCHAR, "CAST(t.col AS CHAR)"),
|
||
(NCHAR, "CAST(t.col AS CHAR)"),
|
||
(CHAR, "CAST(t.col AS CHAR)"),
|
||
(CLOB, "CAST(t.col AS CHAR)"),
|
||
(TEXT, "CAST(t.col AS CHAR)"),
|
||
(String(32), "CAST(t.col AS CHAR(32))"),
|
||
(Unicode(32), "CAST(t.col AS CHAR(32))"),
|
||
(CHAR(32), "CAST(t.col AS CHAR(32))"),
|
||
(m.MSString, "CAST(t.col AS CHAR)"),
|
||
(m.MSText, "CAST(t.col AS CHAR)"),
|
||
(m.MSTinyText, "CAST(t.col AS CHAR)"),
|
||
(m.MSMediumText, "CAST(t.col AS CHAR)"),
|
||
(m.MSLongText, "CAST(t.col AS CHAR)"),
|
||
(m.MSNChar, "CAST(t.col AS CHAR)"),
|
||
(m.MSNVarChar, "CAST(t.col AS CHAR)"),
|
||
|
||
(LargeBinary, "CAST(t.col AS BINARY)"),
|
||
(BLOB, "CAST(t.col AS BINARY)"),
|
||
(m.MSBlob, "CAST(t.col AS BINARY)"),
|
||
(m.MSBlob(32), "CAST(t.col AS BINARY)"),
|
||
(m.MSTinyBlob, "CAST(t.col AS BINARY)"),
|
||
(m.MSMediumBlob, "CAST(t.col AS BINARY)"),
|
||
(m.MSLongBlob, "CAST(t.col AS BINARY)"),
|
||
(m.MSBinary, "CAST(t.col AS BINARY)"),
|
||
(m.MSBinary(32), "CAST(t.col AS BINARY)"),
|
||
(m.MSVarBinary, "CAST(t.col AS BINARY)"),
|
||
(m.MSVarBinary(32), "CAST(t.col AS BINARY)"),
|
||
|
||
# maybe this could be changed to something more DWIM, needs
|
||
# testing
|
||
(Boolean, "t.col"),
|
||
(BOOLEAN, "t.col"),
|
||
|
||
(m.MSEnum, "t.col"),
|
||
(m.MSEnum("1", "2"), "t.col"),
|
||
(m.MSSet, "t.col"),
|
||
(m.MSSet("1", "2"), "t.col"),
|
||
]
|
||
|
||
for type_, expected in specs:
|
||
self.assert_compile(cast(t.c.col, type_), expected)
|
||
|
||
def test_no_cast_pre_4(self):
|
||
self.assert_compile(
|
||
cast(Column('foo', Integer), String),
|
||
"CAST(foo AS CHAR)",
|
||
)
|
||
dialect = mysql.dialect()
|
||
dialect.server_version_info = (3, 2, 3)
|
||
self.assert_compile(
|
||
cast(Column('foo', Integer), String),
|
||
"foo",
|
||
dialect=dialect
|
||
)
|
||
|
||
def test_cast_grouped_expression_non_castable(self):
|
||
self.assert_compile(
|
||
cast(sql.column('x') + sql.column('y'), Float),
|
||
"(x + y)"
|
||
)
|
||
|
||
def test_cast_grouped_expression_pre_4(self):
|
||
dialect = mysql.dialect()
|
||
dialect.server_version_info = (3, 2, 3)
|
||
self.assert_compile(
|
||
cast(sql.column('x') + sql.column('y'), Integer),
|
||
"(x + y)",
|
||
dialect=dialect
|
||
)
|
||
|
||
def test_extract(self):
|
||
t = sql.table('t', sql.column('col1'))
|
||
|
||
for field in 'year', 'month', 'day':
|
||
self.assert_compile(
|
||
select([extract(field, t.c.col1)]),
|
||
"SELECT EXTRACT(%s FROM t.col1) AS anon_1 FROM t" % field)
|
||
|
||
# millsecondS to millisecond
|
||
self.assert_compile(
|
||
select([extract('milliseconds', t.c.col1)]),
|
||
"SELECT EXTRACT(millisecond FROM t.col1) AS anon_1 FROM t")
|
||
|
||
def test_too_long_index(self):
|
||
exp = 'ix_zyrenian_zyme_zyzzogeton_zyzzogeton_zyrenian_zyme_zyz_5cd2'
|
||
tname = 'zyrenian_zyme_zyzzogeton_zyzzogeton'
|
||
cname = 'zyrenian_zyme_zyzzogeton_zo'
|
||
|
||
t1 = Table(tname, MetaData(),
|
||
Column(cname, Integer, index=True),
|
||
)
|
||
ix1 = list(t1.indexes)[0]
|
||
|
||
self.assert_compile(
|
||
schema.CreateIndex(ix1),
|
||
"CREATE INDEX %s "
|
||
"ON %s (%s)" % (exp, tname, cname),
|
||
dialect=mysql.dialect()
|
||
)
|
||
|
||
def test_innodb_autoincrement(self):
|
||
t1 = Table('sometable', MetaData(), Column('assigned_id',
|
||
Integer(), primary_key=True, autoincrement=False),
|
||
Column('id', Integer(), primary_key=True,
|
||
autoincrement=True), mysql_engine='InnoDB')
|
||
self.assert_compile(schema.CreateTable(t1),
|
||
'CREATE TABLE sometable (assigned_id '
|
||
'INTEGER NOT NULL, id INTEGER NOT NULL '
|
||
'AUTO_INCREMENT, PRIMARY KEY (assigned_id, '
|
||
'id), KEY idx_autoinc_id (id))ENGINE=Inn'
|
||
'oDB')
|
||
|
||
t1 = Table('sometable', MetaData(), Column('assigned_id',
|
||
Integer(), primary_key=True, autoincrement=True),
|
||
Column('id', Integer(), primary_key=True,
|
||
autoincrement=False), mysql_engine='InnoDB')
|
||
self.assert_compile(schema.CreateTable(t1),
|
||
'CREATE TABLE sometable (assigned_id '
|
||
'INTEGER NOT NULL AUTO_INCREMENT, id '
|
||
'INTEGER NOT NULL, PRIMARY KEY '
|
||
'(assigned_id, id))ENGINE=InnoDB')
|
||
|
||
def test_innodb_autoincrement_reserved_word_column_name(self):
|
||
t1 = Table(
|
||
'sometable', MetaData(),
|
||
Column('id', Integer(), primary_key=True, autoincrement=False),
|
||
Column('order', Integer(), primary_key=True, autoincrement=True),
|
||
mysql_engine='InnoDB')
|
||
self.assert_compile(
|
||
schema.CreateTable(t1),
|
||
'CREATE TABLE sometable ('
|
||
'id INTEGER NOT NULL, '
|
||
'`order` INTEGER NOT NULL AUTO_INCREMENT, '
|
||
'PRIMARY KEY (id, `order`), '
|
||
'KEY idx_autoinc_order (`order`)'
|
||
')ENGINE=InnoDB')
|
||
|
||
|
||
class SQLModeDetectionTest(fixtures.TestBase):
|
||
__only_on__ = 'mysql'
|
||
|
||
def _options(self, modes):
|
||
def connect(con, record):
|
||
cursor = con.cursor()
|
||
print "DOING THiS:", "set sql_mode='%s'" % (",".join(modes))
|
||
cursor.execute("set sql_mode='%s'" % (",".join(modes)))
|
||
e = engines.testing_engine(options={
|
||
'pool_events':[
|
||
(connect, 'first_connect'),
|
||
(connect, 'connect')
|
||
]
|
||
})
|
||
return e
|
||
|
||
def test_backslash_escapes(self):
|
||
engine = self._options(['NO_BACKSLASH_ESCAPES'])
|
||
c = engine.connect()
|
||
assert not engine.dialect._backslash_escapes
|
||
c.close()
|
||
engine.dispose()
|
||
|
||
engine = self._options([])
|
||
c = engine.connect()
|
||
assert engine.dialect._backslash_escapes
|
||
c.close()
|
||
engine.dispose()
|
||
|
||
def test_ansi_quotes(self):
|
||
engine = self._options(['ANSI_QUOTES'])
|
||
c = engine.connect()
|
||
assert engine.dialect._server_ansiquotes
|
||
c.close()
|
||
engine.dispose()
|
||
|
||
def test_combination(self):
|
||
engine = self._options(['ANSI_QUOTES,NO_BACKSLASH_ESCAPES'])
|
||
c = engine.connect()
|
||
assert engine.dialect._server_ansiquotes
|
||
assert not engine.dialect._backslash_escapes
|
||
c.close()
|
||
engine.dispose()
|
||
|
||
class RawReflectionTest(fixtures.TestBase):
|
||
def setup(self):
|
||
dialect = mysql.dialect()
|
||
self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer)
|
||
|
||
def test_key_reflection(self):
|
||
regex = self.parser._re_key
|
||
|
||
assert regex.match(' PRIMARY KEY (`id`),')
|
||
assert regex.match(' PRIMARY KEY USING BTREE (`id`),')
|
||
assert regex.match(' PRIMARY KEY (`id`) USING BTREE,')
|
||
assert regex.match(' PRIMARY KEY (`id`)')
|
||
assert regex.match(' PRIMARY KEY USING BTREE (`id`)')
|
||
assert regex.match(' PRIMARY KEY (`id`) USING BTREE')
|
||
|
||
|
||
class ExecutionTest(fixtures.TestBase):
|
||
"""Various MySQL execution special cases."""
|
||
|
||
__only_on__ = 'mysql'
|
||
|
||
def test_charset_caching(self):
|
||
engine = engines.testing_engine()
|
||
|
||
cx = engine.connect()
|
||
meta = MetaData()
|
||
charset = engine.dialect._detect_charset(cx)
|
||
|
||
meta.reflect(cx)
|
||
eq_(cx.dialect._connection_charset, charset)
|
||
cx.close()
|
||
|
||
def test_sysdate(self):
|
||
d = testing.db.scalar(func.sysdate())
|
||
assert isinstance(d, datetime.datetime)
|
||
|
||
class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
|
||
__only_on__ = 'mysql'
|
||
|
||
@classmethod
|
||
def setup_class(cls):
|
||
global metadata, cattable, matchtable
|
||
metadata = MetaData(testing.db)
|
||
|
||
cattable = Table('cattable', metadata,
|
||
Column('id', Integer, primary_key=True),
|
||
Column('description', String(50)),
|
||
mysql_engine='MyISAM'
|
||
)
|
||
matchtable = Table('matchtable', metadata,
|
||
Column('id', Integer, primary_key=True),
|
||
Column('title', String(200)),
|
||
Column('category_id', Integer, ForeignKey('cattable.id')),
|
||
mysql_engine='MyISAM'
|
||
)
|
||
metadata.create_all()
|
||
|
||
cattable.insert().execute([
|
||
{'id': 1, 'description': 'Python'},
|
||
{'id': 2, 'description': 'Ruby'},
|
||
])
|
||
matchtable.insert().execute([
|
||
{'id': 1,
|
||
'title': 'Agile Web Development with Rails',
|
||
'category_id': 2},
|
||
{'id': 2,
|
||
'title': 'Dive Into Python',
|
||
'category_id': 1},
|
||
{'id': 3,
|
||
'title': "Programming Matz's Ruby",
|
||
'category_id': 2},
|
||
{'id': 4,
|
||
'title': 'The Definitive Guide to Django',
|
||
'category_id': 1},
|
||
{'id': 5,
|
||
'title': 'Python in a Nutshell',
|
||
'category_id': 1}
|
||
])
|
||
|
||
@classmethod
|
||
def teardown_class(cls):
|
||
metadata.drop_all()
|
||
|
||
@testing.fails_on('mysql+mysqlconnector', 'uses pyformat')
|
||
def test_expression(self):
|
||
format = testing.db.dialect.paramstyle == 'format' and '%s' or '?'
|
||
self.assert_compile(
|
||
matchtable.c.title.match('somstr'),
|
||
"MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format)
|
||
|
||
@testing.fails_on('mysql+mysqldb', 'uses format')
|
||
@testing.fails_on('mysql+pymysql', 'uses format')
|
||
@testing.fails_on('mysql+oursql', 'uses format')
|
||
@testing.fails_on('mysql+pyodbc', 'uses format')
|
||
@testing.fails_on('mysql+zxjdbc', 'uses format')
|
||
def test_expression(self):
|
||
format = '%(title_1)s'
|
||
self.assert_compile(
|
||
matchtable.c.title.match('somstr'),
|
||
"MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format)
|
||
|
||
def test_simple_match(self):
|
||
results = (matchtable.select().
|
||
where(matchtable.c.title.match('python')).
|
||
order_by(matchtable.c.id).
|
||
execute().
|
||
fetchall())
|
||
eq_([2, 5], [r.id for r in results])
|
||
|
||
def test_simple_match_with_apostrophe(self):
|
||
results = (matchtable.select().
|
||
where(matchtable.c.title.match("Matz's")).
|
||
execute().
|
||
fetchall())
|
||
eq_([3], [r.id for r in results])
|
||
|
||
def test_or_match(self):
|
||
results1 = (matchtable.select().
|
||
where(or_(matchtable.c.title.match('nutshell'),
|
||
matchtable.c.title.match('ruby'))).
|
||
order_by(matchtable.c.id).
|
||
execute().
|
||
fetchall())
|
||
eq_([3, 5], [r.id for r in results1])
|
||
results2 = (matchtable.select().
|
||
where(matchtable.c.title.match('nutshell ruby')).
|
||
order_by(matchtable.c.id).
|
||
execute().
|
||
fetchall())
|
||
eq_([3, 5], [r.id for r in results2])
|
||
|
||
|
||
def test_and_match(self):
|
||
results1 = (matchtable.select().
|
||
where(and_(matchtable.c.title.match('python'),
|
||
matchtable.c.title.match('nutshell'))).
|
||
execute().
|
||
fetchall())
|
||
eq_([5], [r.id for r in results1])
|
||
results2 = (matchtable.select().
|
||
where(matchtable.c.title.match('+python +nutshell')).
|
||
execute().
|
||
fetchall())
|
||
eq_([5], [r.id for r in results2])
|
||
|
||
def test_match_across_joins(self):
|
||
results = (matchtable.select().
|
||
where(and_(cattable.c.id==matchtable.c.category_id,
|
||
or_(cattable.c.description.match('Ruby'),
|
||
matchtable.c.title.match('nutshell')))).
|
||
order_by(matchtable.c.id).
|
||
execute().
|
||
fetchall())
|
||
eq_([1, 3, 5], [r.id for r in results])
|
||
|
||
|
||
def colspec(c):
|
||
return testing.db.dialect.ddl_compiler(
|
||
testing.db.dialect, None).get_column_specification(c)
|
||
|