7. Create & Use Celery Tasks¶
Celery Tasks are functions that can be either (1) normal blocking python execution or (2) delayed non-blocking execution.
Normal, blocking, execution
import time
def hello_world(num=1):
time.sleep(num)
print(f"hello world {num}")
hello_world(num=1)
hello_world(num=2)
hello_world(num=3)
I would imagine you know blocking execution really well by now. The above function is an example of that.
Delayed non-blocking execution
import time
# shared_task is ideal for Django
from celery import shared_task
@shared_task
def hello_world(num=1):
time.sleep(num)
print(f"hello world {num}")
hello_world.delay(num=1)
hello_world.delay(num=2)
hello_world.delay(num=3)
hello_world.apply_async(kwargs={'num': 4}, countdown=10)
Here are the important things to note about the above code:
@shared_task
this decorator is so celery knows about this task. We’ll have a real example below..delay()
since the@shared_task
decorator is implemented on thehello_world
function, we can call this method. This is how we “delay” the execution of this function. This will run as soon as soon as the Celery worker can (we’ll implement this more later..apply_async()
:.delay()
is actually a shortcut toapply_async
. We have a lot more control over execution usingapply_async
.
7.1. Call Tasks Cheat Sheet¶
These are pulled directly from the docs
T.delay(arg, kwarg=value)
: Star arguments shortcut to.apply_async
. (.delay(*args, **kwargs)
calls.apply_async(args, kwargs)
).T.apply_async((arg,), {'kwarg': value})
T.apply_async(countdown=10)
: executes in 10 seconds from now.T.apply_async(eta=now + timedelta(seconds=10))
: executes in 10 seconds from now, specified using etaT.apply_async(countdown=60, expires=120)
: executes in one minute from now, but expires after 2 minutes.T.apply_async(expires=now + timedelta(days=2))
: expires in 2 days, set using datetime.
7.2. Stocks app¶
In Create Django App, we started our stocks
app with the following:
python manage.py startapp stocks
Do this now if you haven’t already.
7.3. stocks.tasks.py
¶
Create a tasks.py
file in your stocks app. When Celery
is managed by Django
, tasks.py
is the default location to find all possible tasks that your worker will be able to run.
from celery import shared_task
from .scraper import
@shared_task
def perform_scrape(ticker, service='echo'):
name, price = StockTickerScraper(service='echo', ticker=ticker).scrape()
print(name, price)
return name, price
7.4. Start the Celery Worker¶
celery -A time_tasks worker --beat --scheduler django --loglevel=info
7.5. Test our perform_scrape
task¶
Open a new command line (ie terminal
or powershell
) so we can make sure that the Start the Celery Worker is running in addition to what we’ll do below.
python manage.py shell
standard/blocking
from stocks.tasks import perform_scrape
results = perform_scrape("aapl', service='echo')
print(results)
delayed execution
results = perform_scrape.delay("GOOG', service='echo')
print(results)
apply_async delayed execution
results = perform_scrape.apply_async(args=("GOOG"), kwargs={"service": "echo"})
print(results)
Did print(results)
work the way you thought it would? I would assume not.
For the delayed execution calls, you will see a shared_task
instance instead of the result of the executed function. The primary reason for this is to allow the task to be run later, canceled later, or interrupted later. Later is the key phrase because the delayed execution task will now be run by the celery
worker.
If the worker isn’t running, delayed execution still works it’s just the tasks will not run until the worker is running again.
7.6. So why not just use async
/ await
¶
For non-blocking execution, async
/ await
might be a better option within Python & Django 3.1+ but that’s not always the case. The delayed execution managed by celery and, more importantly, a worker process offloads all of the work for the task to another process or even another server.
Here’s a few good examples where delayed execution via Celery is idea:
Web scraping (both scheduled or merely delayed)
Machine learning (training, pre-processing, etc)
Sending usage reports (or really any other reports that require time to generate)
Here’s a few good examples where async
/ await
execution might be perferred choices:
Sending a confirmation email
Updating a shopping cart
Retreiving directions
Bulk retreival of users to remove