Fix a couple unquoted string interpolations in migrate.

This commit is contained in:
Charles Leifer
2026-03-09 19:00:48 -05:00
parent 56173ffddb
commit 5b1ceb0728
2 changed files with 29 additions and 22 deletions
+28 -18
View File
@@ -176,7 +176,7 @@ def make_index_name(table_name, columns):
index_name = '_'.join((table_name,) + tuple(columns))
if len(index_name) > 64:
index_hash = hashlib.md5(index_name.encode('utf-8')).hexdigest()
index_name = '%s_%s' % (index_name[:56], index_hash[:7])
index_name = '%s_%s' % (index_name[:51], index_hash[:12])
return index_name
@@ -462,20 +462,21 @@ class PostgresqlMigrator(SchemaMigrator):
SELECT pg_attribute.attname
FROM pg_index, pg_class, pg_attribute
WHERE
pg_class.oid = '%s'::regclass AND
pg_class.oid = %s::regclass AND
indrelid = pg_class.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
indisprimary;
"""
cursor = self.database.execute_sql(query % tbl)
cursor = self.database.execute_sql(query, (tbl,))
return [row[0] for row in cursor.fetchall()]
@operation
def set_search_path(self, schema_name):
return (self
.make_context()
.literal('SET search_path TO %s' % schema_name))
.literal('SET search_path TO ')
.sql(Entity(schema_name)))
@operation
def rename_table(self, old_name, new_name):
@@ -572,7 +573,8 @@ class MySQLMigrator(SchemaMigrator):
.sql(Entity(new_name)))
def _get_column_definition(self, table, column_name):
cursor = self.database.execute_sql('DESCRIBE `%s`;' % table)
table_safe = table.replace('`', '``')
cursor = self.database.execute_sql('DESCRIBE `%s`;' % table_safe)
rows = cursor.fetchall()
for row in rows:
column = MySQLColumn(*row)
@@ -594,7 +596,7 @@ class MySQLMigrator(SchemaMigrator):
if not result:
raise AttributeError(
'Unable to find foreign key constraint for '
'"%s" on table "%s".' % (table, column_name))
'"%s" on table "%s".' % (column_name, table))
return result[0]
@operation
@@ -633,13 +635,13 @@ class MySQLMigrator(SchemaMigrator):
@operation
def drop_not_null(self, table, column):
column = self._get_column_definition(table, column)
if column.is_pk:
column_def = self._get_column_definition(table, column)
if column_def.is_pk:
raise ValueError('Primary keys can not be null')
return (self
._alter_table(self.make_context(), table)
.literal(' MODIFY ')
.sql(column.sql(is_null=True)))
.sql(column_def.sql(is_null=True)))
@operation
def rename_column(self, table, old_name, new_name):
@@ -703,7 +705,8 @@ class SqliteMigrator(SchemaMigrator):
fk_re = re.compile(r'FOREIGN KEY\s+\("?([\w]+)"?\)\s+', re.I)
def _get_column_names(self, table):
res = self.database.execute_sql('select * from "%s" limit 1' % table)
quoted = table.replace('"', '""')
res = self.database.execute_sql('select * from "%s" limit 1' % quoted)
return [item[0] for item in res.description]
def _get_create_table(self, table):
@@ -727,9 +730,6 @@ class SqliteMigrator(SchemaMigrator):
# Get the indexes and SQL to re-create indexes.
indexes = self.database.get_indexes(table)
# Find any foreign keys we may need to remove.
self.database.get_foreign_keys(table)
# Make sure the create_table does not contain any newlines or tabs,
# allowing the regex to work correctly.
create_table = re.sub(r'\s+', ' ', create_table)
@@ -789,9 +789,9 @@ class SqliteMigrator(SchemaMigrator):
# Update the name of the new CREATE TABLE query.
temp_table = table + '__tmp__'
rgx = re.compile('("?)%s("?)' % table, re.I)
rgx = re.compile(r'("?)%s("?)' % re.escape(table), re.I)
create = rgx.sub(
'\\1%s\\2' % temp_table,
r'\1%s\2' % temp_table,
raw_create)
# Create the new table.
@@ -854,7 +854,8 @@ class SqliteMigrator(SchemaMigrator):
# Strip out any junk after the column name.
clean = []
for column in columns:
if re.match(r'%s(?:[\'"`\]]?\s|$)' % column_to_update, column):
if re.match(r'%s(?:[\'"`\]]?\s|$)' % re.escape(column_to_update),
column):
column = new_column + column[len(column_to_update):]
clean.append(column)
@@ -880,6 +881,14 @@ class SqliteMigrator(SchemaMigrator):
.literal(' TO ')
.sql(Entity(new_name)))
def _rename(column_name, column_def):
# Only replace the leading column name identifier to avoid
# corrupting type names, defaults, or constraints that happen
# to contain the column name as a substring.
return re.sub(
r'(["\'`]?)%s\1' % re.escape(column_name),
r'\g<1>%s\1' % new_name,
column_def,
count=1)
return column_def.replace(column_name, new_name)
return self._update_column(table, old_name, _rename)
@@ -903,7 +912,7 @@ class SqliteMigrator(SchemaMigrator):
default = default()
if (isinstance(default, str) and not default.endswith((')', "'"))
and not default.isdigit()):
default = "'%s'" % default
default = "'%s'" % default.replace("'", "''")
def _add_default(column_name, column_def):
# Try to handle SQL functions and string literals, otherwise quote.
return column_def + ' DEFAULT %s' % default
@@ -912,7 +921,8 @@ class SqliteMigrator(SchemaMigrator):
@operation
def drop_column_default(self, table, column):
def _drop_default(column_name, column_def):
col = re.sub(r'DEFAULT\s+[\w"\'\(\)]+(\s|$)', '', column_def, flags=re.I)
col = re.sub(r'DEFAULT\s+[\w"\'\(\)]+(\s|$)', '', column_def,
flags=re.I)
return col.strip()
return self._update_column(table, column, _drop_default)
+1 -4
View File
@@ -809,9 +809,6 @@ class TestSchemaMigration(ModelTestCase):
('PRAGMA "main".index_info("index_model_first_name_last_name")',
None),
# Get foreign keys.
('PRAGMA "main".foreign_key_list("index_model")', None),
# Drop any temporary table, if it exists.
('DROP TABLE IF EXISTS "index_model__tmp__"', []),
@@ -866,7 +863,7 @@ class TestSchemaMigration(ModelTestCase):
('PRAGMA "main".index_list("page")', None),
('PRAGMA "main".index_info("page_name")', None),
('PRAGMA "main".index_info("page_user_id")', None),
('PRAGMA "main".foreign_key_list("page")', None),
#('PRAGMA "main".foreign_key_list("page")', None),
# Clear out a temp table and create it w/o the user_id FK.
('DROP TABLE IF EXISTS "page__tmp__"', []),