Files
sqlalchemy/test/sql/test_labels.py
T
2012-11-23 11:09:57 -05:00

461 lines
17 KiB
Python

from sqlalchemy import *
from sqlalchemy import exc as exceptions
from sqlalchemy import testing
from sqlalchemy.engine import default
from sqlalchemy.sql import table, column
from sqlalchemy.testing import assert_raises, eq_
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Table, Column
IDENT_LENGTH = 29
class LabelTypeTest(fixtures.TestBase):
def test_type(self):
m = MetaData()
t = Table('sometable', m,
Column('col1', Integer),
Column('col2', Float))
assert isinstance(t.c.col1.label('hi').type, Integer)
assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
Float)
class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
run_inserts = 'once'
run_deletes = None
@classmethod
def define_tables(cls, metadata):
table1 = Table('some_large_named_table', metadata,
Column('this_is_the_primarykey_column', Integer,
primary_key=True,
test_needs_autoincrement=True),
Column('this_is_the_data_column', String(30))
)
table2 = Table('table_with_exactly_29_characs', metadata,
Column('this_is_the_primarykey_column', Integer,
primary_key=True,
test_needs_autoincrement=True),
Column('this_is_the_data_column', String(30))
)
cls.tables.table1 = table1
cls.tables.table2 = table2
@classmethod
def insert_data(cls):
table1 = cls.tables.table1
table2 = cls.tables.table2
for data in [
{'this_is_the_primarykey_column':1,
'this_is_the_data_column':'data1'},
{'this_is_the_primarykey_column':2,
'this_is_the_data_column':'data2'},
{'this_is_the_primarykey_column':3,
'this_is_the_data_column':'data3'},
{'this_is_the_primarykey_column':4,
'this_is_the_data_column':'data4'}
]:
testing.db.execute(
table1.insert(),
**data
)
testing.db.execute(
table2.insert(),
{'this_is_the_primary_key_column': 1,
'this_is_the_data_column': 'data'}
)
@classmethod
def setup_class(cls):
super(LongLabelsTest, cls).setup_class()
cls.maxlen = testing.db.dialect.max_identifier_length
testing.db.dialect.max_identifier_length = IDENT_LENGTH
@classmethod
def teardown_class(cls):
testing.db.dialect.max_identifier_length = cls.maxlen
super(LongLabelsTest, cls).teardown_class()
def test_too_long_name_disallowed(self):
m = MetaData(testing.db)
t1 = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
m, Column('foo', Integer))
assert_raises(exceptions.IdentifierError, m.create_all)
assert_raises(exceptions.IdentifierError, m.drop_all)
assert_raises(exceptions.IdentifierError, t1.create)
assert_raises(exceptions.IdentifierError, t1.drop)
def test_basic_result(self):
table1 = self.tables.table1
s = table1.select(use_labels=True,
order_by=[table1.c.this_is_the_primarykey_column])
result = [
(row[table1.c.this_is_the_primarykey_column],
row[table1.c.this_is_the_data_column])
for row in testing.db.execute(s)
]
eq_(result, [
(1, 'data1'),
(2, 'data2'),
(3, 'data3'),
(4, 'data4'),
])
def test_result_limit(self):
table1 = self.tables.table1
# some dialects such as oracle (and possibly ms-sql in a future
# version) generate a subquery for limits/offsets. ensure that the
# generated result map corresponds to the selected table, not the
# select query
s = table1.select(use_labels=True,
order_by=[table1.c.this_is_the_primarykey_column]).\
limit(2)
result = [
(row[table1.c.this_is_the_primarykey_column],
row[table1.c.this_is_the_data_column])
for row in testing.db.execute(s)
]
eq_(result, [
(1, 'data1'),
(2, 'data2'),
])
@testing.requires.offset
def test_result_limit_offset(self):
table1 = self.tables.table1
s = table1.select(use_labels=True,
order_by=[table1.c.this_is_the_primarykey_column]).\
limit(2).offset(1)
result = [
(row[table1.c.this_is_the_primarykey_column],
row[table1.c.this_is_the_data_column])
for row in testing.db.execute(s)
]
eq_(result, [
(2, 'data2'),
(3, 'data3'),
])
def test_table_alias_1(self):
table2 = self.tables.table2
if testing.against('oracle'):
self.assert_compile(table2.alias().select(),
'SELECT '
'table_with_exactly_29_c_1.'
'this_is_the_primarykey_column, '
'table_with_exactly_29_c_1.this_is_the_data_column '
'FROM '
'table_with_exactly_29_characs '
'table_with_exactly_29_c_1'
)
else:
self.assert_compile(
table2.alias().select(),
'SELECT '
'table_with_exactly_29_c_1.'
'this_is_the_primarykey_column, '
'table_with_exactly_29_c_1.this_is_the_data_column '
'FROM '
'table_with_exactly_29_characs '
'AS table_with_exactly_29_c_1'
)
def test_table_alias_2(self):
table1 = self.tables.table1
table2 = self.tables.table2
ta = table2.alias()
dialect = default.DefaultDialect()
dialect.max_identifier_length = IDENT_LENGTH
on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
self.assert_compile(
select([table1, ta]).select_from(table1.join(ta, on)).\
where(ta.c.this_is_the_data_column == 'data3'),
'SELECT '
'some_large_named_table.this_is_the_primarykey_column, '
'some_large_named_table.this_is_the_data_column, '
'table_with_exactly_29_c_1.this_is_the_primarykey_column, '
'table_with_exactly_29_c_1.this_is_the_data_column '
'FROM '
'some_large_named_table '
'JOIN '
'table_with_exactly_29_characs '
'AS '
'table_with_exactly_29_c_1 '
'ON '
'some_large_named_table.this_is_the_data_column = '
'table_with_exactly_29_c_1.this_is_the_data_column '
'WHERE '
'table_with_exactly_29_c_1.this_is_the_data_column = '
':this_is_the_data_column_1',
dialect=dialect
)
def test_table_alias_3(self):
table2 = self.tables.table2
eq_(
testing.db.execute(table2.alias().select()).first(),
(1, 'data')
)
def test_colbinds(self):
table1 = self.tables.table1
r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
execute()
assert r.fetchall() == [(4, 'data4')]
r = table1.select(or_(
table1.c.this_is_the_primarykey_column == 4,
table1.c.this_is_the_primarykey_column == 2
)).execute()
assert r.fetchall() == [(2, 'data2'), (4, 'data4')]
@testing.provide_metadata
def test_insert_no_pk(self):
t = Table('some_other_large_named_table', self.metadata,
Column('this_is_the_primarykey_column', Integer,
Sequence('this_is_some_large_seq'),
primary_key=True),
Column('this_is_the_data_column', String(30))
)
t.create(testing.db, checkfirst=True)
testing.db.execute(t.insert(),
**{'this_is_the_data_column': 'data1'})
@testing.requires.subqueries
def test_subquery(self):
table1 = self.tables.table1
q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
alias('foo')
eq_(
list(testing.db.execute(select([q]))),
[(4, u'data4')]
)
@testing.requires.subqueries
def test_anon_alias(self):
table1 = self.tables.table1
compile_dialect = default.DefaultDialect()
compile_dialect.max_identifier_length = IDENT_LENGTH
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
self.assert_compile(x,
'SELECT '
'anon_1.this_is_the_primarykey_column '
'AS anon_1_this_is_the_prim_1, '
'anon_1.this_is_the_data_column '
'AS anon_1_this_is_the_data_2 '
'FROM ('
'SELECT '
'some_large_named_table.'
'this_is_the_primarykey_column '
'AS this_is_the_primarykey_column, '
'some_large_named_table.this_is_the_data_column '
'AS this_is_the_data_column '
'FROM '
'some_large_named_table '
'WHERE '
'some_large_named_table.this_is_the_primarykey_column '
'= :this_is_the_primarykey__1'
') '
'AS anon_1',
dialect=compile_dialect)
eq_(
list(testing.db.execute(x)),
[(4, u'data4')]
)
def test_adjustable(self):
table1 = self.tables.table1
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).alias('foo')
x = select([q])
compile_dialect = default.DefaultDialect(label_length=10)
self.assert_compile(x,
'SELECT '
'foo.this_1, foo.this_2 '
'FROM ('
'SELECT '
'some_large_named_table.this_is_the_primarykey_column '
'AS this_1, '
'some_large_named_table.this_is_the_data_column '
'AS this_2 '
'FROM '
'some_large_named_table '
'WHERE '
'some_large_named_table.this_is_the_primarykey_column '
'= :this_1'
') '
'AS foo',
dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
self.assert_compile(x,
'SELECT '
'foo._1, foo._2 '
'FROM ('
'SELECT '
'some_large_named_table.this_is_the_primarykey_column '
'AS _1, '
'some_large_named_table.this_is_the_data_column '
'AS _2 '
'FROM '
'some_large_named_table '
'WHERE '
'some_large_named_table.this_is_the_primarykey_column '
'= :_1'
') '
'AS foo',
dialect=compile_dialect)
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
compile_dialect = default.DefaultDialect(label_length=10)
self.assert_compile(x,
'SELECT '
'anon_1.this_2 AS anon_1, '
'anon_1.this_4 AS anon_3 '
'FROM ('
'SELECT '
'some_large_named_table.this_is_the_primarykey_column '
'AS this_2, '
'some_large_named_table.this_is_the_data_column '
'AS this_4 '
'FROM '
'some_large_named_table '
'WHERE '
'some_large_named_table.this_is_the_primarykey_column '
'= :this_1'
') '
'AS anon_1',
dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
self.assert_compile(x,
'SELECT '
'_1._2 AS _1, '
'_1._4 AS _3 '
'FROM ('
'SELECT '
'some_large_named_table.this_is_the_primarykey_column '
'AS _2, '
'some_large_named_table.this_is_the_data_column '
'AS _4 '
'FROM '
'some_large_named_table '
'WHERE '
'some_large_named_table.this_is_the_primarykey_column '
'= :_1'
') '
'AS _1',
dialect=compile_dialect)
def test_adjustable_result_schema_column(self):
table1 = self.tables.table1
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).alias('foo')
x = select([q])
e = testing_engine(options={'label_length': 10})
e.pool = testing.db.pool
row = e.execute(x).first()
eq_(row.this_is_the_primarykey_column, 4)
eq_(row.this_1, 4)
eq_(row['this_1'], 4)
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).alias('foo')
row = e.execute(x).first()
eq_(row.this_is_the_primarykey_column, 4)
eq_(row.this_1, 4)
def test_adjustable_result_lightweight_column(self):
table1 = table('some_large_named_table',
column('this_is_the_primarykey_column'),
column('this_is_the_data_column')
)
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).alias('foo')
x = select([q])
e = testing_engine(options={'label_length': 10})
e.pool = testing.db.pool
row = e.execute(x).first()
eq_(row.this_is_the_primarykey_column, 4)
eq_(row.this_1, 4)
def test_table_plus_column_exceeds_length(self):
'''test that the truncation occurs if tablename / colname are only
greater than the max when concatenated.'''
compile_dialect = default.DefaultDialect(label_length=30)
m = MetaData()
a_table = Table(
'thirty_characters_table_xxxxxx',
m,
Column('id', Integer, primary_key=True)
)
other_table = Table(
'other_thirty_characters_table_',
m,
Column('id', Integer, primary_key=True),
Column('thirty_characters_table_id',
Integer,
ForeignKey('thirty_characters_table_xxxxxx.id'),
primary_key=True
)
)
anon = a_table.alias()
self.assert_compile(
select([other_table, anon]).
select_from(other_table.outerjoin(anon)).apply_labels(),
'SELECT '
'other_thirty_characters_table_.id '
'AS other_thirty_characters__1, '
'other_thirty_characters_table_.thirty_characters_table_id '
'AS other_thirty_characters__2, '
'thirty_characters_table__1.id '
'AS thirty_characters_table__3 '
'FROM '
'other_thirty_characters_table_ '
'LEFT OUTER JOIN '
'thirty_characters_table_xxxxxx AS thirty_characters_table__1 '
'ON thirty_characters_table__1.id = '
'other_thirty_characters_table_.thirty_characters_table_id',
dialect=compile_dialect)
self.assert_compile(
select([other_table, anon]).
select_from(other_table.outerjoin(anon)).apply_labels(),
'SELECT '
'other_thirty_characters_table_.id '
'AS other_thirty_characters__1, '
'other_thirty_characters_table_.thirty_characters_table_id '
'AS other_thirty_characters__2, '
'thirty_characters_table__1.id '
'AS thirty_characters_table__3 '
'FROM '
'other_thirty_characters_table_ '
'LEFT OUTER JOIN '
'thirty_characters_table_xxxxxx AS thirty_characters_table__1 '
'ON thirty_characters_table__1.id = '
'other_thirty_characters_table_.thirty_characters_table_id',
dialect=compile_dialect
)