mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-10 02:39:59 -04:00
18abd47af3
Applied on top of a pure run of black -l 79 in I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9, this set of changes resolves all remaining flake8 conditions for those codes we have enabled in setup.cfg. Included are resolutions for all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I4f72d3ba1380dd601610ff80b8fb06a2aff8b0fe
158 lines
4.5 KiB
Python
158 lines
4.5 KiB
Python
from sqlalchemy import bindparam
|
|
from sqlalchemy import Column
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import Sequence
|
|
from sqlalchemy import String
|
|
from sqlalchemy import Table
|
|
from sqlalchemy import testing
|
|
from sqlalchemy import text
|
|
from sqlalchemy.testing import AssertsExecutionResults
|
|
from sqlalchemy.testing import eq_
|
|
from sqlalchemy.testing import fixtures
|
|
|
|
|
|
class FoundRowsTest(fixtures.TestBase, AssertsExecutionResults):
|
|
|
|
"""tests rowcount functionality"""
|
|
|
|
__requires__ = ("sane_rowcount",)
|
|
__backend__ = True
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
global employees_table, metadata
|
|
metadata = MetaData(testing.db)
|
|
|
|
employees_table = Table(
|
|
"employees",
|
|
metadata,
|
|
Column(
|
|
"employee_id",
|
|
Integer,
|
|
Sequence("employee_id_seq", optional=True),
|
|
primary_key=True,
|
|
),
|
|
Column("name", String(50)),
|
|
Column("department", String(1)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
def setup(self):
|
|
global data
|
|
data = [
|
|
("Angela", "A"),
|
|
("Andrew", "A"),
|
|
("Anand", "A"),
|
|
("Bob", "B"),
|
|
("Bobette", "B"),
|
|
("Buffy", "B"),
|
|
("Charlie", "C"),
|
|
("Cynthia", "C"),
|
|
("Chris", "C"),
|
|
]
|
|
|
|
i = employees_table.insert()
|
|
i.execute(*[{"name": n, "department": d} for n, d in data])
|
|
|
|
def teardown(self):
|
|
employees_table.delete().execute()
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
metadata.drop_all()
|
|
|
|
def test_basic(self):
|
|
s = employees_table.select()
|
|
r = s.execute().fetchall()
|
|
|
|
assert len(r) == len(data)
|
|
|
|
def test_update_rowcount1(self):
|
|
# WHERE matches 3, 3 rows changed
|
|
department = employees_table.c.department
|
|
r = employees_table.update(department == "C").execute(department="Z")
|
|
assert r.rowcount == 3
|
|
|
|
def test_update_rowcount2(self):
|
|
# WHERE matches 3, 0 rows changed
|
|
department = employees_table.c.department
|
|
r = employees_table.update(department == "C").execute(department="C")
|
|
assert r.rowcount == 3
|
|
|
|
@testing.skip_if(
|
|
testing.requires.oracle5x, "unknown DBAPI error fixed in later version"
|
|
)
|
|
@testing.requires.sane_rowcount_w_returning
|
|
def test_update_rowcount_return_defaults(self):
|
|
department = employees_table.c.department
|
|
stmt = (
|
|
employees_table.update(department == "C")
|
|
.values(name=employees_table.c.department + "Z")
|
|
.return_defaults()
|
|
)
|
|
|
|
r = stmt.execute()
|
|
assert r.rowcount == 3
|
|
|
|
def test_raw_sql_rowcount(self):
|
|
# test issue #3622, make sure eager rowcount is called for text
|
|
with testing.db.connect() as conn:
|
|
result = conn.execute(
|
|
"update employees set department='Z' where department='C'"
|
|
)
|
|
eq_(result.rowcount, 3)
|
|
|
|
def test_text_rowcount(self):
|
|
# test issue #3622, make sure eager rowcount is called for text
|
|
with testing.db.connect() as conn:
|
|
result = conn.execute(
|
|
text(
|
|
"update employees set department='Z' "
|
|
"where department='C'"
|
|
)
|
|
)
|
|
eq_(result.rowcount, 3)
|
|
|
|
def test_delete_rowcount(self):
|
|
# WHERE matches 3, 3 rows deleted
|
|
department = employees_table.c.department
|
|
r = employees_table.delete(department == "C").execute()
|
|
assert r.rowcount == 3
|
|
|
|
@testing.requires.sane_multi_rowcount
|
|
def test_multi_update_rowcount(self):
|
|
stmt = (
|
|
employees_table.update()
|
|
.where(employees_table.c.name == bindparam("emp_name"))
|
|
.values(department="C")
|
|
)
|
|
|
|
r = testing.db.execute(
|
|
stmt,
|
|
[
|
|
{"emp_name": "Bob"},
|
|
{"emp_name": "Cynthia"},
|
|
{"emp_name": "nonexistent"},
|
|
],
|
|
)
|
|
|
|
eq_(r.rowcount, 2)
|
|
|
|
@testing.requires.sane_multi_rowcount
|
|
def test_multi_delete_rowcount(self):
|
|
stmt = employees_table.delete().where(
|
|
employees_table.c.name == bindparam("emp_name")
|
|
)
|
|
|
|
r = testing.db.execute(
|
|
stmt,
|
|
[
|
|
{"emp_name": "Bob"},
|
|
{"emp_name": "Cynthia"},
|
|
{"emp_name": "nonexistent"},
|
|
],
|
|
)
|
|
|
|
eq_(r.rowcount, 2)
|