Learning Notes #10 β Lazy Queues | RabbitMQ
26 December 2024 at 06:54
What Are Lazy Queues?
- Lazy Queues are designed to store messages primarily on disk rather than in memory.
- They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.
Key Characteristics
- Disk-Based Storage β Messages are stored on disk immediately upon arrival, rather than being held in memory.
- Low Memory Usage β Only minimal metadata for messages is kept in memory.
- Scalability β Can handle millions of messages without consuming significant memory.
- Message Retrieval β Retrieving messages is slower because messages are fetched from disk.
- Durability β Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.
Trade-offs
- Latency: Fetching messages from disk is slower than retrieving them from memory.
- Throughput: Not suitable for high-throughput, low-latency applications.
Choose Lazy Queues if
- You need to handle very large backlogs of messages.
- Memory is a constraint in your system.Latency and throughput are less critical.
Implementation
Pre-requisites
1. Install and run RabbitMQ on your local machine.
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
2. Install the pika
library
pip install pika
Producer (producer.py
)
This script sends a persistent message to a Lazy Queue.
import pika # RabbitMQ connection parameters for localhost connection_params = pika.ConnectionParameters(host="localhost") # Connect to RabbitMQ connection = pika.BlockingConnection(connection_params) channel = connection.channel() # Custom Exchange and Routing Key exchange_name = "custom_exchange" routing_key = "custom_routing_key" queue_name = "lazy_queue_example" # Declare the custom exchange channel.exchange_declare( exchange=exchange_name, exchange_type="direct", # Direct exchange routes messages based on the routing key durable=True ) # Declare a Lazy Queue channel.queue_declare( queue=queue_name, durable=True, arguments={"x-queue-mode": "lazy"} # Configure the queue as lazy ) # Bind the queue to the custom exchange with the routing key channel.queue_bind( exchange=exchange_name, queue=queue_name, routing_key=routing_key ) # Publish a message message = "Hello from the Producer via Custom Exchange!" channel.basic_publish( exchange=exchange_name, routing_key=routing_key, body=message, properties=pika.BasicProperties(delivery_mode=2) # Persistent message ) print(f"Message sent to Lazy Queue via Exchange: {message}") # Close the connection connection.close()
Consumer (consumer.py
)
import pika # RabbitMQ connection parameters for localhost connection_params = pika.ConnectionParameters(host="localhost") # Connect to RabbitMQ connection = pika.BlockingConnection(connection_params) channel = connection.channel() # Custom Exchange and Routing Key exchange_name = "custom_exchange" routing_key = "custom_routing_key" queue_name = "lazy_queue_example" # Declare the custom exchange channel.exchange_declare( exchange=exchange_name, exchange_type="direct", # Direct exchange routes messages based on the routing key durable=True ) # Declare the Lazy Queue channel.queue_declare( queue=queue_name, durable=True, arguments={"x-queue-mode": "lazy"} # Configure the queue as lazy ) # Bind the queue to the custom exchange with the routing key channel.queue_bind( exchange=exchange_name, queue=queue_name, routing_key=routing_key ) # Callback function to process messages def callback(ch, method, properties, body): print(f"Received message: {body.decode()}") ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge the message # Start consuming messages channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False) print("Waiting for messages. To exit, press CTRL+C") try: channel.start_consuming() except KeyboardInterrupt: print("Stopped consuming.") # Close the connection connection.close()
Explanation
- Producer
- Defines a custom exchange (
custom_exchange
) of typedirect
. - Declares a Lazy Queue (
lazy_queue_example
). - Binds the queue to the exchange using a routing key (
custom_routing_key
). - Publishes a persistent message via the custom exchange and routing key.
- Defines a custom exchange (
- Consumer
- Declares the same exchange and Lazy Queue to ensure they exist.
- Consumes messages routed to the queue through the custom exchange and routing key.
- Custom Exchange and Binding
- The
direct
exchange type routes messages based on an exact match of the routing key. - Binding ensures the queue receives messages published to the exchange with the specified key.
- The
- Lazy Queue Behavior
- Messages are stored directly on disk to minimize memory usage.