Files
sqlalchemy/test/engine/test_processors.py
T
Mike Bayer ccadbec825 use .fromisoformat() for sqlite datetime, date, time parsing
SQLite datetime, date, and time datatypes now use Python standard lib
``fromisoformat()`` methods in order to parse incoming datetime, date, and
time string values. This improves performance vs. the previous regular
expression-based approach, and also automatically accommodates for datetime
and time formats that contain either a six-digit "microseconds" format or a
three-digit "milliseconds" format.

Fixes: #7029
Change-Id: I67aab4fe5ee3055e5996050cf4564981413cc221
2022-04-03 14:47:52 -04:00

297 lines
9.2 KiB
Python

import datetime
import re
from types import MappingProxyType
from sqlalchemy import exc
from sqlalchemy.engine import processors
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.util import immutabledict
class _BooleanProcessorTest(fixtures.TestBase):
def test_int_to_bool_none(self):
eq_(self.module.int_to_boolean(None), None)
def test_int_to_bool_zero(self):
eq_(self.module.int_to_boolean(0), False)
def test_int_to_bool_one(self):
eq_(self.module.int_to_boolean(1), True)
def test_int_to_bool_positive_int(self):
eq_(self.module.int_to_boolean(12), True)
def test_int_to_bool_negative_int(self):
eq_(self.module.int_to_boolean(-4), True)
class CyBooleanProcessorTest(_BooleanProcessorTest):
__requires__ = ("cextensions",)
@classmethod
def setup_test_class(cls):
from sqlalchemy.cyextension import processors
cls.module = processors
class _DateProcessorTest(fixtures.TestBase):
def test_iso_datetime(self):
eq_(
self.module.str_to_datetime("2022-04-03 17:12:34.353"),
datetime.datetime(2022, 4, 3, 17, 12, 34, 353000),
)
eq_(
self.module.str_to_datetime("2022-04-03 17:12:34.353123"),
datetime.datetime(2022, 4, 3, 17, 12, 34, 353123),
)
eq_(
self.module.str_to_datetime("2022-04-03 17:12:34"),
datetime.datetime(2022, 4, 3, 17, 12, 34),
)
eq_(
self.module.str_to_time("17:12:34.353123"),
datetime.time(17, 12, 34, 353123),
)
eq_(
self.module.str_to_time("17:12:34.353"),
datetime.time(17, 12, 34, 353000),
)
eq_(
self.module.str_to_time("17:12:34"),
datetime.time(17, 12, 34),
)
eq_(self.module.str_to_date("2022-04-03"), datetime.date(2022, 4, 3))
def test_date_no_string(self):
assert_raises_message(
TypeError,
"fromisoformat: argument must be str",
self.module.str_to_date,
2012,
)
def test_datetime_no_string_custom_reg(self):
assert_raises_message(
ValueError,
"Couldn't parse datetime string '2012' - value is not a string",
processors.str_to_datetime_processor_factory(
re.compile(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)(?:\.(\d+))?"),
datetime.datetime,
),
2012,
)
def test_time_no_string_custom_reg(self):
assert_raises_message(
ValueError,
"Couldn't parse time string '2012' - value is not a string",
processors.str_to_datetime_processor_factory(
re.compile(r"^(\d+):(\d+):(\d+)(?:\.(\d{6}))?$"), datetime.time
),
2012,
)
def test_date_invalid_string(self):
assert_raises_message(
ValueError,
"Invalid isoformat string: '5:a'",
self.module.str_to_date,
"5:a",
)
def test_datetime_invalid_string(self):
assert_raises_message(
ValueError,
"Invalid isoformat string: '5:a'",
self.module.str_to_datetime,
"5:a",
)
def test_time_invalid_string(self):
assert_raises_message(
ValueError,
"Invalid isoformat string: '5:a'",
self.module.str_to_time,
"5:a",
)
class PyDateProcessorTest(_DateProcessorTest):
@classmethod
def setup_test_class(cls):
from sqlalchemy.engine import _py_processors
cls.module = _py_processors
class CyDateProcessorTest(_DateProcessorTest):
__requires__ = ("cextensions",)
@classmethod
def setup_test_class(cls):
from sqlalchemy.cyextension import processors
cls.module = processors
class _DistillArgsTest(fixtures.TestBase):
def test_distill_20_none(self):
eq_(self.module._distill_params_20(None), ())
def test_distill_20_empty_sequence(self):
eq_(self.module._distill_params_20(()), ())
eq_(self.module._distill_params_20([]), [])
def test_distill_20_sequence_sequence(self):
eq_(self.module._distill_params_20(((1, 2, 3),)), ((1, 2, 3),))
eq_(self.module._distill_params_20([(1, 2, 3)]), [(1, 2, 3)])
eq_(self.module._distill_params_20(((1, 2), (2, 3))), ((1, 2), (2, 3)))
eq_(self.module._distill_params_20([(1, 2), (2, 3)]), [(1, 2), (2, 3)])
def test_distill_20_sequence_dict(self):
eq_(self.module._distill_params_20(({"a": 1},)), ({"a": 1},))
eq_(
self.module._distill_params_20([{"a": 1}, {"a": 2}]),
[{"a": 1}, {"a": 2}],
)
eq_(
self.module._distill_params_20((MappingProxyType({"a": 1}),)),
(MappingProxyType({"a": 1}),),
)
def test_distill_20_sequence_error(self):
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_params_20((1, 2, 3))
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_params_20(([1, 2, 3],))
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_params_20([1, 2, 3])
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_params_20(["a", "b"])
def test_distill_20_dict(self):
eq_(self.module._distill_params_20({"foo": "bar"}), [{"foo": "bar"}])
eq_(
self.module._distill_params_20(immutabledict({"foo": "bar"})),
[immutabledict({"foo": "bar"})],
)
eq_(
self.module._distill_params_20(MappingProxyType({"foo": "bar"})),
[MappingProxyType({"foo": "bar"})],
)
def test_distill_20_error(self):
with expect_raises_message(
exc.ArgumentError, "mapping or list expected for parameters"
):
self.module._distill_params_20("foo")
with expect_raises_message(
exc.ArgumentError, "mapping or list expected for parameters"
):
self.module._distill_params_20(1)
def test_distill_raw_none(self):
eq_(self.module._distill_raw_params(None), ())
def test_distill_raw_empty_list(self):
eq_(self.module._distill_raw_params([]), [])
def test_distill_raw_list_sequence(self):
eq_(self.module._distill_raw_params([(1, 2, 3)]), [(1, 2, 3)])
eq_(
self.module._distill_raw_params([(1, 2), (2, 3)]), [(1, 2), (2, 3)]
)
def test_distill_raw_list_dict(self):
eq_(
self.module._distill_raw_params([{"a": 1}, {"a": 2}]),
[{"a": 1}, {"a": 2}],
)
eq_(
self.module._distill_raw_params([MappingProxyType({"a": 1})]),
[MappingProxyType({"a": 1})],
)
def test_distill_raw_sequence_error(self):
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_raw_params([1, 2, 3])
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_raw_params([[1, 2, 3]])
with expect_raises_message(
exc.ArgumentError,
"List argument must consist only of tuples or dictionaries",
):
self.module._distill_raw_params(["a", "b"])
def test_distill_raw_tuple(self):
eq_(self.module._distill_raw_params(()), [()])
eq_(self.module._distill_raw_params((1, 2, 3)), [(1, 2, 3)])
def test_distill_raw_dict(self):
eq_(self.module._distill_raw_params({"foo": "bar"}), [{"foo": "bar"}])
eq_(
self.module._distill_raw_params(immutabledict({"foo": "bar"})),
[immutabledict({"foo": "bar"})],
)
eq_(
self.module._distill_raw_params(MappingProxyType({"foo": "bar"})),
[MappingProxyType({"foo": "bar"})],
)
def test_distill_raw_error(self):
with expect_raises_message(
exc.ArgumentError, "mapping or sequence expected for parameters"
):
self.module._distill_raw_params("foo")
with expect_raises_message(
exc.ArgumentError, "mapping or sequence expected for parameters"
):
self.module._distill_raw_params(1)
class PyDistillArgsTest(_DistillArgsTest):
@classmethod
def setup_test_class(cls):
from sqlalchemy.engine import _py_util
cls.module = _py_util
class CyDistillArgsTest(_DistillArgsTest):
__requires__ = ("cextensions",)
@classmethod
def setup_test_class(cls):
from sqlalchemy.cyextension import util
cls.module = util