❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #10 – Lazy Queues | RabbitMQ

26 December 2024 at 06:54

What Are Lazy Queues?

  • Lazy Queues are designed to store messages primarily on disk rather than in memory.
  • They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.

Key Characteristics

  1. Disk-Based Storage – Messages are stored on disk immediately upon arrival, rather than being held in memory.
  2. Low Memory Usage – Only minimal metadata for messages is kept in memory.
  3. Scalability – Can handle millions of messages without consuming significant memory.
  4. Message Retrieval – Retrieving messages is slower because messages are fetched from disk.
  5. Durability – Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.

Trade-offs

  • Latency: Fetching messages from disk is slower than retrieving them from memory.
  • Throughput: Not suitable for high-throughput, low-latency applications.

Choose Lazy Queues if

  • You need to handle very large backlogs of messages.
  • Memory is a constraint in your system.Latency and throughput are less critical.

Implementation

Pre-requisites

1. Install and run RabbitMQ on your local machine.


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

2. Install the pika library


pip install pika

Producer (producer.py)

This script sends a persistent message to a Lazy Queue.

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare a Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Publish a message
message = "Hello from the Producer via Custom Exchange!"
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent message
)

print(f"Message sent to Lazy Queue via Exchange: {message}")

# Close the connection
connection.close()

Consumer (consumer.py)

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare the Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Callback function to process messages
def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Acknowledge the message

# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)

print("Waiting for messages. To exit, press CTRL+C")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print("Stopped consuming.")

# Close the connection
connection.close()

Explanation

  1. Producer
    • Defines a custom exchange (custom_exchange) of type direct.
    • Declares a Lazy Queue (lazy_queue_example).
    • Binds the queue to the exchange using a routing key (custom_routing_key).
    • Publishes a persistent message via the custom exchange and routing key.
  2. Consumer
    • Declares the same exchange and Lazy Queue to ensure they exist.
    • Consumes messages routed to the queue through the custom exchange and routing key.
  3. Custom Exchange and Binding
    • The direct exchange type routes messages based on an exact match of the routing key.
    • Binding ensures the queue receives messages published to the exchange with the specified key.
  4. Lazy Queue Behavior
    • Messages are stored directly on disk to minimize memory usage.

❌
❌