Files
sqlalchemy/test/aaa_profiling/test_memusage.py
T
Mike Bayer 03e0f776be - The warning emitted by the Unicode and String types
with convert_unicode=True no longer embeds the actual
value passed.   This so that the Python warning
registry does not continue to grow in size, the warning
is emitted once as per the warning filter settings,
and large string values don't pollute the output.
[ticket:1822]
2010-06-13 16:09:13 -04:00

569 lines
17 KiB
Python

from sqlalchemy.test.testing import eq_
from sqlalchemy.orm import mapper, relationship, create_session,\
clear_mappers, sessionmaker, class_mapper
from sqlalchemy.orm.mapper import _mapper_registry
from sqlalchemy.orm.session import _sessions
from sqlalchemy.util import jython
import operator
from sqlalchemy.test import testing, engines
from sqlalchemy import MetaData, Integer, String, ForeignKey, \
PickleType, create_engine, Unicode
from sqlalchemy.test.schema import Table, Column
import sqlalchemy as sa
from sqlalchemy.sql import column
from sqlalchemy.test.util import gc_collect
import gc
import weakref
from test.orm import _base
if jython:
from nose import SkipTest
raise SkipTest("Profiling not supported on this platform")
class A(_base.ComparableEntity):
pass
class B(_base.ComparableEntity):
pass
def profile_memory(func):
# run the test 50 times. if length of gc.get_objects()
# keeps growing, assert false
def profile(*args):
gc_collect()
samples = [0 for x in range(0, 50)]
for x in range(0, 50):
func(*args)
gc_collect()
samples[x] = len(gc.get_objects())
print "sample gc sizes:", samples
assert len(_sessions) == 0
for x in samples[-4:]:
if x != samples[-5]:
flatline = False
break
else:
flatline = True
# object count is bigger than when it started
if not flatline and samples[-1] > samples[0]:
for x in samples[1:-2]:
# see if a spike bigger than the endpoint exists
if x > samples[-1]:
break
else:
assert False, repr(samples) + " " + repr(flatline)
return profile
def assert_no_mappers():
clear_mappers()
gc_collect()
assert len(_mapper_registry) == 0
class EnsureZeroed(_base.ORMTest):
def setup(self):
_sessions.clear()
_mapper_registry.clear()
class MemUsageTest(EnsureZeroed):
# ensure a pure growing test trips the assertion
@testing.fails_if(lambda: True)
def test_fixture(self):
class Foo(object):
pass
x = []
@profile_memory
def go():
x[-1:] = [Foo(), Foo(), Foo(), Foo(), Foo(), Foo()]
go()
def test_session(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
metadata.create_all()
m1 = mapper(A, table1, properties={
"bs":relationship(B, cascade="all, delete",
order_by=table2.c.col1)},
order_by=table1.c.col1)
m2 = mapper(B, table2)
m3 = mapper(A, table1, non_primary=True)
@profile_memory
def go():
sess = create_session()
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
go()
metadata.drop_all()
del m1, m2, m3
assert_no_mappers()
@testing.crashes('sqlite', ':memory: connection not suitable here')
def test_orm_many_engines(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
metadata.create_all()
m1 = mapper(A, table1, properties={
"bs":relationship(B, cascade="all, delete",
order_by=table2.c.col1)},
order_by=table1.c.col1,
_compiled_cache_size=10
)
m2 = mapper(B, table2,
_compiled_cache_size=10
)
m3 = mapper(A, table1, non_primary=True)
@profile_memory
def go():
engine = engines.testing_engine(
options={'logging_name':'FOO',
'pool_logging_name':'BAR'}
)
sess = create_session(bind=engine)
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
engine.dispose()
go()
metadata.drop_all()
del m1, m2, m3
assert_no_mappers()
def test_many_updates(self):
metadata = MetaData(testing.db)
wide_table = Table('t', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
*[Column('col%d' % i, Integer) for i in range(10)]
)
class Wide(object):
pass
mapper(Wide, wide_table, _compiled_cache_size=10)
metadata.create_all()
session = create_session()
w1 = Wide()
session.add(w1)
session.flush()
session.close()
del session
counter = [1]
@profile_memory
def go():
session = create_session()
w1 = session.query(Wide).first()
x = counter[0]
dec = 10
while dec > 0:
# trying to count in binary here,
# works enough to trip the test case
if pow(2, dec) < x:
setattr(w1, 'col%d' % dec, counter[0])
x -= pow(2, dec)
dec -= 1
session.flush()
session.close()
counter[0] += 1
try:
go()
finally:
metadata.drop_all()
def test_unicode_warnings(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', Unicode(30)))
metadata.create_all()
i = [1]
@testing.emits_warning()
@profile_memory
def go():
# execute with a non-unicode object.
# a warning is emitted, this warning shouldn't
# clog up memory.
testing.db.execute(table1.select().where(table1.c.col2=='foo%d' % i[0]))
i[0] += 1
try:
go()
finally:
metadata.drop_all()
def test_mapper_reset(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
@profile_memory
def go():
m1 = mapper(A, table1, properties={
"bs":relationship(B, order_by=table2.c.col1)
})
m2 = mapper(B, table2)
m3 = mapper(A, table1, non_primary=True)
sess = create_session()
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
clear_mappers()
metadata.create_all()
try:
go()
finally:
metadata.drop_all()
assert_no_mappers()
def test_with_inheritance(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30))
)
table2 = Table("mytable2", metadata,
Column('col1', Integer, ForeignKey('mytable.col1'),
primary_key=True, test_needs_autoincrement=True),
Column('col3', String(30)),
)
@profile_memory
def go():
class A(_base.ComparableEntity):
pass
class B(A):
pass
mapper(A, table1,
polymorphic_on=table1.c.col2,
polymorphic_identity='a')
mapper(B, table2,
inherits=A,
polymorphic_identity='b')
sess = create_session()
a1 = A()
a2 = A()
b1 = B(col3='b1')
b2 = B(col3='b2')
for x in [a1,a2,b1, b2]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
eq_(
[
A(), A(), B(col3='b1'), B(col3='b2')
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
# dont need to clear_mappers()
del B
del A
metadata.create_all()
try:
go()
finally:
metadata.drop_all()
assert_no_mappers()
def test_with_manytomany(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30))
)
table2 = Table("mytable2", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', String(30)),
)
table3 = Table('t1tot2', metadata,
Column('t1', Integer, ForeignKey('mytable.col1')),
Column('t2', Integer, ForeignKey('mytable2.col1')),
)
@profile_memory
def go():
class A(_base.ComparableEntity):
pass
class B(_base.ComparableEntity):
pass
mapper(A, table1, properties={
'bs':relationship(B, secondary=table3,
backref='as', order_by=table3.c.t1)
})
mapper(B, table2)
sess = create_session()
a1 = A(col2='a1')
a2 = A(col2='a2')
b1 = B(col2='b1')
b2 = B(col2='b2')
a1.bs.append(b1)
a2.bs.append(b2)
for x in [a1,a2]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
eq_(
[
A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
# dont need to clear_mappers()
del B
del A
metadata.create_all()
try:
go()
finally:
metadata.drop_all()
assert_no_mappers()
# fails on newer versions of pysqlite due to unusual memory
# behvior in pysqlite itself.
# background at: http://thread.gmane.org/gmane.comp.python.db.pysqlite.user/2290
@testing.fails_if(lambda:
testing.db.dialect.name == 'sqlite' and
testing.db.dialect.dbapi.version > '2.5')
def test_join_cache(self):
metadata = MetaData(testing.db)
table1 = Table("table1", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(30))
)
table2 = Table("table2", metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(30)),
Column('t1id', Integer, ForeignKey('table1.id'))
)
class Foo(object):
pass
class Bar(object):
pass
mapper(Foo, table1, properties={
'bars':relationship(mapper(Bar, table2))
})
metadata.create_all()
session = sessionmaker()
@profile_memory
def go():
s = table2.select()
sess = session()
sess.query(Foo).join((s, Foo.bars)).all()
sess.rollback()
try:
go()
finally:
metadata.drop_all()
def test_mutable_identity(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('col2', PickleType(comparator=operator.eq))
)
class Foo(object):
def __init__(self, col2):
self.col2 = col2
mapper(Foo, table1)
metadata.create_all()
session = sessionmaker()()
def go():
obj = [
Foo({'a':1}),
Foo({'b':1}),
Foo({'c':1}),
Foo({'d':1}),
Foo({'e':1}),
Foo({'f':1}),
Foo({'g':1}),
Foo({'h':1}),
Foo({'i':1}),
Foo({'j':1}),
Foo({'k':1}),
Foo({'l':1}),
]
session.add_all(obj)
session.commit()
testing.eq_(len(session.identity_map._mutable_attrs), 12)
testing.eq_(len(session.identity_map), 12)
obj = None
gc_collect()
testing.eq_(len(session.identity_map._mutable_attrs), 0)
testing.eq_(len(session.identity_map), 0)
try:
go()
finally:
metadata.drop_all()
def test_type_compile(self):
from sqlalchemy.dialects.sqlite.base import dialect as SQLiteDialect
cast = sa.cast(column('x'), sa.Integer)
@profile_memory
def go():
dialect = SQLiteDialect()
cast.compile(dialect=dialect)
go()