django-utils v0.3.0 documentation

This Page

Queue

A simple task queue and consumer to make processing tasks out-of-band painless. Ideal for sending email, checking items for spam, generating thumbnails, etc.

The Queue module is divided up into several components, but for almost all use-cases, you will only use the queue_command() decorator found in djutils.queue.decorators

Executing tasks out-of-process

For the simple case, you need only write a function, decorate it with the queue_command() decorator and call it as you normally would. Instead of executing immediately and potentially blocking, the function will be enqueued and return immediately afterwards.

### app/commands.py

from djutils.queue.decorators import queue_command

@queue_command
def churn_data(model_instance, some_data, another_value):
    # this function will do some expensive processing, and so
    # should happen outside the request/response cycle.

    important_results = model_instance.process_data(some_value, another_value)
    model_instance.propogate_results(important_results)

Here’s how you might call your function:

### app/views.py ###

from django.http import HttpResponse

from app.commands import churn_data

def data_processing_view(request, some_val, another_val):
    # assume we load up an object based on some parameter passed in
    # to the view.  also, the view gives us a payload of data.  we
    # want the object to churn that data in the background using the
    # function above:
    churn_data(my_object, request.POST['payload'], another_val)
    return HttpResponse('Churning in background task added to queue')

Whenever the view gets called, the function will be enqueued for execution. Meanwhile the QueueDaemon consumer will pick it up and execute it in a separate process.

When the consumer picks up the message, it will churn your data!

Warning

You can pass anything in to the decorated function as long as it is pickle-able.

Warning

Your decorated functions must be loaded into memory by the consumer - to ensure that this happens it is good practice to put all queue_command() decorated functions in a module named commands.py so the autodiscovery bits will pick them up.

Executing tasks on a schedule

Sometimes it may be necessary to run a certain bit of code every so often, irrespective of some triggering event. If you’ve used the linux crontab before, then you’re already familiar with the idea.

djutils provides two functions to help write periodic commands:

from djutils.queue.decorators import periodic_command, crontab

@periodic_command(crontab(hour='0', minute='0'))
def send_daily_digest():
    # send out a daily email at midnight

@periodic_command(crontab(day_of_week='0', hour='5,17', minute='0'))
def send_sunday_editions():
    # send out an email every sunday, once at 5am, once at 5pm

Remember to put any periodic commands you write in a file named commands.py to ensure that they’re picked up by the consumer.

Warning

functions decorated with @periodic_command should not accept any parameters

Note

Tasks can be run with a minimum resolution of 1 minute.

Note

The periodic_command() decorator is a bit different than the queue_command() decorator. Rather than causing the function be enqueued upon execution, it will execute normally and not be enqueued. The purpose of the decorator is to create a PeriodicQueueCommand and register it with the global invoker. The invoker then handles running any PeriodicQueueCommand instances according to schedule.

djutils.queue.decorators.queue_command(func)

function decorator that causes the decorated function to be enqueued for execution when called

Usage:

from djutils.queue.decorators import queue_command

@queue_command
def run_this_out_of_process(some_val, another_val)
    # whenever called, will be run by the consumer instead of in-process
djutils.queue.decorators.periodic_command(validate_datetime)

Decorator to execute a function on a specific schedule. This is a bit different than :func:queue_command in that it does not cause items to be enqueued when called, but rather causes a PeriodicQueueCommand to be registered with the global invoker.

Since the command is called at a given schedule, it cannot be “triggered” by a run-time event. As such, there should never be any need for parameters, since nothing can vary between executions.

The :param:`validate_datetime` parameter

Usage:

from djutils.queue.decorators import crontab, periodic_command

@periodic_command(crontab(day='1', hour='0', minute='0'))
def run_at_first_of_month():
    # run this function at midnight on the first of the month
djutils.queue.decorators.crontab(month='*', day='*', day_of_week='*', hour='*', minute='*')

Convert a “crontab”-style set of parameters into a test function that will return True when the given datetime matches the parameters set forth in the crontab.

Acceptable inputs:

  • * = every distinct value
  • */n = run every “n” times, i.e. hours=’*/4’ == 0, 4, 8, 12, 16, 20
  • m-n = run every time m..n
  • m,n = run on m and n

Autodiscovery

The djutils.queue.registry stores references to all QueueCommand classes (this includes any function decorated with queue_command()). The consumer needs to “discover” your commands in order to process them, so it is recommended that you put all your code that needs to be processed via the Queue in files named commands.py, much like django’s admin processes files named admin.py.

To manually discover commands, execute:

>>> from djutils import queue; queue.autodiscover()

Consuming Messages

The djutils.queue.bin.consumer module contains the daemon that will consume your queue. This is a “proper” linux daemon, and is based on the python code found in this blog post.

To run the consumer, you will need to ensure that two environment variables are properly set:

  • PYTHONPATH: a list of directories in which to find python packages
  • DJANGO_SETTINGS_MODULE: the location of the settings file your django project uses

Useful consumer switches

“-t” or “–threads”
controls how many worker threads to use. If your tasks are CPU bound you probably won’t see much benefit from multiple threads due to the GIL, but if you plan on doing I/O in your tasks multi-threading can give you a big boost!
“-n” or “–no-periodic”
turns off the periodic task scheduler. If you have no periodic tasks feel free to turn this off. Also, if you plan on running multiple consumers, only one should be enqueueing periodic tasks.

Example assuming you use virtualenv

# assume your cwd is the root dir of virtualenv
export DJANGO_SETTINGS_MODULE=mysite.settings
./bin/python ./src/djutils/djutils/queue/bin/consumer.py start -l ./logs/queue.log -p ./run/queue.pid

-- stopping --

./bin/python ./src/djutils/djutils/queue/bin/consumer.py stop -l ./logs/queue.log -p ./run/queue.pid

Example running as root

sudo su
export PYTHONPATH=/path/to/site/:/path/to/djutils/:$PYTHONPATH
export DJANGO_SETTINGS_MODULE=mysite.settings
python djutils/bin/consumer.py start

-- stopping --

python djutils/bin/consumer.py stop

Backends

Currently I’ve only written two backends, the djutils.queue.backends.database.DatabaseQueue which stores messages in the db using django’s ORM and the djutils.queue.backends.redis_backend.RedisQueue whish uses redis to store messages. I plan on adding additional backends, but if you’d like to write your own there are just a few methods that need to be implemented.

class BaseQueue(object)
djutils.queue.backends.base.__init__(self, name, connection)

Initialize the Queue - this happens once when the module is loaded

djutils.queue.backends.base.write(self, data)

Push ‘data’ onto the queue

djutils.queue.backends.base.read(self)

Pop data from the queue. An empty queue should not raise an Exception!

djutils.queue.backends.base.flush(self)

Delete everything from the queue

djutils.queue.backends.base.__len__(self)

Number of items in the queue

class DatabaseQueue(BaseQueue)
QUEUE_CLASS = 'djutils.queue.backends.database.DatabaseQueue'
QUEUE_CONNECTION = '' # <-- no connection needed as it uses django's ORM
class RedisQueue(BaseQueue)
QUEUE_CLASS = 'djutils.queue.backends.redis_backend.RedisQueue'
QUEUE_CONNECTION = '10.0.0.75:6379:0' # host, port, database-number
class RedisBlockingQueue(RedisQueue)

An experimental queue that uses Redis’ blocking right pop operation to pull messages from the queue rather than polling for updates. Should work identical to RedisQueue in all other regards, including configuration.