mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-15 13:17:24 -04:00
- docstring on polymorphic_mapping
- applied Simon Wittber's ActiveMapper version_id_col patch for [ticket:348]
This commit is contained in:
@@ -206,7 +206,9 @@ class ActiveMapperMeta(type):
|
||||
autoload = False
|
||||
_metadata = getattr(sys.modules[cls.__module__],
|
||||
"__metadata__", metadata)
|
||||
|
||||
version_id_col = None
|
||||
version_id_col_object = None
|
||||
|
||||
if 'mapping' in dict:
|
||||
found_pk = False
|
||||
|
||||
@@ -223,7 +225,10 @@ class ActiveMapperMeta(type):
|
||||
if '__autoload__' == name:
|
||||
autoload = True
|
||||
continue
|
||||
|
||||
|
||||
if '__version_id_col__' == name:
|
||||
version_id_col = value
|
||||
|
||||
if name.startswith('__'): continue
|
||||
|
||||
if isinstance(value, column):
|
||||
@@ -263,12 +268,16 @@ class ActiveMapperMeta(type):
|
||||
cls.columns = cls.table._columns
|
||||
|
||||
# check for inheritence
|
||||
if version_id_col is not None:
|
||||
version_id_col_object = getattr(cls.table.c, version_id_col, None)
|
||||
assert(version_id_col_object is not None, "version_id_col (%s) does not exist." % version_id_col)
|
||||
|
||||
if hasattr(bases[0], "mapping"):
|
||||
cls._base_mapper= bases[0].mapper
|
||||
assign_mapper(objectstore.context, cls, cls.table,
|
||||
inherits=cls._base_mapper)
|
||||
inherits=cls._base_mapper, version_id_col=version_id_col_object)
|
||||
else:
|
||||
assign_mapper(objectstore.context, cls, cls.table)
|
||||
assign_mapper(objectstore.context, cls, cls.table, version_id_col=version_id_col_object)
|
||||
cls.relations = relations
|
||||
ActiveMapperMeta.classes[clsname] = cls
|
||||
|
||||
|
||||
@@ -23,6 +23,9 @@ class CascadeOptions(object):
|
||||
|
||||
|
||||
def polymorphic_union(table_map, typecolname, aliasname='p_union'):
|
||||
"""create a UNION statement used by a polymorphic mapper.
|
||||
|
||||
See the SQLAlchemy advanced mapping docs for an example of how this is used."""
|
||||
colnames = util.Set()
|
||||
colnamemaps = {}
|
||||
types = {}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import testbase
|
||||
from sqlalchemy.ext.activemapper import ActiveMapper, column, one_to_many, one_to_one, many_to_many, objectstore
|
||||
from sqlalchemy import and_, or_, clear_mappers, backref
|
||||
from sqlalchemy import and_, or_, clear_mappers, backref, create_session, exceptions
|
||||
from sqlalchemy import ForeignKey, String, Integer, DateTime, Table, Column
|
||||
from datetime import datetime
|
||||
import sqlalchemy
|
||||
@@ -14,6 +14,7 @@ class testcase(testbase.PersistTest):
|
||||
|
||||
class Person(ActiveMapper):
|
||||
class mapping:
|
||||
__version_id_col__ = 'row_version'
|
||||
full_name = column(String)
|
||||
first_name = column(String)
|
||||
middle_name = column(String)
|
||||
@@ -24,6 +25,7 @@ class testcase(testbase.PersistTest):
|
||||
home_phone = column(String)
|
||||
cell_phone = column(String)
|
||||
work_phone = column(String)
|
||||
row_version = column(Integer, default=0)
|
||||
prefs_id = column(Integer, foreign_key=ForeignKey('preferences.id'))
|
||||
addresses = one_to_many('Address', colname='person_id', backref='person', order_by=['state', 'city', 'postal_code'])
|
||||
preferences = one_to_one('Preferences', colname='pref_id', backref='person')
|
||||
@@ -137,7 +139,41 @@ class testcase(testbase.PersistTest):
|
||||
self.assertEquals(person.id, p1.id)
|
||||
self.assertEquals(len(person.addresses), 2)
|
||||
self.assertEquals(person.addresses[0].postal_code, '30338')
|
||||
|
||||
|
||||
@testbase.unsupported('mysql')
|
||||
def test_update(self):
|
||||
p1 = self.create_person_one()
|
||||
objectstore.flush()
|
||||
objectstore.clear()
|
||||
|
||||
person = Person.select()[0]
|
||||
person.gender = 'F'
|
||||
objectstore.flush()
|
||||
objectstore.clear()
|
||||
self.assertEquals(person.row_version, 2)
|
||||
|
||||
person = Person.select()[0]
|
||||
person.gender = 'M'
|
||||
objectstore.flush()
|
||||
objectstore.clear()
|
||||
self.assertEquals(person.row_version, 3)
|
||||
|
||||
#TODO: check that a concurrent modification raises exception
|
||||
p1 = Person.select()[0]
|
||||
s1 = objectstore.session
|
||||
s2 = create_session()
|
||||
objectstore.context.current = s2
|
||||
p2 = Person.select()[0]
|
||||
p1.first_name = "jack"
|
||||
p2.first_name = "ed"
|
||||
objectstore.flush()
|
||||
try:
|
||||
objectstore.context.current = s1
|
||||
objectstore.flush()
|
||||
assert False
|
||||
except exceptions.ConcurrentModificationError:
|
||||
pass
|
||||
|
||||
|
||||
def test_delete(self):
|
||||
p1 = self.create_person_one()
|
||||
|
||||
Reference in New Issue
Block a user