❌

Reading view

There are new articles available, click to refresh the page.

Learning Notes #7 – AMQP Protocol and RabbitMQ | An Overview

Today, i learned about AMQP Protocol, Components of RabbitMQ (Connections, Channels, Queues, Exchanges, Bindings and Different Types of Exchanges, Acknowledgement and Publisher Confirmation). I learned these all from CloudAMQP In this blog, you will find a crisp details on these topics.

1. Overview of AMQP Protocol

  • Advanced Message Queuing Protocol (AMQP) is an open standard for messaging middleware. It enables systems to exchange messages in a reliable and flexible manner.
  • Key components:
    • Producers: Applications that send messages.
    • Consumers: Applications that receive messages.
    • Broker: Middleware (e.g., RabbitMQ) that manages message exchanges.
    • Message: A unit of data transferred between producer and consumer.

2. How AMQP Works in RabbitMQ

  • RabbitMQ implements AMQP to facilitate message exchange. It acts as the broker, managing queues, exchanges, and bindings.
  • AMQP Operations:
    1. Producer sends a message to an exchange.
    2. The exchange routes the message to one or more queues based on bindings.
    3. Consumer retrieves the message from the queue.

3. Connections and Channels

Connections

A connection is a persistent, long-lived TCP connection between a client application and the RabbitMQ broker. Connections are relatively resource-intensive because they involve socket communication and the overhead of establishing and maintaining the connection. Each connection is uniquely identified by the broker and can be shared across multiple threads or processes.

When an application establishes a connection to RabbitMQ, it uses it as a gateway to interact with the broker. This includes creating channels, declaring queues and exchanges, publishing messages, and consuming messages. Connections should ideally be reused across the application to reduce overhead and optimize resource usage.

Channels

A channel is a lightweight, logical communication pathway established within a connection. Channels provide a way to perform multiple operations concurrently over a single connection. They are less resource-intensive than connections and are designed to handle operations such as queue declarations, message publishing, and consuming.

Using channels allows applications to:

  1. Scale efficiently: Instead of opening multiple connections, applications can open multiple channels over a single connection.
  2. Isolate operations: Each channel operates independently. For instance, one channel can consume messages while another publishes.

How They Work Together

When a client connects to RabbitMQ, it first establishes a connection. Within that connection, it can open multiple channels. Each channel operates as a virtual connection, allowing concurrent tasks without needing separate TCP connections.


import pika

# Establish a connection to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# Create multiple channels on the same connection
channel1 = connection.channel()
channel2 = connection.channel()

# Declare queues on each channel
channel1.queue_declare(queue='queue1')
channel2.queue_declare(queue='queue2')

# Publish messages on different channels
channel1.basic_publish(exchange='', routing_key='queue1', body='Message for Queue 1')
channel2.basic_publish(exchange='', routing_key='queue2', body='Message for Queue 2')

print("Messages sent to both queues!")

# Close the connection
connection.close()

Best Practices (Not Tried; Got this from the video)

  1. Reusing Connections: Establish one connection per application or service and share it across threads or processes for efficiency.
  2. Using Channels for Parallelism: Open separate channels for different operations like publishing and consuming.
  3. Graceful Cleanup: Always close channels and connections when done to avoid resource leaks.

4. Queues

  • Act as message storage.
  • Can be:
    • Durable: Survives broker restarts.
    • Exclusive: Used by a single connection.
    • Auto-delete: Deleted when the last consumer disconnects.

# Declaring a durable queue
channel.queue_declare(queue='durable_queue', durable=True)

# Sending a persistent message
channel.basic_publish(
    exchange='',  # Default exchange
    routing_key='durable_queue',
    body='Persistent message',
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

5. Exchanges

An exchange in RabbitMQ is a routing mechanism that determines how messages sent by producers are directed to queues. Exchanges act as intermediaries between producers and queues, enabling flexible and efficient message routing based on routing rules and patterns.

Types of Exchanges

RabbitMQ supports four types of exchanges, each with its unique routing mechanism:

1. Direct Exchange

  • Routes messages to queues based on an exact match of the routing key.
  • If the routing key in the message matches the binding key of a queue, the message is routed to that queue.
  • Use case: Task queues where each task has a specific destination.

Example:

  • Queue queue1 is bound to the exchange with the routing key info.
  • A message with the routing key info is routed to queue1.

channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
channel.queue_declare(queue='direct_queue')
channel.queue_bind(exchange='direct_exchange', queue='direct_queue', routing_key='info')
channel.basic_publish(exchange='direct_exchange', routing_key='info', body='Direct message')

2. Fanout Exchange

  • Broadcasts messages to all queues bound to the exchange, ignoring routing keys.
  • Use case: Broadcasting events to multiple consumers, such as notifications or logs.

Example:

  • All queues bound to the exchange receive the same message.

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')

channel.queue_bind(exchange='fanout_exchange', queue='queue1')
channel.queue_bind(exchange='fanout_exchange', queue='queue2')

channel.basic_publish(exchange='fanout_exchange', routing_key='', body='Broadcast message')

3. Topic Exchange

  • Routes messages to queues based on pattern matching of routing keys.
  • Routing keys are dot-separated words, and queues can bind with patterns using wildcards:
    • * matches exactly one word.
    • # matches zero or more words.
  • Use case: Complex routing scenarios, such as logging systems with multiple log levels and sources.

Example:

  • Queue queue1 is bound with the pattern logs.info.*.
  • A message with the routing key logs.info.app1 is routed to queue1.

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
channel.queue_declare(queue='topic_queue')

channel.queue_bind(exchange='topic_exchange', queue='topic_queue', routing_key='logs.info.*')

channel.basic_publish(exchange='topic_exchange', routing_key='logs.info.app1', body='Topic message')

4. Headers Exchange

  • Routes messages based on message header attributes instead of routing keys.
  • Headers can specify conditions like x-match:
    • x-match = all: All specified headers must match.
    • x-match = any: At least one specified header must match.
  • Use case: Advanced filtering scenarios.

Example:

  • Queue queue1 is bound with headers format=json and type=report.

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
channel.queue_declare(queue='headers_queue')

channel.queue_bind(
    exchange='headers_exchange',
    queue='headers_queue',
    arguments={'format': 'json', 'type': 'report', 'x-match': 'all'}
)

channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',
    body='Headers message',
    properties=pika.BasicProperties(headers={'format': 'json', 'type': 'report'})
)

Exchange Lifecycle

  1. Declaration: Exchanges must be explicitly declared before use. If an exchange is not declared and a producer tries to publish a message to it, an error will occur.
  2. Binding: Queues are bound to exchanges with routing keys or header arguments.
  3. Publishing: Producers publish messages to exchanges with optional routing keys.

Durable and Non-Durable Exchanges

  • Durable Exchange: Survives broker restarts. Useful for critical applications.
  • Non-Durable Exchange: Deleted when the broker restarts. Suitable for transient tasks.

# Declare a durable exchange
channel.exchange_declare(exchange='durable_exchange', exchange_type='direct', durable=True)

Default Exchange

RabbitMQ provides a built-in default exchange (unnamed exchange) that routes messages directly to a queue with a name matching the routing key.


channel.queue_declare(queue='default_queue')
channel.basic_publish(exchange='', routing_key='default_queue', body='Default exchange message')

Best Practices for Exchanges

  • Use durable exchanges for critical applications that require persistence across broker restarts.
  • Use direct exchanges for targeted delivery when routing keys are predictable.
  • Use fanout exchanges for broadcasting to multiple queues.
  • Use topic exchanges for complex routing needs, especially with hierarchical routing keys.
  • Use headers exchanges for advanced filtering based on metadata.

6. Bindings

Bindings connect queues to exchanges with routing rules.


# Binding a queue with a routing key
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')

7. Consumer Acknowledgments

Two acknowledgment types:

  • Manual: Consumer explicitly sends an acknowledgment.
  • Automatic: RabbitMQ assumes successful processing.
# Auto ACK
channel.basic_consume(queue='test_queue', on_message_callback=lambda ch, method, properties, body: print(body), auto_ack=True)

# Manual ACK
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='test_queue', on_message_callback=callback)
channel.start_consuming()

8. Publisher Confirmations

  • Guarantees that RabbitMQ successfully received a message.
  • Enable publisher confirms for robust systems.

# Enable delivery confirmation
channel.confirm_delivery()

# Publish and handle confirmation
try:
    channel.basic_publish(exchange='direct_logs', routing_key='error', body='Message with confirmation')
    print("Message successfully published!")
except pika.exceptions.UnroutableError:
    print("Message could not be routed.")

9. Virtual Hosts (vhosts)

  • Logical partitions to segregate exchanges, queues, and users.
  • Use vhosts for multi-tenant setups.

Tomm, I am planning to explore more on RabbitMQ. Let’s see tomm.

❌