Files
sqlalchemy/test/sql/testtypes.py
T
Mike Bayer 7d74fc7785 - added "pickleable" module to test suite to have cPickle-compatible
test objects
- added copy_function, compare_function arguments to InstrumentedAttribute
- added MutableType mixin, copy_value/compare_values methods to TypeEngine,
PickleType
- ColumnProperty and DeferredProperty propigate the TypeEngine copy/compare
methods to the attribute instrumentation
- cleanup of UnitOfWork, removed unused methods
- UnitOfWork "dirty" list is calculated across the total collection of persistent
objects when called, no longer has register_dirty.
- attribute system can still report "modified" status fairly quickly, but does
extra work for InstrumentedAttributes that have detected a "mutable" type where
catching the __set__() event is not enough (i.e. PickleTypes)
- attribute tracking modified to be more intelligent about detecting
changes, particularly with mutable types.  TypeEngine objects now
take a greater role in defining how to compare two scalar instances,
including the addition of a MutableType mixin which is implemented by
PickleType.  unit-of-work now tracks the "dirty" list as an expression
of all persistent objects where the attribute manager detects changes.
The basic issue thats fixed is detecting changes on PickleType
objects, but also generalizes type handling and "modified" object
checking to be more complete and extensible.
2006-09-23 20:26:20 +00:00

338 lines
14 KiB
Python

from testbase import PersistTest, AssertMixin
import testbase
import pickleable
from sqlalchemy import *
import string,datetime, re, sys
import sqlalchemy.engine.url as url
import sqlalchemy.types
db = testbase.db
class MyType(types.TypeEngine):
def get_col_spec(self):
return "VARCHAR(100)"
def convert_bind_param(self, value, engine):
return "BIND_IN"+ value
def convert_result_value(self, value, engine):
return value + "BIND_OUT"
def adapt(self, typeobj):
return typeobj()
class MyDecoratedType(types.TypeDecorator):
impl = String
def convert_bind_param(self, value, engine):
return "BIND_IN"+ value
def convert_result_value(self, value, engine):
return value + "BIND_OUT"
def copy(self):
return MyDecoratedType()
class MyUnicodeType(types.Unicode):
def convert_bind_param(self, value, engine):
return "UNI_BIND_IN"+ value
def convert_result_value(self, value, engine):
return value + "UNI_BIND_OUT"
def copy(self):
return MyUnicodeType(self.impl.length)
class AdaptTest(PersistTest):
def testadapt(self):
e1 = url.URL('postgres').get_module().dialect()
e2 = url.URL('mysql').get_module().dialect()
e3 = url.URL('sqlite').get_module().dialect()
type = String(40)
t1 = type.dialect_impl(e1)
t2 = type.dialect_impl(e2)
t3 = type.dialect_impl(e3)
assert t1 != t2
assert t2 != t3
assert t3 != t1
def testdecorator(self):
t1 = Unicode(20)
t2 = Unicode()
assert isinstance(t1.impl, String)
assert not isinstance(t1.impl, TEXT)
assert (t1.impl.length == 20)
assert isinstance(t2.impl, TEXT)
assert t2.impl.length is None
class OverrideTest(PersistTest):
"""tests user-defined types, including a full type as well as a TypeDecorator"""
def testprocessing(self):
global users
users.insert().execute(user_id = 2, goofy = 'jack', goofy2='jack', goofy3='jack', goofy4='jack')
users.insert().execute(user_id = 3, goofy = 'lala', goofy2='lala', goofy3='lala', goofy4='lala')
users.insert().execute(user_id = 4, goofy = 'fred', goofy2='fred', goofy3='fred', goofy4='fred')
l = users.select().execute().fetchall()
print repr(l)
self.assert_(l == [(2, 'BIND_INjackBIND_OUT', 'BIND_INjackBIND_OUT', 'BIND_INjackBIND_OUT', u'UNI_BIND_INjackUNI_BIND_OUT'), (3, 'BIND_INlalaBIND_OUT', 'BIND_INlalaBIND_OUT', 'BIND_INlalaBIND_OUT', u'UNI_BIND_INlalaUNI_BIND_OUT'), (4, 'BIND_INfredBIND_OUT', 'BIND_INfredBIND_OUT', 'BIND_INfredBIND_OUT', u'UNI_BIND_INfredUNI_BIND_OUT')])
def setUpAll(self):
global users
users = Table('type_users', db,
Column('user_id', Integer, primary_key = True),
# totall custom type
Column('goofy', MyType, nullable = False),
# decorated type with an argument, so its a String
Column('goofy2', MyDecoratedType(50), nullable = False),
# decorated type without an argument, it will adapt_args to TEXT
Column('goofy3', MyDecoratedType, nullable = False),
Column('goofy4', MyUnicodeType, nullable = False),
)
users.create()
def tearDownAll(self):
global users
users.drop()
class ColumnsTest(AssertMixin):
def testcolumns(self):
expectedResults = { 'int_column': 'int_column INTEGER',
'smallint_column': 'smallint_column SMALLINT',
'varchar_column': 'varchar_column VARCHAR(20)',
'numeric_column': 'numeric_column NUMERIC(12, 3)',
'float_column': 'float_column NUMERIC(25, 2)'
}
if not db.name=='sqlite':
expectedResults['float_column'] = 'float_column FLOAT(25)'
print db.engine.__module__
testTable = Table('testColumns', db,
Column('int_column', Integer),
Column('smallint_column', Smallinteger),
Column('varchar_column', String(20)),
Column('numeric_column', Numeric(12,3)),
Column('float_column', Float(25)),
)
for aCol in testTable.c:
self.assertEquals(expectedResults[aCol.name], db.dialect.schemagenerator(db, None).get_column_specification(aCol))
class UnicodeTest(AssertMixin):
"""tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
def setUpAll(self):
global unicode_table
unicode_table = Table('unicode_table', db,
Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
Column('unicode_data', Unicode(250)),
Column('plain_data', String(250))
)
unicode_table.create()
def tearDownAll(self):
unicode_table.drop()
def testbasic(self):
rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
unicodedata = rawdata.decode('utf-8')
unicode_table.insert().execute(unicode_data=unicodedata, plain_data=rawdata)
x = unicode_table.select().execute().fetchone()
self.echo(repr(x['unicode_data']))
self.echo(repr(x['plain_data']))
self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
if isinstance(x['plain_data'], unicode):
# SQLLite returns even non-unicode data as unicode
self.assert_(db.name == 'sqlite')
self.echo("its sqlite !")
else:
self.assert_(not isinstance(x['plain_data'], unicode) and x['plain_data'] == rawdata)
def testengineparam(self):
"""tests engine-wide unicode conversion"""
prev_unicode = db.engine.dialect.convert_unicode
try:
db.engine.dialect.convert_unicode = True
rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
unicodedata = rawdata.decode('utf-8')
unicode_table.insert().execute(unicode_data=unicodedata, plain_data=rawdata)
x = unicode_table.select().execute().fetchone()
self.echo(repr(x['unicode_data']))
self.echo(repr(x['plain_data']))
self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
self.assert_(isinstance(x['plain_data'], unicode) and x['plain_data'] == unicodedata)
finally:
db.engine.dialect.convert_unicode = prev_unicode
class BinaryTest(AssertMixin):
def setUpAll(self):
global binary_table
binary_table = Table('binary_table', db,
Column('primary_id', Integer, primary_key=True),
Column('data', Binary),
Column('data_slice', Binary(100)),
Column('misc', String(30)),
# construct PickleType with non-native pickle module, since cPickle uses relative module
# loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
# to the 'types' module
Column('pickled', PickleType)
)
binary_table.create()
def tearDownAll(self):
binary_table.drop()
def testbinary(self):
testobj1 = pickleable.Foo('im foo 1')
testobj2 = pickleable.Foo('im foo 2')
stream1 =self.get_module_stream('sqlalchemy.sql')
stream2 =self.get_module_stream('sqlalchemy.schema')
binary_table.insert().execute(primary_id=1, misc='sql.pyc', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
binary_table.insert().execute(primary_id=2, misc='schema.pyc', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
l = binary_table.select().execute().fetchall()
print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assert_(list(stream1) == list(l[0]['data']))
self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
self.assert_(list(stream2) == list(l[1]['data']))
self.assert_(testobj1 == l[0]['pickled'])
self.assert_(testobj2 == l[1]['pickled'])
def get_module_stream(self, name):
mod = __import__(name)
for token in name.split('.')[1:]:
mod = getattr(mod, token)
f = mod.__file__
f = re.sub('\.py$', '\.pyc', f)
# put a number less than the typical MySQL default BLOB size
return file(f).read(59473)
class DateTest(AssertMixin):
def setUpAll(self):
global users_with_date, insert_data
insert_data = [
[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.time(12,20,2)],
[8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)],
[9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)],
[10, 'colber', None, None, None]
]
fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime(timezone=False)),
Column('user_date', Date), Column('user_time', Time)]
if db.engine.name == 'mysql' or db.engine.name == 'mssql':
# strip microseconds -- not supported by this engine (should be an easier way to detect this)
for d in insert_data:
if d[2] is not None:
d[2] = d[2].replace(microsecond=0)
if d[4] is not None:
d[4] = d[4].replace(microsecond=0)
try:
db.type_descriptor(types.TIME).get_col_spec()
except:
# don't test TIME type -- not supported by this engine
insert_data = [d[:-1] for d in insert_data]
fnames = fnames[:-1]
collist = collist[:-1]
users_with_date = Table('query_users_with_date', db, redefine = True, *collist)
users_with_date.create()
insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
for idict in insert_dicts:
users_with_date.insert().execute(**idict) # insert the data
def tearDownAll(self):
users_with_date.drop()
def testdate(self):
global insert_data
l = map(list, users_with_date.select().execute().fetchall())
self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data))
def testtextdate(self):
x = db.text("select user_datetime from query_users_with_date", typemap={'user_datetime':DateTime}).execute().fetchall()
print repr(x)
self.assert_(isinstance(x[0][0], datetime.datetime))
#x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
#print repr(x)
class TimezoneTest(AssertMixin):
"""test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
info. """
def setUpAll(self):
global tztable, notztable, metadata
metadata = BoundMetaData(testbase.db)
# current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
tztable = Table('tztable', metadata,
Column("id", Integer, primary_key=True),
Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
Column("name", String(20)),
)
notztable = Table('notztable', metadata,
Column("id", Integer, primary_key=True),
Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
Column("name", String(20)),
)
metadata.create_all()
def tearDownAll(self):
metadata.drop_all()
@testbase.supported('postgres')
def testtz(self):
# get a date with a tzinfo
somedate = testbase.db.connect().scalar(func.current_timestamp().select())
tztable.insert().execute(id=1, name='row1', date=somedate)
c = tztable.update(tztable.c.id==1).execute(name='newname')
x = c.last_updated_params()
print x['date'] == somedate
@testbase.supported('postgres')
def testnotz(self):
# get a date without a tzinfo
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
notztable.insert().execute(id=1, name='row1', date=somedate)
c = notztable.update(notztable.c.id==1).execute(name='newname')
x = c.last_updated_params()
print x['date'] == somedate
class BooleanTest(AssertMixin):
def setUpAll(self):
global bool_table
metadata = BoundMetaData(testbase.db)
bool_table = Table('booltest', metadata,
Column('id', Integer, primary_key=True),
Column('value', Boolean))
bool_table.create()
def tearDownAll(self):
bool_table.drop()
def testbasic(self):
bool_table.insert().execute(id=1, value=True)
bool_table.insert().execute(id=2, value=False)
bool_table.insert().execute(id=3, value=True)
bool_table.insert().execute(id=4, value=True)
bool_table.insert().execute(id=5, value=True)
res = bool_table.select(bool_table.c.value==True).execute().fetchall()
print res
assert(res==[(1, True),(3, True),(4, True),(5, True)])
res2 = bool_table.select(bool_table.c.value==False).execute().fetchall()
print res2
assert(res2==[(2, False)])
if __name__ == "__main__":
testbase.main()