Files
sqlalchemy/test/dialect/postgres.py
T
Mike Bayer d6b9757778 - added unit test for exception formatting
- Deprecated the hardcoded TIMESTAMP function, which when
used as func.TIMESTAMP(value) would render "TIMESTAMP value".
This breaks on some platforms as Postgres doesn't allow
bind parameters to be used in this context.  The hard-coded
uppercase is also inappropriate and there's lots of other
PG casts that we'd need to support.  So instead, use
text constructs i.e. select(["timestamp '12/05/09'"]).
2009-05-26 01:00:46 +00:00

1026 lines
40 KiB
Python

import testenv; testenv.configure_for_tests()
import datetime
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy import exc
from sqlalchemy.databases import postgres
from sqlalchemy.engine.strategies import MockEngineStrategy
from testlib import *
from sqlalchemy.sql import table, column
class SequenceTest(TestBase, AssertsCompiledSQL):
def test_basic(self):
seq = Sequence("my_seq_no_schema")
dialect = postgres.PGDialect()
assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema"
seq = Sequence("my_seq", schema="some_schema")
assert dialect.identifier_preparer.format_sequence(seq) == "some_schema.my_seq"
seq = Sequence("My_Seq", schema="Some_Schema")
assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
class CompileTest(TestBase, AssertsCompiledSQL):
__dialect__ = postgres.dialect()
def test_update_returning(self):
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
u = update(table1, values=dict(name='foo'), postgres_returning=[table1])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect)
def test_insert_returning(self):
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect)
def test_extract(self):
t = table('t', column('col1'))
for field in 'year', 'month', 'day':
self.assert_compile(
select([extract(field, t.c.col1)]),
"SELECT EXTRACT(%s FROM t.col1::timestamp) AS anon_1 "
"FROM t" % field)
class ReturningTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
@testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
def test_update_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
Column('id', Integer, primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
)
table.create()
try:
table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute()
self.assertEqual(result.fetchall(), [(1,)])
result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
self.assertEqual(result2.fetchall(), [(1,True),(2,False)])
finally:
table.drop()
@testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
def test_insert_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
Column('id', Integer, primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
)
table.create()
try:
result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False})
self.assertEqual(result.fetchall(), [(1,)])
@testing.fails_on('postgres', 'Known limitation of psycopg2')
def test_executemany():
# return value is documented as failing with psycopg2/executemany
result2 = table.insert(postgres_returning=[table]).execute(
[{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
self.assertEqual(result2.fetchall(), [(2, 2, False), (3,3,True)])
test_executemany()
result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False})
self.assertEqual([dict(row) for row in result3], [{'double_id':8}])
result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons')
self.assertEqual([dict(row) for row in result4], [{'persons': 10}])
finally:
table.drop()
class InsertTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
def setUpAll(self):
global metadata
metadata = MetaData(testing.db)
def tearDown(self):
metadata.drop_all()
metadata.tables.clear()
def test_compiled_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
metadata.create_all()
ins = table.insert(values={'data':bindparam('x')}).compile()
ins.execute({'x':"five"}, {'x':"seven"})
assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
def test_sequence_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, Sequence('my_seq'), primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_with_sequence(table, "my_seq")
def test_opt_sequence_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
def test_autoincrement_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
def test_noautoincrement_insert(self):
table = Table('testtable', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
Column('data', String(30)))
metadata.create_all()
self._assert_data_noautoincrement(table)
def _assert_data_autoincrement(self, table):
def go():
# execute with explicit id
r = table.insert().execute({'id':30, 'data':'d1'})
assert r.last_inserted_ids() == [30]
# execute with prefetch id
r = table.insert().execute({'data':'d2'})
assert r.last_inserted_ids() == [1]
# executemany with explicit ids
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
# executemany, uses SERIAL
table.insert().execute({'data':'d5'}, {'data':'d6'})
# single execute, explicit id, inline
table.insert(inline=True).execute({'id':33, 'data':'d7'})
# single execute, inline, uses SERIAL
table.insert(inline=True).execute({'data':'d8'})
# note that the test framework doesnt capture the "preexecute" of a seqeuence
# or default. we just see it in the bind params.
self.assert_sql(testing.db, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':1, 'data':'d2'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
),
(
"INSERT INTO testtable (data) VALUES (:data)",
[{'data':'d5'}, {'data':'d6'}]
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':33, 'data':'d7'}]
),
(
"INSERT INTO testtable (data) VALUES (:data)",
[{'data':'d8'}]
),
])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
(31, 'd3'),
(32, 'd4'),
(2, 'd5'),
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
]
table.delete().execute()
# test the same series of events using a reflected
# version of the table
m2 = MetaData(testing.db)
table = Table(table.name, m2, autoload=True)
def go():
table.insert().execute({'id':30, 'data':'d1'})
r = table.insert().execute({'data':'d2'})
assert r.last_inserted_ids() == [5]
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
table.insert().execute({'data':'d5'}, {'data':'d6'})
table.insert(inline=True).execute({'id':33, 'data':'d7'})
table.insert(inline=True).execute({'data':'d8'})
self.assert_sql(testing.db, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':5, 'data':'d2'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
),
(
"INSERT INTO testtable (data) VALUES (:data)",
[{'data':'d5'}, {'data':'d6'}]
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':33, 'data':'d7'}]
),
(
"INSERT INTO testtable (data) VALUES (:data)",
[{'data':'d8'}]
),
])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(5, 'd2'),
(31, 'd3'),
(32, 'd4'),
(6, 'd5'),
(7, 'd6'),
(33, 'd7'),
(8, 'd8'),
]
table.delete().execute()
def _assert_data_with_sequence(self, table, seqname):
def go():
table.insert().execute({'id':30, 'data':'d1'})
table.insert().execute({'data':'d2'})
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
table.insert().execute({'data':'d5'}, {'data':'d6'})
table.insert(inline=True).execute({'id':33, 'data':'d7'})
table.insert(inline=True).execute({'data':'d8'})
self.assert_sql(testing.db, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':30, 'data':'d1'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{'id':1, 'data':'d2'}
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
),
(
"INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
[{'data':'d5'}, {'data':'d6'}]
),
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'id':33, 'data':'d7'}]
),
(
"INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
[{'data':'d8'}]
),
])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
(31, 'd3'),
(32, 'd4'),
(2, 'd5'),
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
]
# cant test reflection here since the Sequence must be
# explicitly specified
def _assert_data_noautoincrement(self, table):
table.insert().execute({'id':30, 'data':'d1'})
try:
table.insert().execute({'data':'d2'})
assert False
except exc.IntegrityError, e:
assert "violates not-null constraint" in str(e)
try:
table.insert().execute({'data':'d2'}, {'data':'d3'})
assert False
except exc.IntegrityError, e:
assert "violates not-null constraint" in str(e)
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
assert table.select().execute().fetchall() == [
(30, 'd1'),
(31, 'd2'),
(32, 'd3'),
(33, 'd4'),
]
table.delete().execute()
# test the same series of events using a reflected
# version of the table
m2 = MetaData(testing.db)
table = Table(table.name, m2, autoload=True)
table.insert().execute({'id':30, 'data':'d1'})
try:
table.insert().execute({'data':'d2'})
assert False
except exc.IntegrityError, e:
assert "violates not-null constraint" in str(e)
try:
table.insert().execute({'data':'d2'}, {'data':'d3'})
assert False
except exc.IntegrityError, e:
assert "violates not-null constraint" in str(e)
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
assert table.select().execute().fetchall() == [
(30, 'd1'),
(31, 'd2'),
(32, 'd3'),
(33, 'd4'),
]
class DomainReflectionTest(TestBase, AssertsExecutionResults):
"Test PostgreSQL domains"
__only_on__ = 'postgres'
def setUpAll(self):
con = testing.db.connect()
for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42',
'CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0'):
try:
con.execute(ddl)
except exc.SQLError, e:
if not "already exists" in str(e):
raise e
con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
def tearDownAll(self):
con = testing.db.connect()
con.execute('DROP TABLE testtable')
con.execute('DROP TABLE alt_schema.testtable')
con.execute('DROP TABLE crosschema')
con.execute('DROP DOMAIN testdomain')
con.execute('DROP DOMAIN alt_schema.testdomain')
def test_table_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
def test_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
def test_table_is_reflected_alt_schema(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
def test_schema_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
def test_crosschema_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('crosschema', metadata, autoload=True)
self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
def test_unknown_types(self):
from sqlalchemy.databases import postgres
ischema_names = postgres.ischema_names
postgres.ischema_names = {}
try:
m2 = MetaData(testing.db)
self.assertRaises(exc.SAWarning, Table, "testtable", m2, autoload=True)
@testing.emits_warning('Did not recognize type')
def warns():
m3 = MetaData(testing.db)
t3 = Table("testtable", m3, autoload=True)
assert t3.c.answer.type.__class__ == sa.types.NullType
finally:
postgres.ischema_names = ischema_names
class MiscTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
def test_date_reflection(self):
m1 = MetaData(testing.db)
t1 = Table('pgdate', m1,
Column('date1', DateTime(timezone=True)),
Column('date2', DateTime(timezone=False))
)
m1.create_all()
try:
m2 = MetaData(testing.db)
t2 = Table('pgdate', m2, autoload=True)
assert t2.c.date1.type.timezone is True
assert t2.c.date2.type.timezone is False
finally:
m1.drop_all()
def test_pg_weirdchar_reflection(self):
meta1 = MetaData(testing.db)
subject = Table("subject", meta1,
Column("id$", Integer, primary_key=True),
)
referer = Table("referer", meta1,
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey('subject.id$')),
)
meta1.create_all()
try:
meta2 = MetaData(testing.db)
subject = Table("subject", meta2, autoload=True)
referer = Table("referer", meta2, autoload=True)
print str(subject.join(referer).onclause)
self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
def test_checksfor_sequence(self):
meta1 = MetaData(testing.db)
t = Table('mytable', meta1,
Column('col1', Integer, Sequence('fooseq')))
try:
testing.db.execute("CREATE SEQUENCE fooseq")
t.create(checkfirst=True)
finally:
t.drop(checkfirst=True)
def test_distinct_on(self):
t = Table('mytable', MetaData(testing.db),
Column('id', Integer, primary_key=True),
Column('a', String(8)))
self.assertEquals(
str(t.select(distinct=t.c.a)),
'SELECT DISTINCT ON (mytable.a) mytable.id, mytable.a \n'
'FROM mytable')
self.assertEquals(
str(t.select(distinct=['id','a'])),
'SELECT DISTINCT ON (id, a) mytable.id, mytable.a \n'
'FROM mytable')
self.assertEquals(
str(t.select(distinct=[t.c.id, t.c.a])),
'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n'
'FROM mytable')
def test_schema_reflection(self):
"""note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
meta1 = MetaData(testing.db)
users = Table('users', meta1,
Column('user_id', Integer, primary_key = True),
Column('user_name', String(30), nullable = False),
schema="alt_schema"
)
addresses = Table('email_addresses', meta1,
Column('address_id', Integer, primary_key = True),
Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
schema="alt_schema"
)
meta1.create_all()
try:
meta2 = MetaData(testing.db)
addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
users = Table('users', meta2, mustexist=True, schema="alt_schema")
print users
print addresses
j = join(users, addresses)
print str(j.onclause)
self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
finally:
meta1.drop_all()
def test_schema_reflection_2(self):
meta1 = MetaData(testing.db)
subject = Table("subject", meta1,
Column("id", Integer, primary_key=True),
)
referer = Table("referer", meta1,
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey('subject.id')),
schema="alt_schema")
meta1.create_all()
try:
meta2 = MetaData(testing.db)
subject = Table("subject", meta2, autoload=True)
referer = Table("referer", meta2, schema="alt_schema", autoload=True)
print str(subject.join(referer).onclause)
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
def test_schema_reflection_3(self):
meta1 = MetaData(testing.db)
subject = Table("subject", meta1,
Column("id", Integer, primary_key=True),
schema='alt_schema_2'
)
referer = Table("referer", meta1,
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey('alt_schema_2.subject.id')),
schema="alt_schema")
meta1.create_all()
try:
meta2 = MetaData(testing.db)
subject = Table("subject", meta2, autoload=True, schema="alt_schema_2")
referer = Table("referer", meta2, schema="alt_schema", autoload=True)
print str(subject.join(referer).onclause)
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
def test_schema_roundtrips(self):
meta = MetaData(testing.db)
users = Table('users', meta,
Column('id', Integer, primary_key=True),
Column('name', String(50)), schema='alt_schema')
users.create()
try:
users.insert().execute(id=1, name='name1')
users.insert().execute(id=2, name='name2')
users.insert().execute(id=3, name='name3')
users.insert().execute(id=4, name='name4')
self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
users.delete().where(users.c.id==3).execute()
self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), [])
users.update().where(users.c.name=='name4').execute(name='newname')
self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
finally:
users.drop()
def test_preexecute_passivedefault(self):
"""test that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
that DefaultClause upon insert."""
try:
meta = MetaData(testing.db)
testing.db.execute("""
CREATE TABLE speedy_users
(
speedy_user_id SERIAL PRIMARY KEY,
user_name VARCHAR NOT NULL,
user_password VARCHAR NOT NULL
);
""", None)
t = Table("speedy_users", meta, autoload=True)
r = t.insert().execute(user_name='user', user_password='lala')
assert r.last_inserted_ids() == [1]
l = t.select().execute().fetchall()
assert l == [(1, 'user', 'lala')]
finally:
testing.db.execute("drop table speedy_users", None)
@testing.emits_warning()
def test_index_reflection(self):
""" Reflecting partial & expression-based indexes should warn """
import warnings
def capture_warnings(*args, **kw):
capture_warnings._orig_showwarning(*args, **kw)
capture_warnings.warnings.append(args)
capture_warnings._orig_showwarning = warnings.warn
capture_warnings.warnings = []
m1 = MetaData(testing.db)
t1 = Table('party', m1,
Column('id', String(10), nullable=False),
Column('name', String(20), index=True),
Column('aname', String(20))
)
m1.create_all()
testing.db.execute("""
create index idx1 on party ((id || name))
""", None)
testing.db.execute("""
create unique index idx2 on party (id) where name = 'test'
""", None)
testing.db.execute("""
create index idx3 on party using btree
(lower(name::text), lower(aname::text))
""")
try:
m2 = MetaData(testing.db)
warnings.warn = capture_warnings
t2 = Table('party', m2, autoload=True)
wrn = capture_warnings.warnings
assert str(wrn[0][0]) == (
"Skipped unsupported reflection of expression-based index idx1")
assert str(wrn[1][0]) == (
"Predicate of partial index idx2 ignored during reflection")
assert len(t2.indexes) == 2
# Make sure indexes are in the order we expect them in
tmp = [(idx.name, idx) for idx in t2.indexes]
tmp.sort()
r1, r2 = [idx[1] for idx in tmp]
assert r1.name == 'idx2'
assert r1.unique == True
assert r2.unique == False
assert [t2.c.id] == r1.columns
assert [t2.c.name] == r2.columns
finally:
warnings.warn = capture_warnings._orig_showwarning
m1.drop_all()
def test_create_partial_index(self):
tbl = Table('testtbl', MetaData(), Column('data',Integer))
idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
executed_sql = []
mock_strategy = MockEngineStrategy()
mock_conn = mock_strategy.create('postgres://', executed_sql.append)
idx.create(mock_conn)
assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10']
class TimezoneTest(TestBase, AssertsExecutionResults):
"""Test timezone-aware datetimes.
psycopg will return a datetime with a tzinfo attached to it, if postgres
returns it. python then will not let you compare a datetime with a tzinfo
to a datetime that doesnt have one. this test illustrates two ways to
have datetime types with and without timezone info.
"""
__only_on__ = 'postgres'
def setUpAll(self):
global tztable, notztable, metadata
metadata = MetaData(testing.db)
# current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
tztable = Table('tztable', metadata,
Column("id", Integer, primary_key=True),
Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
Column("name", String(20)),
)
notztable = Table('notztable', metadata,
Column("id", Integer, primary_key=True),
Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
Column("name", String(20)),
)
metadata.create_all()
def tearDownAll(self):
metadata.drop_all()
def test_with_timezone(self):
# get a date with a tzinfo
somedate = testing.db.connect().scalar(func.current_timestamp().select())
tztable.insert().execute(id=1, name='row1', date=somedate)
c = tztable.update(tztable.c.id==1).execute(name='newname')
print tztable.select(tztable.c.id==1).execute().fetchone()
def test_without_timezone(self):
# get a date without a tzinfo
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
notztable.insert().execute(id=1, name='row1', date=somedate)
c = notztable.update(notztable.c.id==1).execute(name='newname')
print notztable.select(tztable.c.id==1).execute().fetchone()
class ArrayTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
def setUpAll(self):
global metadata, arrtable
metadata = MetaData(testing.db)
arrtable = Table('arrtable', metadata,
Column('id', Integer, primary_key=True),
Column('intarr', postgres.PGArray(Integer)),
Column('strarr', postgres.PGArray(String(convert_unicode=True)), nullable=False)
)
metadata.create_all()
def tearDownAll(self):
metadata.drop_all()
def test_reflect_array_column(self):
metadata2 = MetaData(testing.db)
tbl = Table('arrtable', metadata2, autoload=True)
self.assertTrue(isinstance(tbl.c.intarr.type, postgres.PGArray))
self.assertTrue(isinstance(tbl.c.strarr.type, postgres.PGArray))
self.assertTrue(isinstance(tbl.c.intarr.type.item_type, Integer))
self.assertTrue(isinstance(tbl.c.strarr.type.item_type, String))
def test_insert_array(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = arrtable.select().execute().fetchall()
self.assertEquals(len(results), 1)
self.assertEquals(results[0]['intarr'], [1,2,3])
self.assertEquals(results[0]['strarr'], ['abc','def'])
arrtable.delete().execute()
def test_array_where(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
arrtable.insert().execute(intarr=[4,5,6], strarr='ABC')
results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
self.assertEquals(len(results), 1)
self.assertEquals(results[0]['intarr'], [1,2,3])
arrtable.delete().execute()
def test_array_concat(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
self.assertEquals(len(results), 1)
self.assertEquals(results[0][0], [1,2,3,4,5,6])
arrtable.delete().execute()
def test_array_subtype_resultprocessor(self):
arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
results = arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
self.assertEquals(len(results), 2)
self.assertEquals(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
self.assertEquals(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
arrtable.delete().execute()
def test_array_mutability(self):
class Foo(object): pass
footable = Table('foo', metadata,
Column('id', Integer, primary_key=True),
Column('intarr', postgres.PGArray(Integer), nullable=True)
)
mapper(Foo, footable)
metadata.create_all()
sess = create_session()
foo = Foo()
foo.id = 1
foo.intarr = [1,2,3]
sess.add(foo)
sess.flush()
sess.expunge_all()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3])
foo.intarr.append(4)
sess.flush()
sess.expunge_all()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3,4])
foo.intarr = []
sess.flush()
sess.expunge_all()
self.assertEquals(foo.intarr, [])
foo.intarr = None
sess.flush()
sess.expunge_all()
self.assertEquals(foo.intarr, None)
# Errors in r4217:
foo = Foo()
foo.id = 2
sess.add(foo)
sess.flush()
class TimeStampTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
@testing.uses_deprecated()
def test_timestamp(self):
engine = testing.db
connection = engine.connect()
s = select([func.TIMESTAMP("12/25/07").label("ts")])
result = connection.execute(s).fetchone()
self.assertEqual(result[0], datetime.datetime(2007, 12, 25, 0, 0))
class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
def setUpAll(self):
global ss_engine
ss_engine = engines.testing_engine(options={'server_side_cursors':True})
def tearDownAll(self):
ss_engine.dispose()
def test_uses_ss(self):
result = ss_engine.execute("select 1")
assert result.cursor.name
result = ss_engine.execute(text("select 1"))
assert result.cursor.name
result = ss_engine.execute(select([1]))
assert result.cursor.name
def test_roundtrip(self):
test_table = Table('test_table', MetaData(ss_engine),
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
test_table.create(checkfirst=True)
try:
test_table.insert().execute(data='data1')
nextid = ss_engine.execute(Sequence('test_table_id_seq'))
test_table.insert().execute(id=nextid, data='data2')
self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2')])
test_table.update().where(test_table.c.id==2).values(data=test_table.c.data + ' updated').execute()
self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')])
test_table.delete().execute()
self.assertEquals(test_table.count().scalar(), 0)
finally:
test_table.drop(checkfirst=True)
class SpecialTypesTest(TestBase, ComparesTables):
"""test DDL and reflection of PG-specific types """
__only_on__ = 'postgres'
__excluded_on__ = (('postgres', '<', (8, 3, 0)),)
def setUpAll(self):
global metadata, table
metadata = MetaData(testing.db)
table = Table('sometable', metadata,
Column('id', postgres.PGUuid, primary_key=True),
Column('flag', postgres.PGBit),
Column('addr', postgres.PGInet),
Column('addr2', postgres.PGMacAddr),
Column('addr3', postgres.PGCidr)
)
metadata.create_all()
def tearDownAll(self):
metadata.drop_all()
def test_reflection(self):
m = MetaData(testing.db)
t = Table('sometable', m, autoload=True)
self.assert_tables_equal(table, t)
class MatchTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'postgres'
__excluded_on__ = (('postgres', '<', (8, 3, 0)),)
def setUpAll(self):
global metadata, cattable, matchtable
metadata = MetaData(testing.db)
cattable = Table('cattable', metadata,
Column('id', Integer, primary_key=True),
Column('description', String(50)),
)
matchtable = Table('matchtable', metadata,
Column('id', Integer, primary_key=True),
Column('title', String(200)),
Column('category_id', Integer, ForeignKey('cattable.id')),
)
metadata.create_all()
cattable.insert().execute([
{'id': 1, 'description': 'Python'},
{'id': 2, 'description': 'Ruby'},
])
matchtable.insert().execute([
{'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
{'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
{'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
{'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
{'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
])
def tearDownAll(self):
metadata.drop_all()
def test_expression(self):
self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
def test_simple_match(self):
results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
self.assertEquals([2, 5], [r.id for r in results])
def test_simple_match_with_apostrophe(self):
results = matchtable.select().where(matchtable.c.title.match("Matz''s")).execute().fetchall()
self.assertEquals([3], [r.id for r in results])
def test_simple_derivative_match(self):
results = matchtable.select().where(matchtable.c.title.match('nutshells')).execute().fetchall()
self.assertEquals([5], [r.id for r in results])
def test_or_match(self):
results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshells'),
matchtable.c.title.match('rubies'))
).order_by(matchtable.c.id).execute().fetchall()
self.assertEquals([3, 5], [r.id for r in results1])
results2 = matchtable.select().where(matchtable.c.title.match('nutshells | rubies'),
).order_by(matchtable.c.id).execute().fetchall()
self.assertEquals([3, 5], [r.id for r in results2])
def test_and_match(self):
results1 = matchtable.select().where(and_(matchtable.c.title.match('python'),
matchtable.c.title.match('nutshells'))
).execute().fetchall()
self.assertEquals([5], [r.id for r in results1])
results2 = matchtable.select().where(matchtable.c.title.match('python & nutshells'),
).execute().fetchall()
self.assertEquals([5], [r.id for r in results2])
def test_match_across_joins(self):
results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id,
or_(cattable.c.description.match('Ruby'),
matchtable.c.title.match('nutshells')))
).order_by(matchtable.c.id).execute().fetchall()
self.assertEquals([1, 3, 5], [r.id for r in results])
if __name__ == "__main__":
testenv.main()