Django's Tasks framework

New in Django 6.0.

For a web application, there's often more than just turning HTTP requests into HTTP responses. For some functionality, it may be beneficial to run code outside the request-response cycle.

That's where background Tasks come in.

Background Tasks can offload work to be run outside the request-response cycle, to be run elsewhere, potentially at a later date. This keeps requests fast, reduces latency, and improves the user experience. For example, a user shouldn't have to wait for an email to send before their page finishes loading.

Django's Tasks framework makes it easy to define and enqueue such work. It does not provide a worker mechanism to run Tasks. The actual execution must be handled by infrastructure outside Django, such as a separate process or service. Given that, a task backend capable of executing tasks on that service should be evaluated and configured.

Background Task fundamentals

When work needs to be done in the background, Django creates a Task, which is stored in the Queue Store. This Task contains all the metadata needed to execute it, as well as a unique identifier for Django to retrieve the result later.

A Worker will look at the Queue Store for new Tasks to run. When a new Task is added, a Worker claims the Task, executes it, and saves the status and result back to the Queue Store. These workers run outside the request-response lifecycle.

Configuring a Task backend

The Task backend determines how and where Tasks are stored for execution and how they are executed. Different Task backends have different characteristics and configuration options, which may impact the performance and reliability of your application. Django comes with built-in backends, but these are for development and testing only.

Django handles task definition, validation, queuing, and result handling, not execution, so production setups need a backend or worker process that actually runs queued work. Relevant options are listed in the Community Ecosystem page.

Task backends are configured using the TASKS setting in your settings file. Whilst most applications will only need a single backend, multiple are supported.

Immediate execution

This is the default backend if another is not specified in your settings file. The ImmediateBackend runs enqueued Tasks immediately, rather than in the background. This allows background Task functionality to be slowly added to an application, before the required infrastructure is available.

To use it, set BACKEND to "django.tasks.backends.immediate.ImmediateBackend":

TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}

The ImmediateBackend may also be useful in tests, to bypass the need to run a real background worker in your tests.

虚拟后端

The DummyBackend doesn't execute enqueued Tasks at all, instead storing results for later use. Task results will forever remain in the READY state.

This backend is not intended for use in production - it is provided as a convenience that can be used during development and testing.

To use it, set BACKEND to "django.tasks.backends.dummy.DummyBackend":

TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}}

The results for enqueued Tasks can be retrieved from the backend's results attribute:

>>> from django.tasks import default_task_backend
>>> my_task.enqueue()
>>> len(default_task_backend.results)
1

Stored results can be cleared using the clear() method:

>>> default_task_backend.clear()
>>> len(default_task_backend.results)
0

Third-party backends

As mentioned at the beginning of this section, Django includes backends suitable for development and testing only. Production systems should rely on backends that supply a worker process and durable queue implementation. To use an external Task backend with Django, use the Python import path as the BACKEND of the TASKS setting, like so:

TASKS = {
    "default": {
        "BACKEND": "path.to.backend",
    }
}

A Task backend is a class that inherits BaseTaskBackend. At a minimum, it must implement BaseTaskBackend.enqueue(). If you're building your own backend, you can use the built-in Task backends as reference implementations. You'll find the code in the django/tasks/backends/ directory of the Django source.

异步支持

Django has developing support for asynchronous Task backends.

BaseTaskBackend has async variants of all base methods. By convention, the asynchronous versions of all methods are prefixed with a. The arguments for both variants are the same.

Retrieving backends

Backends can be retrieved using the task_backends connection handler:

from django.tasks import task_backends

task_backends["default"]  # The default backend
task_backends["reserve"]  # Another backend

The "default" backend is available as default_task_backend:

from django.tasks import default_task_backend

Defining Tasks

Tasks are defined using the django.tasks.task() decorator on a module-level function:

from django.core.mail import send_mail
from django.tasks import task


@task
def email_users(emails, subject, message):
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

The return value of the decorator is a Task instance.

Task attributes can be customized via the @task decorator arguments:

from django.core.mail import send_mail
from django.tasks import task


@task(priority=2, queue_name="emails")
def email_users(emails, subject, message):
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

By convention, Tasks are defined in a tasks.py file, however this is not enforced.

Task context

Sometimes, the running Task may need to know context about how it was enqueued, and how it is being executed. This can be accessed by taking a context argument, which is an instance of TaskContext.

To receive the Task context as an argument to your Task function, pass takes_context when defining it:

import logging
from django.core.mail import send_mail
from django.tasks import task


logger = logging.getLogger(__name__)


@task(takes_context=True)
def email_users(context, emails, subject, message):
    logger.debug(
        f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}."
    )
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

Modifying Tasks

Before enqueueing Tasks, it may be necessary to modify certain parameters of the Task. For example, to give it a higher priority than it would normally.

A Task instance cannot be modified directly. Instead, a modified instance can be created with the using() method, leaving the original as-is. For example:

>>> email_users.priority
0
>>> email_users.using(priority=10).priority
10

Enqueueing Tasks

To add the Task to the queue store, so it will be executed, call the enqueue() method on it. If the Task takes arguments, these can be passed as-is. For example:

result = email_users.enqueue(
    emails=["user@example.com"],
    subject="You have a message",
    message="Hello there!",
)

This returns a TaskResult, which can be used to retrieve the result of the Task once it has finished executing.

To enqueue Tasks in an async context, aenqueue() is available as an async variant of enqueue().

Because both Task arguments and return values are serialized to JSON, they must be JSON-serializable:

>>> process_data.enqueue(datetime.now())
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable

Arguments must also be able to round-trip through a json.dumps()/ json.loads() cycle without changing type. For example, consider this Task:

@task()
def double_dictionary(key):
    return {key: key * 2}

With the ImmediateBackend configured as the default backend:

>>> result = double_dictionary.enqueue((1, 2, 3))
>>> result.status
FAILED
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: unhashable type: 'list'

The double_dictionary Task fails because after the JSON round-trip the tuple (1, 2, 3) becomes the list [1, 2, 3], which cannot be used as a dictionary key.

In general, complex objects such as model instances, or built-in types like datetime and tuple cannot be used in Tasks without additional conversion.

事务

For most backends, Tasks are run in a separate process, using a different database connection. When using a transaction, without waiting for it to commit, workers could start to process a Task which uses objects it can't access yet.

For example, consider this simplified example:

@task
def my_task(thing_num):
    Thing.objects.get(num=thing_num)


with transaction.atomic():
    Thing.objects.create(num=1)
    my_task.enqueue(thing_num=1)

To prevent the scenario where my_task runs before the Thing is committed to the database, use transaction.on_commit(), binding all arguments to enqueue() via functools.partial():

from functools import partial

from django.db import transaction


with transaction.atomic():
    Thing.objects.create(num=1)
    transaction.on_commit(partial(my_task.enqueue, thing_num=1))

Task results

When enqueueing a Task, you receive a TaskResult, however it's likely useful to retrieve the result from somewhere else (for example another request or another Task).

Each TaskResult has a unique id, which can be used to identify and retrieve the result once the code which enqueued the Task has finished.

The get_result() method can retrieve a result based on its id:

# Later, somewhere else...
result = email_users.get_result(result_id)

To retrieve a TaskResult, regardless of which kind of Task it was from, use the get_result() method on the backend:

from django.tasks import default_task_backend

result = default_task_backend.get_result(result_id)

To retrieve results in an async context, aget_result() is available as an async variant of get_result() on both the backend and Task.

Some backends, such as the built-in ImmediateBackend do not support get_result(). Calling get_result() on these backends will raise NotImplementedError.

Updating results

A TaskResult contains the status of a Task's execution at the point it was retrieved. If the Task finishes after get_result() is called, it will not update.

To refresh the values, call the django.tasks.TaskResult.refresh() method:

>>> result.status
RUNNING
>>> result.refresh()  # or await result.arefresh()
>>> result.status
SUCCESSFUL

Return values

If your Task function returns something, it can be retrieved from the django.tasks.TaskResult.return_value attribute:

>>> result.status
SUCCESSFUL
>>> result.return_value
42

If the Task has not finished executing, or has failed, ValueError is raised.

>>> result.status
RUNNING
>>> result.return_value
Traceback (most recent call last):
...
ValueError: Task has not finished yet

Errors

If the Task doesn't succeed, and instead raises an exception, either as part of the Task or as part of running it, the exception and traceback are saved to the django.tasks.TaskResult.errors list.

Each entry in errors is a TaskError containing information about error raised during the execution:

>>> result.errors[0].exception_class
<class 'ValueError'>

Note that this is just the type of exception, and contains no other values. The traceback information is reduced to a string which you can use to help debugging:

>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable