From af241dfa7d9c201f2bf341976cc1740a248df37c Mon Sep 17 00:00:00 2001 From: Baptiste Mispelon Date: Mon, 28 Apr 2025 20:14:09 +0200 Subject: [PATCH] [5.2.x] Used addCleanup() instead of try-finally blocks in inspectdb tests. Backport of 2722cb61ccae84f593e6d2c28814e3c628743994 from main. --- tests/inspectdb/tests.py | 305 ++++++++++++++++++--------------------- 1 file changed, 143 insertions(+), 162 deletions(-) diff --git a/tests/inspectdb/tests.py b/tests/inspectdb/tests.py index 131bd45ce8..f7a6958b75 100644 --- a/tests/inspectdb/tests.py +++ b/tests/inspectdb/tests.py @@ -29,6 +29,17 @@ def special_table_only(table_name): return table_name.startswith("inspectdb_special") +def cursor_execute(*queries): + """ + Execute SQL queries using a new cursor on the default database connection. + """ + results = [] + with connection.cursor() as cursor: + for q in queries: + results.append(cursor.execute(q)) + return results + + class InspectDBTestCase(TestCase): unique_re = re.compile(r".*unique_together = \((.+),\).*") @@ -415,29 +426,21 @@ class InspectDBTestCase(TestCase): @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL") def test_unsupported_unique_together(self): """Unsupported index types (COALESCE here) are skipped.""" - with connection.cursor() as c: - c.execute( - "CREATE UNIQUE INDEX Findex ON %s " - "(id, people_unique_id, COALESCE(message_id, -1))" - % PeopleMoreData._meta.db_table - ) - try: - out = StringIO() - call_command( - "inspectdb", - table_name_filter=lambda tn: tn.startswith( - PeopleMoreData._meta.db_table - ), - stdout=out, - ) - output = out.getvalue() - self.assertIn("# A unique constraint could not be introspected.", output) - self.assertEqual( - self.unique_re.findall(output), ["('id', 'people_unique')"] - ) - finally: - with connection.cursor() as c: - c.execute("DROP INDEX Findex") + cursor_execute( + "CREATE UNIQUE INDEX Findex ON %s " + "(id, people_unique_id, COALESCE(message_id, -1))" + % PeopleMoreData._meta.db_table + ) + self.addCleanup(cursor_execute, "DROP INDEX Findex") + out = StringIO() + call_command( + "inspectdb", + table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table), + stdout=out, + ) + output = out.getvalue() + self.assertIn("# A unique constraint could not be introspected.", output) + self.assertEqual(self.unique_re.findall(output), ["('id', 'people_unique')"]) @skipUnless( connection.vendor == "sqlite", @@ -492,178 +495,156 @@ class InspectDBTransactionalTests(TransactionTestCase): def test_include_views(self): """inspectdb --include-views creates models for database views.""" - with connection.cursor() as cursor: - cursor.execute( - "CREATE VIEW inspectdb_people_view AS " - "SELECT id, name FROM inspectdb_people" - ) + cursor_execute( + "CREATE VIEW inspectdb_people_view AS " + "SELECT id, name FROM inspectdb_people" + ) + self.addCleanup(cursor_execute, "DROP VIEW inspectdb_people_view") out = StringIO() view_model = "class InspectdbPeopleView(models.Model):" view_managed = "managed = False # Created from a view." - try: - call_command( - "inspectdb", - table_name_filter=inspectdb_views_only, - stdout=out, - ) - no_views_output = out.getvalue() - self.assertNotIn(view_model, no_views_output) - self.assertNotIn(view_managed, no_views_output) - call_command( - "inspectdb", - table_name_filter=inspectdb_views_only, - include_views=True, - stdout=out, - ) - with_views_output = out.getvalue() - self.assertIn(view_model, with_views_output) - self.assertIn(view_managed, with_views_output) - finally: - with connection.cursor() as cursor: - cursor.execute("DROP VIEW inspectdb_people_view") + call_command( + "inspectdb", + table_name_filter=inspectdb_views_only, + stdout=out, + ) + no_views_output = out.getvalue() + self.assertNotIn(view_model, no_views_output) + self.assertNotIn(view_managed, no_views_output) + call_command( + "inspectdb", + table_name_filter=inspectdb_views_only, + include_views=True, + stdout=out, + ) + with_views_output = out.getvalue() + self.assertIn(view_model, with_views_output) + self.assertIn(view_managed, with_views_output) @skipUnlessDBFeature("can_introspect_materialized_views") def test_include_materialized_views(self): """inspectdb --include-views creates models for materialized views.""" - with connection.cursor() as cursor: - cursor.execute( - "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS " - "SELECT id, name FROM inspectdb_people" - ) + cursor_execute( + "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS " + "SELECT id, name FROM inspectdb_people" + ) + self.addCleanup( + cursor_execute, "DROP MATERIALIZED VIEW inspectdb_people_materialized" + ) out = StringIO() view_model = "class InspectdbPeopleMaterialized(models.Model):" view_managed = "managed = False # Created from a view." - try: - call_command( - "inspectdb", - table_name_filter=inspectdb_views_only, - stdout=out, - ) - no_views_output = out.getvalue() - self.assertNotIn(view_model, no_views_output) - self.assertNotIn(view_managed, no_views_output) - call_command( - "inspectdb", - table_name_filter=inspectdb_views_only, - include_views=True, - stdout=out, - ) - with_views_output = out.getvalue() - self.assertIn(view_model, with_views_output) - self.assertIn(view_managed, with_views_output) - finally: - with connection.cursor() as cursor: - cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized") + call_command( + "inspectdb", + table_name_filter=inspectdb_views_only, + stdout=out, + ) + no_views_output = out.getvalue() + self.assertNotIn(view_model, no_views_output) + self.assertNotIn(view_managed, no_views_output) + call_command( + "inspectdb", + table_name_filter=inspectdb_views_only, + include_views=True, + stdout=out, + ) + with_views_output = out.getvalue() + self.assertIn(view_model, with_views_output) + self.assertIn(view_managed, with_views_output) @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL") def test_include_partitions(self): """inspectdb --include-partitions creates models for partitions.""" - with connection.cursor() as cursor: - cursor.execute( - """\ - CREATE TABLE inspectdb_partition_parent (name text not null) - PARTITION BY LIST (left(upper(name), 1)) + cursor_execute( """ - ) - cursor.execute( - """\ - CREATE TABLE inspectdb_partition_child - PARTITION OF inspectdb_partition_parent - FOR VALUES IN ('A', 'B', 'C') + CREATE TABLE inspectdb_partition_parent (name text not null) + PARTITION BY LIST (left(upper(name), 1)) + """, """ - ) + CREATE TABLE inspectdb_partition_child + PARTITION OF inspectdb_partition_parent + FOR VALUES IN ('A', 'B', 'C') + """, + ) + self.addCleanup( + cursor_execute, + "DROP TABLE IF EXISTS inspectdb_partition_child", + "DROP TABLE IF EXISTS inspectdb_partition_parent", + ) out = StringIO() partition_model_parent = "class InspectdbPartitionParent(models.Model):" partition_model_child = "class InspectdbPartitionChild(models.Model):" partition_managed = "managed = False # Created from a partition." - try: - call_command( - "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out - ) - no_partitions_output = out.getvalue() - self.assertIn(partition_model_parent, no_partitions_output) - self.assertNotIn(partition_model_child, no_partitions_output) - self.assertNotIn(partition_managed, no_partitions_output) - call_command( - "inspectdb", - table_name_filter=inspectdb_tables_only, - include_partitions=True, - stdout=out, - ) - with_partitions_output = out.getvalue() - self.assertIn(partition_model_parent, with_partitions_output) - self.assertIn(partition_model_child, with_partitions_output) - self.assertIn(partition_managed, with_partitions_output) - finally: - with connection.cursor() as cursor: - cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child") - cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent") + call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out) + no_partitions_output = out.getvalue() + self.assertIn(partition_model_parent, no_partitions_output) + self.assertNotIn(partition_model_child, no_partitions_output) + self.assertNotIn(partition_managed, no_partitions_output) + call_command( + "inspectdb", + table_name_filter=inspectdb_tables_only, + include_partitions=True, + stdout=out, + ) + with_partitions_output = out.getvalue() + self.assertIn(partition_model_parent, with_partitions_output) + self.assertIn(partition_model_child, with_partitions_output) + self.assertIn(partition_managed, with_partitions_output) @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL") def test_foreign_data_wrapper(self): - with connection.cursor() as cursor: - cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw") - cursor.execute( - "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw" - ) - cursor.execute( - """ - CREATE FOREIGN TABLE inspectdb_iris_foreign_table ( - petal_length real, - petal_width real, - sepal_length real, - sepal_width real - ) SERVER inspectdb_server OPTIONS ( - program 'echo 1,2,3,4', - format 'csv' - ) - """ + cursor_execute( + "CREATE EXTENSION IF NOT EXISTS file_fdw", + "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw", + """ + CREATE FOREIGN TABLE inspectdb_iris_foreign_table ( + petal_length real, + petal_width real, + sepal_length real, + sepal_width real + ) SERVER inspectdb_server OPTIONS ( + program 'echo 1,2,3,4', + format 'csv' ) + """, + ) + self.addCleanup( + cursor_execute, + "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table", + "DROP SERVER IF EXISTS inspectdb_server", + "DROP EXTENSION IF EXISTS file_fdw", + ) out = StringIO() foreign_table_model = "class InspectdbIrisForeignTable(models.Model):" foreign_table_managed = "managed = False" - try: - call_command( - "inspectdb", - table_name_filter=inspectdb_tables_only, - stdout=out, - ) - output = out.getvalue() - self.assertIn(foreign_table_model, output) - self.assertIn(foreign_table_managed, output) - finally: - with connection.cursor() as cursor: - cursor.execute( - "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table" - ) - cursor.execute("DROP SERVER IF EXISTS inspectdb_server") - cursor.execute("DROP EXTENSION IF EXISTS file_fdw") + call_command( + "inspectdb", + table_name_filter=inspectdb_tables_only, + stdout=out, + ) + output = out.getvalue() + self.assertIn(foreign_table_model, output) + self.assertIn(foreign_table_managed, output) @skipUnlessDBFeature("create_test_table_with_composite_primary_key") def test_composite_primary_key(self): table_name = "test_table_composite_pk" - with connection.cursor() as cursor: - cursor.execute( - connection.features.create_test_table_with_composite_primary_key - ) + cursor_execute(connection.features.create_test_table_with_composite_primary_key) + self.addCleanup(cursor_execute, "DROP TABLE %s" % table_name) out = StringIO() if connection.vendor == "sqlite": field_type = connection.features.introspected_field_types["AutoField"] else: field_type = connection.features.introspected_field_types["IntegerField"] - try: - call_command("inspectdb", table_name, stdout=out) - output = out.getvalue() - self.assertIn( - "pk = models.CompositePrimaryKey('column_1', 'column_2')", - output, - ) - self.assertIn(f"column_1 = models.{field_type}()", output) - self.assertIn( - "column_2 = models.%s()" - % connection.features.introspected_field_types["IntegerField"], - output, - ) - finally: - with connection.cursor() as cursor: - cursor.execute("DROP TABLE %s" % table_name) + call_command("inspectdb", table_name, stdout=out) + output = out.getvalue() + self.assertIn( + "pk = models.CompositePrimaryKey('column_1', 'column_2')", + output, + ) + self.assertIn(f"column_1 = models.{field_type}()", output) + self.assertIn( + "column_2 = models.%s()" + % connection.features.introspected_field_types["IntegerField"], + output, + )