Introducing Queues — Creating a Pipeline in the Cloud

April 3rd, 2013 by Ken Elkabany

Queues provide an interface for Dataflow Programming that is built on top of our job system.

While a distributed queue data structure with push, pop, and ack capabilities is provided, the key benefit is the ability to attach a handler to a queue for scalable processing of a queue’s messages. The handler in turn can feed its output messages to other queues.

In other words, you’re probably used to the queue data structure:

Overview of Queue

Our queues link the data structure with a message handler, f, which we call an attachment:

Overview of Queue

f(msg) is any Python-function you define which takes in 1 argument at a time, a message, coming from the input queue. Its return value is pushed into the output queue.

By the end of this post, you’ll be able to:

  • Create a distributed, fault-tolerant pipeline of queues and processors.
  • Scale each component to achieve a throughput of thousands of messages per second.
  • See it all through automatically-generated visualizations.
  • Pay only when you have messages in your pipeline.
  • Do it all, with only Python, and not a single server.

If you’re a developer who just wants to RTFM, see our documentation.

Diving In

Let’s see queues in action. You’ll need to have the latest client installed, released today (4/3).

First, let’s push and pop from a queue in your console to get comfortable:

>>> # import our library
>>> import cloud
>>> q = cloud.queue.get('numbers')
>>> # adds 3 messages to the queue
>>> q.push([1,2,3])
>>> # pops up to 10 messages
>>> q.pop()
[2, 1, 3]

Note that the queue did not dequeue in perfect-FIFO order; this is expected. Our queues are designed for high-throughput, high-parallelism, with minimal queue backlog, making guaranteed FIFO behavior less relevant.

Attach

Now let’s attach a function that increments all numbers in the input queue:

# declare the input and output queue
input_q = cloud.queue.get('numbers')
output_q = cloud.queue.get('bigger-numbers')

# create handler function
def increment(x):
    return x + 1

# attach the handler to the queue
input_q.attach(increment, output_q)

How did that work? We’re using the same automagic dependency transfer we use in our job system to send your increment function to us along with any dependencies it might have.

Visualization

From the Queues Dashboard, we can see an auto-generated layout of our pipeline based on the attachment we made:

Overview of Queue

Message Processing by Attachment

Let’s increment 1,000 numbers:

# range(1000) is a list of numbers from 0 to 999
input_q.push(range(1000))

In the background, our systems have created a job (visible in your Job Dashboard) that applies the increment function to every number in the numbers queue, and outputs the result to the bigger-numbers queue. If you’re unfamiliar with our job framework, don’t worry, queues abstract away most of the details. But, if you’re interested, see our Primer.

After ~10 seconds, you’ll see that all messages have been processed. On the live throughput chart, a single point represents the average throughput during a 10s window of time; the interpolation lines are cosmetic. Below, the single point at 100 msgs/second represents that 1000 messages were processed during the window. In actuality, we got about ~150 msgs/second for 7 seconds.

Overview of Queue

As a sanity check, we can also check the size of the queues:

>>> input_q.count()
0
>>> output_q.count()
1000

Increasing Throughput

What if you want to increase your throughput past 150 msgs/second? Set max_parallel_jobs for the attachment. You can do this from the Queue Dashboard or from the attach call:

# attach the handler to the queue
input_q.attach(increment, output_q, _max_parallel_jobs=5)

Now, assuming there are messages in the queue, you’ll see a throughput of 750 msgs/second!

Overview of Queue

If you click on “view” jobs, you can see a list of the five jobs attached to the queue. For those familiar with our job processing framework, you can now see that our Queue processors are built on top of jobs.

The takeaway: you just increased your throughput 5-fold by changing one number, and without any server management or scaling.

Creating an Image Processing Pipeline

To showcase the power of queues, we’re going to create the following pipeline:

Overview of Queue

This is an auto-generated visualization available in the Queues Dashboard. Rectangles are queues; circles are attachments.

The inputs to this pipeline are URLs, which should be pushed to the img-urls queue. The pipeline downloads the image, and does the following image operations:

  • Resize to 150px (thumbnail)
  • Resize to 400px (medium size)
  • Apply a Sepia-tone filter to the medium size image

For each generated image, an HTTP callback is made to an endpoint of your choice. Note that separating the three image operations into three attachments with different input queues isn’t the most efficient (you’d probably want to combine them into one operation), but it’s done for illustrative purposes.

You can download this pipeline from our repository: basic-example/queue/imgpipeline/pipeline.py

Step 1: Scraping Images

We’re going to use the following function as our attachment:

import os
import Image
import urllib2
from StringIO import StringIO

import cloud

def scrape_to_bucket(target):
    """Downloads image from url, and saves to bucket. *target* should
    be a dict with keys id (image id), and url (location of image).

    Returns a dict with keys id (image id), path (obj key), and
    transforms (empty list)."""

    id = target['id']
    url = target['url']

    # path to save image in bucket
    obj_path = 'imgs/{id}/original.png'.format(id=id)

    # extract extension from url
    ext = os.path.splitext(url)[-1]

    # open connection to image
    u = urllib2.urlopen(url)

    # if image isn't png, convert it to png
    if ext.lower() != 'png':
        i = Image.open(StringIO(u.read()))
        data = StringIO()
        i.save(data, 'png')
        data = data.getvalue()
    else:
        data = u.read()

    u.close()

    # add image to bucket
    cloud.bucket.putf(data, obj_path)

    return {'id': id,
            'path': obj_path,
            'transforms': []}

If you’re unfamiliar with Buckets, just think of them as a key->value object store. We use it here to conveniently retrieve and store objects to and from memory. However, buckets are not necessary, and are completely unrelated to queues. You can modify scrape_to_bucket() so it saves images into your own Amazon S3 account, database, or anywhere else.

Here’s a sample input message we’ll use to demonstrate each operation:

{
 'id': 1,
 'url': 'http://s3.amazonaws.com/pi-user-buckets/vFvZxWVSiHeeB20rAZwnS66OLRjeU8MU4Igf2Kyl/blog/Obama_family_portrait.jpg'
}

The url points to an image of the Obama family:
Obama Family Full Size

Per the source code, the above image will be saved in your bucket. The output message pushed to the thumbnail and medium queues will be:

{
 'id': 1,
 'path': 'imgs/1/original.png',
 'transforms': []
}

You can verify this works by simply running the function on your own machine:

>>> msg = {'id': 1,
'url': 'http://s3.amazonaws.com/pi-user-buckets/vFvZxWVSiHeeB20rAZwnS66OLRjeU8MU4Igf2Kyl/blog/Obama_family_portrait.jpg'}
>>> scrape_to_bucket(msg)
{'id': 1, 'path': 'imgs/1/original.png', 'transforms': []}

This is another advantage of queues. Because your function doesn’t need to be modified in any way to be an attachment, you can just as easily test it locally, as you can on the cloud.

Handling Exceptions

What if the message handler throws an Exception? Maybe the URL was temporarily unavailable, but you’d like to retry it in 60 seconds. Using retry_on, max_retries, and retry_delay, you can specify which Exceptions you’d like to retry, the number of times to retry, and the amount of time between each attempt.

import urllib2

q = cloud.queue.get('img-urls')
output_qs = cloud.queue.get('thumbnail'), cloud.queue.get('medium')
bad_urls_q = cloud.queue.get('bad-urls')

q.attach(scrape_to_bucket,
         output_qs,
         retry_on=[urllib2.HTTPError, urllib2.URLError],
         max_retries=3,
         retry_delay=60,
         on_error={Exception: {'queue': bad_urls_q}})

Using the on_error keyword, the bad-urls queue will be sent messages that raised non-retryable Exceptions, and messages that failed even after three retries. Error messages generated by on_error include the triggered exception, and associated traceback.

You can confirm that your attachment has been setup as intended with the visualization.

Scrape Attachment

Tweaking Performance with Multi-threading

Because this scraping attachment spends most of its time waiting for network data transfer, and is thus I/O bound, it won’t be effectively utilizing the core it’s running on. The workaround is to run multiple, identical attachments in the job.

To do this, set the readers_per_job keyword to the number of simultaneous threads you want running in a job. The default value is 1. Do not mistake this with max_parallel_jobs, which controls the number of jobs that may be running. For this example, we’ll set the value to 4.

import urllib2

q = cloud.queue.get('img-urls')
output_qs = cloud.queue.get('thumbnail'), cloud.queue.get('medium')
bad_urls_q = cloud.queue.get('bad-urls')

q.attach(scrape_to_bucket,
         output_qs,
         retry_on=[urllib2.HTTPError, urllib2.URLError],
         max_retries=3,
         retry_delay=60,
         on_error={Exception: {'queue': bad_urls_q}},
         readers_per_job=4)

Step 2: Resizing Images

We’re going to attach handlers to the thumbnail queue, and medium queue to resize images to 150px, and 400px, respectively. To ease the storage and retrieval of images as PIL Image objects, we’re going to use a custom-defined ImageOperation class.


class ImageOperation(object):
    """Base class for Message Handlers in Image Pipeline.

    Retrieves images from bucket, performs in-memory manipulation
    with PIL object, stores result back in bucket, and then
    outputs message with additional transform listed.

    Override operation() for custom operation."""

    name = 'identity'

    def get_image_from_bucket(self, obj_path):
        """Given *obj_path* in bucket, returns PIL Image object"""

        # get image data as string of raw bytes
        data = cloud.bucket.getf(obj_path).read()

        return Image.open(StringIO(data))

    def put_image_in_bucket(self, img, obj_path):
        """Given PIL image *img*, saves it to *obj_path* in bucket"""

        output_data = StringIO()

        # write raw image bytes to StringIO
        img.save(output_data, 'png')

        # store the image file in your bucket
        cloud.bucket.putf(output_data.getvalue(), obj_path)

    def add_modifier_to_key(self, obj_path):
        """Returns new *obj_path* that includes name of transform"""

        obj_key, obj_ext = os.path.splitext(obj_path)
        obj_path = '{key}.{name}.png'.format(key=obj_key,
                                             name=self.name)
        return obj_path

    def message_handler(self, msg):
        """Entry point for message handling. Do not override."""

        img = self.get_image_from_bucket(msg['path'])

        # apply image operation
        new_img = self.operation(img)

        msg['path'] = self.add_modifier_to_key(msg['path'])
        msg['transforms'].append(self.name)

        self.put_image_in_bucket(new_img, msg['path'])

        return msg

    def operation(self, img):
        """Method to replace for custom operation"""

        return img

Since we support instances as message handlers, we’ll subclass ImageOperation to make two message handlers: ThumbnailOperation and MediumSizeOperation.

class ImageThumbnail(ImageOperation):

    name = 'thumb'

    def operation(self, img):
        """Returns a thumbnail of the *img*"""

        img.thumbnail((150, 150), Image.ANTIALIAS)
        return img

class ImageMediumSize(ImageOperation):

    name = 'med'

    def operation(self, img):
        """Returns a 400px version of the *img*"""

        img.thumbnail((400, 400), Image.ANTIALIAS)
        return img

Now we’ll attach instances of these classes to their respective input queues.

thumbnail_q = cloud.queue.get('thumbnail')
thumbnail_q.attach(ImageThumbnail(), [callback_q])

medium_q = cloud.queue.get('medium')
medium_q.attach(ImageMediumSize(), [sepia_q, callback_q])

If you pushed the example message of the Obama family to img-urls, then there are already messages ready for the thumbnail and medium queue. Once processed, these two objects will appear in your bucket:

imgs/1/original.thumb.png

Obama Family Thumbnail

imgs/1/original.med.png

Obama Family Medium Size

Step 3: Sepia Tone

The final image operation is a sepia-tone filter after the medium-size downscale operation.

class ImageSepia(ImageOperation):
    """Applies Sepia Filter.
    Based on: http://effbot.org/zone/pil-sepia.htm"""

    name = 'sepia'

    def __init__(self):
        self.sepia_palette = self.make_linear_ramp()

    @staticmethod
    def make_linear_ramp():
        """Generate a palette in a format acceptable for `putpalette`,
        which expects [r,g,b,r,g,b,...]"""

        ramp = []
        r, g, b = 255, 220, 162 

        for i in range(255):
            ramp.extend((r*i/255, g*i/255, b*i/255))

        return ramp

    def operation(self, img):
        """Returns a version of the *img* with Sepia applied
        for a vintage look."""

        # convert to grayscale
        orig_mode = img.mode
        if orig_mode != "L":
            img = img.convert("L")

        img = ImageOps.autocontrast(img)

        # apply sepia palette
        img.putpalette(self.sepia_palette)

        # convert back to its original mode
        if orig_mode != "L":
            img = img.convert(orig_mode)

        return img

Attaching:

sepia_q = cloud.queue.get('sepia')
sepia_q.attach(ImageSepia(), [callback_q])

Once again, if you pushed the sample message, there should already be a message ready in the sepia queue. The image outputted to your bucket is:

Obama Family Sepia

Step 4: Callback

Each image operation outputs a message to the callback queue. You probably want your callback handler to:

  • Write to your database that the image is ready
  • Make a POST request to your website for instant notification
  • Store the image somewhere else

For simplicity, we’ll have the callback handler set the image object in your bucket as public, so that it’s accessible by anyone. Based on the above examples, the following should be straightforward:

def callback(msg):
    print msg
    cloud.bucket.make_public(msg['path'])

callback_q = cloud.queue.get('callback')
callback_q.attach(callback)

Debugging Attachments

You may have noticed that in callback(msg), we did a print msg. How would you see standard output for an attachment? The same way you would for a job—by clicking on it in the Jobs Dashboard. To know what jobs are running your attachments, click “view” jobs from the Queues Dashboard, which will take you to the Jobs Dashboard filtered for your attachment.

Using this method, you’ll get access to all the information you’re accustomed to with jobs, including a realtime feed of CPU, memory, and disk usage.

Scaling Up

With just a few tweaks, I was able to get a system throughput greater than 150 images per second. I set max_parallel_jobs to 20 for the scraping step (10 readers_per_job, c2 core), and 30 for all image operation steps. Also, I set the image operation steps to use the f2 core for faster processing.

Here’s a screenshot of the Queue Dashboard in action as I was testing (doesn’t show max throughput). Note how the dequeue rate is able to keep up with the enqueue rate, which is precisely what we want.

Obama Family Sepia

Pricing

We charge for queues based on the amount of time jobs spend processing messages. Assuming you have a steady stream of messages, the maximum you’ll pay in an hour is:

max_parallel_jobs x cost per core hour

You can find the cost per core hour based on the core type you’ve chosen from our pricing page. If your queue is empty, no jobs will be running, and you won’t pay a thing!

Conclusion: Let Us Clean Your Pipes

Letting us manage the full pipeline—the queues, and the processing of messages—has several advantages:

  • No Servers: You won’t have to configure or deploy a single server for storage or processing.
  • Faster Development: It takes only a few lines of code to “use the cloud” for a pipeline.
  • Reliability: Our queues and workers are distributed and replicated across multiple datacenters (AWS availability zones), and even a server failure won’t jeopardize your messages.
  • Scale Effortlessly: Tell us how many cores you want to put to work, and we make it so.
  • Cut Costs: You only pay for processing when there are messages. No idling servers.
  • Monitoring & Analytics: Take advantage of our queue analytics, and the same job monitoring interface that powers our standard service.

If you’re ready to give it try, sign up now, and get 20 free core hours. Happy coding!

Tags: , , ,

Categories: What's New

You can follow any responses to this entry through the RSS 2.0 feed.

Leave a Reply