❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #5 – Message Queues | RabbitMQ

22 December 2024 at 12:05

Github: https://github.com/syedjaferk/rabbitmq_message_queues

Imagine you own a busy online store. Customers place orders, payments are processed, inventory is updated, and confirmation emails are sent.

If these steps happen one after another in real-time (synchronous), your website could slow down or even crash under high demand. This is where message queues come in to picture. They help different parts of your system communicate smoothly and handle tasks efficiently, even during a rush. Its one of the solution for asynchronous communication.

What is a Message Queue?

A message queue is a software system that enables different parts of an application to send and receive messages asynchronously. Messages are temporarily stored in a queue until the recipient is ready to process them.

For example, think of it as a waiting line at a busy coffee shop. Each order (or message) waits in line until it’s picked up and handled by a coffee maker (or worker). The beauty of a message queue is that the coffee shop (producer) can keep taking orders without waiting for the coffee maker (consumer) to finish the current one.

Here’s how it works:

  • The producer sends messages to the queue.
  • The queue stores the messages.
  • The consumer picks up messages one by one to process them.

RabbitMQ is one kind of tool which helps in enabling async communication.

Key Components of RabbitMQ (a Popular Message Queue System)

  1. Producer: The sender of messages. For example, your website sending an order to the queue.
  2. Queue: The holding area for messages, like a to-do list. Each order waits here until processed.
  3. Consumer: The worker that processes messages. For example, the service that charges a credit card.
  4. Exchange: Think of this as a traffic controller. It decides which queue gets each message based on rules you set.
  5. Message: The data being sent, such as order details (customer name, items, total price).
  6. Acknowledgements (ACKs): A signal from the consumer to RabbitMQ saying, β€œMessage processed successfully!”.

How a Message Queue Solves Real Problems

Scenario: Imagine your online store uses a message queue during a holiday rush.

  1. Placing Orders
    • Customers place orders on your website (producer).
    • Orders are sent to the RabbitMQ queue.
  2. Processing Payments
    • The payment service (consumer) picks up orders from the queue, one by one, to charge credit cards.
  3. Sending Emails
    • Once payment is successful, another consumer sends confirmation emails.
  4. Updating Inventory
    • A third consumer updates the inventory system.

Without a queue: All these tasks would happen one after the other, causing delays and potential failures.

With a queue: Each task works independently and efficiently, ensuring smooth operations.

Simple RabbitMQ Example

Step1: I am spinning up a RabbitMQ from a Docker


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

Step 2: Producer Code (Sending Messages)


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

channel.basic_publish(exchange='',
                      routing_key='order_queue',
                      body='Order #12345')
print("[x] Sent 'Order #12345'")
connection.close()

Explanation

  1. pika.ConnectionParameters('localhost'): Connects to RabbitMQ running locally.
  2. channel.queue_declare(queue='order_queue'): Ensures the queue exists. If it doesn’t, RabbitMQ will create it.
  3. channel.basic_publish(...): Publishes a message (in this case, β€œOrder #12345”) to the specified queue.
  4. connection.close(): Cleans up and closes the connection.

Step 3: Consumer Code (Processing Message)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

def callback(ch, method, properties, body):
    print(f"[x] Processed {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='order_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Explanation

  1. channel.queue_declare(queue='order_queue'): Ensures the consumer is listening to the correct queue.
  2. callback: A function that processes each message. Here, it prints the message content and acknowledges it.
  3. channel.basic_consume(...): Binds the callback function to the queue, so the consumer processes messages as they arrive.
  4. channel.start_consuming(): Starts the consumer, waiting for messages indefinitely.

Best Practices (Not Tried – Just Got it from Course page.)

  1. Keep Messages Small: Only send necessary data to avoid delays.
  2. Use Dead Letter Queues: Handle failed messages separately to keep the main queue clear.
  3. Monitor Performance: Watch queue sizes and processing times to prevent backlogs.
  4. Scale Consumers: Add more workers during busy times to process messages faster.
  5. Secure Your System: Use encryption and authentication to protect sensitive data.

❌
❌