Files
sqlalchemy/test/dialect/mysql/test_reflection.py
T
2015-02-07 10:22:08 -05:00

418 lines
17 KiB
Python

# coding: utf-8
from sqlalchemy.testing import eq_
from sqlalchemy import *
from sqlalchemy import sql
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.testing import fixtures, AssertsExecutionResults
from sqlalchemy import testing
class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = 'mysql'
__backend__ = True
def test_default_reflection(self):
"""Test reflection of column defaults."""
from sqlalchemy.dialects.mysql import VARCHAR
def_table = Table(
'mysql_def',
MetaData(testing.db),
Column('c1', VARCHAR(10, collation='utf8_unicode_ci'),
DefaultClause(''), nullable=False),
Column('c2', String(10), DefaultClause('0')),
Column('c3', String(10), DefaultClause('abc')),
Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00')),
Column('c5', TIMESTAMP),
Column('c6', TIMESTAMP,
DefaultClause(sql.text("CURRENT_TIMESTAMP "
"ON UPDATE CURRENT_TIMESTAMP"))),
)
def_table.create()
try:
reflected = Table('mysql_def', MetaData(testing.db),
autoload=True)
finally:
def_table.drop()
assert def_table.c.c1.server_default.arg == ''
assert def_table.c.c2.server_default.arg == '0'
assert def_table.c.c3.server_default.arg == 'abc'
assert def_table.c.c4.server_default.arg \
== '2009-04-05 12:00:00'
assert str(reflected.c.c1.server_default.arg) == "''"
assert str(reflected.c.c2.server_default.arg) == "'0'"
assert str(reflected.c.c3.server_default.arg) == "'abc'"
assert str(reflected.c.c4.server_default.arg) \
== "'2009-04-05 12:00:00'"
assert reflected.c.c5.default is None
assert reflected.c.c5.server_default is None
assert reflected.c.c6.default is None
eq_(
str(reflected.c.c6.server_default.arg).upper(),
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
)
reflected.create()
try:
reflected2 = Table('mysql_def', MetaData(testing.db),
autoload=True)
finally:
reflected.drop()
assert str(reflected2.c.c1.server_default.arg) == "''"
assert str(reflected2.c.c2.server_default.arg) == "'0'"
assert str(reflected2.c.c3.server_default.arg) == "'abc'"
assert str(reflected2.c.c4.server_default.arg) \
== "'2009-04-05 12:00:00'"
assert reflected.c.c5.default is None
assert reflected.c.c5.server_default is None
assert reflected.c.c6.default is None
eq_(
str(reflected.c.c6.server_default.arg).upper(),
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
)
def test_reflection_with_table_options(self):
comment = r"""Comment types type speedily ' " \ '' Fun!"""
def_table = Table('mysql_def', MetaData(testing.db),
Column('c1', Integer()),
mysql_engine='MEMORY',
mysql_comment=comment,
mysql_default_charset='utf8',
mysql_auto_increment='5',
mysql_avg_row_length='3',
mysql_password='secret',
mysql_connection='fish',
)
def_table.create()
try:
reflected = Table('mysql_def', MetaData(testing.db),
autoload=True)
finally:
def_table.drop()
assert def_table.kwargs['mysql_engine'] == 'MEMORY'
assert def_table.kwargs['mysql_comment'] == comment
assert def_table.kwargs['mysql_default_charset'] == 'utf8'
assert def_table.kwargs['mysql_auto_increment'] == '5'
assert def_table.kwargs['mysql_avg_row_length'] == '3'
assert def_table.kwargs['mysql_password'] == 'secret'
assert def_table.kwargs['mysql_connection'] == 'fish'
assert reflected.kwargs['mysql_engine'] == 'MEMORY'
assert reflected.kwargs['mysql_comment'] == comment
assert reflected.kwargs['mysql_default charset'] == 'utf8'
assert reflected.kwargs['mysql_avg_row_length'] == '3'
assert reflected.kwargs['mysql_connection'] == 'fish'
# This field doesn't seem to be returned by mysql itself.
#assert reflected.kwargs['mysql_password'] == 'secret'
# This is explicitly ignored when reflecting schema.
#assert reflected.kwargs['mysql_auto_increment'] == '5'
def test_reflection_on_include_columns(self):
"""Test reflection of include_columns to be sure they respect case."""
case_table = Table('mysql_case', MetaData(testing.db),
Column('c1', String(10)),
Column('C2', String(10)),
Column('C3', String(10)))
try:
case_table.create()
reflected = Table('mysql_case', MetaData(testing.db),
autoload=True, include_columns=['c1', 'C2'])
for t in case_table, reflected:
assert 'c1' in t.c.keys()
assert 'C2' in t.c.keys()
reflected2 = Table('mysql_case', MetaData(testing.db),
autoload=True, include_columns=['c1', 'c2'])
assert 'c1' in reflected2.c.keys()
for c in ['c2', 'C2', 'C3']:
assert c not in reflected2.c.keys()
finally:
case_table.drop()
@testing.exclude('mysql', '<', (5, 0, 0), 'early types are squirrely')
@testing.uses_deprecated('Using String type with no length')
@testing.uses_deprecated('Manually quoting ENUM value literals')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
specs = [(String(1), mysql.MSString(1), ),
(String(3), mysql.MSString(3), ),
(Text(), mysql.MSText(), ),
(Unicode(1), mysql.MSString(1), ),
(Unicode(3), mysql.MSString(3), ),
(UnicodeText(), mysql.MSText(), ),
(mysql.MSChar(1), ),
(mysql.MSChar(3), ),
(NCHAR(2), mysql.MSChar(2), ),
(mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only
(mysql.MSNVarChar(22), mysql.MSString(22), ),
(SmallInteger(), mysql.MSSmallInteger(), ),
(SmallInteger(), mysql.MSSmallInteger(4), ),
(mysql.MSSmallInteger(), ),
(mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
(mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
(mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
(LargeBinary(3), mysql.TINYBLOB(), ),
(LargeBinary(), mysql.BLOB() ),
(mysql.MSBinary(3), mysql.MSBinary(3), ),
(mysql.MSVarBinary(3),),
(mysql.MSTinyBlob(),),
(mysql.MSBlob(),),
(mysql.MSBlob(1234), mysql.MSBlob()),
(mysql.MSMediumBlob(),),
(mysql.MSLongBlob(),),
(mysql.ENUM("''","'fleem'"), ),
]
columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
db = testing.db
m = MetaData(db)
t_table = Table('mysql_types', m, *columns)
try:
m.create_all()
m2 = MetaData(db)
rt = Table('mysql_types', m2, autoload=True)
try:
db.execute('CREATE OR REPLACE VIEW mysql_types_v '
'AS SELECT * from mysql_types')
rv = Table('mysql_types_v', m2, autoload=True)
expected = [len(c) > 1 and c[1] or c[0] for c in specs]
# Early 5.0 releases seem to report more "general" for columns
# in a view, e.g. char -> varchar, tinyblob -> mediumblob
#
# Not sure exactly which point version has the fix.
if db.dialect.server_version_info < (5, 0, 11):
tables = rt,
else:
tables = rt, rv
for table in tables:
for i, reflected in enumerate(table.c):
assert isinstance(reflected.type,
type(expected[i])), \
'element %d: %r not instance of %r' % (i,
reflected.type, type(expected[i]))
finally:
db.execute('DROP VIEW mysql_types_v')
finally:
m.drop_all()
def test_autoincrement(self):
meta = MetaData(testing.db)
try:
Table('ai_1', meta,
Column('int_y', Integer, primary_key=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True),
mysql_engine='MyISAM')
Table('ai_2', meta,
Column('int_y', Integer, primary_key=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True),
mysql_engine='MyISAM')
Table('ai_3', meta,
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
Column('int_y', Integer, primary_key=True),
mysql_engine='MyISAM')
Table('ai_4', meta,
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
Column('int_n2', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
mysql_engine='MyISAM')
Table('ai_5', meta,
Column('int_y', Integer, primary_key=True),
Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
mysql_engine='MyISAM')
Table('ai_6', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('int_y', Integer, primary_key=True),
mysql_engine='MyISAM')
Table('ai_7', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('o2', String(1), DefaultClause('x'),
primary_key=True),
Column('int_y', Integer, primary_key=True),
mysql_engine='MyISAM')
Table('ai_8', meta,
Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('o2', String(1), DefaultClause('x'),
primary_key=True),
mysql_engine='MyISAM')
meta.create_all()
table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
'ai_5', 'ai_6', 'ai_7', 'ai_8']
mr = MetaData(testing.db)
mr.reflect(only=table_names)
for tbl in [mr.tables[name] for name in table_names]:
for c in tbl.c:
if c.name.startswith('int_y'):
assert c.autoincrement
elif c.name.startswith('int_n'):
assert not c.autoincrement
tbl.insert().execute()
if 'int_y' in tbl.c:
assert select([tbl.c.int_y]).scalar() == 1
assert list(tbl.select().execute().first()).count(1) == 1
else:
assert 1 not in list(tbl.select().execute().first())
finally:
meta.drop_all()
@testing.exclude('mysql', '<', (5, 0, 0), 'no information_schema support')
def test_system_views(self):
dialect = testing.db.dialect
connection = testing.db.connect()
view_names = dialect.get_view_names(connection, "information_schema")
self.assert_('TABLES' in view_names)
@testing.provide_metadata
def test_nullable_reflection(self):
"""test reflection of NULL/NOT NULL, in particular with TIMESTAMP
defaults where MySQL is inconsistent in how it reports CREATE TABLE.
"""
meta = self.metadata
# this is ideally one table, but older MySQL versions choke
# on the multiple TIMESTAMP columns
reflected = []
for idx, cols in enumerate([
[
"x INTEGER NULL",
"y INTEGER NOT NULL",
"z INTEGER",
"q TIMESTAMP NULL"
],
["p TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP"],
["r TIMESTAMP NOT NULL"],
["s TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"],
["t TIMESTAMP"],
["u TIMESTAMP DEFAULT CURRENT_TIMESTAMP"]
]):
Table("nn_t%d" % idx, meta) # to allow DROP
testing.db.execute("""
CREATE TABLE nn_t%d (
%s
)
""" % (idx, ", \n".join(cols)))
reflected.extend(
{
"name": d['name'], "nullable": d['nullable'],
"default": d['default']}
for d in inspect(testing.db).get_columns("nn_t%d" % idx)
)
eq_(
reflected,
[
{'name': 'x', 'nullable': True, 'default': None},
{'name': 'y', 'nullable': False, 'default': None},
{'name': 'z', 'nullable': True, 'default': None},
{'name': 'q', 'nullable': True, 'default': None},
{'name': 'p', 'nullable': True,
'default': 'CURRENT_TIMESTAMP'},
{'name': 'r', 'nullable': False,
'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"},
{'name': 's', 'nullable': False,
'default': 'CURRENT_TIMESTAMP'},
{'name': 't', 'nullable': False,
'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"},
{'name': 'u', 'nullable': False,
'default': 'CURRENT_TIMESTAMP'},
]
)
@testing.provide_metadata
def test_reflection_with_unique_constraint(self):
insp = inspect(testing.db)
meta = self.metadata
uc_table = Table('mysql_uc', meta,
Column('a', String(10)),
UniqueConstraint('a', name='uc_a'))
uc_table.create()
# MySQL converts unique constraints into unique indexes.
# separately we get both
indexes = dict((i['name'], i) for i in insp.get_indexes('mysql_uc'))
constraints = set(i['name']
for i in insp.get_unique_constraints('mysql_uc'))
self.assert_('uc_a' in indexes)
self.assert_(indexes['uc_a']['unique'])
self.assert_('uc_a' in constraints)
# reflection here favors the unique index, as that's the
# more "official" MySQL construct
reflected = Table('mysql_uc', MetaData(testing.db), autoload=True)
indexes = dict((i.name, i) for i in reflected.indexes)
constraints = set(uc.name for uc in reflected.constraints)
self.assert_('uc_a' in indexes)
self.assert_(indexes['uc_a'].unique)
self.assert_('uc_a' not in constraints)
class RawReflectionTest(fixtures.TestBase):
def setup(self):
dialect = mysql.dialect()
self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer)
def test_key_reflection(self):
regex = self.parser._re_key
assert regex.match(' PRIMARY KEY (`id`),')
assert regex.match(' PRIMARY KEY USING BTREE (`id`),')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE,')
assert regex.match(' PRIMARY KEY (`id`)')
assert regex.match(' PRIMARY KEY USING BTREE (`id`)')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE 16')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE=16')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = 16')
assert not regex.match(' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = = 16')
def test_fk_reflection(self):
regex = self.parser._re_constraint
m = regex.match(' CONSTRAINT `addresses_user_id_fkey` '
'FOREIGN KEY (`user_id`) '
'REFERENCES `users` (`id`) '
'ON DELETE CASCADE ON UPDATE CASCADE')
eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
'`users`', '`id`', None, 'CASCADE', 'CASCADE'))
m = regex.match(' CONSTRAINT `addresses_user_id_fkey` '
'FOREIGN KEY (`user_id`) '
'REFERENCES `users` (`id`) '
'ON DELETE CASCADE ON UPDATE SET NULL')
eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
'`users`', '`id`', None, 'CASCADE', 'SET NULL'))