How To Use RabbitMQ and Python's Puka to Deliver Messages to Multiple Consumers

Published on February 12, 2014

Software Engineer, CTO @Makimo

How To Use RabbitMQ and Python's Puka to Deliver Messages to Multiple Consumers



Working with RabbitMQ to send and receive messages is possible only after installing and configuring the software itself. How To Install and Manage RabbitMQ explains in detail how to get RabbitMQ working and is a good starting point for using this message broker.

Puka Python Library

All examples in this article are presented using Python language backed up with puka library handling the AMQP messaging protocol. Python has been chosen as a clean and easy to understand language for the sake of straightforward presentation, but since AMQP is a widely adopted protocol, any other programming language can be freely used to achieve similar goals.

puka can be quickly installed using pip – a Python package manager.

pip install puka

pip is not always bundled with Linux distributions. On Debian based distributions (including Ubuntu) it can be easily installed using:

apt-get install python-pip

On RHEL based, like CentOS:

yum install python-setuptools
easy_install pip

Introduction to RabbitMQ and Its Terminology

Messaging [RabbitMQ in particular] introduces a few terms that describe basic principles of the message broker and its mechanics.

  • Producer is a party that sends messages, hence creating a message is producing.

  • Consumer is a party that receives messages, hence receiving a message is consuming.

  • Queue is a buffer in which sent messages are stored and ready to be received. There is no limitation to how many messages a single queue can hold. There is also no limitation as to how many producers can send a message to a queue, nor how many consumers can try to access it. When a message hits the existing queue, it waits there until consumed by a consumer accessing that particular queue. When a message hits a non-existent queue, it gets discarded.

  • Exchange is an entity that resides between producers and queues. The producer never sends a message directly to a queue. It sends messages to an exchange, which - in turn - places the message to one or more queues, depending on the exchange used. To use a real life metaphor, exchange is like a mailman: It handles messages so they get delivered to proper queues (mailboxes), from which consumers can gather them.

  • Binding is a connection between queues and exchanges. Queues bound to a certain exchange are served by the exchange. How exactly depends on the exchange itself.

All five terms will be used throughout this text. There is one more, strictly related to puka python library, which was chosen as the library of choice for its clarity. It is a promise, which may be understood as a synchronous request to the AMQP server that guarantees execution (successful or not) of the request and on which the client waits until it is completed.

While puka can work asynchronously, in our examples puka will be used as a synchronous library. That means after each request (promise) puka will wait until it gets executed before going to the next step.

Testing RabbitMQ and Puka with a Simple Example

To test whether the message broker and puka works perfectly, and to get a grip on how the sending and receiving messages work in practice, create a sample python script named rabbit_test.py

vim rabbit_test.py

and paste the script contents:

import puka

# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")

# connect sending party
send_promise = producer.connect()

# connect receiving party
receive_promise = consumer.connect()

# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')

# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')

print "Message sent!"

# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)

print "Starting receiving!"

while True:
    received_message = consumer.wait(receive_promise)
    print "GOT: %r" % (received_message['body'],)

Press :wq to save the file and quit.

Running the script should print the message that was sent by the script to the RabbitMQ queue, since the test program receives the message immediately afterwards. The output should look like:

root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'

To explain what happens in this code, let’s go step by step:

  1. Both consumer and producer are created and connected to the same RabbitMQ server, residing on localhost

  2. Producer declares a queue, to make sure it exists when the message will be produced. If it weren’t for this step, a queue could be non-existent, and therefore messages could get discarded immediately.

  3. Producer sends the message to a nameless_ exchange (more on exchanges comes later) with a routing key specifying the queue created beforehand. After that the message hits the exchange, which in turn places it in the “rabbit” queue. The message then sits there until someone will consume it.

  4. Consumer accesses the “rabbit” queue and starts receiving messages stored there. Because there is one message waiting, it will get delivered immediately. It is consumed, which means it will no longer stay in the queue.

  5. The consumed message gets printed on screen.

Fanout Exchange

In the previous example, a nameless exchange has been used to deliver the message to a particular queue named “rabbit”. The nameless exchange needs a queue name to work, which means it can deliver the message only to a single queue.

There are also other types of exchanges in RabbitMQ, one of which is fanout, our primary concern in this text. Fanout exchange is a simple, blind tool that delivers messages to ALL queues it is aware of. With fanout exchange there is no need (in fact - it is impossible) to provide a particular queue name. Messages hitting that kind of exchange are delivered to all queues that are bound to the exchange before the message has been produced. There is no limit to how many queues can be connected to the exchange.

Publish/Subscribe Pattern

With fanout exchange, we can easily create a publish/subscribe pattern, working like an open to all newsletter. Producer, a newsletter broadcaster, sends periodic messages to the audience it may not even know (produces message and sends it to newsletter fanout exchange). New subscribers apply for the newsletter (binds own queue to the same newsletter fanout). From that moment the newsletter fanout exchange will deliver the message to all registered subscribers (queues).

While one-to-one messaging is pretty straightforward and developers often use other means of communication, one-to-many (where “many” is unspecified and can be anything between few and lots) is a very popular scenario in which a message broker can be of immense help.

Writing Producer Application

The sole role of producer application is to create a named fanout exchange and produce periodic messages (every few seconds) to that exchange. In a real life scenario, messages would be produced for a reason. To simplify the example, messages will be auto-generated. This application will act as a newsletter publisher.

Create a python script named newsletter_produce.py

vim newsletter_produce.py

and paste the script contents:

import puka
import datetime
import time

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()

# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')

# send current time in a loop
while True:
    message = "%s" % datetime.datetime.now()

    message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)

    print "SENT: %s" % message

Let’s go step by step with the example to explain what happens in the code.

  1. Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely.

  2. A named newsletter fanout exchange is created. After that step, the exchange exists on the RabbitMQ server and can be used to bind queues to it and send messages through it.

  3. In an endless loop, messages with current time are produced to the newsletter exchange. Note that routing_key is empty, which means there is no particular queue specified. It is the exchange that will deliver message to proper queues further on.

The application, when running, notifies the current time to all newsletter subscribers.

Writing Consumer Application

Consumer application will create a temporary queue and bind it to a named fanout exchange. After that, it will start waiting for messages. After binding the queue to the exchange, every message sent by the producer created before shall be received by this consumer. This application will act as a newsletter subscriber-- it will be possible to run the application multiple times at once and still all the instances will receive broadcast messages.

Create a python script named newsletter_consume.py

vim newsletter_consume.py

and paste the script contents:

import puka

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "GOT: %r" % message['body']

The consumer code is a bit more complicated than the producer’s. Let’s look into it step by step:

  1. Consumer client is created and connected to the local RabbitMQ instance.

  2. A temporary queue is created. Temporary means that no name is supplied - queue name will be auto-generated by RabbitMQ. Also, such queue will be destroyed after the client disconnects. It is a common way of creating queues that exist only to be bound to one of the exchanges and have no other special purposes. Since it is necessary to create a queue to receive anything, it is a convenient method to avoid thinking about the queue name.

  3. The created queue is bound to the newsletter exchange. From that moment, the fanout exchange will deliver every message to that queue.

  4. In an endless loop the consumer waits on the queue, receiving every message that hits the queue and printing it on the screen.

The application, when running, receives time notifications from the newsletter publisher. It can be executed multiple times at once, and every single instance of this application will get the current time.

Testing Both Applications

To test the newsletter publisher and its consumers, open multiple SSH sessions to the virtual server (or open multiple terminal windows, if working on local computer). In one of the windows run the producer application.

root@rabbitmq:~# python newsletter_produce.py

It will start displaying every second the current time:

SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000

In every other window run the consumer application:

root@rabbitmq:~# python newsletter_consume.py

Every instance of this application will receive time notifications broadcast by the producer:

GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000

It means that RabbitMQ properly registered the fanout exchange, bound the subscriber queues to this exchange, and delivered sent messages to proper queues. In other words, RabbitMQ worked as expected.

Further Reading

Publish/subscribe is a simple (both in concept and to implement) messaging pattern that often may come in handy; it is nowhere near RabbitMQ limits though. There are countless ways to use RabbitMQ to solve messaging problems, including advanced message routing, message acknowledgements, security or persistence.

The primary goal of this text was to introduce basic messaging concepts using simple examples. Many other uses are covered in detail in official RabbitMQ documentation which is a great resource for RabbitMQ users and administrators.

<div class=“author”>Article Submitted by: <a href=“http://maticomp.net”>Mateusz Papiernik</a></div>

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the authors
Default avatar

Software Engineer, CTO @Makimo

Creating bespoke software ◦ CTO & co-founder at Makimo. I’m a software enginner & a geek. I like making impossible things possible. And I need tea.

Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?

This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

great tutorial. thanks for your effort

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel