mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	Refs #15802 -- Reverted #7c657b24 as BaseDatabaseWrapper.close() now
has a proper "finally" clause that may need to preserve self.connection.
Backport of 25860096 from master.
		
	
		
			
				
	
	
		
			738 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			738 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import absolute_import
 | |
| 
 | |
| import sys
 | |
| try:
 | |
|     import threading
 | |
| except ImportError:
 | |
|     threading = None
 | |
| import time
 | |
| 
 | |
| from django.db import (connection, transaction,
 | |
|     DatabaseError, Error, IntegrityError, OperationalError)
 | |
| from django.test import TransactionTestCase, skipIfDBFeature, skipUnlessDBFeature
 | |
| from django.test.utils import IgnorePendingDeprecationWarningsMixin
 | |
| from django.utils import six
 | |
| from django.utils.unittest import skipIf, skipUnless
 | |
| 
 | |
| from .models import Reporter
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.features.uses_savepoints,
 | |
|         "'atomic' requires transactions and savepoints.")
 | |
| class AtomicTests(TransactionTestCase):
 | |
|     """
 | |
|     Tests for the atomic decorator and context manager.
 | |
| 
 | |
|     The tests make assertions on internal attributes because there isn't a
 | |
|     robust way to ask the database for its current transaction state.
 | |
| 
 | |
|     Since the decorator syntax is converted into a context manager (see the
 | |
|     implementation), there are only a few basic tests with the decorator
 | |
|     syntax and the bulk of the tests use the context manager syntax.
 | |
|     """
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def test_decorator_syntax_commit(self):
 | |
|         @transaction.atomic
 | |
|         def make_reporter():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|         make_reporter()
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_decorator_syntax_rollback(self):
 | |
|         @transaction.atomic
 | |
|         def make_reporter():
 | |
|             Reporter.objects.create(first_name="Haddock")
 | |
|             raise Exception("Oops, that's his last name")
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             make_reporter()
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_alternate_decorator_syntax_commit(self):
 | |
|         @transaction.atomic()
 | |
|         def make_reporter():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|         make_reporter()
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_alternate_decorator_syntax_rollback(self):
 | |
|         @transaction.atomic()
 | |
|         def make_reporter():
 | |
|             Reporter.objects.create(first_name="Haddock")
 | |
|             raise Exception("Oops, that's his last name")
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             make_reporter()
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_commit(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_rollback(self):
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(first_name="Haddock")
 | |
|                 raise Exception("Oops, that's his last name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_nested_commit_commit(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(),
 | |
|                 ['<Reporter: Archibald Haddock>', '<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_nested_commit_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                 with transaction.atomic():
 | |
|                     Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_nested_rollback_commit(self):
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with transaction.atomic():
 | |
|                     Reporter.objects.create(last_name="Haddock")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_nested_rollback_rollback(self):
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                     with transaction.atomic():
 | |
|                         Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_merged_commit_commit(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with transaction.atomic(savepoint=False):
 | |
|                 Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(),
 | |
|                 ['<Reporter: Archibald Haddock>', '<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_merged_commit_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                 with transaction.atomic(savepoint=False):
 | |
|                     Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|         # Writes in the outer block are rolled back too.
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_merged_rollback_commit(self):
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with transaction.atomic(savepoint=False):
 | |
|                     Reporter.objects.create(last_name="Haddock")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_merged_rollback_rollback(self):
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                     with transaction.atomic(savepoint=False):
 | |
|                         Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_reuse_commit_commit(self):
 | |
|         atomic = transaction.atomic()
 | |
|         with atomic:
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with atomic:
 | |
|                 Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(),
 | |
|                 ['<Reporter: Archibald Haddock>', '<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_reuse_commit_rollback(self):
 | |
|         atomic = transaction.atomic()
 | |
|         with atomic:
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                 with atomic:
 | |
|                     Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
|     def test_reuse_rollback_commit(self):
 | |
|         atomic = transaction.atomic()
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with atomic:
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with atomic:
 | |
|                     Reporter.objects.create(last_name="Haddock")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_reuse_rollback_rollback(self):
 | |
|         atomic = transaction.atomic()
 | |
|         with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|             with atomic:
 | |
|                 Reporter.objects.create(last_name="Tintin")
 | |
|                 with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                     with atomic:
 | |
|                         Reporter.objects.create(first_name="Haddock")
 | |
|                     raise Exception("Oops, that's his last name")
 | |
|                 raise Exception("Oops, that's his first name")
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_force_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             # atomic block shouldn't rollback, but force it.
 | |
|             self.assertFalse(transaction.get_rollback())
 | |
|             transaction.set_rollback(True)
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_prevent_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             sid = transaction.savepoint()
 | |
|             # trigger a database error inside an inner atomic without savepoint
 | |
|             with self.assertRaises(DatabaseError):
 | |
|                 with transaction.atomic(savepoint=False):
 | |
|                     connection.cursor().execute(
 | |
|                             "SELECT no_such_col FROM transactions_reporter")
 | |
|             # prevent atomic from rolling back since we're recovering manually
 | |
|             self.assertTrue(transaction.get_rollback())
 | |
|             transaction.set_rollback(False)
 | |
|             transaction.savepoint_rollback(sid)
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
| 
 | |
| class AtomicInsideTransactionTests(AtomicTests):
 | |
|     """All basic tests for atomic should also pass within an existing transaction."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.atomic = transaction.atomic()
 | |
|         self.atomic.__enter__()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.atomic.__exit__(*sys.exc_info())
 | |
| 
 | |
| 
 | |
| @skipIf(connection.features.autocommits_when_autocommit_is_off,
 | |
|         "This test requires a non-autocommit mode that doesn't autocommit.")
 | |
| class AtomicWithoutAutocommitTests(AtomicTests):
 | |
|     """All basic tests for atomic should also pass when autocommit is turned off."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         transaction.set_autocommit(False)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # The tests access the database after exercising 'atomic', initiating
 | |
|         # a transaction ; a rollback is required before restoring autocommit.
 | |
|         transaction.rollback()
 | |
|         transaction.set_autocommit(True)
 | |
| 
 | |
| 
 | |
| @skipIf(connection.features.autocommits_when_autocommit_is_off,
 | |
|         "This test requires a non-autocommit mode that doesn't autocommit.")
 | |
| class AtomicInsideLegacyTransactionManagementTests(AtomicTests):
 | |
| 
 | |
|     def setUp(self):
 | |
|         transaction.enter_transaction_management()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # The tests access the database after exercising 'atomic', making the
 | |
|         # connection dirty; a rollback is required to make it clean.
 | |
|         transaction.rollback()
 | |
|         transaction.leave_transaction_management()
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.features.uses_savepoints,
 | |
|         "'atomic' requires transactions and savepoints.")
 | |
| class AtomicMergeTests(TransactionTestCase):
 | |
|     """Test merging transactions with savepoint=False."""
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def test_merged_outer_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with transaction.atomic(savepoint=False):
 | |
|                 Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|                 with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                     with transaction.atomic(savepoint=False):
 | |
|                         Reporter.objects.create(first_name="Tournesol")
 | |
|                         raise Exception("Oops, that's his last name")
 | |
|                 # The third insert couldn't be roll back. Temporarily mark the
 | |
|                 # connection as not needing rollback to check it.
 | |
|                 self.assertTrue(transaction.get_rollback())
 | |
|                 transaction.set_rollback(False)
 | |
|                 self.assertEqual(Reporter.objects.count(), 3)
 | |
|                 transaction.set_rollback(True)
 | |
|             # The second insert couldn't be roll back. Temporarily mark the
 | |
|             # connection as not needing rollback to check it.
 | |
|             self.assertTrue(transaction.get_rollback())
 | |
|             transaction.set_rollback(False)
 | |
|             self.assertEqual(Reporter.objects.count(), 3)
 | |
|             transaction.set_rollback(True)
 | |
|         # The first block has a savepoint and must roll back.
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     def test_merged_inner_savepoint_rollback(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Tintin")
 | |
|             with transaction.atomic():
 | |
|                 Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|                 with six.assertRaisesRegex(self, Exception, "Oops"):
 | |
|                     with transaction.atomic(savepoint=False):
 | |
|                         Reporter.objects.create(first_name="Tournesol")
 | |
|                         raise Exception("Oops, that's his last name")
 | |
|                 # The third insert couldn't be roll back. Temporarily mark the
 | |
|                 # connection as not needing rollback to check it.
 | |
|                 self.assertTrue(transaction.get_rollback())
 | |
|                 transaction.set_rollback(False)
 | |
|                 self.assertEqual(Reporter.objects.count(), 3)
 | |
|                 transaction.set_rollback(True)
 | |
|             # The second block has a savepoint and must roll back.
 | |
|             self.assertEqual(Reporter.objects.count(), 1)
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), ['<Reporter: Tintin>'])
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.features.uses_savepoints,
 | |
|         "'atomic' requires transactions and savepoints.")
 | |
| class AtomicErrorsTests(TransactionTestCase):
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def test_atomic_prevents_setting_autocommit(self):
 | |
|         autocommit = transaction.get_autocommit()
 | |
|         with transaction.atomic():
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 transaction.set_autocommit(not autocommit)
 | |
|         # Make sure autocommit wasn't changed.
 | |
|         self.assertEqual(connection.autocommit, autocommit)
 | |
| 
 | |
|     def test_atomic_prevents_calling_transaction_methods(self):
 | |
|         with transaction.atomic():
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 transaction.commit()
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 transaction.rollback()
 | |
| 
 | |
|     def test_atomic_prevents_calling_transaction_management_methods(self):
 | |
|         with transaction.atomic():
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 transaction.enter_transaction_management()
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 transaction.leave_transaction_management()
 | |
| 
 | |
|     def test_atomic_prevents_queries_in_broken_transaction(self):
 | |
|         r1 = Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|         with transaction.atomic():
 | |
|             r2 = Reporter(first_name="Cuthbert", last_name="Calculus", id=r1.id)
 | |
|             with self.assertRaises(IntegrityError):
 | |
|                 r2.save(force_insert=True)
 | |
|             # The transaction is marked as needing rollback.
 | |
|             with self.assertRaises(transaction.TransactionManagementError):
 | |
|                 r2.save(force_update=True)
 | |
|         self.assertEqual(Reporter.objects.get(pk=r1.pk).last_name, "Haddock")
 | |
| 
 | |
|     @skipIfDBFeature('atomic_transactions')
 | |
|     def test_atomic_allows_queries_after_fixing_transaction(self):
 | |
|         r1 = Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|         with transaction.atomic():
 | |
|             r2 = Reporter(first_name="Cuthbert", last_name="Calculus", id=r1.id)
 | |
|             with self.assertRaises(IntegrityError):
 | |
|                 r2.save(force_insert=True)
 | |
|             # Mark the transaction as no longer needing rollback.
 | |
|             transaction.set_rollback(False)
 | |
|             r2.save(force_update=True)
 | |
|         self.assertEqual(Reporter.objects.get(pk=r1.pk).last_name, "Calculus")
 | |
| 
 | |
|     @skipUnlessDBFeature('test_db_allows_multiple_connections')
 | |
|     def test_atomic_prevents_queries_in_broken_transaction_after_client_close(self):
 | |
|         with transaction.atomic():
 | |
|             Reporter.objects.create(first_name="Archibald", last_name="Haddock")
 | |
|             connection.close()
 | |
|             # The connection is closed and the transaction is marked as
 | |
|             # needing rollback. This will raise an InterfaceError on databases
 | |
|             # that refuse to create cursors on closed connections (PostgreSQL)
 | |
|             # and a TransactionManagementError on other databases.
 | |
|             with self.assertRaises(Error):
 | |
|                 Reporter.objects.create(first_name="Cuthbert", last_name="Calculus")
 | |
|         # The connection is usable again .
 | |
|         self.assertEqual(Reporter.objects.count(), 0)
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.vendor == 'mysql', "MySQL-specific behaviors")
 | |
| class AtomicMySQLTests(TransactionTestCase):
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     @skipIf(threading is None, "Test requires threading")
 | |
|     def test_implicit_savepoint_rollback(self):
 | |
|         """MySQL implicitly rolls back savepoints when it deadlocks (#22291)."""
 | |
| 
 | |
|         other_thread_ready = threading.Event()
 | |
| 
 | |
|         def other_thread():
 | |
|             try:
 | |
|                 with transaction.atomic():
 | |
|                     Reporter.objects.create(id=1, first_name="Tintin")
 | |
|                     other_thread_ready.set()
 | |
|                     # We cannot synchronize the two threads with an event here
 | |
|                     # because the main thread locks. Sleep for a little while.
 | |
|                     time.sleep(1)
 | |
|                     # 2) ... and this line deadlocks. (see below for 1)
 | |
|                     Reporter.objects.exclude(id=1).update(id=2)
 | |
|             finally:
 | |
|                 # This is the thread-local connection, not the main connection.
 | |
|                 connection.close()
 | |
| 
 | |
|         other_thread = threading.Thread(target=other_thread)
 | |
|         other_thread.start()
 | |
|         other_thread_ready.wait()
 | |
| 
 | |
|         with six.assertRaisesRegex(self, OperationalError, 'Deadlock found'):
 | |
|             # Double atomic to enter a transaction and create a savepoint.
 | |
|             with transaction.atomic():
 | |
|                 with transaction.atomic():
 | |
|                     # 1) This line locks... (see above for 2)
 | |
|                     Reporter.objects.create(id=1, first_name="Tintin")
 | |
| 
 | |
|         other_thread.join()
 | |
| 
 | |
| 
 | |
| class AtomicMiscTests(TransactionTestCase):
 | |
| 
 | |
|     available_apps = []
 | |
| 
 | |
|     def test_wrap_callable_instance(self):
 | |
|         # Regression test for #20028
 | |
|         class Callable(object):
 | |
|             def __call__(self):
 | |
|                 pass
 | |
|         # Must not raise an exception
 | |
|         transaction.atomic(Callable())
 | |
| 
 | |
| 
 | |
| class TransactionTests(IgnorePendingDeprecationWarningsMixin, TransactionTestCase):
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def create_a_reporter_then_fail(self, first, last):
 | |
|         a = Reporter(first_name=first, last_name=last)
 | |
|         a.save()
 | |
|         raise Exception("I meant to do that")
 | |
| 
 | |
|     def remove_a_reporter(self, first_name):
 | |
|         r = Reporter.objects.get(first_name="Alice")
 | |
|         r.delete()
 | |
| 
 | |
|     def manually_managed(self):
 | |
|         r = Reporter(first_name="Dirk", last_name="Gently")
 | |
|         r.save()
 | |
|         transaction.commit()
 | |
| 
 | |
|     def manually_managed_mistake(self):
 | |
|         r = Reporter(first_name="Edward", last_name="Woodward")
 | |
|         r.save()
 | |
|         # Oops, I forgot to commit/rollback!
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit(self):
 | |
|         """
 | |
|         The default behavior is to autocommit after each save() action.
 | |
|         """
 | |
|         self.assertRaises(Exception,
 | |
|             self.create_a_reporter_then_fail,
 | |
|             "Alice", "Smith"
 | |
|         )
 | |
| 
 | |
|         # The object created before the exception still exists
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit_decorator(self):
 | |
|         """
 | |
|         The autocommit decorator works exactly the same as the default behavior.
 | |
|         """
 | |
|         autocomitted_create_then_fail = transaction.autocommit(
 | |
|             self.create_a_reporter_then_fail
 | |
|         )
 | |
|         self.assertRaises(Exception,
 | |
|             autocomitted_create_then_fail,
 | |
|             "Alice", "Smith"
 | |
|         )
 | |
|         # Again, the object created before the exception still exists
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit_decorator_with_using(self):
 | |
|         """
 | |
|         The autocommit decorator also works with a using argument.
 | |
|         """
 | |
|         autocomitted_create_then_fail = transaction.autocommit(using='default')(
 | |
|             self.create_a_reporter_then_fail
 | |
|         )
 | |
|         self.assertRaises(Exception,
 | |
|             autocomitted_create_then_fail,
 | |
|             "Alice", "Smith"
 | |
|         )
 | |
|         # Again, the object created before the exception still exists
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success(self):
 | |
|         """
 | |
|         With the commit_on_success decorator, the transaction is only committed
 | |
|         if the function doesn't throw an exception.
 | |
|         """
 | |
|         committed_on_success = transaction.commit_on_success(
 | |
|             self.create_a_reporter_then_fail)
 | |
|         self.assertRaises(Exception, committed_on_success, "Dirk", "Gently")
 | |
|         # This time the object never got saved
 | |
|         self.assertEqual(Reporter.objects.count(), 0)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_with_using(self):
 | |
|         """
 | |
|         The commit_on_success decorator also works with a using argument.
 | |
|         """
 | |
|         using_committed_on_success = transaction.commit_on_success(using='default')(
 | |
|             self.create_a_reporter_then_fail
 | |
|         )
 | |
|         self.assertRaises(Exception,
 | |
|             using_committed_on_success,
 | |
|             "Dirk", "Gently"
 | |
|         )
 | |
|         # This time the object never got saved
 | |
|         self.assertEqual(Reporter.objects.count(), 0)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_succeed(self):
 | |
|         """
 | |
|         If there aren't any exceptions, the data will get saved.
 | |
|         """
 | |
|         Reporter.objects.create(first_name="Alice", last_name="Smith")
 | |
|         remove_comitted_on_success = transaction.commit_on_success(
 | |
|             self.remove_a_reporter
 | |
|         )
 | |
|         remove_comitted_on_success("Alice")
 | |
|         self.assertEqual(list(Reporter.objects.all()), [])
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_exit(self):
 | |
|         @transaction.autocommit()
 | |
|         def gen_reporter():
 | |
|             @transaction.commit_on_success
 | |
|             def create_reporter():
 | |
|                 Reporter.objects.create(first_name="Bobby", last_name="Tables")
 | |
| 
 | |
|             create_reporter()
 | |
|             # Much more formal
 | |
|             r = Reporter.objects.get()
 | |
|             r.first_name = "Robert"
 | |
|             r.save()
 | |
| 
 | |
|         gen_reporter()
 | |
|         r = Reporter.objects.get()
 | |
|         self.assertEqual(r.first_name, "Robert")
 | |
| 
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed(self):
 | |
|         """
 | |
|         You can manually manage transactions if you really want to, but you
 | |
|         have to remember to commit/rollback.
 | |
|         """
 | |
|         manually_managed = transaction.commit_manually(self.manually_managed)
 | |
|         manually_managed()
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed_mistake(self):
 | |
|         """
 | |
|         If you forget, you'll get bad errors.
 | |
|         """
 | |
|         manually_managed_mistake = transaction.commit_manually(
 | |
|             self.manually_managed_mistake
 | |
|         )
 | |
|         self.assertRaises(transaction.TransactionManagementError,
 | |
|             manually_managed_mistake)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed_with_using(self):
 | |
|         """
 | |
|         The commit_manually function also works with a using argument.
 | |
|         """
 | |
|         using_manually_managed_mistake = transaction.commit_manually(using='default')(
 | |
|             self.manually_managed_mistake
 | |
|         )
 | |
|         self.assertRaises(transaction.TransactionManagementError,
 | |
|             using_manually_managed_mistake
 | |
|         )
 | |
| 
 | |
| 
 | |
| class TransactionRollbackTests(IgnorePendingDeprecationWarningsMixin, TransactionTestCase):
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def execute_bad_sql(self):
 | |
|         cursor = connection.cursor()
 | |
|         cursor.execute("INSERT INTO transactions_reporter (first_name, last_name) VALUES ('Douglas', 'Adams');")
 | |
| 
 | |
|     @skipUnlessDBFeature('requires_rollback_on_dirty_transaction')
 | |
|     def test_bad_sql(self):
 | |
|         """
 | |
|         Regression for #11900: If a function wrapped by commit_on_success
 | |
|         writes a transaction that can't be committed, that transaction should
 | |
|         be rolled back. The bug is only visible using the psycopg2 backend,
 | |
|         though the fix is generally a good idea.
 | |
|         """
 | |
|         execute_bad_sql = transaction.commit_on_success(self.execute_bad_sql)
 | |
|         self.assertRaises(IntegrityError, execute_bad_sql)
 | |
|         transaction.rollback()
 | |
| 
 | |
| class TransactionContextManagerTests(IgnorePendingDeprecationWarningsMixin, TransactionTestCase):
 | |
| 
 | |
|     available_apps = ['transactions']
 | |
| 
 | |
|     def create_reporter_and_fail(self):
 | |
|         Reporter.objects.create(first_name="Bob", last_name="Holtzman")
 | |
|         raise Exception
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit(self):
 | |
|         """
 | |
|         The default behavior is to autocommit after each save() action.
 | |
|         """
 | |
|         with self.assertRaises(Exception):
 | |
|             self.create_reporter_and_fail()
 | |
|         # The object created before the exception still exists
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit_context_manager(self):
 | |
|         """
 | |
|         The autocommit context manager works exactly the same as the default
 | |
|         behavior.
 | |
|         """
 | |
|         with self.assertRaises(Exception):
 | |
|             with transaction.autocommit():
 | |
|                 self.create_reporter_and_fail()
 | |
| 
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_autocommit_context_manager_with_using(self):
 | |
|         """
 | |
|         The autocommit context manager also works with a using argument.
 | |
|         """
 | |
|         with self.assertRaises(Exception):
 | |
|             with transaction.autocommit(using="default"):
 | |
|                 self.create_reporter_and_fail()
 | |
| 
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success(self):
 | |
|         """
 | |
|         With the commit_on_success context manager, the transaction is only
 | |
|         committed if the block doesn't throw an exception.
 | |
|         """
 | |
|         with self.assertRaises(Exception):
 | |
|             with transaction.commit_on_success():
 | |
|                 self.create_reporter_and_fail()
 | |
| 
 | |
|         self.assertEqual(Reporter.objects.count(), 0)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_with_using(self):
 | |
|         """
 | |
|         The commit_on_success context manager also works with a using argument.
 | |
|         """
 | |
|         with self.assertRaises(Exception):
 | |
|             with transaction.commit_on_success(using="default"):
 | |
|                 self.create_reporter_and_fail()
 | |
| 
 | |
|         self.assertEqual(Reporter.objects.count(), 0)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_succeed(self):
 | |
|         """
 | |
|         If there aren't any exceptions, the data will get saved.
 | |
|         """
 | |
|         Reporter.objects.create(first_name="Alice", last_name="Smith")
 | |
|         with transaction.commit_on_success():
 | |
|             Reporter.objects.filter(first_name="Alice").delete()
 | |
| 
 | |
|         self.assertQuerysetEqual(Reporter.objects.all(), [])
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_commit_on_success_exit(self):
 | |
|         with transaction.autocommit():
 | |
|             with transaction.commit_on_success():
 | |
|                 Reporter.objects.create(first_name="Bobby", last_name="Tables")
 | |
| 
 | |
|             # Much more formal
 | |
|             r = Reporter.objects.get()
 | |
|             r.first_name = "Robert"
 | |
|             r.save()
 | |
| 
 | |
|         r = Reporter.objects.get()
 | |
|         self.assertEqual(r.first_name, "Robert")
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed(self):
 | |
|         """
 | |
|         You can manually manage transactions if you really want to, but you
 | |
|         have to remember to commit/rollback.
 | |
|         """
 | |
|         with transaction.commit_manually():
 | |
|             Reporter.objects.create(first_name="Libby", last_name="Holtzman")
 | |
|             transaction.commit()
 | |
|         self.assertEqual(Reporter.objects.count(), 1)
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed_mistake(self):
 | |
|         """
 | |
|         If you forget, you'll get bad errors.
 | |
|         """
 | |
|         with self.assertRaises(transaction.TransactionManagementError):
 | |
|             with transaction.commit_manually():
 | |
|                 Reporter.objects.create(first_name="Scott", last_name="Browning")
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_transactions')
 | |
|     def test_manually_managed_with_using(self):
 | |
|         """
 | |
|         The commit_manually function also works with a using argument.
 | |
|         """
 | |
|         with self.assertRaises(transaction.TransactionManagementError):
 | |
|             with transaction.commit_manually(using="default"):
 | |
|                 Reporter.objects.create(first_name="Walter", last_name="Cronkite")
 | |
| 
 | |
|     @skipUnlessDBFeature('requires_rollback_on_dirty_transaction')
 | |
|     def test_bad_sql(self):
 | |
|         """
 | |
|         Regression for #11900: If a block wrapped by commit_on_success
 | |
|         writes a transaction that can't be committed, that transaction should
 | |
|         be rolled back. The bug is only visible using the psycopg2 backend,
 | |
|         though the fix is generally a good idea.
 | |
|         """
 | |
|         with self.assertRaises(IntegrityError):
 | |
|             with transaction.commit_on_success():
 | |
|                 cursor = connection.cursor()
 | |
|                 cursor.execute("INSERT INTO transactions_reporter (first_name, last_name) VALUES ('Douglas', 'Adams');")
 | |
|         transaction.rollback()
 |