From b8e6e1b43b0bccec178dcc6708c5c8295abfb2e5 Mon Sep 17 00:00:00 2001
From: Simon Charette <charette.s@gmail.com>
Date: Thu, 23 Jun 2016 11:52:14 -0400
Subject: [PATCH] Fixed #26500 -- Added SKIP LOCKED support to
 select_for_update().

Thanks Tim for the review.
---
 django/db/backends/base/features.py       |  1 +
 django/db/backends/base/operations.py     |  4 +-
 django/db/backends/oracle/features.py     |  1 +
 django/db/backends/postgresql/features.py |  5 ++
 django/db/models/query.py                 |  5 +-
 django/db/models/sql/compiler.py          | 11 +++--
 django/db/models/sql/query.py             |  2 +
 docs/ref/databases.txt                    |  6 +--
 docs/ref/models/querysets.txt             | 23 ++++++---
 docs/releases/1.11.txt                    |  7 ++-
 tests/select_for_update/tests.py          | 60 +++++++++++++++++++----
 11 files changed, 98 insertions(+), 27 deletions(-)

diff --git a/django/db/backends/base/features.py b/django/db/backends/base/features.py
index 484b272294..9bb0ed6848 100644
--- a/django/db/backends/base/features.py
+++ b/django/db/backends/base/features.py
@@ -36,6 +36,7 @@ class BaseDatabaseFeatures(object):
     allow_sliced_subqueries = True
     has_select_for_update = False
     has_select_for_update_nowait = False
+    has_select_for_update_skip_locked = False
 
     supports_select_related = True
 
diff --git a/django/db/backends/base/operations.py b/django/db/backends/base/operations.py
index 4e5814e58f..1ed11b178e 100644
--- a/django/db/backends/base/operations.py
+++ b/django/db/backends/base/operations.py
@@ -177,12 +177,14 @@ class BaseDatabaseOperations(object):
         """
         return []
 
-    def for_update_sql(self, nowait=False):
+    def for_update_sql(self, nowait=False, skip_locked=False):
         """
         Returns the FOR UPDATE SQL clause to lock rows for an update operation.
         """
         if nowait:
             return 'FOR UPDATE NOWAIT'
+        elif skip_locked:
+            return 'FOR UPDATE SKIP LOCKED'
         else:
             return 'FOR UPDATE'
 
diff --git a/django/db/backends/oracle/features.py b/django/db/backends/oracle/features.py
index 84e18cf9ba..956f7a051b 100644
--- a/django/db/backends/oracle/features.py
+++ b/django/db/backends/oracle/features.py
@@ -13,6 +13,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
     uses_savepoints = True
     has_select_for_update = True
     has_select_for_update_nowait = True
+    has_select_for_update_skip_locked = True
     can_return_id_from_insert = True
     allow_sliced_subqueries = False
     supports_subqueries_in_group_by = False
diff --git a/django/db/backends/postgresql/features.py b/django/db/backends/postgresql/features.py
index 218a7645de..a9e1c77480 100644
--- a/django/db/backends/postgresql/features.py
+++ b/django/db/backends/postgresql/features.py
@@ -1,5 +1,6 @@
 from django.db.backends.base.features import BaseDatabaseFeatures
 from django.db.utils import InterfaceError
+from django.utils.functional import cached_property
 
 
 class DatabaseFeatures(BaseDatabaseFeatures):
@@ -31,3 +32,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
     greatest_least_ignores_nulls = True
     can_clone_databases = True
     supports_temporal_subtraction = True
+
+    @cached_property
+    def has_select_for_update_skip_locked(self):
+        return self.connection.pg_version >= 90500
diff --git a/django/db/models/query.py b/django/db/models/query.py
index 07f1fc414f..0a0965fe47 100644
--- a/django/db/models/query.py
+++ b/django/db/models/query.py
@@ -835,15 +835,18 @@ class QuerySet(object):
         else:
             return self._filter_or_exclude(None, **filter_obj)
 
-    def select_for_update(self, nowait=False):
+    def select_for_update(self, nowait=False, skip_locked=False):
         """
         Returns a new QuerySet instance that will select objects with a
         FOR UPDATE lock.
         """
+        if nowait and skip_locked:
+            raise ValueError('The nowait option cannot be used with skip_locked.')
         obj = self._clone()
         obj._for_write = True
         obj.query.select_for_update = True
         obj.query.select_for_update_nowait = nowait
+        obj.query.select_for_update_skip_locked = skip_locked
         return obj
 
     def select_related(self, *fields):
diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
index cedb140143..5eaac1fc74 100644
--- a/django/db/models/sql/compiler.py
+++ b/django/db/models/sql/compiler.py
@@ -445,13 +445,16 @@ class SQLCompiler(object):
                         "select_for_update cannot be used outside of a transaction."
                     )
 
-                # If we've been asked for a NOWAIT query but the backend does
-                # not support it, raise a DatabaseError otherwise we could get
-                # an unexpected deadlock.
                 nowait = self.query.select_for_update_nowait
+                skip_locked = self.query.select_for_update_skip_locked
+                # If we've been asked for a NOWAIT/SKIP LOCKED query but the
+                # backend does not support it, raise a DatabaseError otherwise
+                # we could get an unexpected deadlock.
                 if nowait and not self.connection.features.has_select_for_update_nowait:
                     raise DatabaseError('NOWAIT is not supported on this database backend.')
-                result.append(self.connection.ops.for_update_sql(nowait=nowait))
+                elif skip_locked and not self.connection.features.has_select_for_update_skip_locked:
+                    raise DatabaseError('SKIP LOCKED is not supported on this database backend.')
+                result.append(self.connection.ops.for_update_sql(nowait=nowait, skip_locked=skip_locked))
 
             return ' '.join(result), tuple(params)
         finally:
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
index d182242ce4..426087f8af 100644
--- a/django/db/models/sql/query.py
+++ b/django/db/models/sql/query.py
@@ -167,6 +167,7 @@ class Query(object):
         self.distinct_fields = []
         self.select_for_update = False
         self.select_for_update_nowait = False
+        self.select_for_update_skip_locked = False
 
         self.select_related = False
         # Arbitrary limit for select_related to prevents infinite recursion.
@@ -286,6 +287,7 @@ class Query(object):
         obj.distinct_fields = self.distinct_fields[:]
         obj.select_for_update = self.select_for_update
         obj.select_for_update_nowait = self.select_for_update_nowait
+        obj.select_for_update_skip_locked = self.select_for_update_skip_locked
         obj.select_related = self.select_related
         obj.values_select = self.values_select[:]
         obj._annotations = self._annotations.copy() if self._annotations is not None else None
diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt
index 35a3dbcbe8..983b7525c3 100644
--- a/docs/ref/databases.txt
+++ b/docs/ref/databases.txt
@@ -569,9 +569,9 @@ both MySQL and Django will attempt to convert the values from UTC to local time.
 Row locking with ``QuerySet.select_for_update()``
 -------------------------------------------------
 
-MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
-statement. If ``select_for_update()`` is used with ``nowait=True`` then a
-``DatabaseError`` will be raised.
+MySQL does not support the ``NOWAIT`` and ``SKIP LOCKED`` options to the
+``SELECT ... FOR UPDATE`` statement. If ``select_for_update()`` is used with
+``nowait=True`` or ``skip_locked=True`` then a ``DatabaseError`` will be raised.
 
 Automatic typecasting can cause unexpected results
 --------------------------------------------------
diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
index 3f4fce1a26..8d2e3ebb3b 100644
--- a/docs/ref/models/querysets.txt
+++ b/docs/ref/models/querysets.txt
@@ -1528,7 +1528,7 @@ For example::
 ``select_for_update()``
 ~~~~~~~~~~~~~~~~~~~~~~~
 
-.. method:: select_for_update(nowait=False)
+.. method:: select_for_update(nowait=False, skip_locked=False)
 
 Returns a queryset that will lock rows until the end of the transaction,
 generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
@@ -1546,16 +1546,19 @@ selected rows, the query will block until the lock is released. If this is
 not the behavior you want, call ``select_for_update(nowait=True)``. This will
 make the call non-blocking. If a conflicting lock is already acquired by
 another transaction, :exc:`~django.db.DatabaseError` will be raised when the
-queryset is evaluated.
+queryset is evaluated. You can also ignore locked rows by using
+``select_for_update(skip_locked=True)`` instead. The ``nowait`` and
+``skip_locked`` are mutually exclusive and attempts to call
+``select_for_update()`` with both options enabled will result in a
+:exc:`ValueError`.
 
 Currently, the ``postgresql``, ``oracle``, and ``mysql`` database
-backends support ``select_for_update()``. However, MySQL has no support for the
-``nowait`` argument. Obviously, users of external third-party backends should
-check with their backend's documentation for specifics in those cases.
+backends support ``select_for_update()``. However, MySQL doesn't support the
+``nowait`` and ``skip_locked`` arguments.
 
-Passing ``nowait=True`` to ``select_for_update()`` using database backends that
-do not support ``nowait``, such as MySQL, will cause a
-:exc:`~django.db.DatabaseError` to be raised. This is in order to prevent code
+Passing ``nowait=True`` or ``skip_locked=True`` to ``select_for_update()``
+using database backends that do not support these options, such as MySQL, will
+cause a :exc:`~django.db.DatabaseError` to be raised. This prevents code from
 unexpectedly blocking.
 
 Evaluating a queryset with ``select_for_update()`` in autocommit mode on
@@ -1580,6 +1583,10 @@ raised if ``select_for_update()`` is used in autocommit mode.
     ``select_for_update()`` you should use
     :class:`~django.test.TransactionTestCase`.
 
+.. versionchanged:: 1.11
+
+    The ``skip_locked`` argument was added.
+
 ``raw()``
 ~~~~~~~~~
 
diff --git a/docs/releases/1.11.txt b/docs/releases/1.11.txt
index 1c7509b062..128ef2c203 100644
--- a/docs/releases/1.11.txt
+++ b/docs/releases/1.11.txt
@@ -150,7 +150,9 @@ CSRF
 Database backends
 ~~~~~~~~~~~~~~~~~
 
-* ...
+* Added the ``skip_locked`` argument to :meth:`.QuerySet.select_for_update()`
+  on PostgreSQL 9.5+ and Oracle to execute queries with
+  ``FOR UPDATE SKIP LOCKED``.
 
 Email
 ~~~~~
@@ -297,6 +299,9 @@ Database backend API
   support the :lookup:`time` lookup. It accepts a ``field_name`` and ``tzname``
   arguments and returns the SQL necessary to cast a datetime value to time value.
 
+* To enable ``FOR UPDATE SKIP LOCKED`` support, set
+  ``DatabaseFeatures.has_select_for_update_skip_locked = True``.
+
 Dropped support for PostgreSQL 9.2 and PostGIS 2.0
 --------------------------------------------------
 
diff --git a/tests/select_for_update/tests.py b/tests/select_for_update/tests.py
index 594dc0eb83..55ead7ef8f 100644
--- a/tests/select_for_update/tests.py
+++ b/tests/select_for_update/tests.py
@@ -52,10 +52,10 @@ class SelectForUpdateTests(TransactionTestCase):
         self.new_connection.rollback()
         self.new_connection.set_autocommit(True)
 
-    def has_for_update_sql(self, queries, nowait=False):
+    def has_for_update_sql(self, queries, **kwargs):
         # Examine the SQL that was executed to determine whether it
         # contains the 'SELECT..FOR UPDATE' stanza.
-        for_update_sql = connection.ops.for_update_sql(nowait)
+        for_update_sql = connection.ops.for_update_sql(**kwargs)
         return any(for_update_sql in query['sql'] for query in queries)
 
     @skipUnlessDBFeature('has_select_for_update')
@@ -78,6 +78,16 @@ class SelectForUpdateTests(TransactionTestCase):
             list(Person.objects.all().select_for_update(nowait=True))
         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
 
+    @skipUnlessDBFeature('has_select_for_update_skip_locked')
+    def test_for_update_sql_generated_skip_locked(self):
+        """
+        Test that the backend's FOR UPDATE SKIP LOCKED variant appears in
+        generated SQL when select_for_update is invoked.
+        """
+        with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
+            list(Person.objects.all().select_for_update(skip_locked=True))
+        self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
+
     @skipUnlessDBFeature('has_select_for_update_nowait')
     def test_nowait_raises_error_on_block(self):
         """
@@ -99,6 +109,25 @@ class SelectForUpdateTests(TransactionTestCase):
         self.end_blocking_transaction()
         self.assertIsInstance(status[-1], DatabaseError)
 
+    @skipUnlessDBFeature('has_select_for_update_skip_locked')
+    def test_skip_locked_skips_locked_rows(self):
+        """
+        If skip_locked is specified, the locked row is skipped resulting in
+        Person.DoesNotExist.
+        """
+        self.start_blocking_transaction()
+        status = []
+        thread = threading.Thread(
+            target=self.run_select_for_update,
+            args=(status,),
+            kwargs={'skip_locked': True},
+        )
+        thread.start()
+        time.sleep(1)
+        thread.join()
+        self.end_blocking_transaction()
+        self.assertIsInstance(status[-1], Person.DoesNotExist)
+
     @skipIfDBFeature('has_select_for_update_nowait')
     @skipUnlessDBFeature('has_select_for_update')
     def test_unsupported_nowait_raises_error(self):
@@ -110,6 +139,17 @@ class SelectForUpdateTests(TransactionTestCase):
         with self.assertRaises(DatabaseError):
             list(Person.objects.all().select_for_update(nowait=True))
 
+    @skipIfDBFeature('has_select_for_update_skip_locked')
+    @skipUnlessDBFeature('has_select_for_update')
+    def test_unsupported_skip_locked_raises_error(self):
+        """
+        DatabaseError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run on
+        a database backend that supports FOR UPDATE but not SKIP LOCKED.
+        """
+        with self.assertRaisesMessage(DatabaseError, 'SKIP LOCKED is not supported on this database backend.'):
+            with transaction.atomic():
+                Person.objects.select_for_update(skip_locked=True).get()
+
     @skipUnlessDBFeature('has_select_for_update')
     def test_for_update_requires_transaction(self):
         """
@@ -130,7 +170,7 @@ class SelectForUpdateTests(TransactionTestCase):
         with self.assertRaises(transaction.TransactionManagementError):
             list(people)
 
-    def run_select_for_update(self, status, nowait=False):
+    def run_select_for_update(self, status, **kwargs):
         """
         Utility method that runs a SELECT FOR UPDATE against all
         Person instances. After the select_for_update, it attempts
@@ -143,12 +183,10 @@ class SelectForUpdateTests(TransactionTestCase):
             # We need to enter transaction management again, as this is done on
             # per-thread basis
             with transaction.atomic():
-                people = list(
-                    Person.objects.all().select_for_update(nowait=nowait)
-                )
-                people[0].name = 'Fred'
-                people[0].save()
-        except DatabaseError as e:
+                person = Person.objects.select_for_update(**kwargs).get()
+                person.name = 'Fred'
+                person.save()
+        except (DatabaseError, Person.DoesNotExist) as e:
             status.append(e)
         finally:
             # This method is run in a separate thread. It uses its own
@@ -248,3 +286,7 @@ class SelectForUpdateTests(TransactionTestCase):
         with transaction.atomic():
             person = Person.objects.select_for_update().get(name='Reinhardt')
         self.assertEqual(person.name, 'Reinhardt')
+
+    def test_nowait_and_skip_locked(self):
+        with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'):
+            Person.objects.select_for_update(nowait=True, skip_locked=True)