mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-14 20:57:19 -04:00
0b95f0055b
References: #4600 Change-Id: I2a62ddfe00bc562720f0eae700a497495d7a987a
50 lines
1.3 KiB
Python
50 lines
1.3 KiB
Python
from sqlalchemy import event
|
|
from sqlalchemy.pool import QueuePool
|
|
from sqlalchemy.testing import AssertsExecutionResults
|
|
from sqlalchemy.testing import fixtures
|
|
from sqlalchemy.testing import profiling
|
|
|
|
pool = None
|
|
|
|
|
|
class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults):
|
|
__requires__ = ("cpython", "python_profiling_backend")
|
|
|
|
class Connection:
|
|
def rollback(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def setup_test(self):
|
|
# create a throwaway pool which
|
|
# has the effect of initializing
|
|
# class-level event listeners on Pool,
|
|
# if not present already.
|
|
p1 = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
|
|
p1.connect()
|
|
|
|
global pool
|
|
pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
|
|
|
|
# make this a real world case where we have a "connect" handler
|
|
@event.listens_for(pool, "connect")
|
|
def do_connect(dbapi_conn, conn_record):
|
|
pass
|
|
|
|
@profiling.function_call_count()
|
|
def test_first_connect(self):
|
|
pool.connect()
|
|
|
|
def test_second_connect(self):
|
|
conn = pool.connect()
|
|
conn.close()
|
|
|
|
@profiling.function_call_count()
|
|
def go():
|
|
conn2 = pool.connect()
|
|
return conn2
|
|
|
|
go()
|