- the zblog example is obsolete, the tests don't really test it

and a key feature of its mapping (the deferred col outside of the select)
doesn't work anyway.
- add a token "deferred on selectable" test to test_mapper.
This commit is contained in:
Mike Bayer
2011-01-16 20:18:32 -05:00
parent 7325ba60bc
commit a9a6766790
7 changed files with 22 additions and 380 deletions
+22
View File
@@ -1892,6 +1892,28 @@ class DeferredTest(_fixtures.FixtureTest):
eq_(o1.description, 'order 1')
self.sql_count_(0, go)
@testing.resolve_artifact_names
def test_map_selectable_wo_deferred(self):
"""test mapping to a selectable with deferred cols,
the selectable doesn't include the deferred col.
"""
order_select = sa.select([
orders.c.id,
orders.c.user_id,
orders.c.address_id,
orders.c.description,
orders.c.isopen]).alias()
mapper(Order, order_select, properties={
'description':deferred(order_select.c.description)
})
sess = Session()
o1 = sess.query(Order).order_by(Order.id).first()
assert 'description' not in o1.__dict__
eq_(o1.description, 'order 1')
@testing.resolve_artifact_names
def test_deep_options(self):
mapper(Item, items, properties=dict(
View File
-35
View File
@@ -1,35 +0,0 @@
import datetime
__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment']
class Blog(object):
def __init__(self, owner=None):
self.owner = owner
class Post(object):
topics = set
def __init__(self, user=None, headline=None, summary=None):
self.user = user
self.datetime = datetime.datetime.today()
self.headline = headline
self.summary = summary
self.comments = []
self.comment_count = 0
class Topic(object):
def __init__(self, keyword=None, description=None):
self.keyword = keyword
self.description = description
class TopicAssociation(object):
def __init__(self, post=None, topic=None, is_primary=False):
self.post = post
self.topic = topic
self.is_primary = is_primary
class Comment(object):
def __init__(self, subject=None, body=None):
self.subject = subject
self.datetime = datetime.datetime.today()
self.body = body
-135
View File
@@ -1,135 +0,0 @@
"""mapper.py - defines mappers for domain objects, mapping operations"""
from test.zblog import tables, user
from test.zblog.blog import *
from sqlalchemy import *
from sqlalchemy.orm import *
import sqlalchemy.util as util
def zblog_mappers():
# User mapper. Here, we redefine the names of some of the columns to
# different property names. normally the table columns are all sucked in
# automatically.
mapper(user.User, tables.users, properties={
'id':tables.users.c.user_id,
'name':tables.users.c.user_name,
'group':tables.users.c.groupname,
'crypt_password':tables.users.c.password,
})
# blog mapper. this contains a reference to the user mapper, and also
# installs a "backreference" on that relationship to handle it in both
# ways. this will also attach a 'blogs' property to the user mapper.
mapper(Blog, tables.blogs, properties={
'id':tables.blogs.c.blog_id,
'owner':relationship(user.User, lazy='joined',
backref=backref('blogs', cascade="all, delete-orphan")),
})
# topic mapper. map all topic columns to the Topic class.
mapper(Topic, tables.topics)
# TopicAssocation mapper. This is an "association" object, which is
# similar to a many-to-many relationship except extra data is associated
# with each pair of related data. because the topic_xref table doesnt
# have a primary key, the "primary key" columns of a TopicAssociation are
# defined manually here.
mapper(TopicAssociation,tables.topic_xref,
primary_key=[tables.topic_xref.c.post_id,
tables.topic_xref.c.topic_id],
properties={
'topic':relationship(Topic, lazy='joined'),
})
# Post mapper, these are posts within a blog.
# since we want the count of comments for each post, create a select that
# will get the posts and count the comments in one query.
posts_with_ccount = select(
[c for c in tables.posts.c if c.key != 'body'] + [
func.count(tables.comments.c.comment_id).label('comment_count')
],
from_obj = [
outerjoin(tables.posts, tables.comments)
],
group_by=[
c for c in tables.posts.c if c.key != 'body'
]
) .alias('postswcount')
# then create a Post mapper on that query.
# we have the body as "deferred" so that it loads only when needed, the
# user as a Lazy load, since the lazy load will run only once per user and
# its usually only one user's posts is needed per page, the owning blog is
# a lazy load since its also probably loaded into the identity map
# already, and topics is an eager load since that query has to be done per
# post in any case.
mapper(Post, posts_with_ccount, properties={
'id':posts_with_ccount.c.post_id,
'body':deferred(tables.posts.c.body),
'user':relationship(user.User, lazy='select',
backref=backref('posts', cascade="all, delete-orphan")),
'blog':relationship(Blog, lazy='select',
backref=backref('posts', cascade="all, delete-orphan")),
'topics':relationship(TopicAssociation, lazy='joined',
cascade="all, delete-orphan",
backref='post')
}, order_by=[desc(posts_with_ccount.c.datetime)])
# comment mapper. This mapper is handling a hierarchical relationship on
# itself, and contains a lazy reference both to its parent comment and its
# list of child comments.
mapper(Comment, tables.comments, properties={
'id':tables.comments.c.comment_id,
'post':relationship(Post, lazy='select',
backref=backref('comments',
cascade="all, delete-orphan")),
'user':relationship(user.User, lazy='joined',
backref=backref('comments',
cascade="all, delete-orphan")),
'parent':relationship(Comment,
primaryjoin=(tables.comments.c.parent_comment_id ==
tables.comments.c.comment_id),
foreign_keys=[tables.comments.c.comment_id],
lazy='select', uselist=False),
'replies':relationship(Comment,
primaryjoin=(tables.comments.c.parent_comment_id ==
tables.comments.c.comment_id),
lazy='select', uselist=True, cascade="all"),
})
# we define one special find-by for the comments of a post, which is going to
# make its own "noload" mapper and organize the comments into their correct
# hierarchy in one pass. hierarchical data normally needs to be loaded by
# separate queries for each set of children, unless you use a proprietary
# extension like CONNECT BY.
def find_by_post(post):
"""returns a hierarchical collection of comments based on a given criterion.
Uses a mapper that does not lazy load replies or parents, and instead
organizes comments into a hierarchical tree when the result is produced.
"""
q = session().query(Comment).options(noload('replies'), noload('parent'))
comments = q.select_by(post_id=post.id)
result = []
d = {}
for c in comments:
d[c.id] = c
if c.parent_comment_id is None:
result.append(c)
c.parent=None
else:
parent = d[c.parent_comment_id]
parent.replies.append(c)
c.parent = parent
return result
Comment.find_by_post = staticmethod(find_by_post)
def start_session():
"""creates a new session for the start of a request."""
trans.session = create_session(bind_to=zblog.database.engine )
def session():
return trans.session
-53
View File
@@ -1,53 +0,0 @@
"""application table metadata objects are described here."""
from sqlalchemy import *
from test.lib.schema import Table, Column
metadata = MetaData()
users = Table('users', metadata,
Column('user_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_name', String(30), nullable=False),
Column('fullname', String(100), nullable=False),
Column('password', String(40), nullable=False),
Column('groupname', String(20), nullable=False),
)
blogs = Table('blogs', metadata,
Column('blog_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('name', String(100), nullable=False),
Column('description', String(500))
)
posts = Table('posts', metadata,
Column('post_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('headline', String(500)),
Column('summary', String(255)),
Column('body', Text),
)
topics = Table('topics', metadata,
Column('topic_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('keyword', String(50), nullable=False),
Column('description', String(500))
)
topic_xref = Table('topic_post_xref', metadata,
Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
Column('is_primary', Boolean, nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
)
comments = Table('comments', metadata,
Column('comment_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
Column('subject', String(500)),
Column('body', Text),
)
-116
View File
@@ -1,116 +0,0 @@
from sqlalchemy import *
from sqlalchemy.orm import *
from test.lib import *
from test.zblog import mappers, tables
from test.zblog.user import *
from test.zblog.blog import *
class ZBlogTest(TestBase, AssertsExecutionResults):
@classmethod
def create_tables(cls):
tables.metadata.drop_all(bind=testing.db)
tables.metadata.create_all(bind=testing.db)
@classmethod
def drop_tables(cls):
tables.metadata.drop_all(bind=testing.db)
@classmethod
def setup_class(cls):
cls.create_tables()
@classmethod
def teardown_class(cls):
cls.drop_tables()
def teardown(self):
pass
def setup(self):
pass
class SavePostTest(ZBlogTest):
@classmethod
def setup_class(cls):
super(SavePostTest, cls).setup_class()
mappers.zblog_mappers()
global blog_id, user_id
s = create_session(bind=testing.db)
user = User('zbloguser', "Zblog User", "hello", group=administrator)
blog = Blog(owner=user)
blog.name = "this is a blog"
s.add(user)
s.add(blog)
s.flush()
blog_id = blog.id
user_id = user.id
s.close()
@classmethod
def teardown_class(cls):
clear_mappers()
super(SavePostTest, cls).teardown_class()
def test_attach_noautoflush(self):
"""Test pending backref behavior."""
s = create_session(bind=testing.db, autoflush=False)
s.begin()
try:
blog = s.query(Blog).get(blog_id)
post = Post(headline="asdf asdf", summary="asdfasfd")
s.add(post)
post.blog_id=blog_id
post.blog = blog
assert post in blog.posts
finally:
s.rollback()
def test_attach_autoflush(self):
s = create_session(bind=testing.db, autoflush=True)
s.begin()
try:
blog = s.query(Blog).get(blog_id)
user = s.query(User).get(user_id)
post = Post(headline="asdf asdf", summary="asdfasfd", user=user)
s.add(post)
post.blog_id=blog_id
post.blog = blog
assert post in blog.posts
finally:
s.rollback()
def testoptimisticorphans(self):
"""test that instances in the session with un-loaded parents will not
get marked as "orphans" and then deleted """
s = create_session(bind=testing.db)
s.begin()
try:
blog = s.query(Blog).get(blog_id)
post = Post(headline="asdf asdf", summary="asdfasfd")
post.blog = blog
user = s.query(User).get(user_id)
post.user = user
s.add(post)
s.flush()
s.expunge_all()
user = s.query(User).get(user_id)
blog = s.query(Blog).get(blog_id)
post = blog.posts[0]
comment = Comment(subject="some subject", body="some body")
comment.post = post
comment.user = user
s.flush()
s.expunge_all()
assert s.query(Post).get(post.id) is not None
finally:
s.rollback()
-41
View File
@@ -1,41 +0,0 @@
"""user.py - handles user login and validation"""
import random, string
import sys
if sys.version_info < (2, 5):
from sha import sha
else:
from hashlib import sha1 as sha
administrator = 'admin'
user = 'user'
groups = [user, administrator]
def cryptpw(password, salt=None):
if salt is None:
salt = "".join([chr(random.randint(ord('a'), ord('z'))),
chr(random.randint(ord('a'), ord('z')))])
return sha((password+ salt).encode('ascii')).hexdigest()
def checkpw(password, dbpw):
return cryptpw(password, dbpw[:2]) == dbpw
class User(object):
def __init__(self, name=None, fullname=None, password=None, group=user):
self.name = name
self.fullname = fullname
self.password = password
self.group = group
def is_administrator(self):
return self.group == administrator
def _set_password(self, password):
if password:
self.crypt_password=cryptpw(password)
password = property(lambda s: None, _set_password)
def checkpw(self, password):
return checkpw(password, self.crypt_password)