mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-06-07 00:01:12 -04:00
Merge branch 'master' into rel_0_9
This commit is contained in:
Vendored
+5
-8
@@ -60,14 +60,11 @@
|
||||
|
||||
The :func:`.cast` function, when given a plain literal value,
|
||||
will now apply the given type to the given literal value on the
|
||||
bind parameter side according
|
||||
to the type given to the cast. This essentially replaces what would
|
||||
normally be the detected type of the literal value. This only
|
||||
takes effect if the auto-detected type of the literal value is either
|
||||
"nulltype" (e.g. couldn't detect)
|
||||
or a type that is of the same "affinity" as the cast type.
|
||||
The net change here is that the :func:`.cast` function includes more
|
||||
of the functionality already present in the :func:`.type_coerce` function.
|
||||
bind parameter side according to the type given to the cast,
|
||||
in the same manner as that of the :func:`.type_coerce` function.
|
||||
However unlike :func:`.type_coerce`, this only takes effect if a
|
||||
non-clauseelement value is passed to :func:`.cast`; an existing typed
|
||||
construct will retain its type.
|
||||
|
||||
.. change::
|
||||
:tags: bug, postgresql
|
||||
|
||||
@@ -1766,14 +1766,7 @@ class Cast(ColumnElement):
|
||||
|
||||
"""
|
||||
self.type = type_api.to_instance(type_)
|
||||
self.clause = _literal_as_binds(expression, None)
|
||||
if isinstance(self.clause, BindParameter) and (
|
||||
self.clause.type._isnull
|
||||
or self.clause.type._type_affinity is self.type._type_affinity
|
||||
):
|
||||
self.clause = self.clause._clone()
|
||||
self.clause.type = self.type
|
||||
|
||||
self.clause = _literal_as_binds(expression, type_=self.type)
|
||||
self.typeclause = TypeClause(self.type)
|
||||
|
||||
def _copy_internals(self, clone=_clone, **kw):
|
||||
@@ -2785,7 +2778,6 @@ def _only_column_elements(element, name):
|
||||
"'%s'; got: '%s', type %s" % (name, element, type(element)))
|
||||
return element
|
||||
|
||||
|
||||
def _literal_as_binds(element, name=None, type_=None):
|
||||
if hasattr(element, '__clause_element__'):
|
||||
return element.__clause_element__()
|
||||
|
||||
@@ -82,6 +82,10 @@ class InsertBehaviorTest(fixtures.TablesTest):
|
||||
test_needs_autoincrement=True),
|
||||
Column('data', String(50))
|
||||
)
|
||||
Table('manual_pk', metadata,
|
||||
Column('id', Integer, primary_key=True, autoincrement=False),
|
||||
Column('data', String(50))
|
||||
)
|
||||
|
||||
def test_autoclose_on_insert(self):
|
||||
if requirements.returning.enabled:
|
||||
@@ -124,13 +128,13 @@ class InsertBehaviorTest(fixtures.TablesTest):
|
||||
|
||||
@requirements.insert_from_select
|
||||
def test_insert_from_select(self):
|
||||
table = self.tables.autoinc_pk
|
||||
table = self.tables.manual_pk
|
||||
config.db.execute(
|
||||
table.insert(),
|
||||
[
|
||||
dict(data="data1"),
|
||||
dict(data="data2"),
|
||||
dict(data="data3"),
|
||||
dict(id=1, data="data1"),
|
||||
dict(id=2, data="data2"),
|
||||
dict(id=3, data="data3"),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
+208
-104
@@ -404,110 +404,6 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
|
||||
eq_(a.foo, 'foo')
|
||||
eq_(a.dialect_specific_args['bar'], 'bar')
|
||||
|
||||
@testing.provide_metadata
|
||||
def test_type_coerce(self):
|
||||
"""test ad-hoc usage of custom types with type_coerce()."""
|
||||
|
||||
self._test_type_coerce_cast(type_coerce)
|
||||
|
||||
@testing.provide_metadata
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_cast(self):
|
||||
"""test ad-hoc usage of custom types with cast()."""
|
||||
|
||||
self._test_type_coerce_cast(cast)
|
||||
|
||||
def _test_type_coerce_cast(self, coerce_fn):
|
||||
metadata = self.metadata
|
||||
class MyType(types.TypeDecorator):
|
||||
impl = String
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
return util.text_type(value)[0:-8]
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
return value + "BIND_OUT"
|
||||
|
||||
t = Table('t', metadata, Column('data', String(50)))
|
||||
metadata.create_all()
|
||||
|
||||
t.insert().values(data=coerce_fn('d1BIND_OUT', MyType)).execute()
|
||||
|
||||
eq_(
|
||||
select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
|
||||
[('d1BIND_OUT', )]
|
||||
)
|
||||
|
||||
# test coerce from nulltype - e.g. use an object that
|
||||
# doens't match to a known type
|
||||
class MyObj(object):
|
||||
def __str__(self):
|
||||
return "THISISMYOBJ"
|
||||
|
||||
eq_(
|
||||
testing.db.execute(
|
||||
select([coerce_fn(MyObj(), MyType)])
|
||||
).fetchall(),
|
||||
[('THIBIND_OUT',)]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
|
||||
[('d1', 'd1BIND_OUT')]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).
|
||||
alias().select().execute().fetchall(),
|
||||
[('d1', 'd1BIND_OUT')]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(coerce_fn(t.c.data, MyType) == 'd1BIND_OUT').\
|
||||
execute().fetchall(),
|
||||
[('d1', 'd1BIND_OUT')]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(t.c.data == coerce_fn('d1BIND_OUT', MyType)).\
|
||||
execute().fetchall(),
|
||||
[('d1', 'd1BIND_OUT')]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(t.c.data == coerce_fn(None, MyType)).\
|
||||
execute().fetchall(),
|
||||
[]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(coerce_fn(t.c.data, MyType) == None).\
|
||||
execute().fetchall(),
|
||||
[]
|
||||
)
|
||||
|
||||
eq_(
|
||||
testing.db.scalar(
|
||||
select([coerce_fn(literal('d1BIND_OUT'), MyType)])
|
||||
),
|
||||
'd1BIND_OUT'
|
||||
)
|
||||
|
||||
class MyFoob(object):
|
||||
def __clause_element__(self):
|
||||
return t.c.data
|
||||
|
||||
eq_(
|
||||
testing.db.execute(
|
||||
select([t.c.data, coerce_fn(MyFoob(), MyType)])
|
||||
).fetchall(),
|
||||
[('d1', 'd1BIND_OUT')]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
@@ -608,6 +504,214 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
|
||||
Column('goofy9', MyNewIntSubClass, nullable=False),
|
||||
)
|
||||
|
||||
class TypeCoerceCastTest(fixtures.TablesTest):
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
class MyType(types.TypeDecorator):
|
||||
impl = String
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
return "BIND_IN" + str(value)
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
return value + "BIND_OUT"
|
||||
|
||||
cls.MyType = MyType
|
||||
|
||||
Table('t', metadata,
|
||||
Column('data', String(50))
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_insert_round_trip_cast(self):
|
||||
self._test_insert_round_trip(cast)
|
||||
|
||||
def test_insert_round_trip_type_coerce(self):
|
||||
self._test_insert_round_trip(type_coerce)
|
||||
|
||||
def _test_insert_round_trip(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
t = self.tables.t
|
||||
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
|
||||
eq_(
|
||||
select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
|
||||
[('BIND_INd1BIND_OUT', )]
|
||||
)
|
||||
|
||||
def test_coerce_from_nulltype_cast(self):
|
||||
self._test_coerce_from_nulltype(cast)
|
||||
|
||||
def test_coerce_from_nulltype_type_coerce(self):
|
||||
self._test_coerce_from_nulltype(type_coerce)
|
||||
|
||||
def _test_coerce_from_nulltype(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
|
||||
# test coerce from nulltype - e.g. use an object that
|
||||
# doens't match to a known type
|
||||
class MyObj(object):
|
||||
def __str__(self):
|
||||
return "THISISMYOBJ"
|
||||
|
||||
eq_(
|
||||
testing.db.execute(
|
||||
select([coerce_fn(MyObj(), MyType)])
|
||||
).fetchall(),
|
||||
[('BIND_INTHISISMYOBJBIND_OUT',)]
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_vs_non_coerced_cast(self):
|
||||
self._test_vs_non_coerced(cast)
|
||||
|
||||
def test_vs_non_coerced_type_coerce(self):
|
||||
self._test_vs_non_coerced(type_coerce)
|
||||
|
||||
def _test_vs_non_coerced(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
t = self.tables.t
|
||||
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
|
||||
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_vs_non_coerced_alias_cast(self):
|
||||
self._test_vs_non_coerced_alias(cast)
|
||||
|
||||
def test_vs_non_coerced_alias_type_coerce(self):
|
||||
self._test_vs_non_coerced_alias(type_coerce)
|
||||
|
||||
def _test_vs_non_coerced_alias(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
t = self.tables.t
|
||||
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).
|
||||
alias().select().execute().fetchall(),
|
||||
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_vs_non_coerced_where_cast(self):
|
||||
self._test_vs_non_coerced_where(cast)
|
||||
|
||||
def test_vs_non_coerced_where_type_coerce(self):
|
||||
self._test_vs_non_coerced_where(type_coerce)
|
||||
|
||||
def _test_vs_non_coerced_where(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
|
||||
t = self.tables.t
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
|
||||
# coerce on left side
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(coerce_fn(t.c.data, MyType) == 'd1').\
|
||||
execute().fetchall(),
|
||||
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
|
||||
)
|
||||
|
||||
# coerce on right side
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(t.c.data == coerce_fn('d1', MyType)).\
|
||||
execute().fetchall(),
|
||||
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_coerce_none_cast(self):
|
||||
self._test_coerce_none(cast)
|
||||
|
||||
def test_coerce_none_type_coerce(self):
|
||||
self._test_coerce_none(type_coerce)
|
||||
|
||||
def _test_coerce_none(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
|
||||
t = self.tables.t
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(t.c.data == coerce_fn(None, MyType)).\
|
||||
execute().fetchall(),
|
||||
[]
|
||||
)
|
||||
|
||||
eq_(
|
||||
select([t.c.data, coerce_fn(t.c.data, MyType)]).\
|
||||
where(coerce_fn(t.c.data, MyType) == None).\
|
||||
execute().fetchall(),
|
||||
[]
|
||||
)
|
||||
|
||||
@testing.fails_on("oracle",
|
||||
"oracle doesn't like CAST in the VALUES of an INSERT")
|
||||
def test_resolve_clause_element_cast(self):
|
||||
self._test_resolve_clause_element(cast)
|
||||
|
||||
def test_resolve_clause_element_type_coerce(self):
|
||||
self._test_resolve_clause_element(type_coerce)
|
||||
|
||||
def _test_resolve_clause_element(self, coerce_fn):
|
||||
MyType = self.MyType
|
||||
|
||||
t = self.tables.t
|
||||
t.insert().values(data=coerce_fn('d1', MyType)).execute()
|
||||
|
||||
class MyFoob(object):
|
||||
def __clause_element__(self):
|
||||
return t.c.data
|
||||
|
||||
eq_(
|
||||
testing.db.execute(
|
||||
select([t.c.data, coerce_fn(MyFoob(), MyType)])
|
||||
).fetchall(),
|
||||
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
|
||||
)
|
||||
|
||||
def test_cast_existing_typed(self):
|
||||
MyType = self.MyType
|
||||
coerce_fn = cast
|
||||
|
||||
# when cast() is given an already typed value,
|
||||
# the type does not take effect on the value itself.
|
||||
eq_(
|
||||
testing.db.scalar(
|
||||
select([coerce_fn(literal('d1'), MyType)])
|
||||
),
|
||||
'd1BIND_OUT'
|
||||
)
|
||||
|
||||
def test_type_coerce_existing_typed(self):
|
||||
MyType = self.MyType
|
||||
coerce_fn = type_coerce
|
||||
# type_coerce does upgrade the given expression to the
|
||||
# given type.
|
||||
eq_(
|
||||
testing.db.scalar(
|
||||
select([coerce_fn(literal('d1'), MyType)])
|
||||
),
|
||||
'BIND_INd1BIND_OUT'
|
||||
)
|
||||
|
||||
|
||||
|
||||
class VariantTest(fixtures.TestBase, AssertsCompiledSQL):
|
||||
def setup(self):
|
||||
class UTypeOne(types.UserDefinedType):
|
||||
|
||||
Reference in New Issue
Block a user