mirror of
				https://github.com/django/django.git
				synced 2025-10-26 23:26:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			746 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			746 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from math import ceil
 | |
| 
 | |
| from django.db import connection, models
 | |
| from django.db.models import ProtectedError, RestrictedError
 | |
| from django.db.models.deletion import Collector
 | |
| from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
 | |
| from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature
 | |
| 
 | |
| from .models import (
 | |
|     B1, B2, B3, MR, A, Avatar, B, Base, Child, DeleteBottom, DeleteTop,
 | |
|     GenericB1, GenericB2, GenericDeleteBottom, HiddenUser, HiddenUserProfile,
 | |
|     M, M2MFrom, M2MTo, MRNull, Origin, P, Parent, R, RChild, RChildChild,
 | |
|     Referrer, S, T, User, create_a, get_default_r,
 | |
| )
 | |
| 
 | |
| 
 | |
| class OnDeleteTests(TestCase):
 | |
|     def setUp(self):
 | |
|         self.DEFAULT = get_default_r()
 | |
| 
 | |
|     def test_auto(self):
 | |
|         a = create_a('auto')
 | |
|         a.auto.delete()
 | |
|         self.assertFalse(A.objects.filter(name='auto').exists())
 | |
| 
 | |
|     def test_non_callable(self):
 | |
|         msg = 'on_delete must be callable.'
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             models.ForeignKey('self', on_delete=None)
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             models.OneToOneField('self', on_delete=None)
 | |
| 
 | |
|     def test_auto_nullable(self):
 | |
|         a = create_a('auto_nullable')
 | |
|         a.auto_nullable.delete()
 | |
|         self.assertFalse(A.objects.filter(name='auto_nullable').exists())
 | |
| 
 | |
|     def test_setvalue(self):
 | |
|         a = create_a('setvalue')
 | |
|         a.setvalue.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertEqual(self.DEFAULT, a.setvalue.pk)
 | |
| 
 | |
|     def test_setnull(self):
 | |
|         a = create_a('setnull')
 | |
|         a.setnull.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertIsNone(a.setnull)
 | |
| 
 | |
|     def test_setdefault(self):
 | |
|         a = create_a('setdefault')
 | |
|         a.setdefault.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertEqual(self.DEFAULT, a.setdefault.pk)
 | |
| 
 | |
|     def test_setdefault_none(self):
 | |
|         a = create_a('setdefault_none')
 | |
|         a.setdefault_none.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertIsNone(a.setdefault_none)
 | |
| 
 | |
|     def test_cascade(self):
 | |
|         a = create_a('cascade')
 | |
|         a.cascade.delete()
 | |
|         self.assertFalse(A.objects.filter(name='cascade').exists())
 | |
| 
 | |
|     def test_cascade_nullable(self):
 | |
|         a = create_a('cascade_nullable')
 | |
|         a.cascade_nullable.delete()
 | |
|         self.assertFalse(A.objects.filter(name='cascade_nullable').exists())
 | |
| 
 | |
|     def test_protect(self):
 | |
|         a = create_a('protect')
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'R' because they are "
 | |
|             "referenced through protected foreign keys: 'A.protect'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(ProtectedError, msg) as cm:
 | |
|             a.protect.delete()
 | |
|         self.assertEqual(cm.exception.protected_objects, {a})
 | |
| 
 | |
|     def test_protect_multiple(self):
 | |
|         a = create_a('protect')
 | |
|         b = B.objects.create(protect=a.protect)
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'R' because they are "
 | |
|             "referenced through protected foreign keys: 'A.protect', "
 | |
|             "'B.protect'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(ProtectedError, msg) as cm:
 | |
|             a.protect.delete()
 | |
|         self.assertEqual(cm.exception.protected_objects, {a, b})
 | |
| 
 | |
|     def test_protect_path(self):
 | |
|         a = create_a('protect')
 | |
|         a.protect.p = P.objects.create()
 | |
|         a.protect.save()
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'P' because they are "
 | |
|             "referenced through protected foreign keys: 'R.p'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(ProtectedError, msg) as cm:
 | |
|             a.protect.p.delete()
 | |
|         self.assertEqual(cm.exception.protected_objects, {a})
 | |
| 
 | |
|     def test_do_nothing(self):
 | |
|         # Testing DO_NOTHING is a bit harder: It would raise IntegrityError for a normal model,
 | |
|         # so we connect to pre_delete and set the fk to a known value.
 | |
|         replacement_r = R.objects.create()
 | |
| 
 | |
|         def check_do_nothing(sender, **kwargs):
 | |
|             obj = kwargs['instance']
 | |
|             obj.donothing_set.update(donothing=replacement_r)
 | |
|         models.signals.pre_delete.connect(check_do_nothing)
 | |
|         a = create_a('do_nothing')
 | |
|         a.donothing.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertEqual(replacement_r, a.donothing)
 | |
|         models.signals.pre_delete.disconnect(check_do_nothing)
 | |
| 
 | |
|     def test_do_nothing_qscount(self):
 | |
|         """
 | |
|         A models.DO_NOTHING relation doesn't trigger a query.
 | |
|         """
 | |
|         b = Base.objects.create()
 | |
|         with self.assertNumQueries(1):
 | |
|             # RelToBase should not be queried.
 | |
|             b.delete()
 | |
|         self.assertEqual(Base.objects.count(), 0)
 | |
| 
 | |
|     def test_inheritance_cascade_up(self):
 | |
|         child = RChild.objects.create()
 | |
|         child.delete()
 | |
|         self.assertFalse(R.objects.filter(pk=child.pk).exists())
 | |
| 
 | |
|     def test_inheritance_cascade_down(self):
 | |
|         child = RChild.objects.create()
 | |
|         parent = child.r_ptr
 | |
|         parent.delete()
 | |
|         self.assertFalse(RChild.objects.filter(pk=child.pk).exists())
 | |
| 
 | |
|     def test_cascade_from_child(self):
 | |
|         a = create_a('child')
 | |
|         a.child.delete()
 | |
|         self.assertFalse(A.objects.filter(name='child').exists())
 | |
|         self.assertFalse(R.objects.filter(pk=a.child_id).exists())
 | |
| 
 | |
|     def test_cascade_from_parent(self):
 | |
|         a = create_a('child')
 | |
|         R.objects.get(pk=a.child_id).delete()
 | |
|         self.assertFalse(A.objects.filter(name='child').exists())
 | |
|         self.assertFalse(RChild.objects.filter(pk=a.child_id).exists())
 | |
| 
 | |
|     def test_setnull_from_child(self):
 | |
|         a = create_a('child_setnull')
 | |
|         a.child_setnull.delete()
 | |
|         self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists())
 | |
| 
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertIsNone(a.child_setnull)
 | |
| 
 | |
|     def test_setnull_from_parent(self):
 | |
|         a = create_a('child_setnull')
 | |
|         R.objects.get(pk=a.child_setnull_id).delete()
 | |
|         self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists())
 | |
| 
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertIsNone(a.child_setnull)
 | |
| 
 | |
|     def test_o2o_setnull(self):
 | |
|         a = create_a('o2o_setnull')
 | |
|         a.o2o_setnull.delete()
 | |
|         a = A.objects.get(pk=a.pk)
 | |
|         self.assertIsNone(a.o2o_setnull)
 | |
| 
 | |
|     def test_restrict(self):
 | |
|         a = create_a('restrict')
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'R' because they are "
 | |
|             "referenced through restricted foreign keys: 'A.restrict'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(RestrictedError, msg) as cm:
 | |
|             a.restrict.delete()
 | |
|         self.assertEqual(cm.exception.restricted_objects, {a})
 | |
| 
 | |
|     def test_restrict_multiple(self):
 | |
|         a = create_a('restrict')
 | |
|         b3 = B3.objects.create(restrict=a.restrict)
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'R' because they are "
 | |
|             "referenced through restricted foreign keys: 'A.restrict', "
 | |
|             "'B3.restrict'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(RestrictedError, msg) as cm:
 | |
|             a.restrict.delete()
 | |
|         self.assertEqual(cm.exception.restricted_objects, {a, b3})
 | |
| 
 | |
|     def test_restrict_path_cascade_indirect(self):
 | |
|         a = create_a('restrict')
 | |
|         a.restrict.p = P.objects.create()
 | |
|         a.restrict.save()
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'P' because they are "
 | |
|             "referenced through restricted foreign keys: 'A.restrict'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(RestrictedError, msg) as cm:
 | |
|             a.restrict.p.delete()
 | |
|         self.assertEqual(cm.exception.restricted_objects, {a})
 | |
|         # Object referenced also with CASCADE relationship can be deleted.
 | |
|         a.cascade.p = a.restrict.p
 | |
|         a.cascade.save()
 | |
|         a.restrict.p.delete()
 | |
|         self.assertFalse(A.objects.filter(name='restrict').exists())
 | |
|         self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
 | |
| 
 | |
|     def test_restrict_path_cascade_direct(self):
 | |
|         a = create_a('restrict')
 | |
|         a.restrict.p = P.objects.create()
 | |
|         a.restrict.save()
 | |
|         a.cascade_p = a.restrict.p
 | |
|         a.save()
 | |
|         a.restrict.p.delete()
 | |
|         self.assertFalse(A.objects.filter(name='restrict').exists())
 | |
|         self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
 | |
| 
 | |
|     def test_restrict_path_cascade_indirect_diamond(self):
 | |
|         delete_top = DeleteTop.objects.create()
 | |
|         b1 = B1.objects.create(delete_top=delete_top)
 | |
|         b2 = B2.objects.create(delete_top=delete_top)
 | |
|         delete_bottom = DeleteBottom.objects.create(b1=b1, b2=b2)
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'B1' because they are "
 | |
|             "referenced through restricted foreign keys: 'DeleteBottom.b1'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(RestrictedError, msg) as cm:
 | |
|             b1.delete()
 | |
|         self.assertEqual(cm.exception.restricted_objects, {delete_bottom})
 | |
|         self.assertTrue(DeleteTop.objects.exists())
 | |
|         self.assertTrue(B1.objects.exists())
 | |
|         self.assertTrue(B2.objects.exists())
 | |
|         self.assertTrue(DeleteBottom.objects.exists())
 | |
|         # Object referenced also with CASCADE relationship can be deleted.
 | |
|         delete_top.delete()
 | |
|         self.assertFalse(DeleteTop.objects.exists())
 | |
|         self.assertFalse(B1.objects.exists())
 | |
|         self.assertFalse(B2.objects.exists())
 | |
|         self.assertFalse(DeleteBottom.objects.exists())
 | |
| 
 | |
|     def test_restrict_gfk_no_fast_delete(self):
 | |
|         delete_top = DeleteTop.objects.create()
 | |
|         generic_b1 = GenericB1.objects.create(generic_delete_top=delete_top)
 | |
|         generic_b2 = GenericB2.objects.create(generic_delete_top=delete_top)
 | |
|         generic_delete_bottom = GenericDeleteBottom.objects.create(
 | |
|             generic_b1=generic_b1,
 | |
|             generic_b2=generic_b2,
 | |
|         )
 | |
|         msg = (
 | |
|             "Cannot delete some instances of model 'GenericB1' because they "
 | |
|             "are referenced through restricted foreign keys: "
 | |
|             "'GenericDeleteBottom.generic_b1'."
 | |
|         )
 | |
|         with self.assertRaisesMessage(RestrictedError, msg) as cm:
 | |
|             generic_b1.delete()
 | |
|         self.assertEqual(cm.exception.restricted_objects, {generic_delete_bottom})
 | |
|         self.assertTrue(DeleteTop.objects.exists())
 | |
|         self.assertTrue(GenericB1.objects.exists())
 | |
|         self.assertTrue(GenericB2.objects.exists())
 | |
|         self.assertTrue(GenericDeleteBottom.objects.exists())
 | |
|         # Object referenced also with CASCADE relationship can be deleted.
 | |
|         delete_top.delete()
 | |
|         self.assertFalse(DeleteTop.objects.exists())
 | |
|         self.assertFalse(GenericB1.objects.exists())
 | |
|         self.assertFalse(GenericB2.objects.exists())
 | |
|         self.assertFalse(GenericDeleteBottom.objects.exists())
 | |
| 
 | |
| 
 | |
| class DeletionTests(TestCase):
 | |
|     def test_sliced_queryset(self):
 | |
|         msg = "Cannot use 'limit' or 'offset' with delete()."
 | |
|         with self.assertRaisesMessage(TypeError, msg):
 | |
|             M.objects.all()[0:5].delete()
 | |
| 
 | |
|     def test_pk_none(self):
 | |
|         m = M()
 | |
|         msg = "M object can't be deleted because its id attribute is set to None."
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             m.delete()
 | |
| 
 | |
|     def test_m2m(self):
 | |
|         m = M.objects.create()
 | |
|         r = R.objects.create()
 | |
|         MR.objects.create(m=m, r=r)
 | |
|         r.delete()
 | |
|         self.assertFalse(MR.objects.exists())
 | |
| 
 | |
|         r = R.objects.create()
 | |
|         MR.objects.create(m=m, r=r)
 | |
|         m.delete()
 | |
|         self.assertFalse(MR.objects.exists())
 | |
| 
 | |
|         m = M.objects.create()
 | |
|         r = R.objects.create()
 | |
|         m.m2m.add(r)
 | |
|         r.delete()
 | |
|         through = M._meta.get_field('m2m').remote_field.through
 | |
|         self.assertFalse(through.objects.exists())
 | |
| 
 | |
|         r = R.objects.create()
 | |
|         m.m2m.add(r)
 | |
|         m.delete()
 | |
|         self.assertFalse(through.objects.exists())
 | |
| 
 | |
|         m = M.objects.create()
 | |
|         r = R.objects.create()
 | |
|         MRNull.objects.create(m=m, r=r)
 | |
|         r.delete()
 | |
|         self.assertFalse(not MRNull.objects.exists())
 | |
|         self.assertFalse(m.m2m_through_null.exists())
 | |
| 
 | |
|     def test_bulk(self):
 | |
|         s = S.objects.create(r=R.objects.create())
 | |
|         for i in range(2 * GET_ITERATOR_CHUNK_SIZE):
 | |
|             T.objects.create(s=s)
 | |
|         #   1 (select related `T` instances)
 | |
|         # + 1 (select related `U` instances)
 | |
|         # + 2 (delete `T` instances in batches)
 | |
|         # + 1 (delete `s`)
 | |
|         self.assertNumQueries(5, s.delete)
 | |
|         self.assertFalse(S.objects.exists())
 | |
| 
 | |
|     def test_instance_update(self):
 | |
|         deleted = []
 | |
|         related_setnull_sets = []
 | |
| 
 | |
|         def pre_delete(sender, **kwargs):
 | |
|             obj = kwargs['instance']
 | |
|             deleted.append(obj)
 | |
|             if isinstance(obj, R):
 | |
|                 related_setnull_sets.append([a.pk for a in obj.setnull_set.all()])
 | |
| 
 | |
|         models.signals.pre_delete.connect(pre_delete)
 | |
|         a = create_a('update_setnull')
 | |
|         a.setnull.delete()
 | |
| 
 | |
|         a = create_a('update_cascade')
 | |
|         a.cascade.delete()
 | |
| 
 | |
|         for obj in deleted:
 | |
|             self.assertIsNone(obj.pk)
 | |
| 
 | |
|         for pk_list in related_setnull_sets:
 | |
|             for a in A.objects.filter(id__in=pk_list):
 | |
|                 self.assertIsNone(a.setnull)
 | |
| 
 | |
|         models.signals.pre_delete.disconnect(pre_delete)
 | |
| 
 | |
|     def test_deletion_order(self):
 | |
|         pre_delete_order = []
 | |
|         post_delete_order = []
 | |
| 
 | |
|         def log_post_delete(sender, **kwargs):
 | |
|             pre_delete_order.append((sender, kwargs['instance'].pk))
 | |
| 
 | |
|         def log_pre_delete(sender, **kwargs):
 | |
|             post_delete_order.append((sender, kwargs['instance'].pk))
 | |
| 
 | |
|         models.signals.post_delete.connect(log_post_delete)
 | |
|         models.signals.pre_delete.connect(log_pre_delete)
 | |
| 
 | |
|         r = R.objects.create(pk=1)
 | |
|         s1 = S.objects.create(pk=1, r=r)
 | |
|         s2 = S.objects.create(pk=2, r=r)
 | |
|         T.objects.create(pk=1, s=s1)
 | |
|         T.objects.create(pk=2, s=s2)
 | |
|         RChild.objects.create(r_ptr=r)
 | |
|         r.delete()
 | |
|         self.assertEqual(
 | |
|             pre_delete_order, [(T, 2), (T, 1), (RChild, 1), (S, 2), (S, 1), (R, 1)]
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             post_delete_order, [(T, 1), (T, 2), (RChild, 1), (S, 1), (S, 2), (R, 1)]
 | |
|         )
 | |
| 
 | |
|         models.signals.post_delete.disconnect(log_post_delete)
 | |
|         models.signals.pre_delete.disconnect(log_pre_delete)
 | |
| 
 | |
|     def test_relational_post_delete_signals_happen_before_parent_object(self):
 | |
|         deletions = []
 | |
| 
 | |
|         def log_post_delete(instance, **kwargs):
 | |
|             self.assertTrue(R.objects.filter(pk=instance.r_id))
 | |
|             self.assertIs(type(instance), S)
 | |
|             deletions.append(instance.id)
 | |
| 
 | |
|         r = R.objects.create(pk=1)
 | |
|         S.objects.create(pk=1, r=r)
 | |
| 
 | |
|         models.signals.post_delete.connect(log_post_delete, sender=S)
 | |
| 
 | |
|         try:
 | |
|             r.delete()
 | |
|         finally:
 | |
|             models.signals.post_delete.disconnect(log_post_delete)
 | |
| 
 | |
|         self.assertEqual(len(deletions), 1)
 | |
|         self.assertEqual(deletions[0], 1)
 | |
| 
 | |
|     @skipUnlessDBFeature("can_defer_constraint_checks")
 | |
|     def test_can_defer_constraint_checks(self):
 | |
|         u = User.objects.create(
 | |
|             avatar=Avatar.objects.create()
 | |
|         )
 | |
|         a = Avatar.objects.get(pk=u.avatar_id)
 | |
|         # 1 query to find the users for the avatar.
 | |
|         # 1 query to delete the user
 | |
|         # 1 query to delete the avatar
 | |
|         # The important thing is that when we can defer constraint checks there
 | |
|         # is no need to do an UPDATE on User.avatar to null it out.
 | |
| 
 | |
|         # Attach a signal to make sure we will not do fast_deletes.
 | |
|         calls = []
 | |
| 
 | |
|         def noop(*args, **kwargs):
 | |
|             calls.append('')
 | |
|         models.signals.post_delete.connect(noop, sender=User)
 | |
| 
 | |
|         self.assertNumQueries(3, a.delete)
 | |
|         self.assertFalse(User.objects.exists())
 | |
|         self.assertFalse(Avatar.objects.exists())
 | |
|         self.assertEqual(len(calls), 1)
 | |
|         models.signals.post_delete.disconnect(noop, sender=User)
 | |
| 
 | |
|     @skipIfDBFeature("can_defer_constraint_checks")
 | |
|     def test_cannot_defer_constraint_checks(self):
 | |
|         u = User.objects.create(
 | |
|             avatar=Avatar.objects.create()
 | |
|         )
 | |
|         # Attach a signal to make sure we will not do fast_deletes.
 | |
|         calls = []
 | |
| 
 | |
|         def noop(*args, **kwargs):
 | |
|             calls.append('')
 | |
|         models.signals.post_delete.connect(noop, sender=User)
 | |
| 
 | |
|         a = Avatar.objects.get(pk=u.avatar_id)
 | |
|         # The below doesn't make sense... Why do we need to null out
 | |
|         # user.avatar if we are going to delete the user immediately after it,
 | |
|         # and there are no more cascades.
 | |
|         # 1 query to find the users for the avatar.
 | |
|         # 1 query to delete the user
 | |
|         # 1 query to null out user.avatar, because we can't defer the constraint
 | |
|         # 1 query to delete the avatar
 | |
|         self.assertNumQueries(4, a.delete)
 | |
|         self.assertFalse(User.objects.exists())
 | |
|         self.assertFalse(Avatar.objects.exists())
 | |
|         self.assertEqual(len(calls), 1)
 | |
|         models.signals.post_delete.disconnect(noop, sender=User)
 | |
| 
 | |
|     def test_hidden_related(self):
 | |
|         r = R.objects.create()
 | |
|         h = HiddenUser.objects.create(r=r)
 | |
|         HiddenUserProfile.objects.create(user=h)
 | |
| 
 | |
|         r.delete()
 | |
|         self.assertEqual(HiddenUserProfile.objects.count(), 0)
 | |
| 
 | |
|     def test_large_delete(self):
 | |
|         TEST_SIZE = 2000
 | |
|         objs = [Avatar() for i in range(0, TEST_SIZE)]
 | |
|         Avatar.objects.bulk_create(objs)
 | |
|         # Calculate the number of queries needed.
 | |
|         batch_size = connection.ops.bulk_batch_size(['pk'], objs)
 | |
|         # The related fetches are done in batches.
 | |
|         batches = ceil(len(objs) / batch_size)
 | |
|         # One query for Avatar.objects.all() and then one related fast delete for
 | |
|         # each batch.
 | |
|         fetches_to_mem = 1 + batches
 | |
|         # The Avatar objects are going to be deleted in batches of GET_ITERATOR_CHUNK_SIZE
 | |
|         queries = fetches_to_mem + TEST_SIZE // GET_ITERATOR_CHUNK_SIZE
 | |
|         self.assertNumQueries(queries, Avatar.objects.all().delete)
 | |
|         self.assertFalse(Avatar.objects.exists())
 | |
| 
 | |
|     def test_large_delete_related(self):
 | |
|         TEST_SIZE = 2000
 | |
|         s = S.objects.create(r=R.objects.create())
 | |
|         for i in range(TEST_SIZE):
 | |
|             T.objects.create(s=s)
 | |
| 
 | |
|         batch_size = max(connection.ops.bulk_batch_size(['pk'], range(TEST_SIZE)), 1)
 | |
| 
 | |
|         # TEST_SIZE / batch_size (select related `T` instances)
 | |
|         # + 1 (select related `U` instances)
 | |
|         # + TEST_SIZE / GET_ITERATOR_CHUNK_SIZE (delete `T` instances in batches)
 | |
|         # + 1 (delete `s`)
 | |
|         expected_num_queries = ceil(TEST_SIZE / batch_size)
 | |
|         expected_num_queries += ceil(TEST_SIZE / GET_ITERATOR_CHUNK_SIZE) + 2
 | |
| 
 | |
|         self.assertNumQueries(expected_num_queries, s.delete)
 | |
|         self.assertFalse(S.objects.exists())
 | |
|         self.assertFalse(T.objects.exists())
 | |
| 
 | |
|     def test_delete_with_keeping_parents(self):
 | |
|         child = RChild.objects.create()
 | |
|         parent_id = child.r_ptr_id
 | |
|         child.delete(keep_parents=True)
 | |
|         self.assertFalse(RChild.objects.filter(id=child.id).exists())
 | |
|         self.assertTrue(R.objects.filter(id=parent_id).exists())
 | |
| 
 | |
|     def test_delete_with_keeping_parents_relationships(self):
 | |
|         child = RChild.objects.create()
 | |
|         parent_id = child.r_ptr_id
 | |
|         parent_referent_id = S.objects.create(r=child.r_ptr).pk
 | |
|         child.delete(keep_parents=True)
 | |
|         self.assertFalse(RChild.objects.filter(id=child.id).exists())
 | |
|         self.assertTrue(R.objects.filter(id=parent_id).exists())
 | |
|         self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
 | |
| 
 | |
|         childchild = RChildChild.objects.create()
 | |
|         parent_id = childchild.rchild_ptr.r_ptr_id
 | |
|         child_id = childchild.rchild_ptr_id
 | |
|         parent_referent_id = S.objects.create(r=childchild.rchild_ptr.r_ptr).pk
 | |
|         childchild.delete(keep_parents=True)
 | |
|         self.assertFalse(RChildChild.objects.filter(id=childchild.id).exists())
 | |
|         self.assertTrue(RChild.objects.filter(id=child_id).exists())
 | |
|         self.assertTrue(R.objects.filter(id=parent_id).exists())
 | |
|         self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
 | |
| 
 | |
|     def test_queryset_delete_returns_num_rows(self):
 | |
|         """
 | |
|         QuerySet.delete() should return the number of deleted rows and a
 | |
|         dictionary with the number of deletions for each object type.
 | |
|         """
 | |
|         Avatar.objects.bulk_create([Avatar(desc='a'), Avatar(desc='b'), Avatar(desc='c')])
 | |
|         avatars_count = Avatar.objects.count()
 | |
|         deleted, rows_count = Avatar.objects.all().delete()
 | |
|         self.assertEqual(deleted, avatars_count)
 | |
| 
 | |
|         # more complex example with multiple object types
 | |
|         r = R.objects.create()
 | |
|         h1 = HiddenUser.objects.create(r=r)
 | |
|         HiddenUser.objects.create(r=r)
 | |
|         HiddenUserProfile.objects.create(user=h1)
 | |
|         existed_objs = {
 | |
|             R._meta.label: R.objects.count(),
 | |
|             HiddenUser._meta.label: HiddenUser.objects.count(),
 | |
|             HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
 | |
|         }
 | |
|         deleted, deleted_objs = R.objects.all().delete()
 | |
|         self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
 | |
|         for k, v in existed_objs.items():
 | |
|             self.assertEqual(deleted_objs[k], v)
 | |
| 
 | |
|     def test_model_delete_returns_num_rows(self):
 | |
|         """
 | |
|         Model.delete() should return the number of deleted rows and a
 | |
|         dictionary with the number of deletions for each object type.
 | |
|         """
 | |
|         r = R.objects.create()
 | |
|         h1 = HiddenUser.objects.create(r=r)
 | |
|         h2 = HiddenUser.objects.create(r=r)
 | |
|         HiddenUser.objects.create(r=r)
 | |
|         HiddenUserProfile.objects.create(user=h1)
 | |
|         HiddenUserProfile.objects.create(user=h2)
 | |
|         m1 = M.objects.create()
 | |
|         m2 = M.objects.create()
 | |
|         MR.objects.create(r=r, m=m1)
 | |
|         r.m_set.add(m1)
 | |
|         r.m_set.add(m2)
 | |
|         r.save()
 | |
|         existed_objs = {
 | |
|             R._meta.label: R.objects.count(),
 | |
|             HiddenUser._meta.label: HiddenUser.objects.count(),
 | |
|             MR._meta.label: MR.objects.count(),
 | |
|             HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
 | |
|             M.m2m.through._meta.label: M.m2m.through.objects.count(),
 | |
|         }
 | |
|         deleted, deleted_objs = r.delete()
 | |
|         self.assertEqual(deleted, sum(existed_objs.values()))
 | |
|         self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
 | |
|         for k, v in existed_objs.items():
 | |
|             self.assertEqual(deleted_objs[k], v)
 | |
| 
 | |
|     def test_proxied_model_duplicate_queries(self):
 | |
|         """
 | |
|         #25685 - Deleting instances of a model with existing proxy
 | |
|         classes should not issue multiple queries during cascade
 | |
|         deletion of referring models.
 | |
|         """
 | |
|         avatar = Avatar.objects.create()
 | |
|         # One query for the Avatar table and a second for the User one.
 | |
|         with self.assertNumQueries(2):
 | |
|             avatar.delete()
 | |
| 
 | |
|     def test_only_referenced_fields_selected(self):
 | |
|         """
 | |
|         Only referenced fields are selected during cascade deletion SELECT
 | |
|         unless deletion signals are connected.
 | |
|         """
 | |
|         origin = Origin.objects.create()
 | |
|         expected_sql = str(
 | |
|             Referrer.objects.only(
 | |
|                 # Both fields are referenced by SecondReferrer.
 | |
|                 'id', 'unique_field',
 | |
|             ).filter(origin__in=[origin]).query
 | |
|         )
 | |
|         with self.assertNumQueries(2) as ctx:
 | |
|             origin.delete()
 | |
|         self.assertEqual(ctx.captured_queries[0]['sql'], expected_sql)
 | |
| 
 | |
|         def receiver(instance, **kwargs):
 | |
|             pass
 | |
| 
 | |
|         # All fields are selected if deletion signals are connected.
 | |
|         for signal_name in ('pre_delete', 'post_delete'):
 | |
|             with self.subTest(signal=signal_name):
 | |
|                 origin = Origin.objects.create()
 | |
|                 signal = getattr(models.signals, signal_name)
 | |
|                 signal.connect(receiver, sender=Referrer)
 | |
|                 with self.assertNumQueries(2) as ctx:
 | |
|                     origin.delete()
 | |
|                 self.assertIn(
 | |
|                     connection.ops.quote_name('large_field'),
 | |
|                     ctx.captured_queries[0]['sql'],
 | |
|                 )
 | |
|                 signal.disconnect(receiver, sender=Referrer)
 | |
| 
 | |
| 
 | |
| class FastDeleteTests(TestCase):
 | |
|     def test_fast_delete_all(self):
 | |
|         with self.assertNumQueries(1) as ctx:
 | |
|             User.objects.all().delete()
 | |
|         sql = ctx.captured_queries[0]['sql']
 | |
|         # No subqueries is used when performing a full delete.
 | |
|         self.assertNotIn('SELECT', sql)
 | |
| 
 | |
|     def test_fast_delete_fk(self):
 | |
|         u = User.objects.create(
 | |
|             avatar=Avatar.objects.create()
 | |
|         )
 | |
|         a = Avatar.objects.get(pk=u.avatar_id)
 | |
|         # 1 query to fast-delete the user
 | |
|         # 1 query to delete the avatar
 | |
|         self.assertNumQueries(2, a.delete)
 | |
|         self.assertFalse(User.objects.exists())
 | |
|         self.assertFalse(Avatar.objects.exists())
 | |
| 
 | |
|     def test_fast_delete_m2m(self):
 | |
|         t = M2MTo.objects.create()
 | |
|         f = M2MFrom.objects.create()
 | |
|         f.m2m.add(t)
 | |
|         # 1 to delete f, 1 to fast-delete m2m for f
 | |
|         self.assertNumQueries(2, f.delete)
 | |
| 
 | |
|     def test_fast_delete_revm2m(self):
 | |
|         t = M2MTo.objects.create()
 | |
|         f = M2MFrom.objects.create()
 | |
|         f.m2m.add(t)
 | |
|         # 1 to delete t, 1 to fast-delete t's m_set
 | |
|         self.assertNumQueries(2, f.delete)
 | |
| 
 | |
|     def test_fast_delete_qs(self):
 | |
|         u1 = User.objects.create()
 | |
|         u2 = User.objects.create()
 | |
|         self.assertNumQueries(1, User.objects.filter(pk=u1.pk).delete)
 | |
|         self.assertEqual(User.objects.count(), 1)
 | |
|         self.assertTrue(User.objects.filter(pk=u2.pk).exists())
 | |
| 
 | |
|     def test_fast_delete_instance_set_pk_none(self):
 | |
|         u = User.objects.create()
 | |
|         # User can be fast-deleted.
 | |
|         collector = Collector(using='default')
 | |
|         self.assertTrue(collector.can_fast_delete(u))
 | |
|         u.delete()
 | |
|         self.assertIsNone(u.pk)
 | |
| 
 | |
|     def test_fast_delete_joined_qs(self):
 | |
|         a = Avatar.objects.create(desc='a')
 | |
|         User.objects.create(avatar=a)
 | |
|         u2 = User.objects.create()
 | |
|         self.assertNumQueries(1, User.objects.filter(avatar__desc='a').delete)
 | |
|         self.assertEqual(User.objects.count(), 1)
 | |
|         self.assertTrue(User.objects.filter(pk=u2.pk).exists())
 | |
| 
 | |
|     def test_fast_delete_inheritance(self):
 | |
|         c = Child.objects.create()
 | |
|         p = Parent.objects.create()
 | |
|         # 1 for self, 1 for parent
 | |
|         self.assertNumQueries(2, c.delete)
 | |
|         self.assertFalse(Child.objects.exists())
 | |
|         self.assertEqual(Parent.objects.count(), 1)
 | |
|         self.assertEqual(Parent.objects.filter(pk=p.pk).count(), 1)
 | |
|         # 1 for self delete, 1 for fast delete of empty "child" qs.
 | |
|         self.assertNumQueries(2, p.delete)
 | |
|         self.assertFalse(Parent.objects.exists())
 | |
|         # 1 for self delete, 1 for fast delete of empty "child" qs.
 | |
|         c = Child.objects.create()
 | |
|         p = c.parent_ptr
 | |
|         self.assertNumQueries(2, p.delete)
 | |
|         self.assertFalse(Parent.objects.exists())
 | |
|         self.assertFalse(Child.objects.exists())
 | |
| 
 | |
|     def test_fast_delete_large_batch(self):
 | |
|         User.objects.bulk_create(User() for i in range(0, 2000))
 | |
|         # No problems here - we aren't going to cascade, so we will fast
 | |
|         # delete the objects in a single query.
 | |
|         self.assertNumQueries(1, User.objects.all().delete)
 | |
|         a = Avatar.objects.create(desc='a')
 | |
|         User.objects.bulk_create(User(avatar=a) for i in range(0, 2000))
 | |
|         # We don't hit parameter amount limits for a, so just one query for
 | |
|         # that + fast delete of the related objs.
 | |
|         self.assertNumQueries(2, a.delete)
 | |
|         self.assertEqual(User.objects.count(), 0)
 | |
| 
 | |
|     def test_fast_delete_empty_no_update_can_self_select(self):
 | |
|         """
 | |
|         #25932 - Fast deleting on backends that don't have the
 | |
|         `no_update_can_self_select` feature should work even if the specified
 | |
|         filter doesn't match any row.
 | |
|         """
 | |
|         with self.assertNumQueries(1):
 | |
|             self.assertEqual(
 | |
|                 User.objects.filter(avatar__desc='missing').delete(),
 | |
|                 (0, {}),
 | |
|             )
 | |
| 
 | |
|     def test_fast_delete_combined_relationships(self):
 | |
|         # The cascading fast-delete of SecondReferrer should be combined
 | |
|         # in a single DELETE WHERE referrer_id OR unique_field.
 | |
|         origin = Origin.objects.create()
 | |
|         referer = Referrer.objects.create(origin=origin, unique_field=42)
 | |
|         with self.assertNumQueries(2):
 | |
|             referer.delete()
 | |
| 
 | |
|     def test_fast_delete_aggregation(self):
 | |
|         # Fast-deleting when filtering against an aggregation result in
 | |
|         # a single query containing a subquery.
 | |
|         Base.objects.create()
 | |
|         with self.assertNumQueries(1):
 | |
|             self.assertEqual(
 | |
|                 Base.objects.annotate(
 | |
|                     rels_count=models.Count('rels'),
 | |
|                 ).filter(rels_count=0).delete(),
 | |
|                 (1, {'delete.Base': 1}),
 | |
|             )
 | |
|         self.assertIs(Base.objects.exists(), False)
 |