This text is a continuation of How To Use RabbitMQ and Python’s Puka to Deliver Messages to Multiple Consumers and requires the same software bundle up and running properly. Also, the same definitions are used throughout the article and we assume that the reader is acquainted with subjects from the former text.
We have already described the fanout
exchange, that delivers messages to every queue bound to that exchange with no additional rules in place. It is a very useful mechanism, but lacks flexibility. It is often undesirable to receive everything a producer emits to the exchange. RabbitMQ offers two different exchange types that can be used to implement more complex scenarios. One of these is direct
exchange.
Direct exchange offers a simple, key-based routing mechanism in RabbitMQ. It is somewhat similar to the nameless exchange used in the very first example, in which a message was delivered to the queue of name equal to the routing key of the message. However, whereas with nameless exchange there was no need to define explicit queue bindings, in direct exchange the bindings are crucial and mandatory.
When using direct exchange, each message produced to that exchange must have a routing key specified, which is an arbitrary name string, e.g. Texas. The message will then be delivered to all queues that have been bound to this exchange with the same routing key (all queues that were explicitly declared as interested in messages with Texas routing key).
The biggest difference between basic nameless exchange and direct
exchange is that the latter needs bindings and no queue listens to messages on that exchange before that. That in turn results in three great advantages.
One queue can be bound to listen to many different routing keys on the same exchange
One queue can be bound to listen on many different exchanges at once
Many queues can be bound to listen to the same routing key on an exchange
Let’s imagine a big city hub: a rail and bus station in one, with many destinations reachable by both means of transportation. And let’s imagine that the station wants to dispatch departure notifications using RabbitMQ. The task is to inform everyone interested that a bus or train to Seattle, Tooele, or Boston departs soon.
Such a program would define a direct departures
exchange to which all interested customers could subscribe their queues. Then messages containing departure time would be produced to that exchange with the routing key containing the destination. For example:
Message to departures
exchange with routing key Tooele
and body 2014-01-03 15:23
Message to departures
exchange with routing key Boston
and body 2014-01-03 15:41
Message to departures
exchange with routing key Seattle
and body 2014-01-03 15:55
Since one queue may be bound to many routing keys at once, and many queues can be bound to the same key, we could easily have:
One customer interested in Tooele only
One customer interested in Boston only
Another customer interested in Tooele and Boston at the same time
All waiting for information at the same time. They would receive proper messages using our direct exchange.
To simplify the task slightly for the example, let’s write a basic notification dispatcher that will accept one command line parameter. It will specify the destination and the application will send the current time to all interested consumers.
Create a sample python script named direct_notify.py
vim direct_notify.py
and paste the script contents:
import puka
import datetime
import time
import sys
# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)
# create a direct exchange named departures
exchange_promise = producer.exchange_declare(exchange='departures', type='direct')
producer.wait(exchange_promise)
# send current time to destination specified with command line argument
message = "%s" % datetime.datetime.now()
message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message)
producer.wait(message_promise)
print "Departure to %s at %s" % (sys.argv[1], message)
producer.close()
Press :wq to save the file and quit.
Running the script with one parameter should print the current time and used destination. The output should look like:
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#
Let’s go through the script step by step:
Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely.
A named departures
direct exchange is created. It does not need routing key specified at creation, as any message published to that exchange can have different key assigned to it. After that step the exchange exists on the RabbitMQ server and can be used to bind queues to it and send messages through it.
A message containing current time is published to that exchange, using the command line parameter as the routing key. In the sample run Tooele is used as the parameter, and hence as the departure destination - routing key.
Note: for simplicity, the script does not check whether the mandatory command line argument is supplied! It will not work properly if executed without parameters.
This example consumer application will act as a public transport customer interested in one or more of the destinations reachable from the station.
Create a sample python script named direct_watch.py
vim direct_watch.py
and paste the script contents:
import puka
import sys
# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)
# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']
# bind the queue to all routing keys specified by command line arguments
for destination in sys.argv[1:]:
print "Watching departure times for %s" % destination
bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination)
consumer.wait(bind_promise)
# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)
while True:
message = consumer.wait(message_promise)
print "Departure for %s at %s" % (message['routing_key'], message['body'])
consumer.close()
Press :wq to save the file and quit.
Running the script with one parameter Tooele should announce that the script watches departure times for Tooele, whereas running it with more than one parameter should announce watching departure times for many destinations.
root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#
Let’s go through the script step by step to explain what it does:
Consumer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely.
A temporary queue for this particular consumer is created, with auto-generated name by RabbitMQ. The queue will be destroyed after the script ends.
The queue is bound to all departures
exchange on all routing keys (destinations) specified using command line parameters, printing on the screen each destination for information.
The script starts waiting for messages on the queue. It shall receive all messages matching the bound routing keys. When running with Tooele as a single parameter - only those, when running with both Tooele and Boston - on both of them. Each departure time will be printed on the screen.
To check whether both scripts work as expected, open three terminal windows to the server. One will be used as a public transport station to send notifications. Another two will serve as customers waiting for departures.
In the first terminal, run the direct_notify.py
script once with any parameter:
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#
Important: the direct_notify.py
script must be executed at least once before any consumers, as the exchange must be created before binding queues to it. After execution the exchange stays on the RabbitMQ server and can be used freely.
In the second terminal, run the direct_watch.py
script with one parameter - Tooele:
root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~#
In the third terminal, run the direct_watch.py
script with two parameters - Tooele and Boston:
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#
Then, back in the first terminal, send three departure notifications. One to Tooele, one to Boston and one to Chicago:
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~# python direct_notify.py Boston
Departure to Tooele at 2014-02-18 15:57:31.035000
root@rabbitmq:~# python direct_notify.py Chicago
Departure to Tooele at 2014-02-18 15:57:35.035000
root@rabbitmq:~#
The first notification should be received only by both consumers waiting for departures to Tooele. The second one should get only to the consumer waiting for departures to Boston. The third one should not be received by any of these consumers, since none of them wait for departures to Chicago.
This is the expected behaviour. Those simple examples illustrate how to dispatch messages that only certain consumers specified by a routing key will receive.
Direct routing does not offer complete control over where the messages will be delivered, but is a big step up from fanout
exchange used in previous exchanges that blindly delivers messages everywhere. With direct
exchange many real world messaging scenarios can be served and the process is not terribly difficult.
The primary goal of this text was to introduce basic direct routing using a simple, real world situation. Many other uses are covered in detail in official RabbitMQ documentation which is a great resource for RabbitMQ users and administrators.
<div class=“author”>Article Submitted by: <a href=“http://maticomp.net”>Mateusz Papiernik</a></div>
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Sign up for Infrastructure as a Newsletter.
Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.