mirror of
https://github.com/sqlalchemy/sqlalchemy.git
synced 2026-05-22 00:22:01 -04:00
Always include a schema name in SQLite PRAGMA
Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
reflection features to detect for table existence, list of table columns,
and list of foreign keys, would default to any table in any attached
database, when no schema name was given and the table did not exist in the
base schema. The fix explicitly runs PRAGMA for the 'main' schema and then
the 'temp' schema if the 'main' returned no rows, to maintain the behavior
of tables + temp tables in the "no schema" namespace, attached tables only
in the "schema" namespace.
Fixes: #4793
Change-Id: I75bc03ef42581c46b98987510d2d2e701df07412
(cherry picked from commit e091775f1c)
This commit is contained in:
+13
@@ -0,0 +1,13 @@
|
||||
.. change:
|
||||
:tags: bug, sqlite
|
||||
:tickets: 4793
|
||||
|
||||
Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
|
||||
reflection features to detect for table existence, list of table columns,
|
||||
and list of foreign keys, would default to any table in any attached
|
||||
database, when no schema name was given and the table did not exist in the
|
||||
base schema. The fix explicitly runs PRAGMA for the 'main' schema and then
|
||||
the 'temp' schema if the 'main' returned no rows, to maintain the behavior
|
||||
of tables + temp tables in the "no schema" namespace, attached tables only
|
||||
in the "schema" namespace.
|
||||
|
||||
Vendored
+3
-1
@@ -210,7 +210,9 @@ the actual ``CREATE TABLE`` statement:
|
||||
|
||||
>>> Base.metadata.create_all(engine)
|
||||
SELECT ...
|
||||
PRAGMA table_info("users")
|
||||
PRAGMA main.table_info("users")
|
||||
()
|
||||
PRAGMA temp.table_info("users")
|
||||
()
|
||||
CREATE TABLE users (
|
||||
id INTEGER NOT NULL, name VARCHAR,
|
||||
|
||||
@@ -1999,17 +1999,26 @@ class SQLiteDialect(default.DefaultDialect):
|
||||
def _get_table_pragma(self, connection, pragma, table_name, schema=None):
|
||||
quote = self.identifier_preparer.quote_identifier
|
||||
if schema is not None:
|
||||
statement = "PRAGMA %s." % quote(schema)
|
||||
statements = ["PRAGMA %s." % quote(schema)]
|
||||
else:
|
||||
statement = "PRAGMA "
|
||||
# because PRAGMA looks in all attached databases if no schema
|
||||
# given, need to specify "main" schema, however since we want
|
||||
# 'temp' tables in the same namespace as 'main', need to run
|
||||
# the PRAGMA twice
|
||||
statements = ["PRAGMA main.", "PRAGMA temp."]
|
||||
|
||||
qtable = quote(table_name)
|
||||
statement = "%s%s(%s)" % (statement, pragma, qtable)
|
||||
cursor = connection.execute(statement)
|
||||
if not cursor._soft_closed:
|
||||
# work around SQLite issue whereby cursor.description
|
||||
# is blank when PRAGMA returns no rows:
|
||||
# http://www.sqlite.org/cvstrac/tktview?tn=1884
|
||||
result = cursor.fetchall()
|
||||
for statement in statements:
|
||||
statement = "%s%s(%s)" % (statement, pragma, qtable)
|
||||
cursor = connection.execute(statement)
|
||||
if not cursor._soft_closed:
|
||||
# work around SQLite issue whereby cursor.description
|
||||
# is blank when PRAGMA returns no rows:
|
||||
# http://www.sqlite.org/cvstrac/tktview?tn=1884
|
||||
result = cursor.fetchall()
|
||||
else:
|
||||
result = []
|
||||
if result:
|
||||
return result
|
||||
else:
|
||||
result = []
|
||||
return result
|
||||
return []
|
||||
|
||||
@@ -24,6 +24,8 @@ from ...engine.reflection import Inspector
|
||||
from ...schema import DDL
|
||||
from ...schema import Index
|
||||
from ...sql.elements import quoted_name
|
||||
from ...testing import is_false
|
||||
from ...testing import is_true
|
||||
|
||||
|
||||
metadata, users = None, None
|
||||
@@ -40,11 +42,39 @@ class HasTableTest(fixtures.TablesTest):
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
if testing.requires.schemas.enabled:
|
||||
Table(
|
||||
"test_table_s",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
schema=config.test_schema,
|
||||
)
|
||||
|
||||
def test_has_table(self):
|
||||
with config.db.begin() as conn:
|
||||
assert config.db.dialect.has_table(conn, "test_table")
|
||||
assert not config.db.dialect.has_table(conn, "nonexistent_table")
|
||||
is_true(config.db.dialect.has_table(conn, "test_table"))
|
||||
is_false(config.db.dialect.has_table(conn, "test_table_s"))
|
||||
is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_has_table_schema(self):
|
||||
with config.db.begin() as conn:
|
||||
is_false(
|
||||
config.db.dialect.has_table(
|
||||
conn, "test_table", schema=config.test_schema
|
||||
)
|
||||
)
|
||||
is_true(
|
||||
config.db.dialect.has_table(
|
||||
conn, "test_table_s", schema=config.test_schema
|
||||
)
|
||||
)
|
||||
is_false(
|
||||
config.db.dialect.has_table(
|
||||
conn, "nonexistent_table", schema=config.test_schema
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class ComponentReflectionTest(fixtures.TablesTest):
|
||||
|
||||
@@ -694,6 +694,9 @@ class AttachedDBTest(fixtures.TestBase):
|
||||
def _fixture(self):
|
||||
meta = self.metadata
|
||||
self.conn = testing.db.connect()
|
||||
Table("created", meta, Column("foo", Integer), Column("bar", String))
|
||||
Table("local_only", meta, Column("q", Integer), Column("p", Integer))
|
||||
|
||||
ct = Table(
|
||||
"created",
|
||||
meta,
|
||||
@@ -702,6 +705,14 @@ class AttachedDBTest(fixtures.TestBase):
|
||||
schema="test_schema",
|
||||
)
|
||||
|
||||
Table(
|
||||
"another_created",
|
||||
meta,
|
||||
Column("bat", Integer),
|
||||
Column("hoho", String),
|
||||
schema="test_schema",
|
||||
)
|
||||
|
||||
meta.create_all(self.conn)
|
||||
return ct
|
||||
|
||||
@@ -717,15 +728,59 @@ class AttachedDBTest(fixtures.TestBase):
|
||||
insp = inspect(self.conn)
|
||||
eq_(insp.get_table_names("test_schema"), [])
|
||||
|
||||
def test_column_names(self):
|
||||
self._fixture()
|
||||
insp = inspect(self.conn)
|
||||
eq_(
|
||||
[
|
||||
d["name"]
|
||||
for d in insp.get_columns("created", schema="test_schema")
|
||||
],
|
||||
["id", "name"],
|
||||
)
|
||||
eq_(
|
||||
[d["name"] for d in insp.get_columns("created", schema=None)],
|
||||
["foo", "bar"],
|
||||
)
|
||||
|
||||
eq_(
|
||||
[
|
||||
d["name"]
|
||||
for d in insp.get_columns("nonexistent", schema="test_schema")
|
||||
],
|
||||
[],
|
||||
)
|
||||
eq_(
|
||||
[
|
||||
d["name"]
|
||||
for d in insp.get_columns("another_created", schema=None)
|
||||
],
|
||||
[],
|
||||
)
|
||||
eq_(
|
||||
[
|
||||
d["name"]
|
||||
for d in insp.get_columns("local_only", schema="test_schema")
|
||||
],
|
||||
[],
|
||||
)
|
||||
eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
|
||||
|
||||
def test_table_names_present(self):
|
||||
self._fixture()
|
||||
insp = inspect(self.conn)
|
||||
eq_(insp.get_table_names("test_schema"), ["created"])
|
||||
eq_(
|
||||
set(insp.get_table_names("test_schema")),
|
||||
{"created", "another_created"},
|
||||
)
|
||||
|
||||
def test_table_names_system(self):
|
||||
self._fixture()
|
||||
insp = inspect(self.conn)
|
||||
eq_(insp.get_table_names("test_schema"), ["created"])
|
||||
eq_(
|
||||
set(insp.get_table_names("test_schema")),
|
||||
{"created", "another_created"},
|
||||
)
|
||||
|
||||
def test_schema_names(self):
|
||||
self._fixture()
|
||||
|
||||
Reference in New Issue
Block a user