7. Create & Use Celery Tasks

Celery Tasks are functions that can be either (1) normal blocking python execution or (2) delayed non-blocking execution.

Normal, blocking, execution

import time

def hello_world(num=1):
    time.sleep(num)
    print(f"hello world {num}")

    
hello_world(num=1)
hello_world(num=2)
hello_world(num=3)

I would imagine you know blocking execution really well by now. The above function is an example of that.

Delayed non-blocking execution

import time
# shared_task is ideal for Django
from celery import shared_task


@shared_task
def hello_world(num=1):
    time.sleep(num)
    print(f"hello world {num}")

    
hello_world.delay(num=1)
hello_world.delay(num=2)
hello_world.delay(num=3)
hello_world.apply_async(kwargs={'num': 4}, countdown=10)

Here are the important things to note about the above code:

  • @shared_task this decorator is so celery knows about this task. We’ll have a real example below.

  • .delay() since the @shared_task decorator is implemented on the hello_world function, we can call this method. This is how we “delay” the execution of this function. This will run as soon as soon as the Celery worker can (we’ll implement this more later.

  • .apply_async(): .delay() is actually a shortcut to apply_async. We have a lot more control over execution using apply_async.

7.1. Call Tasks Cheat Sheet

These are pulled directly from the docs

  • T.delay(arg, kwarg=value): Star arguments shortcut to .apply_async. (.delay(*args, **kwargs) calls .apply_async(args, kwargs)).

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10): executes in 10 seconds from now.

  • T.apply_async(eta=now + timedelta(seconds=10)): executes in 10 seconds from now, specified using eta

  • T.apply_async(countdown=60, expires=120): executes in one minute from now, but expires after 2 minutes.

  • T.apply_async(expires=now + timedelta(days=2)): expires in 2 days, set using datetime.

7.2. Stocks app

In Create Django App, we started our stocks app with the following:

python manage.py startapp stocks

Do this now if you haven’t already.

7.3. stocks.tasks.py

Create a tasks.py file in your stocks app. When Celery is managed by Django, tasks.py is the default location to find all possible tasks that your worker will be able to run.

from celery import shared_task

from .scraper import 

@shared_task
def perform_scrape(ticker, service='echo'):
    name, price = StockTickerScraper(service='echo', ticker=ticker).scrape()
    print(name, price)
    return name, price

7.4. Start the Celery Worker

celery -A time_tasks worker --beat --scheduler django --loglevel=info

7.5. Test our perform_scrape task

Open a new command line (ie terminal or powershell) so we can make sure that the Start the Celery Worker is running in addition to what we’ll do below.

python manage.py shell

standard/blocking

from stocks.tasks import perform_scrape

results = perform_scrape("aapl', service='echo')
print(results)

delayed execution

results = perform_scrape.delay("GOOG', service='echo')

print(results)

apply_async delayed execution

results = perform_scrape.apply_async(args=("GOOG"), kwargs={"service": "echo"})

print(results)

Did print(results) work the way you thought it would? I would assume not.

For the delayed execution calls, you will see a shared_task instance instead of the result of the executed function. The primary reason for this is to allow the task to be run later, canceled later, or interrupted later. Later is the key phrase because the delayed execution task will now be run by the celery worker.

If the worker isn’t running, delayed execution still works it’s just the tasks will not run until the worker is running again.

7.6. So why not just use async / await

For non-blocking execution, async / await might be a better option within Python & Django 3.1+ but that’s not always the case. The delayed execution managed by celery and, more importantly, a worker process offloads all of the work for the task to another process or even another server.

Here’s a few good examples where delayed execution via Celery is idea:

  • Web scraping (both scheduled or merely delayed)

  • Machine learning (training, pre-processing, etc)

  • Sending usage reports (or really any other reports that require time to generate)

Here’s a few good examples where async / await execution might be perferred choices:

  • Sending a confirmation email

  • Updating a shopping cart

  • Retreiving directions

  • Bulk retreival of users to remove