Introduction
@shared_task
enables you to create Celery tasks that are decoupled from the Celery app instance, making them portable across different projects or apps. This also eliminates the need to import the Celery app instance, reducing boilerplate code and making task definitions simpler.
Before defining tasks, consider the following:
- Task execution frequency: Should the task run multiple times or just once?
- Retry policy: Does the task need to retry in case of failure? If so, how many times?
- Task result: Is storing the task’s result required?
- Task status: Do you need to track the task’s status externally?
- Task start information: Do you need to know when the task actually starts?
Answering these questions will guide you in choosing the appropriate parameters for the @shared_task
decorator.
Task Naming Advice
Always explicitly name your tasks to avoid name clashes between different parts of the app or with other libraries. For example:
@shared_task(name="api_check_availability")
def check_availability():
pass
Importing Tasks
Tasks can be imported and called using the delay()
or apply_async()
methods:
from our_api.tasks import check_availability
check_availability.delay()
check_availability.apply_async()
To avoid circular import issues, especially when tasks are invoked from other apps, you can call the task by its name:
# In models.py
from celery import current_app
from django.db import transaction
class Post(models.Model):
title = models.CharField(max_length=255)
def generate_og_images(self):
pass
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
transaction.on_commit(lambda: current_app.send_task("content_generate_og_images", kwargs={"post_pk": self.pk}))
In this example, the task content_generate_og_images
is sent by name to avoid circular imports.
Defining Tasks
The Simplest Shared Task
If the task result is not important, and you want to reduce the performance impact, you can define the task like this:
@shared_task(
name="simple_task",
ignore_result=True
)
def simple_task():
pass
Here, ignore_result=True
prevents the task from connecting to the result backend, and there is no retry strategy as bind=False
.
Shared Task with Retry Policy
To implement a retry policy for tasks, such as calling an external API, you can use parameters like max_retries
, autoretry_for
, and retry_backoff
:
@shared_task(
name="read_from_external_api",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True
)
def read_from_external_api(url):
result = requests.get(url)
return result.json()
This example retries the task up to 5 times if an exception occurs, with a backoff strategy for retries.
Shared Task That Must Run At Most Once
For tasks that should only run once (e.g., an expensive API call), you can define a task that acknowledges early to prevent re-execution if the worker fails:
@shared_task(
name="expensive_api_call",
bind=True,
acks_late=False,
)
def expensive_api_call(url):
result = requests.post(url, {})
return result.json()
In this case, acks_late=False
ensures that the task is acknowledged before completion, and the request will not be retried if the worker fails.