1
0
mirror of https://github.com/django/django.git synced 2025-10-10 23:39:11 +00:00
django/django/tasks/__init__.py
Jake Howard 4289966d1b Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14
(https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst).

Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce,
Jacob Walls, and Natalia Bidart for the reviews.
2025-09-16 17:28:32 -03:00

46 lines
1.2 KiB
Python

from django.utils.connection import BaseConnectionHandler, ConnectionProxy
from django.utils.module_loading import import_string
from . import checks, signals # NOQA
from .base import (
DEFAULT_TASK_BACKEND_ALIAS,
DEFAULT_TASK_QUEUE_NAME,
Task,
TaskContext,
TaskResult,
TaskResultStatus,
task,
)
from .exceptions import InvalidTaskBackend
__all__ = [
"DEFAULT_TASK_BACKEND_ALIAS",
"DEFAULT_TASK_QUEUE_NAME",
"default_task_backend",
"task",
"task_backends",
"Task",
"TaskContext",
"TaskResult",
"TaskResultStatus",
]
class TaskBackendHandler(BaseConnectionHandler):
settings_name = "TASKS"
exception_class = InvalidTaskBackend
def create_connection(self, alias):
params = self.settings[alias]
backend = params["BACKEND"]
try:
backend_cls = import_string(backend)
except ImportError as e:
raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e
return backend_cls(alias=alias, params=params)
task_backends = TaskBackendHandler()
default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS)