mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-07 01:10:52 -04:00
058c230cea
This change could be added to .git-blame-ignore-revs Change-Id: I7ba10052b26bc3c178d23fb50a1123d0aae965ca
380 lines
11 KiB
Python
380 lines
11 KiB
Python
import sqlalchemy
|
|
from sqlalchemy import Column
|
|
from sqlalchemy import Enum
|
|
from sqlalchemy import ForeignKey
|
|
from sqlalchemy import inspect
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import select
|
|
from sqlalchemy import String
|
|
from sqlalchemy import Table
|
|
from sqlalchemy import testing
|
|
from sqlalchemy.ext.declarative import ConcreteBase
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.orm import join as ormjoin
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import profiling
|
|
from sqlalchemy.util import classproperty
|
|
|
|
|
|
class EnumTest(fixtures.TestBase):
|
|
__requires__ = ("cpython", "python_profiling_backend")
|
|
|
|
def setup_test(self):
|
|
class SomeEnum:
|
|
# Implements PEP 435 in the minimal fashion needed by SQLAlchemy
|
|
|
|
_members = {}
|
|
|
|
@classproperty
|
|
def __members__(cls):
|
|
"""simulate a very expensive ``__members__`` getter"""
|
|
for i in range(10):
|
|
x = {}
|
|
x.update({k: v for k, v in cls._members.items()}.copy())
|
|
return x.copy()
|
|
|
|
def __init__(self, name, value):
|
|
self.name = name
|
|
self.value = value
|
|
self._members[name] = self
|
|
setattr(self.__class__, name, self)
|
|
|
|
for i in range(400):
|
|
SomeEnum("some%d" % i, i)
|
|
|
|
self.SomeEnum = SomeEnum
|
|
|
|
@profiling.function_call_count()
|
|
def test_create_enum_from_pep_435_w_expensive_members(self):
|
|
Enum(self.SomeEnum, omit_aliases=False)
|
|
|
|
|
|
class CacheKeyTest(fixtures.TestBase):
|
|
__requires__ = ("cpython", "python_profiling_backend")
|
|
|
|
@testing.fixture(scope="class")
|
|
def mapping_fixture(self):
|
|
# note in order to work nicely with "fixture" we are emerging
|
|
# a whole new model of setup/teardown, since pytest "fixture"
|
|
# sort of purposely works badly with setup/teardown
|
|
|
|
registry = sqlalchemy.orm.registry()
|
|
|
|
metadata = MetaData()
|
|
parent = Table(
|
|
"parent",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("data", String(20)),
|
|
)
|
|
child = Table(
|
|
"child",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("data", String(20)),
|
|
Column(
|
|
"parent_id", Integer, ForeignKey("parent.id"), nullable=False
|
|
),
|
|
)
|
|
|
|
class Parent(testing.entities.BasicEntity):
|
|
pass
|
|
|
|
class Child(testing.entities.BasicEntity):
|
|
pass
|
|
|
|
registry.map_imperatively(
|
|
Parent,
|
|
parent,
|
|
properties={"children": relationship(Child, backref="parent")},
|
|
)
|
|
registry.map_imperatively(Child, child)
|
|
|
|
registry.configure()
|
|
|
|
yield Parent, Child
|
|
|
|
registry.dispose()
|
|
|
|
@testing.fixture(scope="function")
|
|
def stmt_fixture_one(self, mapping_fixture):
|
|
Parent, Child = mapping_fixture
|
|
|
|
return [
|
|
(
|
|
select(Parent.id, Child.id)
|
|
.select_from(ormjoin(Parent, Child, Parent.children))
|
|
.where(Child.id == 5)
|
|
)
|
|
for i in range(100)
|
|
]
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=2)
|
|
def test_statement_key_is_cached(self, stmt_fixture_one):
|
|
current_key = None
|
|
for stmt in stmt_fixture_one:
|
|
key = stmt._generate_cache_key()
|
|
assert key is not None
|
|
if current_key:
|
|
eq_(key, current_key)
|
|
else:
|
|
current_key = key
|
|
|
|
def test_statement_key_is_not_cached(
|
|
self, stmt_fixture_one, mapping_fixture
|
|
):
|
|
Parent, Child = mapping_fixture
|
|
|
|
# run a totally different statement so that everything cache
|
|
# related not specific to the statement is warmed up
|
|
some_other_statement = (
|
|
select(Parent.id, Child.id)
|
|
.join_from(Parent, Child, Parent.children)
|
|
.where(Parent.id == 5)
|
|
)
|
|
some_other_statement._generate_cache_key()
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=0)
|
|
def go():
|
|
current_key = None
|
|
for stmt in stmt_fixture_one:
|
|
key = stmt._generate_cache_key()
|
|
assert key is not None
|
|
if current_key:
|
|
eq_(key, current_key)
|
|
else:
|
|
current_key = key
|
|
|
|
go()
|
|
|
|
|
|
class CCLookupTest(fixtures.RemoveORMEventsGlobally, fixtures.TestBase):
|
|
__requires__ = ("cpython", "python_profiling_backend")
|
|
|
|
@testing.fixture
|
|
def t1(self, metadata):
|
|
return Table(
|
|
"t1",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("x1", Integer),
|
|
Column("x2", Integer),
|
|
Column("x3", Integer),
|
|
Column("x4", Integer),
|
|
Column("x5", Integer),
|
|
Column("x6", Integer),
|
|
Column("x7", Integer),
|
|
Column("x8", Integer),
|
|
Column("x9", Integer),
|
|
Column("x10", Integer),
|
|
)
|
|
|
|
@testing.fixture
|
|
def inheritance_model(self, decl_base):
|
|
class Employee(ConcreteBase, decl_base):
|
|
__tablename__ = "employee"
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(50))
|
|
|
|
x1 = Column(Integer)
|
|
x2 = Column(Integer)
|
|
x3 = Column(Integer)
|
|
x4 = Column(Integer)
|
|
x5 = Column(Integer)
|
|
x6 = Column(Integer)
|
|
x7 = Column(Integer)
|
|
x8 = Column(Integer)
|
|
x9 = Column(Integer)
|
|
x10 = Column(Integer)
|
|
x11 = Column(Integer)
|
|
x12 = Column(Integer)
|
|
x13 = Column(Integer)
|
|
x14 = Column(Integer)
|
|
x15 = Column(Integer)
|
|
x16 = Column(Integer)
|
|
|
|
__mapper_args__ = {
|
|
"polymorphic_identity": "employee",
|
|
"concrete": True,
|
|
}
|
|
|
|
class Manager(Employee):
|
|
__tablename__ = "manager"
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(50))
|
|
manager_data = Column(String(40))
|
|
|
|
m1 = Column(Integer)
|
|
m2 = Column(Integer)
|
|
m3 = Column(Integer)
|
|
m4 = Column(Integer)
|
|
m5 = Column(Integer)
|
|
m6 = Column(Integer)
|
|
m7 = Column(Integer)
|
|
m8 = Column(Integer)
|
|
m9 = Column(Integer)
|
|
m10 = Column(Integer)
|
|
m11 = Column(Integer)
|
|
m12 = Column(Integer)
|
|
m13 = Column(Integer)
|
|
m14 = Column(Integer)
|
|
m15 = Column(Integer)
|
|
m16 = Column(Integer)
|
|
|
|
__mapper_args__ = {
|
|
"polymorphic_identity": "manager",
|
|
"concrete": True,
|
|
}
|
|
|
|
class Engineer(Employee):
|
|
__tablename__ = "engineer"
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(50))
|
|
engineer_info = Column(String(40))
|
|
|
|
e1 = Column(Integer)
|
|
e2 = Column(Integer)
|
|
e3 = Column(Integer)
|
|
e4 = Column(Integer)
|
|
e5 = Column(Integer)
|
|
e6 = Column(Integer)
|
|
e7 = Column(Integer)
|
|
e8 = Column(Integer)
|
|
e9 = Column(Integer)
|
|
e10 = Column(Integer)
|
|
e11 = Column(Integer)
|
|
e12 = Column(Integer)
|
|
e13 = Column(Integer)
|
|
e14 = Column(Integer)
|
|
e15 = Column(Integer)
|
|
e16 = Column(Integer)
|
|
|
|
__mapper_args__ = {
|
|
"polymorphic_identity": "engineer",
|
|
"concrete": True,
|
|
}
|
|
|
|
decl_base.registry.configure()
|
|
|
|
return Employee
|
|
|
|
@testing.combinations(
|
|
("require_embedded",), ("no_embedded",), argnames="require_embedded"
|
|
)
|
|
def test_corresponding_column_isolated(self, t1, require_embedded):
|
|
subq = select(t1).union_all(select(t1)).subquery()
|
|
|
|
target = subq.c.x7
|
|
src = t1.c.x7
|
|
|
|
subq.c
|
|
|
|
require_embedded = require_embedded == "require_embedded"
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=1)
|
|
def go():
|
|
assert (
|
|
subq.corresponding_column(
|
|
src, require_embedded=require_embedded
|
|
)
|
|
is target
|
|
)
|
|
|
|
go()
|
|
|
|
@testing.combinations(
|
|
("require_embedded",), ("no_embedded",), argnames="require_embedded"
|
|
)
|
|
def test_gen_subq_to_table_single_corresponding_column(
|
|
self, t1, require_embedded
|
|
):
|
|
src = t1.c.x7
|
|
|
|
require_embedded = require_embedded == "require_embedded"
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=1)
|
|
def go():
|
|
subq = select(t1).union_all(select(t1)).subquery()
|
|
|
|
target = subq.c.x7
|
|
assert (
|
|
subq.corresponding_column(
|
|
src, require_embedded=require_embedded
|
|
)
|
|
is target
|
|
)
|
|
|
|
go()
|
|
|
|
@testing.combinations(
|
|
("require_embedded",), ("no_embedded",), argnames="require_embedded"
|
|
)
|
|
def test_gen_subq_to_table_many_corresponding_column(
|
|
self, t1, require_embedded
|
|
):
|
|
require_embedded = require_embedded == "require_embedded"
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=1)
|
|
def go():
|
|
subq = select(t1).union_all(select(t1)).subquery()
|
|
|
|
for name in ("x%d" % i for i in range(1, 10)):
|
|
target = subq.c[name]
|
|
src = t1.c[name]
|
|
|
|
assert (
|
|
subq.corresponding_column(
|
|
src, require_embedded=require_embedded
|
|
)
|
|
is target
|
|
)
|
|
|
|
go()
|
|
|
|
@testing.combinations(
|
|
("require_embedded",), ("no_embedded",), argnames="require_embedded"
|
|
)
|
|
def test_gen_subq_aliased_class_select(
|
|
self, t1, require_embedded, inheritance_model
|
|
):
|
|
A = inheritance_model
|
|
|
|
require_embedded = require_embedded == "require_embedded"
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=1)
|
|
def go():
|
|
a1a1 = aliased(A)
|
|
a1a2 = aliased(A)
|
|
subq = select(a1a1).union_all(select(a1a2)).subquery()
|
|
|
|
a1 = aliased(A, subq)
|
|
|
|
inspect(a1).__clause_element__()
|
|
|
|
go()
|
|
|
|
@testing.combinations(
|
|
("require_embedded",), ("no_embedded",), argnames="require_embedded"
|
|
)
|
|
def test_gen_subq_aliased_class_select_cols(
|
|
self, t1, require_embedded, inheritance_model
|
|
):
|
|
A = inheritance_model
|
|
|
|
require_embedded = require_embedded == "require_embedded"
|
|
|
|
@profiling.function_call_count(variance=0.15, warmup=1)
|
|
def go():
|
|
a1a1 = aliased(A)
|
|
a1a2 = aliased(A)
|
|
subq = select(a1a1).union_all(select(a1a2)).subquery()
|
|
|
|
a1 = aliased(A, subq)
|
|
|
|
select(a1.x1, a1.x2, a1.x3, a1.x4)
|
|
|
|
go()
|