Merge "Use simple growth scale with any max size for BufferedRowResultProxy"

This commit is contained in:
mike bayer
2019-10-30 18:18:48 +00:00
committed by Gerrit Code Review
4 changed files with 61 additions and 47 deletions
+12
View File
@@ -0,0 +1,12 @@
.. change::
:tags: usecase, postgresql
:tickets: 4914
The maximum buffer size for the :class:`.BufferedRowResultProxy`, which
is used by dialects such as PostgreSQL when ``stream_results=True``, can
now be set to a number greater than 1000 and the buffer will grow to
that size. Previously, the buffer would not go beyond 1000 even if the
value were set larger. The growth of the buffer is also now based
on a simple multiplying factor currently set to 5. Pull request courtesy
Soumaya Mauthoor.
@@ -137,7 +137,8 @@ The following DBAPI-specific options are respected when used with
interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the
buffer will grow to ultimately store 1000 rows at a time.
.. versionadded:: 1.0.6
.. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than
1000, and the buffer will grow to that size.
.. _psycopg2_executemany_mode:
+9 -24
View File
@@ -1486,10 +1486,8 @@ class BufferedRowResultProxy(ResultProxy):
The pre-fetching behavior fetches only one row initially, and then
grows its buffer size by a fixed amount with each successive need
for additional rows up to a size of 1000.
The size argument is configurable using the ``max_row_buffer``
execution option::
for additional rows up the ``max_row_buffer`` size, which defaults
to 1000::
with psycopg2_engine.connect() as conn:
@@ -1497,7 +1495,7 @@ class BufferedRowResultProxy(ResultProxy):
stream_results=True, max_row_buffer=50
).execute("select * from table")
.. versionadded:: 1.0.6 Added the ``max_row_buffer`` option.
.. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
.. seealso::
@@ -1506,34 +1504,21 @@ class BufferedRowResultProxy(ResultProxy):
def _init_metadata(self):
self._max_row_buffer = self.context.execution_options.get(
"max_row_buffer", None
"max_row_buffer", 1000
)
self._growth_factor = 5
self.__buffer_rows()
super(BufferedRowResultProxy, self)._init_metadata()
# this is a "growth chart" for the buffering of rows.
# each successive __buffer_rows call will use the next
# value in the list for the buffer size until the max
# is reached
size_growth = {
1: 5,
5: 10,
10: 20,
20: 50,
50: 100,
100: 250,
250: 500,
500: 1000,
}
def __buffer_rows(self):
if self.cursor is None:
return
size = getattr(self, "_bufsize", 1)
self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
self._bufsize = self.size_growth.get(size, size)
if self._max_row_buffer is not None:
self._bufsize = min(self._max_row_buffer, self._bufsize)
if size < self._max_row_buffer:
self._bufsize = min(
self._max_row_buffer, size * self._growth_factor
)
def _soft_close(self, **kw):
self.__rowbuffer.clear()
+38 -22
View File
@@ -1942,31 +1942,47 @@ class AlternateResultProxyTest(fixtures.TablesTest):
r = conn.execute(stmt)
eq_(r.scalar(), "HI THERE")
def test_buffered_row_growth(self):
@testing.fixture
def row_growth_fixture(self):
with self._proxy_fixture(_result.BufferedRowResultProxy):
with self.engine.connect() as conn:
conn.execute(
self.table.insert(),
[{"x": i, "y": "t_%d" % i} for i in range(15, 1200)],
[{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
)
result = conn.execute(self.table.select())
checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000}
for idx, row in enumerate(result, 0):
if idx in checks:
eq_(result._bufsize, checks[idx])
le_(len(result._BufferedRowResultProxy__rowbuffer), 1000)
yield conn
def test_max_row_buffer_option(self):
with self._proxy_fixture(_result.BufferedRowResultProxy):
with self.engine.connect() as conn:
conn.execute(
self.table.insert(),
[{"x": i, "y": "t_%d" % i} for i in range(15, 1200)],
)
result = conn.execution_options(max_row_buffer=27).execute(
self.table.select()
)
for idx, row in enumerate(result, 0):
if idx in (16, 70, 150, 250):
eq_(result._bufsize, 27)
le_(len(result._BufferedRowResultProxy__rowbuffer), 27)
@testing.combinations(
("no option", None, {0: 5, 1: 25, 9: 125, 135: 625, 274: 1000}),
("lt 1000", 27, {0: 5, 16: 27, 70: 27, 150: 27, 250: 27}),
(
"gt 1000",
1500,
{0: 5, 1: 25, 9: 125, 135: 625, 274: 1500, 1351: 1500},
),
(
"gt 1500",
2000,
{0: 5, 1: 25, 9: 125, 135: 625, 274: 2000, 1351: 2000},
),
id_="iaa",
argnames="max_row_buffer,checks",
)
def test_buffered_row_growth(
self, row_growth_fixture, max_row_buffer, checks
):
if max_row_buffer:
result = row_growth_fixture.execution_options(
max_row_buffer=max_row_buffer
).execute(self.table.select())
else:
result = row_growth_fixture.execute(self.table.select())
assertion = {}
max_size = max(checks.values())
for idx, row in enumerate(result, 0):
if idx in checks:
assertion[idx] = result._bufsize
le_(len(result._BufferedRowResultProxy__rowbuffer), max_size)
eq_(checks, assertion)