mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-09 10:20:00 -04:00
20cdc64588
become an externally usable package but still remains within the main sqlalchemy parent package. in this system, we use kind of an ugly hack to get the noseplugin imported outside of the "sqlalchemy" package, while still making it available within sqlalchemy for usage by third party libraries.
73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
from sqlalchemy import *
|
|
from sqlalchemy.testing import fixtures, AssertsExecutionResults
|
|
from sqlalchemy import testing
|
|
|
|
|
|
class FoundRowsTest(fixtures.TestBase, AssertsExecutionResults):
|
|
"""tests rowcount functionality"""
|
|
|
|
__requires__ = ('sane_rowcount', )
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
global employees_table, metadata
|
|
metadata = MetaData(testing.db)
|
|
|
|
employees_table = Table('employees', metadata,
|
|
Column('employee_id', Integer,
|
|
Sequence('employee_id_seq', optional=True),
|
|
primary_key=True),
|
|
Column('name', String(50)),
|
|
Column('department', String(1)),
|
|
)
|
|
metadata.create_all()
|
|
|
|
def setup(self):
|
|
global data
|
|
data = [ ('Angela', 'A'),
|
|
('Andrew', 'A'),
|
|
('Anand', 'A'),
|
|
('Bob', 'B'),
|
|
('Bobette', 'B'),
|
|
('Buffy', 'B'),
|
|
('Charlie', 'C'),
|
|
('Cynthia', 'C'),
|
|
('Chris', 'C') ]
|
|
|
|
i = employees_table.insert()
|
|
i.execute(*[{'name':n, 'department':d} for n, d in data])
|
|
def teardown(self):
|
|
employees_table.delete().execute()
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
metadata.drop_all()
|
|
|
|
def testbasic(self):
|
|
s = employees_table.select()
|
|
r = s.execute().fetchall()
|
|
|
|
assert len(r) == len(data)
|
|
|
|
def test_update_rowcount1(self):
|
|
# WHERE matches 3, 3 rows changed
|
|
department = employees_table.c.department
|
|
r = employees_table.update(department=='C').execute(department='Z')
|
|
print "expecting 3, dialect reports %s" % r.rowcount
|
|
assert r.rowcount == 3
|
|
|
|
def test_update_rowcount2(self):
|
|
# WHERE matches 3, 0 rows changed
|
|
department = employees_table.c.department
|
|
r = employees_table.update(department=='C').execute(department='C')
|
|
print "expecting 3, dialect reports %s" % r.rowcount
|
|
assert r.rowcount == 3
|
|
|
|
def test_delete_rowcount(self):
|
|
# WHERE matches 3, 3 rows deleted
|
|
department = employees_table.c.department
|
|
r = employees_table.delete(department=='C').execute()
|
|
print "expecting 3, dialect reports %s" % r.rowcount
|
|
assert r.rowcount == 3
|
|
|