mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-14 12:47:22 -04:00
3f3d84e754
- added support for reflection of domains [ticket:570]
- types which are missing during reflection resolve to Null type
instead of raising an error
- moved reflection/types/query unit tests specific to postgres to new
postgres unittest module
196 lines
8.2 KiB
Python
196 lines
8.2 KiB
Python
from testbase import AssertMixin
|
|
import testbase
|
|
from sqlalchemy import *
|
|
from sqlalchemy.databases import postgres
|
|
import datetime
|
|
|
|
db = testbase.db
|
|
|
|
class DomainReflectionTest(AssertMixin):
|
|
"Test PostgreSQL domains"
|
|
|
|
@testbase.supported('postgres')
|
|
def setUpAll(self):
|
|
self.con = db.connect()
|
|
self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42')
|
|
self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0')
|
|
self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
|
|
self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
|
|
self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
|
|
|
|
@testbase.supported('postgres')
|
|
def tearDownAll(self):
|
|
self.con.execute('DROP TABLE testtable')
|
|
self.con.execute('DROP TABLE alt_schema.testtable')
|
|
self.con.execute('DROP TABLE crosschema')
|
|
self.con.execute('DROP DOMAIN testdomain')
|
|
self.con.execute('DROP DOMAIN alt_schema.testdomain')
|
|
|
|
@testbase.supported('postgres')
|
|
def test_table_is_reflected(self):
|
|
metadata = BoundMetaData(db)
|
|
table = Table('testtable', metadata, autoload=True)
|
|
self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
|
|
self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
|
|
|
|
@testbase.supported('postgres')
|
|
def test_domain_is_reflected(self):
|
|
metadata = BoundMetaData(db)
|
|
table = Table('testtable', metadata, autoload=True)
|
|
self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
|
|
self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
|
|
|
|
@testbase.supported('postgres')
|
|
def test_table_is_reflected_alt_schema(self):
|
|
metadata = BoundMetaData(db)
|
|
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
|
|
self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
|
|
self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
|
|
|
|
@testbase.supported('postgres')
|
|
def test_schema_domain_is_reflected(self):
|
|
metadata = BoundMetaData(db)
|
|
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
|
|
self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
|
|
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
|
|
|
|
@testbase.supported('postgres')
|
|
def test_crosschema_domain_is_reflected(self):
|
|
metadata = BoundMetaData(db)
|
|
table = Table('crosschema', metadata, autoload=True)
|
|
self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
|
|
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
|
|
|
|
class MiscTest(AssertMixin):
|
|
@testbase.supported('postgres')
|
|
def test_date_reflection(self):
|
|
m1 = BoundMetaData(testbase.db)
|
|
t1 = Table('pgdate', m1,
|
|
Column('date1', DateTime(timezone=True)),
|
|
Column('date2', DateTime(timezone=False))
|
|
)
|
|
m1.create_all()
|
|
try:
|
|
m2 = BoundMetaData(testbase.db)
|
|
t2 = Table('pgdate', m2, autoload=True)
|
|
assert t2.c.date1.type.timezone is True
|
|
assert t2.c.date2.type.timezone is False
|
|
finally:
|
|
m1.drop_all()
|
|
|
|
@testbase.supported('postgres')
|
|
def test_checksfor_sequence(self):
|
|
meta1 = BoundMetaData(testbase.db)
|
|
t = Table('mytable', meta1,
|
|
Column('col1', Integer, Sequence('fooseq')))
|
|
try:
|
|
testbase.db.execute("CREATE SEQUENCE fooseq")
|
|
t.create()
|
|
finally:
|
|
t.drop()
|
|
|
|
@testbase.supported('postgres')
|
|
def test_schema_reflection(self):
|
|
"""note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
|
|
|
|
meta1 = BoundMetaData(testbase.db)
|
|
users = Table('users', meta1,
|
|
Column('user_id', Integer, primary_key = True),
|
|
Column('user_name', String(30), nullable = False),
|
|
schema="alt_schema"
|
|
)
|
|
|
|
addresses = Table('email_addresses', meta1,
|
|
Column('address_id', Integer, primary_key = True),
|
|
Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
|
|
Column('email_address', String(20)),
|
|
schema="alt_schema"
|
|
)
|
|
meta1.create_all()
|
|
try:
|
|
meta2 = BoundMetaData(testbase.db)
|
|
addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
|
|
users = Table('users', meta2, mustexist=True, schema="alt_schema")
|
|
|
|
print users
|
|
print addresses
|
|
j = join(users, addresses)
|
|
print str(j.onclause)
|
|
self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
|
|
finally:
|
|
meta1.drop_all()
|
|
|
|
@testbase.supported('postgres')
|
|
def test_preexecute_passivedefault(self):
|
|
"""test that when we get a primary key column back
|
|
from reflecting a table which has a default value on it, we pre-execute
|
|
that PassiveDefault upon insert."""
|
|
|
|
try:
|
|
meta = BoundMetaData(testbase.db)
|
|
testbase.db.execute("""
|
|
CREATE TABLE speedy_users
|
|
(
|
|
speedy_user_id SERIAL PRIMARY KEY,
|
|
|
|
user_name VARCHAR NOT NULL,
|
|
user_password VARCHAR NOT NULL
|
|
);
|
|
""", None)
|
|
|
|
t = Table("speedy_users", meta, autoload=True)
|
|
r = t.insert().execute(user_name='user', user_password='lala')
|
|
assert r.last_inserted_ids() == [1]
|
|
l = t.select().execute().fetchall()
|
|
assert l == [(1, 'user', 'lala')]
|
|
finally:
|
|
testbase.db.execute("drop table speedy_users", None)
|
|
|
|
class TimezoneTest(AssertMixin):
|
|
"""test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
|
|
if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
|
|
that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
|
|
info. """
|
|
@testbase.supported('postgres')
|
|
def setUpAll(self):
|
|
global tztable, notztable, metadata
|
|
metadata = BoundMetaData(testbase.db)
|
|
|
|
# current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
|
|
tztable = Table('tztable', metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
|
|
Column("name", String(20)),
|
|
)
|
|
notztable = Table('notztable', metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
|
|
Column("name", String(20)),
|
|
)
|
|
metadata.create_all()
|
|
@testbase.supported('postgres')
|
|
def tearDownAll(self):
|
|
metadata.drop_all()
|
|
|
|
@testbase.supported('postgres')
|
|
def test_with_timezone(self):
|
|
# get a date with a tzinfo
|
|
somedate = testbase.db.connect().scalar(func.current_timestamp().select())
|
|
tztable.insert().execute(id=1, name='row1', date=somedate)
|
|
c = tztable.update(tztable.c.id==1).execute(name='newname')
|
|
x = c.last_updated_params()
|
|
print x['date'] == somedate
|
|
|
|
@testbase.supported('postgres')
|
|
def test_without_timezone(self):
|
|
# get a date without a tzinfo
|
|
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
|
|
notztable.insert().execute(id=1, name='row1', date=somedate)
|
|
c = notztable.update(notztable.c.id==1).execute(name='newname')
|
|
x = c.last_updated_params()
|
|
print x['date'] == somedate
|
|
|
|
|
|
if __name__ == "__main__":
|
|
testbase.main()
|