Working of RabbitMQ
Before diving directly into what topic-based exchanges are and how they work, it is obligatory to have a sound understanding of how messages are published and subscribed.
The first thing to understand is that messages are not published directly to a Queue. Instead, the Producer sends messages through an Exchange.
You can think of an Exchange as a mail delivery person ensuring that the message ends up in the correct Queue.
How a message is routed depends on several things including:
- the exchange type - specifies routing rules
- routing keys and
- header attributes.
These all act as addresses for messages.
From a Queue's perspective, you can check exchanges and routing rules
that are linked to this specific Queue. These links are called Bindings. While the Routing Key is like an address for the message, A Binding links the Queue to an Exchange.
In RabbitMQ there are four main types of Exchanges: Direct, Topic, Fanout, and Headers
This is mainly what the exchange looks for when deciding how to route the message to Queues:

For local development, Existing exchanges, queues, channels, and types can be seen in the management interface or through "rabbitmqadmin."
Local Interface: http://localhost:15672/#/exchanges
Publishing & Subscribing using a topic-based exchange
Topic exchanges route messages to one or more queues based on the match between a message routing key and the pattern that was used to link a queue to an exchange.
Let's demonstrate the pub/sub procedure in python:
We'll use Pika python AMQP Client Library for the forthcoming operations. Install it using:
pip install pika
publisher.py
import pika
class Publisher:
def __init__(self, config):
self.config = config
def publish(self, routing_key, message):
connection = self.create_connection()
channel = connection.channel()
"""
Creates/verifies an exchange.
"""
channel.exchange_declare(
exchange=self.config["exchange"], exchange_type="topic"
)
# Publish 'message' to the 'exchange' matching
# the provided routing key
channel.basic_publish(
exchange=self.config["exchange"], routing_key=routing_key,
body=message
)
print(f" [x] Sent message {message} for {routing_key})
# Create new connection
def create_connection(self):
param = pika.ConnectionParameters(
host=self.config["host"], port=self.config["port"]
)
return pika.BlockingConnection(param)
config = {"host": "localhost", "port": 5672, "exchange": "my_exchange"}
publisher = Publisher(config)
publisher.publish('black', 'single word routing key')
publisher.publish('black.mamba', 'one more word added')
publisher.publish('black.abc.xyz', 'one or more words')
The above script helps us achieve:
- Create connection and channel to RMQ host.
- Declaring an exchange:
my_exchange
. - Publish different messages using various patterns of routing keys.
Execute for declaring exchange:
> python publisher.py
subscriber.py
import pika
import sys
class Subscriber:
def __init__(self, queueName, bindingKey, config):
self.queueName = queueName
self.bindingKey = bindingKey
self.config = config
self.connection = self._create_connection()
def __del__(self):
self.connection.close()
def _create_connection(self):
parameters=pika.ConnectionParameters(host=self.config['host'],
port = self.config['port'])
return pika.BlockingConnection(parameters)
def on_message_callback(self, channel, method, properties, body):
print("body -", body)
def setup(self):
print("setup started")
channel = self.connection.channel()
# This method creates or checks a queue
channel.queue_declare(queue=self.queueName)
print("Queue declared")
# Binds the queue to the specified exchange
channel.queue_bind(queue=self.queueName,exchange=self.config
['exchange'],routing_key=self.bindingKey)
channel.basic_consume(queue=self.queueName,
on_message_callback=self.on_message_callback, auto_ack=True)
print(" [*] Waiting for data for " + self.queueName + ". To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
config = { 'host': 'localhost','port': 5672, 'exchange' : 'my_exchange'}
if len(sys.argv) < 2:
print('Usage: ' + __file__ + ' <QueueName> <BindingKey>')
sys.exit()
else:
queueName = sys.argv[1]
#key in the form exchange.*
key = sys.argv[2]
subscriber = Subscriber(queueName, key, config)
subscriber.setup()
Execute it using:
python subscriber.py testQueue black.#
The above subscriber.py
script helps us achieve:
- Create connection and channel to RMQ host.
- Bind queue:
testQueue
to exchangemy_exchange
using binding keyblack.#
. - Consume data and trigger function
on_message_callback
when the publisher emits the message.
Re-execute publisher for publishing data:
> python publisher.py
[x] Sent message 'single word routing key' for 'black'
[x] Sent message 'One more word added' for 'black.mamba'
[x] Sent message 'one or more words' for 'black.abc.xyz'
Subscriber's output:
setup started
Queue declared
[*] Waiting for data for testQueue. To exit press CTRL+C
body - b'single word routing key'
body - b'One more word added'
body - b'one or more words'
Ending Note: Topic-based exchange in RabbitMQ can be used for solving complex use cases like triggering and executing long-running background tasks, communicating among services, scaling up-down during peak hours, to make reliable backups, etc.