Topic Based Exchange - RabbitMQ

Working of RabbitMQ

Before diving directly into what topic-based exchanges are and how they work, it is obligatory to have a sound understanding of how messages are published and subscribed.
The first thing to understand is that messages are not published directly to a Queue. Instead, the Producer sends messages through an Exchange.
You can think of an Exchange as a mail delivery person ensuring that the message ends up in the correct Queue.

How a message is routed depends on several things including:

  • the exchange type - specifies routing rules
  • routing keys and
  • header attributes.

These all act as addresses for messages.

From a Queue's perspective, you can check exchanges and routing rules
that are linked to this specific Queue. These links are called Bindings. While the Routing Key is like an address for the message, A Binding links the Queue to an Exchange.

In RabbitMQ there are four main types of Exchanges: Direct, Topic, Fanout, and Headers

This is mainly what the exchange looks for when deciding how to route the message to Queues:

Credits: Bikram Kundu

For local development, Existing exchanges, queues, channels, and types can be seen in the management interface or through "rabbitmqadmin."


Local Interface: http://localhost:15672/#/exchanges

Publishing & Subscribing using a topic-based exchange

Topic exchanges route messages to one or more queues based on the match between a message routing key and the pattern that was used to link a queue to an exchange.

Let's demonstrate the pub/sub procedure in python:

We'll use Pika python AMQP Client Library for the forthcoming operations. Install it using:

pip install pika

import pika

class Publisher:
    def __init__(self, config):
        self.config = config

    def publish(self, routing_key, message):
        connection = self.create_connection()
        channel =

        Creates/verifies an exchange.
            exchange=self.config["exchange"], exchange_type="topic"

        # Publish 'message' to the 'exchange' matching
        # the provided routing key
            exchange=self.config["exchange"], routing_key=routing_key, 
        print(f" [x] Sent message {message} for {routing_key})

    # Create new connection
    def create_connection(self):
        param = pika.ConnectionParameters(
            host=self.config["host"], port=self.config["port"]
        return pika.BlockingConnection(param)

config = {"host": "localhost", "port": 5672, "exchange": "my_exchange"}
publisher = Publisher(config)
publisher.publish('black', 'single word routing key')
publisher.publish('black.mamba', 'one more word added')
publisher.publish('', 'one or more words')

The above script helps us achieve:

  • Create connection and channel to RMQ host.
  • Declaring an exchange: my_exchange.
  • Publish different messages using various patterns of routing keys.

Execute for declaring exchange:

> python

import pika
import sys
class Subscriber:
    def __init__(self, queueName, bindingKey, config):
        self.queueName = queueName
        self.bindingKey = bindingKey
        self.config = config
        self.connection = self._create_connection()

    def __del__(self):
    def _create_connection(self):
        port = self.config['port'])
        return pika.BlockingConnection(parameters)

    def on_message_callback(self, channel, method, properties, body):
        print("body -", body)

    def setup(self):
        print("setup started")
        channel =
        # This method creates or checks a queue
        print("Queue declared")

        # Binds the queue to the specified exchange

        on_message_callback=self.on_message_callback, auto_ack=True)

        print(" [*] Waiting for data for "  + self.queueName + ". To exit press CTRL+C")
        except KeyboardInterrupt:
config = { 'host': 'localhost','port': 5672, 'exchange' : 'my_exchange'}

if len(sys.argv) < 2:
    print('Usage: ' + __file__ + ' <QueueName> <BindingKey>')
    queueName = sys.argv[1]
    #key in the form exchange.*
    key = sys.argv[2]
    subscriber = Subscriber(queueName, key, config)

Execute it using:

python testQueue black.#

The above script helps us achieve:

  • Create connection and channel to RMQ host.
  • Bind queue:testQueue to exchange my_exchange using binding key black.#.
  • Consume data and trigger function on_message_callback when the publisher emits the message.

Re-execute publisher for publishing data:

> python
[x] Sent message 'single word routing key' for 'black'
[x] Sent message 'One more word added' for 'black.mamba'
[x] Sent message 'one or more words' for ''

Subscriber's output:

setup started
Queue declared
 [*] Waiting for data for testQueue. To exit press CTRL+C
body - b'single word routing key'
body - b'One more word added'
body - b'one or more words'
Ending Note: Topic-based exchange in RabbitMQ can be used for solving complex use cases like triggering and executing long-running background tasks, communicating among services, scaling up-down during peak hours, to make reliable backups, etc.
Kartik Kapgate

Kartik Kapgate

Software Engineer | Backend