mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-17 22:22:13 -04:00
abcb9dc273
Fixed regression where the :func:`_orm.merge_frozen_result` function relied upon by the dogpile.caching example was not included in tests and began failing due to incorrect internal arguments. Fixes: #6211 Change-Id: I0b53d0f569c817994ad4827a3ddb1626fd2d082f
221 lines
6.5 KiB
Python
221 lines
6.5 KiB
Python
from sqlalchemy import exc
|
|
from sqlalchemy import select
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.engine import result_tuple
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.orm import loading
|
|
from sqlalchemy.orm import mapper
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.testing import mock
|
|
from sqlalchemy.testing.assertions import assert_raises
|
|
from sqlalchemy.testing.assertions import assert_raises_message
|
|
from sqlalchemy.testing.assertions import eq_
|
|
from sqlalchemy.testing.fixtures import fixture_session
|
|
from . import _fixtures
|
|
|
|
# class GetFromIdentityTest(_fixtures.FixtureTest):
|
|
# class LoadOnIdentTest(_fixtures.FixtureTest):
|
|
|
|
|
|
class InstanceProcessorTest(_fixtures.FixtureTest):
|
|
def test_state_no_load_path_comparison(self):
|
|
# test issue #5110
|
|
User, Order, Address = self.classes("User", "Order", "Address")
|
|
users, orders, addresses = self.tables("users", "orders", "addresses")
|
|
|
|
mapper(
|
|
User,
|
|
users,
|
|
properties={
|
|
"addresses": relationship(Address, lazy="joined"),
|
|
"orders": relationship(
|
|
Order, lazy="joined", order_by=orders.c.id
|
|
),
|
|
},
|
|
)
|
|
mapper(
|
|
Order,
|
|
orders,
|
|
properties={"address": relationship(Address, lazy="joined")},
|
|
)
|
|
mapper(Address, addresses)
|
|
|
|
s = fixture_session()
|
|
|
|
def go():
|
|
eq_(
|
|
User(
|
|
id=7,
|
|
orders=[
|
|
Order(id=1, address=Address(id=1)),
|
|
Order(id=3, address=Address(id=1)),
|
|
Order(id=5, address=None),
|
|
],
|
|
),
|
|
s.query(User).populate_existing().get(7),
|
|
)
|
|
|
|
self.assert_sql_count(testing.db, go, 1)
|
|
|
|
|
|
class InstancesTest(_fixtures.FixtureTest):
|
|
run_setup_mappers = "once"
|
|
run_inserts = "once"
|
|
run_deletes = None
|
|
|
|
@classmethod
|
|
def setup_mappers(cls):
|
|
cls._setup_stock_mapping()
|
|
|
|
def test_cursor_close_w_failed_rowproc(self):
|
|
User = self.classes.User
|
|
s = fixture_session()
|
|
|
|
q = s.query(User)
|
|
|
|
ctx = q._compile_context()
|
|
cursor = mock.Mock()
|
|
ctx.compile_state._entities = [
|
|
mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
|
|
]
|
|
assert_raises(Exception, loading.instances, cursor, ctx)
|
|
assert cursor.close.called, "Cursor wasn't closed"
|
|
|
|
def test_row_proc_not_created(self):
|
|
User = self.classes.User
|
|
s = fixture_session()
|
|
|
|
q = s.query(User.id, User.name)
|
|
stmt = select(User.id)
|
|
|
|
assert_raises_message(
|
|
exc.NoSuchColumnError,
|
|
"Could not locate column in row for column 'users.name'",
|
|
q.from_statement(stmt).all,
|
|
)
|
|
|
|
|
|
class MergeResultTest(_fixtures.FixtureTest):
|
|
run_setup_mappers = "once"
|
|
run_inserts = "once"
|
|
run_deletes = None
|
|
|
|
@classmethod
|
|
def setup_mappers(cls):
|
|
cls._setup_stock_mapping()
|
|
|
|
def _fixture(self):
|
|
User = self.classes.User
|
|
|
|
s = fixture_session()
|
|
u1, u2, u3, u4 = (
|
|
User(id=1, name="u1"),
|
|
User(id=2, name="u2"),
|
|
User(id=7, name="u3"),
|
|
User(id=8, name="u4"),
|
|
)
|
|
s.query(User).filter(User.id.in_([7, 8])).all()
|
|
s.close()
|
|
return s, [u1, u2, u3, u4]
|
|
|
|
def test_single_entity(self):
|
|
s, (u1, u2, u3, u4) = self._fixture()
|
|
User = self.classes.User
|
|
|
|
q = s.query(User)
|
|
collection = [u1, u2, u3, u4]
|
|
it = loading.merge_result(q, collection)
|
|
eq_([x.id for x in it], [1, 2, 7, 8])
|
|
|
|
def test_single_entity_frozen(self):
|
|
s = fixture_session()
|
|
User = self.classes.User
|
|
|
|
stmt = select(User).where(User.id.in_([7, 8, 9])).order_by(User.id)
|
|
result = s.execute(stmt)
|
|
it = loading.merge_frozen_result(s, stmt, result.freeze())
|
|
eq_([x.id for x in it().scalars()], [7, 8, 9])
|
|
|
|
def test_single_column(self):
|
|
User = self.classes.User
|
|
|
|
s = fixture_session()
|
|
|
|
q = s.query(User.id)
|
|
collection = [(1,), (2,), (7,), (8,)]
|
|
it = loading.merge_result(q, collection)
|
|
eq_(list(it), [(1,), (2,), (7,), (8,)])
|
|
|
|
def test_single_column_frozen(self):
|
|
User = self.classes.User
|
|
|
|
s = fixture_session()
|
|
|
|
stmt = select(User.id).where(User.id.in_([7, 8, 9])).order_by(User.id)
|
|
result = s.execute(stmt)
|
|
it = loading.merge_frozen_result(s, stmt, result.freeze())
|
|
eq_([x.id for x in it()], [7, 8, 9])
|
|
|
|
def test_entity_col_mix_plain_tuple(self):
|
|
s, (u1, u2, u3, u4) = self._fixture()
|
|
User = self.classes.User
|
|
|
|
q = s.query(User, User.id)
|
|
collection = [(u1, 1), (u2, 2), (u3, 7), (u4, 8)]
|
|
it = loading.merge_result(q, collection)
|
|
it = list(it)
|
|
eq_([(x.id, y) for x, y in it], [(1, 1), (2, 2), (7, 7), (8, 8)])
|
|
eq_(list(it[0]._mapping.keys()), ["User", "id"])
|
|
|
|
def test_entity_col_mix_plain_tuple_frozen(self):
|
|
s = fixture_session()
|
|
User = self.classes.User
|
|
|
|
stmt = (
|
|
select(User, User.id)
|
|
.where(User.id.in_([7, 8, 9]))
|
|
.order_by(User.id)
|
|
)
|
|
result = s.execute(stmt)
|
|
|
|
it = loading.merge_frozen_result(s, stmt, result.freeze())
|
|
it = list(it())
|
|
eq_([(x.id, y) for x, y in it], [(7, 7), (8, 8), (9, 9)])
|
|
eq_(list(it[0]._mapping.keys()), ["User", "id"])
|
|
|
|
def test_entity_col_mix_keyed_tuple(self):
|
|
s, (u1, u2, u3, u4) = self._fixture()
|
|
User = self.classes.User
|
|
|
|
q = s.query(User, User.id)
|
|
|
|
row = result_tuple(["User", "id"])
|
|
|
|
def kt(*x):
|
|
return row(x)
|
|
|
|
collection = [kt(u1, 1), kt(u2, 2), kt(u3, 7), kt(u4, 8)]
|
|
it = loading.merge_result(q, collection)
|
|
it = list(it)
|
|
eq_([(x.id, y) for x, y in it], [(1, 1), (2, 2), (7, 7), (8, 8)])
|
|
eq_(list(it[0]._mapping.keys()), ["User", "id"])
|
|
|
|
def test_none_entity(self):
|
|
s, (u1, u2, u3, u4) = self._fixture()
|
|
User = self.classes.User
|
|
|
|
ua = aliased(User)
|
|
q = s.query(User, ua)
|
|
|
|
row = result_tuple(["User", "useralias"])
|
|
|
|
def kt(*x):
|
|
return row(x)
|
|
|
|
collection = [kt(u1, u2), kt(u1, None), kt(u2, u3)]
|
|
it = loading.merge_result(q, collection)
|
|
eq_(
|
|
[(x and x.id or None, y and y.id or None) for x, y in it],
|
|
[(u1.id, u2.id), (u1.id, None), (u2.id, u3.id)],
|
|
)
|