Files
sqlalchemy/test/base/test_tutorials.py
T
Federico Caselli dce11383f8 Improve sql formatting
change {opensql} to {printsql} in prints, add missing markers

Change-Id: I07b72e6620bb64e329d6b641afa27631e91c4f16
2023-01-11 20:24:29 +01:00

253 lines
7.7 KiB
Python

from __future__ import annotations
import doctest
import logging
import os
import re
import sys
from sqlalchemy.testing import config
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import requires
class DocTest(fixtures.TestBase):
__requires__ = ("python310", "insert_returning", "insertmanyvalues")
__only_on__ = "sqlite+pysqlite"
def _setup_logger(self):
rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
class MyStream:
def write(self, string):
sys.stdout.write(string)
sys.stdout.flush()
def flush(self):
pass
self._handler = handler = logging.StreamHandler(MyStream())
handler.setFormatter(logging.Formatter("%(message)s"))
rootlogger.addHandler(handler)
def _teardown_logger(self):
rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
rootlogger.removeHandler(self._handler)
def _setup_create_table_patcher(self):
from sqlalchemy.sql import ddl
self.orig_sort = ddl.sort_tables_and_constraints
def our_sort(tables, **kw):
return self.orig_sort(sorted(tables, key=lambda t: t.key), **kw)
ddl.sort_tables_and_constraints = our_sort
def _teardown_create_table_patcher(self):
from sqlalchemy.sql import ddl
ddl.sort_tables_and_constraints = self.orig_sort
def setup_test(self):
self._setup_logger()
self._setup_create_table_patcher()
def teardown_test(self):
self._teardown_create_table_patcher()
self._teardown_logger()
def _run_doctest(self, *fnames):
here = os.path.dirname(__file__)
sqla_base = os.path.normpath(os.path.join(here, "..", ".."))
optionflags = (
doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE
| doctest.IGNORE_EXCEPTION_DETAIL
| _get_allow_unicode_flag()
)
runner = doctest.DocTestRunner(
verbose=None,
optionflags=optionflags,
checker=_get_unicode_checker(),
)
parser = doctest.DocTestParser()
globs = {"print_function": print}
for fname in fnames:
path = os.path.join(sqla_base, "doc/build", fname)
if not os.path.exists(path):
config.skip_test("Can't find documentation file %r" % path)
buf = []
with open(path, encoding="utf-8") as file_:
def load_include(m):
fname = m.group(1)
sub_path = os.path.join(os.path.dirname(path), fname)
with open(sub_path, encoding="utf-8") as file_:
for i, line in enumerate(file_, 1):
buf.append((i, line))
return fname
def run_buf(fname, is_include):
if not buf:
return
test = parser.get_doctest(
"".join(line for _, line in buf),
globs,
fname,
fname,
buf[0][0],
)
buf[:] = []
runner.run(test, clear_globs=False)
globs.update(test.globs)
doctest_enabled = True
for line_counter, line in enumerate(file_, 1):
line = re.sub(
r"{(?:stop|sql|opensql|execsql|printsql)}", "", line
)
include = re.match(r"\.\. doctest-include (.+\.rst)", line)
if include:
run_buf(fname, False)
include_fname = load_include(include)
run_buf(include_fname, True)
doctest_disable = re.match(
r"\.\. doctest-(enable|disable)", line
)
if doctest_disable:
doctest_enabled = doctest_disable.group(1) == "enable"
if doctest_enabled:
buf.append((line_counter, line))
else:
buf.append((line_counter, "\n"))
run_buf(fname, False)
runner.summarize()
assert not runner.failures
@requires.has_json_each
def test_20_style(self):
self._run_doctest(
"tutorial/index.rst",
"tutorial/engine.rst",
"tutorial/dbapi_transactions.rst",
"tutorial/metadata.rst",
"tutorial/data.rst",
"tutorial/data_insert.rst",
"tutorial/data_select.rst",
"tutorial/data_update.rst",
"tutorial/orm_data_manipulation.rst",
"tutorial/orm_related_objects.rst",
)
def test_core_operators(self):
self._run_doctest("core/operators.rst")
def test_orm_queryguide_select(self):
self._run_doctest(
"orm/queryguide/_plain_setup.rst",
"orm/queryguide/select.rst",
"orm/queryguide/api.rst",
"orm/queryguide/_end_doctest.rst",
)
def test_orm_queryguide_inheritance(self):
self._run_doctest(
"orm/queryguide/inheritance.rst",
)
@requires.update_from
def test_orm_queryguide_dml(self):
self._run_doctest(
"orm/queryguide/dml.rst",
)
def test_orm_large_collections(self):
self._run_doctest(
"orm/large_collections.rst",
)
def test_orm_queryguide_columns(self):
self._run_doctest(
"orm/queryguide/columns.rst",
)
def test_orm_quickstart(self):
self._run_doctest("orm/quickstart.rst")
# unicode checker courtesy pytest
def _get_unicode_checker():
"""
Returns a doctest.OutputChecker subclass that takes in account the
ALLOW_UNICODE option to ignore u'' prefixes in strings. Useful
when the same doctest should run in Python 2 and Python 3.
An inner class is used to avoid importing "doctest" at the module
level.
"""
if hasattr(_get_unicode_checker, "UnicodeOutputChecker"):
return _get_unicode_checker.UnicodeOutputChecker()
import doctest
import re
class UnicodeOutputChecker(doctest.OutputChecker):
"""
Copied from doctest_nose_plugin.py from the nltk project:
https://github.com/nltk/nltk
"""
_literal_re = re.compile(r"(\W|^)[uU]([rR]?[\'\"])", re.UNICODE)
def check_output(self, want, got, optionflags):
res = doctest.OutputChecker.check_output(
self, want, got, optionflags
)
if res:
return True
if not (optionflags & _get_allow_unicode_flag()):
return False
else: # pragma: no cover
# the code below will end up executed only in Python 2 in
# our tests, and our coverage check runs in Python 3 only
def remove_u_prefixes(txt):
return re.sub(self._literal_re, r"\1\2", txt)
want = remove_u_prefixes(want)
got = remove_u_prefixes(got)
res = doctest.OutputChecker.check_output(
self, want, got, optionflags
)
return res
_get_unicode_checker.UnicodeOutputChecker = UnicodeOutputChecker
return _get_unicode_checker.UnicodeOutputChecker()
def _get_allow_unicode_flag():
"""
Registers and returns the ALLOW_UNICODE flag.
"""
import doctest
return doctest.register_optionflag("ALLOW_UNICODE")
# increase number to force pipeline run. 1