1
0
mirror of https://github.com/django/django.git synced 2025-05-15 11:26:31 +00:00

[5.2.x] Used addCleanup() instead of try-finally blocks in inspectdb tests.

Backport of 2722cb61ccae84f593e6d2c28814e3c628743994 from main.
This commit is contained in:
Baptiste Mispelon 2025-04-28 20:14:09 +02:00 committed by Natalia
parent 231eb7acc3
commit af241dfa7d

View File

@ -29,6 +29,17 @@ def special_table_only(table_name):
return table_name.startswith("inspectdb_special")
def cursor_execute(*queries):
"""
Execute SQL queries using a new cursor on the default database connection.
"""
results = []
with connection.cursor() as cursor:
for q in queries:
results.append(cursor.execute(q))
return results
class InspectDBTestCase(TestCase):
unique_re = re.compile(r".*unique_together = \((.+),\).*")
@ -415,29 +426,21 @@ class InspectDBTestCase(TestCase):
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_unsupported_unique_together(self):
"""Unsupported index types (COALESCE here) are skipped."""
with connection.cursor() as c:
c.execute(
"CREATE UNIQUE INDEX Findex ON %s "
"(id, people_unique_id, COALESCE(message_id, -1))"
% PeopleMoreData._meta.db_table
)
try:
out = StringIO()
call_command(
"inspectdb",
table_name_filter=lambda tn: tn.startswith(
PeopleMoreData._meta.db_table
),
stdout=out,
)
output = out.getvalue()
self.assertIn("# A unique constraint could not be introspected.", output)
self.assertEqual(
self.unique_re.findall(output), ["('id', 'people_unique')"]
)
finally:
with connection.cursor() as c:
c.execute("DROP INDEX Findex")
cursor_execute(
"CREATE UNIQUE INDEX Findex ON %s "
"(id, people_unique_id, COALESCE(message_id, -1))"
% PeopleMoreData._meta.db_table
)
self.addCleanup(cursor_execute, "DROP INDEX Findex")
out = StringIO()
call_command(
"inspectdb",
table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
stdout=out,
)
output = out.getvalue()
self.assertIn("# A unique constraint could not be introspected.", output)
self.assertEqual(self.unique_re.findall(output), ["('id', 'people_unique')"])
@skipUnless(
connection.vendor == "sqlite",
@ -492,178 +495,156 @@ class InspectDBTransactionalTests(TransactionTestCase):
def test_include_views(self):
"""inspectdb --include-views creates models for database views."""
with connection.cursor() as cursor:
cursor.execute(
"CREATE VIEW inspectdb_people_view AS "
"SELECT id, name FROM inspectdb_people"
)
cursor_execute(
"CREATE VIEW inspectdb_people_view AS "
"SELECT id, name FROM inspectdb_people"
)
self.addCleanup(cursor_execute, "DROP VIEW inspectdb_people_view")
out = StringIO()
view_model = "class InspectdbPeopleView(models.Model):"
view_managed = "managed = False # Created from a view."
try:
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
cursor.execute("DROP VIEW inspectdb_people_view")
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
@skipUnlessDBFeature("can_introspect_materialized_views")
def test_include_materialized_views(self):
"""inspectdb --include-views creates models for materialized views."""
with connection.cursor() as cursor:
cursor.execute(
"CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
"SELECT id, name FROM inspectdb_people"
)
cursor_execute(
"CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
"SELECT id, name FROM inspectdb_people"
)
self.addCleanup(
cursor_execute, "DROP MATERIALIZED VIEW inspectdb_people_materialized"
)
out = StringIO()
view_model = "class InspectdbPeopleMaterialized(models.Model):"
view_managed = "managed = False # Created from a view."
try:
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_include_partitions(self):
"""inspectdb --include-partitions creates models for partitions."""
with connection.cursor() as cursor:
cursor.execute(
"""\
CREATE TABLE inspectdb_partition_parent (name text not null)
PARTITION BY LIST (left(upper(name), 1))
cursor_execute(
"""
)
cursor.execute(
"""\
CREATE TABLE inspectdb_partition_child
PARTITION OF inspectdb_partition_parent
FOR VALUES IN ('A', 'B', 'C')
CREATE TABLE inspectdb_partition_parent (name text not null)
PARTITION BY LIST (left(upper(name), 1))
""",
"""
)
CREATE TABLE inspectdb_partition_child
PARTITION OF inspectdb_partition_parent
FOR VALUES IN ('A', 'B', 'C')
""",
)
self.addCleanup(
cursor_execute,
"DROP TABLE IF EXISTS inspectdb_partition_child",
"DROP TABLE IF EXISTS inspectdb_partition_parent",
)
out = StringIO()
partition_model_parent = "class InspectdbPartitionParent(models.Model):"
partition_model_child = "class InspectdbPartitionChild(models.Model):"
partition_managed = "managed = False # Created from a partition."
try:
call_command(
"inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
)
no_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, no_partitions_output)
self.assertNotIn(partition_model_child, no_partitions_output)
self.assertNotIn(partition_managed, no_partitions_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_tables_only,
include_partitions=True,
stdout=out,
)
with_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, with_partitions_output)
self.assertIn(partition_model_child, with_partitions_output)
self.assertIn(partition_managed, with_partitions_output)
finally:
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
no_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, no_partitions_output)
self.assertNotIn(partition_model_child, no_partitions_output)
self.assertNotIn(partition_managed, no_partitions_output)
call_command(
"inspectdb",
table_name_filter=inspectdb_tables_only,
include_partitions=True,
stdout=out,
)
with_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, with_partitions_output)
self.assertIn(partition_model_child, with_partitions_output)
self.assertIn(partition_managed, with_partitions_output)
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_foreign_data_wrapper(self):
with connection.cursor() as cursor:
cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
cursor.execute(
"CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
)
cursor.execute(
"""
CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
petal_length real,
petal_width real,
sepal_length real,
sepal_width real
) SERVER inspectdb_server OPTIONS (
program 'echo 1,2,3,4',
format 'csv'
)
"""
cursor_execute(
"CREATE EXTENSION IF NOT EXISTS file_fdw",
"CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw",
"""
CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
petal_length real,
petal_width real,
sepal_length real,
sepal_width real
) SERVER inspectdb_server OPTIONS (
program 'echo 1,2,3,4',
format 'csv'
)
""",
)
self.addCleanup(
cursor_execute,
"DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table",
"DROP SERVER IF EXISTS inspectdb_server",
"DROP EXTENSION IF EXISTS file_fdw",
)
out = StringIO()
foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
foreign_table_managed = "managed = False"
try:
call_command(
"inspectdb",
table_name_filter=inspectdb_tables_only,
stdout=out,
)
output = out.getvalue()
self.assertIn(foreign_table_model, output)
self.assertIn(foreign_table_managed, output)
finally:
with connection.cursor() as cursor:
cursor.execute(
"DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
)
cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
call_command(
"inspectdb",
table_name_filter=inspectdb_tables_only,
stdout=out,
)
output = out.getvalue()
self.assertIn(foreign_table_model, output)
self.assertIn(foreign_table_managed, output)
@skipUnlessDBFeature("create_test_table_with_composite_primary_key")
def test_composite_primary_key(self):
table_name = "test_table_composite_pk"
with connection.cursor() as cursor:
cursor.execute(
connection.features.create_test_table_with_composite_primary_key
)
cursor_execute(connection.features.create_test_table_with_composite_primary_key)
self.addCleanup(cursor_execute, "DROP TABLE %s" % table_name)
out = StringIO()
if connection.vendor == "sqlite":
field_type = connection.features.introspected_field_types["AutoField"]
else:
field_type = connection.features.introspected_field_types["IntegerField"]
try:
call_command("inspectdb", table_name, stdout=out)
output = out.getvalue()
self.assertIn(
"pk = models.CompositePrimaryKey('column_1', 'column_2')",
output,
)
self.assertIn(f"column_1 = models.{field_type}()", output)
self.assertIn(
"column_2 = models.%s()"
% connection.features.introspected_field_types["IntegerField"],
output,
)
finally:
with connection.cursor() as cursor:
cursor.execute("DROP TABLE %s" % table_name)
call_command("inspectdb", table_name, stdout=out)
output = out.getvalue()
self.assertIn(
"pk = models.CompositePrimaryKey('column_1', 'column_2')",
output,
)
self.assertIn(f"column_1 = models.{field_type}()", output)
self.assertIn(
"column_2 = models.%s()"
% connection.features.introspected_field_types["IntegerField"],
output,
)