diff --git a/doc/build/changelog/unreleased_13/4793.rst b/doc/build/changelog/unreleased_13/4793.rst new file mode 100644 index 0000000000..2227ba17fd --- /dev/null +++ b/doc/build/changelog/unreleased_13/4793.rst @@ -0,0 +1,13 @@ +.. change: + :tags: bug, sqlite + :tickets: 4793 + + Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that + reflection features to detect for table existence, list of table columns, + and list of foreign keys, would default to any table in any attached + database, when no schema name was given and the table did not exist in the + base schema. The fix explicitly runs PRAGMA for the 'main' schema and then + the 'temp' schema if the 'main' returned no rows, to maintain the behavior + of tables + temp tables in the "no schema" namespace, attached tables only + in the "schema" namespace. + diff --git a/doc/build/orm/tutorial.rst b/doc/build/orm/tutorial.rst index 283125c823..686b774b97 100644 --- a/doc/build/orm/tutorial.rst +++ b/doc/build/orm/tutorial.rst @@ -210,7 +210,9 @@ the actual ``CREATE TABLE`` statement: >>> Base.metadata.create_all(engine) SELECT ... - PRAGMA table_info("users") + PRAGMA main.table_info("users") + () + PRAGMA temp.table_info("users") () CREATE TABLE users ( id INTEGER NOT NULL, name VARCHAR, diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index f8bf2c6208..4ea194c750 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1999,17 +1999,26 @@ class SQLiteDialect(default.DefaultDialect): def _get_table_pragma(self, connection, pragma, table_name, schema=None): quote = self.identifier_preparer.quote_identifier if schema is not None: - statement = "PRAGMA %s." % quote(schema) + statements = ["PRAGMA %s." % quote(schema)] else: - statement = "PRAGMA " + # because PRAGMA looks in all attached databases if no schema + # given, need to specify "main" schema, however since we want + # 'temp' tables in the same namespace as 'main', need to run + # the PRAGMA twice + statements = ["PRAGMA main.", "PRAGMA temp."] + qtable = quote(table_name) - statement = "%s%s(%s)" % (statement, pragma, qtable) - cursor = connection.execute(statement) - if not cursor._soft_closed: - # work around SQLite issue whereby cursor.description - # is blank when PRAGMA returns no rows: - # http://www.sqlite.org/cvstrac/tktview?tn=1884 - result = cursor.fetchall() + for statement in statements: + statement = "%s%s(%s)" % (statement, pragma, qtable) + cursor = connection.execute(statement) + if not cursor._soft_closed: + # work around SQLite issue whereby cursor.description + # is blank when PRAGMA returns no rows: + # http://www.sqlite.org/cvstrac/tktview?tn=1884 + result = cursor.fetchall() + else: + result = [] + if result: + return result else: - result = [] - return result + return [] diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index e529a0753c..a1297a5804 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -24,6 +24,8 @@ from ...engine.reflection import Inspector from ...schema import DDL from ...schema import Index from ...sql.elements import quoted_name +from ...testing import is_false +from ...testing import is_true metadata, users = None, None @@ -40,11 +42,39 @@ class HasTableTest(fixtures.TablesTest): Column("id", Integer, primary_key=True), Column("data", String(50)), ) + if testing.requires.schemas.enabled: + Table( + "test_table_s", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(50)), + schema=config.test_schema, + ) def test_has_table(self): with config.db.begin() as conn: - assert config.db.dialect.has_table(conn, "test_table") - assert not config.db.dialect.has_table(conn, "nonexistent_table") + is_true(config.db.dialect.has_table(conn, "test_table")) + is_false(config.db.dialect.has_table(conn, "test_table_s")) + is_false(config.db.dialect.has_table(conn, "nonexistent_table")) + + @testing.requires.schemas + def test_has_table_schema(self): + with config.db.begin() as conn: + is_false( + config.db.dialect.has_table( + conn, "test_table", schema=config.test_schema + ) + ) + is_true( + config.db.dialect.has_table( + conn, "test_table_s", schema=config.test_schema + ) + ) + is_false( + config.db.dialect.has_table( + conn, "nonexistent_table", schema=config.test_schema + ) + ) class ComponentReflectionTest(fixtures.TablesTest): diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index c9332b7fac..bd24878ae9 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -694,6 +694,9 @@ class AttachedDBTest(fixtures.TestBase): def _fixture(self): meta = self.metadata self.conn = testing.db.connect() + Table("created", meta, Column("foo", Integer), Column("bar", String)) + Table("local_only", meta, Column("q", Integer), Column("p", Integer)) + ct = Table( "created", meta, @@ -702,6 +705,14 @@ class AttachedDBTest(fixtures.TestBase): schema="test_schema", ) + Table( + "another_created", + meta, + Column("bat", Integer), + Column("hoho", String), + schema="test_schema", + ) + meta.create_all(self.conn) return ct @@ -717,15 +728,59 @@ class AttachedDBTest(fixtures.TestBase): insp = inspect(self.conn) eq_(insp.get_table_names("test_schema"), []) + def test_column_names(self): + self._fixture() + insp = inspect(self.conn) + eq_( + [ + d["name"] + for d in insp.get_columns("created", schema="test_schema") + ], + ["id", "name"], + ) + eq_( + [d["name"] for d in insp.get_columns("created", schema=None)], + ["foo", "bar"], + ) + + eq_( + [ + d["name"] + for d in insp.get_columns("nonexistent", schema="test_schema") + ], + [], + ) + eq_( + [ + d["name"] + for d in insp.get_columns("another_created", schema=None) + ], + [], + ) + eq_( + [ + d["name"] + for d in insp.get_columns("local_only", schema="test_schema") + ], + [], + ) + eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"]) + def test_table_names_present(self): self._fixture() insp = inspect(self.conn) - eq_(insp.get_table_names("test_schema"), ["created"]) + eq_( + set(insp.get_table_names("test_schema")), + {"created", "another_created"}, + ) def test_table_names_system(self): self._fixture() insp = inspect(self.conn) - eq_(insp.get_table_names("test_schema"), ["created"]) + eq_( + set(insp.get_table_names("test_schema")), + {"created", "another_created"}, + ) def test_schema_names(self): self._fixture()