Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

RSVP for K6 : Load Testing Made Easy in Tamil

5 February 2025 at 10:57

Ensuring your applications perform well under high traffic is crucial. Join us for an interactive K6 Bootcamp, where we’ll explore performance testing, load testing strategies, and real-world use cases to help you build scalable and resilient systems.

🎯 What is K6 and Why Should You Learn It?

Modern applications must handle thousands (or millions!) of users without breaking. K6 is an open-source, developer-friendly performance testing tool that helps you

✅ Simulate real-world traffic and identify performance bottlenecks.
✅ Write tests in JavaScript – no need for complex tools!
✅ Run efficient load tests on APIs, microservices, and web applications.
✅ Integrate with CI/CD pipelines to automate performance testing.
✅ Gain deep insights with real-time performance metrics.

By mastering K6, you’ll gain the skills to predict failures before they happen, optimize performance, and build systems that scale with confidence!

📌 Bootcamp Details

📅 Date: Feb 23 2024 – Sunday
🕒 Time: 10:30 AM
🌐 Mode: Online (Link Will be shared in Email after RSVP)
🗣 Language: தமிழ்

🎓 Who Should Attend?

  • Developers – Ensure APIs and services perform well under load.
  • QA Engineers – Validate system reliability before production.
  • SREs / DevOps Engineers – Continuously test performance in CI/CD pipelines.

RSVP Now

🔥 Don’t miss this opportunity to master load testing with K6 and take your performance engineering skills to the next level!

Got questions? Drop them in the comments or reach out to me. See you at the bootcamp! 🚀

Our Previous Monthly meets – https://www.youtube.com/watch?v=cPtyuSzeaa8&list=PLiutOxBS1MizPGGcdfXF61WP5pNUYvxUl&pp=gAQB

Our Previous Sessions,

  1. Python – https://www.youtube.com/watch?v=lQquVptFreE&list=PLiutOxBS1Mizte0ehfMrRKHSIQcCImwHL&pp=gAQB
  2. Docker – https://www.youtube.com/watch?v=nXgUBanjZP8&list=PLiutOxBS1Mizi9IRQM-N3BFWXJkb-hQ4U&pp=gAQB
  3. Postgres – https://www.youtube.com/watch?v=04pE5bK2-VA&list=PLiutOxBS1Miy3PPwxuvlGRpmNo724mAlt&pp=gAQB

RabbitMQ – All You Need To Know To Start Building Scalable Platforms

1 February 2025 at 02:39

  1. Introduction
  2. What is a Message Queue ?
  3. So Problem Solved !!! Not Yet
  4. RabbitMQ: Installation
  5. RabbitMQ: An Introduction (Optional)
    1. What is RabbitMQ?
    2. Why Use RabbitMQ?
    3. Key Features and Use Cases
  6. Building Blocks of Message Broker
    1. Connection & Channels
    2. Queues – Message Store
    3. Exchanges – Message Distributor and Binding
  7. Producing, Consuming and Acknowledging
  8. Problem #1 – Task Queue for Background Job Processing
    1. Context
    2. Problem
    3. Proposed Solution
  9. Problem #2 – Broadcasting NEWS to all subscribers
    1. Problem
    2. Solution Overview
    3. Step 1: Producer (Publisher)
    4. Step 2: Consumers (Subscribers)
      1. Consumer 1: Mobile App Notifications
      2. Consumer 2: Email Alerts
      3. Consumer 3: Web Notifications
      4. How It Works
  10. Intermediate Resources
    1. Prefetch Count
    2. Request Reply Pattern
    3. Dead Letter Exchange
    4. Alternate Exchanges
    5. Lazy Queues
    6. Quorom Queues
    7. Change Data Capture
    8. Handling Backpressure in Distributed Systems
    9. Choreography Pattern
    10. Outbox Pattern
    11. Queue Based Loading
    12. Two Phase Commit Protocol
    13. Competing Consumer
    14. Retry Pattern
    15. Can We Use Database as a Queue
  11. Let’s Connect

Introduction

Let’s take the example of an online food ordering system like Swiggy or Zomato. Suppose a user places an order through the mobile app. If the application follows a synchronous approach, it would first send the order request to the restaurant’s system and then wait for confirmation. If the restaurant is busy, the app will have to keep waiting until it receives a response.

If the restaurant’s system crashes or temporarily goes offline, the order will fail, and the user may have to restart the process.

This approach leads to a poor user experience, increases the chances of failures, and makes the system less scalable, as multiple users waiting simultaneously can cause a bottleneck.

In a traditional synchronous communication model, one service directly interacts with another and waits for a response before proceeding. While this approach is simple and works for small-scale applications, it introduces several challenges, especially in systems that require high availability and scalability.

The main problems with synchronous communication include slow performance, system failures, and scalability issues. If the receiving service is slow or temporarily unavailable, the sender has no choice but to wait, which can degrade the overall performance of the application.

Moreover, if the receiving service crashes, the entire process fails, leading to potential data loss or incomplete transactions.

In this book, we are going to solve how this can be solved with a message queue.

What is a Message Queue ?

A message queue is a system that allows different parts of an application (or different applications) to communicate with each other asynchronously by sending and receiving messages.

It acts like a buffer or an intermediary where messages are stored until the receiving service is ready to process them.

How It Works

  1. A producer (sender) creates a message and sends it to the queue.
  2. The message sits in the queue until a consumer (receiver) picks it up.
  3. The consumer processes the message and removes it from the queue.

This process ensures that the sender does not have to wait for the receiver to be available, making the system faster, more reliable, and scalable.

Real-Life Example

Imagine a fast-food restaurant where customers place orders at the counter. Instead of waiting at the counter for their food, customers receive a token number and move aside. The kitchen prepares the order in the background, and when it’s ready, the token number is called for pickup.

In this analogy,

  • The counter is the producer (sending orders).
  • The queue is the token system (storing orders).
  • The kitchen is the consumer (processing orders).
  • The customer picks up the food when ready (message is consumed).

Similarly, in applications, a message queue helps decouple systems, allowing them to work at their own pace without blocking each other. RabbitMQ, Apache Kafka, and Redis are popular message queue systems used in modern software development. 🚀

So Problem Solved !!! Not Yet

It seems like problem is solved, but the message life cycle in the queue is need to handled.

  • Message Routing & Binding (Optional) – How a message is routed ?. If an exchange is used, the message is routed based on predefined rules.
  • Message Storage (Queue Retention) – How long a message stays in the queue. The message stays in the queue until a consumer picks it up.
  • If the consumer successfully processes the message, it sends an acknowledgment (ACK), and the message is removed. If the consumer fails, the message requeues or moves to a dead-letter queue (DLQ).
  • Messages that fail multiple times, are not acknowledged, or expire may be moved to a Dead-Letter Queue for further analysis.
  • Messages stored only in memory can be lost if RabbitMQ crashes.
  • Messages not consumed within their TTL expire.
  • If a consumer fails to acknowledge a message, it may be reprocessed twice.
  • Messages failing multiple times may be moved to a DLQ.
  • Too many messages in the queue due to slow consumers can cause system slowdowns.
  • Network failures can disrupt message delivery between producers, RabbitMQ, and consumers.
  • Messages with corrupt or bad data may cause repeated consumer failures.

To handle all the above problems, we need a tool. Stable, Battle tested, Reliable tool. RabbitMQ is one kind of that tool. In this book we will cover the basics of RabbitMQ.

RabbitMQ: Installation

For RabbitMQ Installation please refer to https://www.rabbitmq.com/docs/download. In this book we will go with RabbitMQ docker.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management


RabbitMQ: An Introduction (Optional)

What is RabbitMQ?

Imagine you’re sending messages between friends, but instead of delivering them directly, you drop them in a mailbox, and your friend picks them up when they are ready. RabbitMQ acts like this mailbox, but for computer programs. It helps applications communicate asynchronously, meaning they don’t have to wait for each other to process data.

RabbitMQ is a message broker, which means it handles and routes messages between different parts of an application. It ensures that messages are delivered efficiently, even when some components are running at different speeds or go offline temporarily.

Why Use RabbitMQ?

Modern applications often consist of multiple services that need to exchange data. Sometimes, one service produces data faster than another can consume it. Instead of forcing the slower service to catch up or making the faster service wait, RabbitMQ allows the fast service to place messages in a queue. The slow service can then process them at its own pace.

Some key benefits of using RabbitMQ include,

  • Decoupling services: Components communicate via messages rather than direct calls, reducing dependencies.
  • Scalability: RabbitMQ allows multiple consumers to process messages in parallel.
  • Reliability: It supports message durability and acknowledgments, preventing message loss.
  • Flexibility: Works with many programming languages and integrates well with different systems.
  • Efficient Load Balancing: Multiple consumers can share the message load to prevent overload on a single component.

Key Features and Use Cases

RabbitMQ is widely used in different applications, including

  • Chat applications: Messages are queued and delivered asynchronously to users.
  • Payment processing: Orders are placed in a queue and processed sequentially.
  • Event-driven systems: Used for microservices communication and event notification.
  • IoT systems: Devices publish data to RabbitMQ, which is then processed by backend services.
  • Job queues: Background tasks such as sending emails or processing large files.

Building Blocks of Message Broker

Connection & Channels

In RabbitMQ, connections and channels are fundamental concepts for communication between applications and the broker,

Connections: A connection is a TCP link between a client (producer or consumer) and the RabbitMQ broker. Each connection consumes system resources and is relatively expensive to create and maintain.

Channels: A channel is a virtual communication path inside a connection. It allows multiple logical streams of data over a single TCP connection, reducing overhead. Channels are lightweight and preferred for performing operations like publishing and consuming messages.

Queues – Message Store

A queue is a message buffer that temporarily holds messages until a consumer retrieves and processes them.

1. Queues operate on a FIFO (First In, First Out) basis, meaning messages are processed in the order they arrive (unless priorities or other delivery strategies are set).

2. Queues persist messages if they are declared as durable and the messages are marked as persistent, ensuring reliability even if RabbitMQ restarts.

3. Multiple consumers can subscribe to a queue, and messages can be distributed among them in a round-robin manner.

Consumption by multiple consumers,

Can also be broadcasted,

4. If no consumers are available, messages remain in the queue until a consumer connects.

Analogy: Think of a queue as a to-do list where tasks (messages) are stored until someone (a worker/consumer) picks them up and processes them.

Exchanges – Message Distributor and Binding

An exchange is responsible for routing messages to one or more queues based on routing rules.

When a producer sends a message, it doesn’t go directly to a queue but first reaches an exchange, which decides where to forward it.🔥

The blue color line is called as Binding. A binding is the link between the exchange and the queue, guiding messages to the right place.

RabbitMQ supports different types of exchanges

Direct Exchange (direct)

  • Routes messages to queues based on an exact match between the routing key and the queue’s binding key.
  • Example: Sending messages to a specific queue based on a severity level (info, error, warning).


Fanout Exchange (fanout)

  • Routes messages to all bound queues, ignoring routing keys.
  • Example: Broadcasting notifications to multiple services at once.

Topic Exchange (topic)

  • Routes messages based on pattern matching using * (matches one word) and # (matches multiple words).
  • Example: Routing logs where log.info goes to one queue, log.error goes to another, and log.* captures all.

Headers Exchange (headers)

  • Routes messages based on message headers instead of routing keys.
  • Example: Delivering messages based on metadata like device: mobile or region: US.

Analogy: An exchange is like a traffic controller that decides which road (queue) a vehicle (message) should take based on predefined rules.

Binding

A binding is a link between an exchange and a queue that defines how messages should be routed.

  • When a queue is bound to an exchange with a binding key, messages with a matching routing key are delivered to that queue.
  • A queue can have multiple bindings to different exchanges, allowing it to receive messages from multiple sources.

Example:

  • A queue named error_logs can be bound to a direct exchange with a binding key error.
  • Another queue, all_logs, can be bound to the same exchange with a binding key # (wildcard in a topic exchange) to receive all logs.

Analogy: A binding is like a GPS route guiding messages (vehicles) from the exchange (traffic controller) to the right queue (destination).

Producing, Consuming and Acknowledging

RabbitMQ follows the producer-exchange-queue-consumer model,

  • Producing messages (Publishing): A producer creates a message and sends it to RabbitMQ, which routes it to the correct queue.
  • Consuming messages (Subscribing): A consumer listens for messages from the queue and processes them.
  • Acknowledgment: The consumer sends an acknowledgment (ack) after successfully processing a message.
  • Durability: Ensures messages and queues survive RabbitMQ restarts.

Why do we need an Acknowledgement ?

  1. Ensures message reliability – Prevents messages from being lost if a consumer crashes.
  2. Prevents message loss – Messages are redelivered if no ACK is received.
  3. Avoids unintentional message deletion – Messages stay in the queue until properly processed.
  4. Supports at-least-once delivery – Ensures every message is processed at least once.
  5. Enables load balancing – Distributes messages fairly among multiple consumers.
  6. Allows manual control – Consumers can acknowledge only after successful processing.
  7. Handles redelivery – Messages can be requeued and sent to another consumer if needed.

Problem #1 – Task Queue for Background Job Processing

Context

A company runs an image processing application where users upload images that need to be resized, watermarked, and optimized before they can be served. Processing these images synchronously would slow down the user experience, so the company decides to implement an asynchronous task queue using RabbitMQ.

Problem

  • Users upload large images that require multiple processing steps.
  • Processing each image synchronously blocks the application, leading to slow response times.
  • High traffic results in queue buildup, making it challenging to scale the system efficiently.

Proposed Solution

1. Producer Service

  • Publishes image processing tasks to a RabbitMQ exchange (task_exchange).
  • Sends the image filename as the message body to the queue (image_queue).

2. Worker Consumers

  • Listen for new image processing tasks from the queue.
  • Process each image (resize, watermark, optimize, etc.).
  • Acknowledge completion to ensure no duplicate processing.

3. Scalability

  • Multiple workers can run in parallel to process images faster.

producer.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_declare(queue='image_queue')

# Bind queue to exchange
channel.queue_bind(exchange='task_exchange', queue='image_queue', routing_key='image_task')

# List of images to process
images = ["image1.jpg", "image2.jpg", "image3.jpg"]

for image in images:
    channel.basic_publish(exchange='task_exchange', routing_key='image_task', body=image)
    print(f" [x] Sent {image}")

connection.close()

consumer.py

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_declare(queue='image_queue')

# Bind queue to exchange
channel.queue_bind(exchange='task_exchange', queue='image_queue', routing_key='image_task')

def process_image(ch, method, properties, body):
    print(f" [x] Processing {body.decode()}")
    time.sleep(2)  # Simulate processing time
    print(f" [x] Finished {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Start consuming
channel.basic_consume(queue='image_queue', on_message_callback=process_image)
print(" [*] Waiting for image tasks. To exit press CTRL+C")
channel.start_consuming()

Problem #2 – Broadcasting NEWS to all subscribers

Problem

A news application wants to send breaking news alerts to all subscribers, regardless of their location or interest.

Use a fanout exchange (news_alerts_exchange) to broadcast messages to all connected queues, ensuring all users receive the alert.

🔹 Example

  • mobile_app_queue (for users receiving push notifications)
  • email_alert_queue (for users receiving email alerts)
  • web_notification_queue (for users receiving notifications on the website)

Solution Overview

  • We create a fanout exchange called news_alerts_exchange.
  • Multiple queues (mobile_app_queue, email_alert_queue, and web_notification_queue) are bound to this exchange.
  • A producer publishes messages to the exchange.
  • Each consumer listens to its respective queue and receives the alert.

Step 1: Producer (Publisher)

This script publishes a breaking news alert to the fanout exchange.

import pika

# Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# Declare a fanout exchange
channel.exchange_declare(exchange="news_alerts_exchange", exchange_type="fanout")

# Publish a message
message = "Breaking News: Major event happening now!"
channel.basic_publish(exchange="news_alerts_exchange", routing_key="", body=message)

print(f" [x] Sent: {message}")

# Close connection
connection.close()

Step 2: Consumers (Subscribers)

Each consumer listens to its respective queue and processes the alert.

Consumer 1: Mobile App Notifications

import pika

# Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange="news_alerts_exchange", exchange_type="fanout")

# Declare a queue (auto-delete if no consumers)
queue_name = "mobile_app_queue"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange="news_alerts_exchange", queue=queue_name)

# Callback function
def callback(ch, method, properties, body):
    print(f" [Mobile App] Received: {body.decode()}")

# Consume messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for news alerts...")
channel.start_consuming()

Consumer 2: Email Alerts

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="news_alerts_exchange", exchange_type="fanout")

queue_name = "email_alert_queue"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange="news_alerts_exchange", queue=queue_name)

def callback(ch, method, properties, body):
    print(f" [Email Alert] Received: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for news alerts...")
channel.start_consuming()

Consumer 3: Web Notifications

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="news_alerts_exchange", exchange_type="fanout")

queue_name = "web_notification_queue"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange="news_alerts_exchange", queue=queue_name)

def callback(ch, method, properties, body):
    print(f" [Web Notification] Received: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for news alerts...")
channel.start_consuming()

How It Works

  1. The producer sends a news alert to the fanout exchange (news_alerts_exchange).
  2. All queues (mobile_app_queue, email_alert_queue, web_notification_queue) bound to the exchange receive the message.
  3. Each consumer listens to its queue and processes the alert.

This setup ensures all users receive the alert simultaneously across different platforms. 🚀

Intermediate Resources

Prefetch Count

Prefetch is a mechanism that defines how many messages can be delivered to a consumer at a time before the consumer sends an acknowledgment back to the broker. This ensures that the consumer does not get overwhelmed with too many unprocessed messages, which could lead to high memory usage and potential performance issues.

To Know More: https://parottasalna.com/2024/12/29/learning-notes-16-prefetch-count-rabbitmq/

Request Reply Pattern

The Request-Reply Pattern is a fundamental communication style in distributed systems, where a requester sends a message to a responder and waits for a reply. It’s widely used in systems that require synchronous communication, enabling the requester to receive a response for further processing.

To Know More: https://parottasalna.com/2024/12/28/learning-notes-15-request-reply-pattern-rabbitmq/

Dead Letter Exchange

A dead letter is a message that cannot be delivered to its intended queue or is rejected by a consumer. Common scenarios where messages are dead lettered include,

  1. Message Rejection: A consumer explicitly rejects a message without requeuing it.
  2. Message TTL (Time-To-Live) Expiry: The message remains in the queue longer than its TTL.
  3. Queue Length Limit: The queue has reached its maximum capacity, and new messages are dropped.
  4. Routing Failures: Messages that cannot be routed to any queue from an exchange.

To Know More: https://parottasalna.com/2024/12/28/learning-notes-14-dead-letter-exchange-rabbitmq/

Alternate Exchanges

An alternate exchange in RabbitMQ is a fallback exchange configured for another exchange. If a message cannot be routed to any queue bound to the primary exchange, RabbitMQ will publish the message to the alternate exchange instead. This mechanism ensures that undeliverable messages are not lost but can be processed in a different way, such as logging, alerting, or storing them for later inspection.

To Know More: https://parottasalna.com/2024/12/27/learning-notes-12-alternate-exchanges-rabbitmq/

Lazy Queues

  • Lazy Queues are designed to store messages primarily on disk rather than in memory.
  • They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.

To Know More: https://parottasalna.com/2024/12/26/learning-notes-10-lazy-queues-rabbitmq/

Quorom Queues

  • Quorum Queues are distributed queues built on the Raft consensus algorithm.
  • They are designed for high availability, durability, and data safety by replicating messages across multiple nodes in a RabbitMQ cluster.
  • Its a replacement of Mirrored Queues.

To Know More: https://parottasalna.com/2024/12/25/learning-notes-9-quorum-queues-rabbitmq/

Change Data Capture

CDC stands for Change Data Capture. It’s a technique that listens to a database and captures every change that happens in it. These changes can then be sent to other systems to,

  • Keep data in sync across multiple databases.
  • Power real-time analytics dashboards.
  • Trigger notifications for certain database events.
  • Process data streams in real time.

To Know More: https://parottasalna.com/2025/01/19/learning-notes-63-change-data-capture-what-does-it-do/

Handling Backpressure in Distributed Systems

Backpressure occurs when a downstream system (consumer) cannot keep up with the rate of data being sent by an upstream system (producer). In distributed systems, this can arise in scenarios such as

  • A message queue filling up faster than it is drained.
  • A database struggling to handle the volume of write requests.
  • A streaming system overwhelmed by incoming data.

To Know More: https://parottasalna.com/2025/01/07/learning-notes-45-backpressure-handling-in-distributed-systems/

Choreography Pattern

In the Choreography Pattern, services communicate directly with each other via asynchronous events, without a central controller. Each service is responsible for a specific part of the workflow and responds to events produced by other services. This pattern allows for a more autonomous and loosely coupled system.

To Know More: https://parottasalna.com/2025/01/05/learning-notes-38-choreography-pattern-cloud-pattern/

Outbox Pattern

The Outbox Pattern is a proven architectural solution to this problem, helping developers manage data consistency, especially when dealing with events, messaging systems, or external APIs.

To Know More: https://parottasalna.com/2025/01/03/learning-notes-31-outbox-pattern-cloud-pattern/

Queue Based Loading

The Queue-Based Loading Pattern leverages message queues to decouple and coordinate tasks between producers (such as applications or services generating data) and consumers (services or workers processing that data). By using queues as intermediaries, this pattern allows systems to manage workloads efficiently, ensuring seamless and scalable operation.

To Know More: https://parottasalna.com/2025/01/03/learning-notes-30-queue-based-loading-cloud-patterns/

Two Phase Commit Protocol

The Two-Phase Commit (2PC) protocol is a distributed algorithm used to ensure atomicity in transactions spanning multiple nodes or databases. Atomicity ensures that either all parts of a transaction are committed or none are, maintaining consistency in distributed systems.

To Know More: https://parottasalna.com/2025/01/03/learning-notes-29-two-phase-commit-protocol-acid-in-distributed-systems/

Competing Consumer

The competing consumer pattern involves multiple consumers that independently compete to process messages or tasks from a shared queue. This pattern is particularly effective in scenarios where the rate of incoming tasks is variable or high, as it allows multiple consumers to process tasks concurrently.

To Know More: https://parottasalna.com/2025/01/01/learning-notes-24-competing-consumer-messaging-queue-patterns/

Retry Pattern

The Retry Pattern is a design strategy used to manage transient failures by retrying failed operations. Instead of immediately failing an operation after an error, the pattern retries it with an optional delay or backoff strategy. This is particularly useful in distributed systems where failures are often temporary.

To Know More: https://parottasalna.com/2024/12/31/learning-notes-23-retry-pattern-cloud-patterns/

Can We Use Database as a Queue

Developers try to use their RDBMS as a way to do background processing or service communication. While this can often appear to ‘get the job done’, there are a number of limitations and concerns with this approach.

There are two divisions to any asynchronous processing: the service(s) that create processing tasks and the service(s) that consume and process these tasks accordingly.

To Know More: https://parottasalna.com/2024/06/15/can-we-use-database-as-queue-in-asynchronous-process/

Let’s Connect

Telegram: https://t.me/parottasalna/1

LinkedIn: https://www.linkedin.com/in/syedjaferk/

Whatsapp Channel: https://whatsapp.com/channel/0029Vavu8mF2v1IpaPd9np0s

Youtube: https://www.youtube.com/@syedjaferk

Github: https://github.com/syedjaferk/

RSVP for RabbitMQ: Build Scalable Messaging Systems in Tamil

24 January 2025 at 11:21

Hi All,

Invitation to RabbitMQ Session

🔹 Topic: RabbitMQ: Asynchronous Communication
🔹 Date: Feb 2 Sunday
🔹 Time: 10:30 AM to 1 PM
🔹 Venue: Online. Will be shared in mail after RSVP.

Join us for an in-depth session on RabbitMQ in தமிழ், where we’ll explore,

  • Message queuing fundamentals
  • Connections, channels, and virtual hosts
  • Exchanges, queues, and bindings
  • Publisher confirmations and consumer acknowledgments
  • Use cases and live demos

Whether you’re a developer, DevOps enthusiast, or curious learner, this session will empower you with the knowledge to build scalable and efficient messaging systems.

📌 Don’t miss this opportunity to level up your messaging skills!

RSVP closed !

Our Previous Monthly meetshttps://www.youtube.com/watch?v=cPtyuSzeaa8&list=PLiutOxBS1MizPGGcdfXF61WP5pNUYvxUl&pp=gAQB

Our Previous Sessions,

  1. Python – https://www.youtube.com/watch?v=lQquVptFreE&list=PLiutOxBS1Mizte0ehfMrRKHSIQcCImwHL&pp=gAQB
  2. Docker – https://www.youtube.com/watch?v=nXgUBanjZP8&list=PLiutOxBS1Mizi9IRQM-N3BFWXJkb-hQ4U&pp=gAQB
  3. Postgres – https://www.youtube.com/watch?v=04pE5bK2-VA&list=PLiutOxBS1Miy3PPwxuvlGRpmNo724mAlt&pp=gAQB

Our Social Handles,

Learning Notes #63 – Change Data Capture. What does it do ?

19 January 2025 at 16:22

Few days back i came across a concept of CDC. Like a notifier of database events. Instead of polling, this enables event to be available in a queue, which can be consumed by many consumers. In this blog, i try to explain the concepts, types in a theoretical manner.

You run a library. Every day, books are borrowed, returned, or new books are added. What if you wanted to keep a live record of all these activities so you always know the exact state of your library?

This is essentially what Change Data Capture (CDC) does for your databases. It’s a way to track changes (like inserts, updates, or deletions) in your database tables and send them to another system, like a live dashboard or a backup system. (Might be a bad example. Don’t lose hope. Continue …)

CDC is widely used in modern technology to power,

  • Real-Time Analytics: Live dashboards that show sales, user activity, or system performance.
  • Data Synchronization: Keeping multiple databases or microservices in sync.
  • Event-Driven Architectures: Triggering notifications, workflows, or downstream processes based on database changes.
  • Data Pipelines: Streaming changes to data lakes or warehouses for further processing.
  • Backup and Recovery: Incremental backups by capturing changes instead of full data dumps.

It’s a critical part of tools like Debezium, Kafka, and cloud services such as AWS Database Migration Service (DMS) and Azure Data Factory. CDC enables companies to move towards real-time data-driven decision-making.

What is CDC?

CDC stands for Change Data Capture. It’s a technique that listens to a database and captures every change that happens in it. These changes can then be sent to other systems to,

  • Keep data in sync across multiple databases.
  • Power real-time analytics dashboards.
  • Trigger notifications for certain database events.
  • Process data streams in real time.

In short, CDC ensures your data is always up-to-date wherever it’s needed.

Why is CDC Useful?

Imagine you have an online store. Whenever someone,

  • Places an order,
  • Updates their shipping address, or
  • Cancels an order,

you need these changes to be reflected immediately across,

  • The shipping system.
  • The inventory system.
  • The email notification service.

Instead of having all these systems query the database (this is one of main reasons) constantly (which is slow and inefficient), CDC automatically streams these changes to the relevant systems.

This means,

  1. Real-Time Updates: Systems receive changes instantly.
  2. Improved Performance: Your database isn’t overloaded with repeated queries.
  3. Consistency: All systems stay in sync without manual intervention.

How Does CDC Work?

Note: I haven’t yet tried all these. But conceptually having a feeling.

CDC relies on tracking changes in your database. There are a few ways to do this,

1. Query-Based CDC

This method repeatedly checks the database for changes. For example:

  • Every 5 minutes, it queries the database: “What changed since my last check?”
  • Any new or modified data is identified and processed.

Drawbacks: This can miss changes if the timing isn’t right, and it’s not truly real-time (Long Polling).

2. Log-Based CDC

Most modern databases (like PostgreSQL or MySQL) keep logs of every operation. Log-based CDC listens to these logs and captures changes as they happen.

Advantages

  • It’s real-time.
  • It’s lightweight since it doesn’t query the database directly.

3. Trigger-Based CDC

In this method, the database uses triggers to log changes into a separate table. Whenever a change occurs, a trigger writes a record of it.

Advantages: Simple to set up.

Drawbacks: Can slow down the database if not carefully managed.

Tools That Make CDC Easy

Several tools simplify CDC implementation. Some popular ones are,

  1. Debezium: Open-source and widely used for log-based CDC with databases like PostgreSQL, MySQL, and MongoDB.
  2. Striim: A commercial tool for real-time data integration.
  3. AWS Database Migration Service (DMS): A cloud-based CDC service.
  4. StreamSets: Another tool for real-time data movement.

These tools integrate with databases, capture changes, and deliver them to systems like RabbitMQ, Kafka, or cloud storage.

To help visualize CDC, think of,

  • Social Media Feeds: When someone likes or comments on a post, you see the update instantly. This is CDC in action.
  • Bank Notifications: Whenever you make a transaction, your bank app updates instantly. Another example of CDC.

In upcoming blogs, will include Debezium implementation with CDC.

Learning Notes #38 – Choreography Pattern | Cloud Pattern

5 January 2025 at 12:21

Today i learnt about Choreography pattern, where each and every service is communicating using a messaging queue. In this blog, i jot down notes on choreography pattern for my future self.

What is the Choreography Pattern?

In the Choreography Pattern, services communicate directly with each other via asynchronous events, without a central controller. Each service is responsible for a specific part of the workflow and responds to events produced by other services. This pattern allows for a more autonomous and loosely coupled system.

Key Features

  • High scalability and independence of services.
  • Decentralized control.
  • Services respond to events they subscribe to.

When to Use the Choreography Pattern

  • Event-Driven Systems: When workflows can be modeled as events triggering responses.
  • High Scalability: When services need to operate independently and scale autonomously.
  • Loose Coupling: When minimizing dependencies between services is critical.

Benefits of the Choreography Pattern

  1. Decentralized Control: No single point of failure or bottleneck.
  2. Increased Flexibility: Services can be added or modified without affecting others.
  3. Better Scalability: Services operate independently and scale based on their workloads.
  4. Resilience: The system can handle partial failures more gracefully, as services continue independently.

Example: E-Commerce Order Fulfillment

Problem

A fictional e-commerce platform needs to manage the following workflow:

  1. Accepting an order.
  2. Validating payment.
  3. Reserving inventory.
  4. Sending notifications to the customer.

Each step is handled by an independent service.

Solution

Using the Choreography Pattern, each service listens for specific events and publishes new events as needed. The workflow emerges naturally from the interaction of these services.

Implementation

Step 1: Define the Workflow as Events

  • OrderPlaced: Triggered when a customer places an order.
  • PaymentProcessed: Triggered after successful payment.
  • InventoryReserved: Triggered after reserving inventory.
  • NotificationSent: Triggered when the customer is notified.

Step 2: Implement Services

Each service subscribes to events and performs its task.

shared_utility.py

import pika
import json

def publish_event(exchange, event_type, data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    message = json.dumps({"event_type": event_type, "data": data})
    channel.basic_publish(exchange=exchange, routing_key='', body=message)
    connection.close()

def subscribe_to_event(exchange, callback):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    queue = channel.queue_declare('', exclusive=True).method.queue
    channel.queue_bind(exchange=exchange, queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
    print(f"Subscribed to events on exchange '{exchange}'")
    channel.start_consuming()

Order Service


from shared_utils import publish_event

def place_order(order_id, customer):
    print(f"Placing order {order_id} for {customer}")
    publish_event("order_exchange", "OrderPlaced", {"order_id": order_id, "customer": customer})

if __name__ == "__main__":
    # Simulate placing an order
    place_order(order_id=101, customer="John Doe")

Payment Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_order_placed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "OrderPlaced":
        order_id = event["data"]["order_id"]
        print(f"Processing payment for order {order_id}")
        time.sleep(1)  # Simulate payment processing
        publish_event("payment_exchange", "PaymentProcessed", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("order_exchange", handle_order_placed)

Inventory Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_payment_processed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "PaymentProcessed":
        order_id = event["data"]["order_id"]
        print(f"Reserving inventory for order {order_id}")
        time.sleep(1)  # Simulate inventory reservation
        publish_event("inventory_exchange", "InventoryReserved", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("payment_exchange", handle_payment_processed)

Notification Service


from shared_utils import subscribe_to_event
import time

def handle_inventory_reserved(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "InventoryReserved":
        order_id = event["data"]["order_id"]
        print(f"Notifying customer for order {order_id}")
        time.sleep(1)  # Simulate notification
        print(f"Customer notified for order {order_id}")

if __name__ == "__main__":
    subscribe_to_event("inventory_exchange", handle_inventory_reserved)

Step 3: Run the Workflow

  1. Start RabbitMQ using Docker as described above.
  2. Run the services in the following order:
    • Notification Service: python notification_service.py
    • Inventory Service: python inventory_service.py
    • Payment Service: python payment_service.py
    • Order Service: python order_service.py
  3. Place an order by running the Order Service. The workflow will propagate through the services as events are handled.

Key Considerations

  1. Event Bus: Use an event broker like RabbitMQ, Kafka, or AWS SNS to manage communication between services.
  2. Event Versioning: Include versioning to handle changes in event formats over time.
  3. Idempotency: Ensure services handle repeated events gracefully to avoid duplication.
  4. Monitoring and Tracing: Use tools like OpenTelemetry to trace and debug distributed workflows.
  5. Error Handling:
    • Dead Letter Queues (DLQs) to capture failed events.
    • Retries with backoff for transient errors.

Advantages of the Choreography Pattern

  1. Loose Coupling: Services interact via events without direct knowledge of each other.
  2. Resilience: Failures in one service don’t block the entire workflow.
  3. High Autonomy: Services operate independently and can be deployed or scaled separately.
  4. Dynamic Workflows: Adding new services to the workflow requires subscribing them to relevant events.

Challenges of the Choreography Pattern

  1. Complex Debugging: Tracing errors across distributed services can be difficult.
  2. Event Storms: Poorly designed workflows may generate excessive events, overwhelming the system.
  3. Coordination Overhead: Decentralized logic can lead to inconsistent behavior if not carefully managed.

Orchestrator vs. Choreography: When to Choose?

  • Use Orchestrator Pattern when workflows are complex, require central control, or involve many dependencies.
  • Use Choreography Pattern when you need high scalability, loose coupling, or event-driven workflows.

Learning Notes #30 – Queue Based Loading | Cloud Patterns

3 January 2025 at 14:47

Today, i learnt about Queue Based Loading pattern, which helps to manage intermittent peak load to a service via queues. Basically decoupling Tasks from Services. In this blog i jot down notes on this pattern for my future self.

In today’s digital landscape, applications are expected to handle large-scale operations efficiently. Whether it’s processing massive data streams, ensuring real-time responsiveness, or integrating with multiple third-party services, scalability and reliability are paramount. One pattern that elegantly addresses these challenges is the Queue-Based Loading Pattern.

What Is the Queue-Based Loading Pattern?

The Queue-Based Loading Pattern leverages message queues to decouple and coordinate tasks between producers (such as applications or services generating data) and consumers (services or workers processing that data). By using queues as intermediaries, this pattern allows systems to manage workloads efficiently, ensuring seamless and scalable operation.

Key Components of the Pattern

  1. Producers: Producers are responsible for generating tasks or data. They send these tasks to a message queue instead of directly interacting with consumers. Examples include:
    • Web applications logging user activity.
    • IoT devices sending sensor data.
  2. Message Queue: The queue acts as a buffer, storing tasks until consumers are ready to process them. Popular tools for implementing queues include RabbitMQ, Apache Kafka, AWS SQS, and Redis.
  3. Consumers: Consumers retrieve messages from the queue and process them asynchronously. They are typically designed to handle tasks independently and at their own pace.
  4. Processing Logic: This is the core functionality that processes the tasks retrieved by consumers. For example, resizing images, sending notifications, or updating a database.

How It Works

  1. Task Generation: Producers push tasks to the queue as they are generated.
  2. Message Storage: The queue stores tasks in a structured manner (FIFO, priority-based, etc.) and ensures reliable delivery.
  3. Task Consumption: Consumers pull tasks from the queue, process them, and optionally acknowledge completion.
  4. Scalability: New consumers can be added dynamically to handle increased workloads, ensuring the system remains responsive.

Benefits of the Queue-Based Loading Pattern

  1. Decoupling: Producers and consumers operate independently, reducing tight coupling and improving system maintainability.
  2. Scalability: By adding more consumers, systems can easily scale to handle higher workloads.
  3. Fault Tolerance: If a consumer fails, messages remain in the queue, ensuring no data is lost.
  4. Load Balancing: Tasks are distributed evenly among consumers, preventing any single consumer from becoming a bottleneck.
  5. Asynchronous Processing: Consumers can process tasks in the background, freeing producers to continue generating data without delay.

Issues and Considerations

  1. Rate Limiting: Implement logic to control the rate at which services handle messages to prevent overwhelming the target resource. Test the system under load and adjust the number of queues or service instances to manage demand effectively.
  2. One-Way Communication: Message queues are inherently one-way. If tasks require responses, you may need to implement a separate mechanism for replies.
  3. Autoscaling Challenges: Be cautious when autoscaling consumers, as it can lead to increased contention for shared resources, potentially reducing the effectiveness of load leveling.
  4. Traffic Variability: Consider the variability of incoming traffic to avoid situations where tasks pile up faster than they are processed, creating a perpetual backlog.
  5. Queue Persistence: Ensure your queue is durable and capable of persisting messages. Crashes or system limits could lead to dropped messages, risking data loss.

Use Cases

  1. Email and Notification Systems: Sending bulk emails or push notifications without overloading the main application.
  2. Data Pipelines: Ingesting, transforming, and analyzing large datasets in real-time or batch processing.
  3. Video Processing: Queues facilitate tasks like video encoding and thumbnail generation.
  4. Microservices Communication: Ensures reliable and scalable communication between microservices.

Best Practices

  1. Message Durability: Configure your queue to persist messages to disk, ensuring they are not lost during system failures.
  2. Monitoring and Metrics: Use monitoring tools to track queue lengths, processing rates, and consumer health.
  3. Idempotency: Design consumers to handle duplicate messages gracefully.
  4. Error Handling and Dead Letter Queues (DLQs): Route failed messages to DLQs for later analysis and reprocessing.

Learning Notes #29 – Two Phase Commit Protocol | ACID in Distributed Systems

3 January 2025 at 13:45

Today, i learnt about compensating transaction pattern which leads to two phase commit protocol which helps in maintaining the Atomicity of a distributed transactions. Distributed transactions are hard.

In this blog, i jot down notes on Two Phase Commit protocol for better understanding.

The Two-Phase Commit (2PC) protocol is a distributed algorithm used to ensure atomicity in transactions spanning multiple nodes or databases. Atomicity ensures that either all parts of a transaction are committed or none are, maintaining consistency in distributed systems.

Why Two-Phase Commit?

In distributed systems, a transaction might involve several independent nodes, each maintaining its own database. Without a mechanism like 2PC, failures in one node can leave the system in an inconsistent state.

For example, consider an e-commerce platform where a customer places an order.

The transaction involves updating the inventory in one database, recording the payment in another, and generating a shipment request in a third system. If the payment database successfully commits but the inventory database fails, the system becomes inconsistent, potentially causing issues like double selling or incomplete orders. 2PC mitigates this by providing a coordinated protocol to commit or abort transactions across all nodes.

The Phases of 2PC

The protocol operates in two main phases

1. Prepare Phase (Voting Phase)

The coordinator node initiates the transaction and prepares to commit it across all participating nodes.

  1. Request to Prepare: The coordinator sends a PREPARE request to all participant nodes.
  2. Vote: Each participant checks if it can commit the transaction (e.g., no constraints violated, resources available). It logs its decision (YES or NO) locally and sends its vote to the coordinator. If any participant votes NO, the transaction cannot be committed.

2. Commit Phase (Decision Phase)

Based on the votes received in the prepare phase, the coordinator decides the final outcome.

Commit Decision:

If all participants vote YES, the coordinator logs a COMMIT decision, sends COMMIT messages to all participants, and participants apply the changes and confirm with an acknowledgment.

Abort Decision:

If any participant votes NO, the coordinator logs an ABORT decision, sends ABORT messages to all participants, and participants roll back any changes made during the transaction.

Implementation:

For a simple implementation of 2PC, we can try out the below flow using RabbitMQ as a medium for Co-Ordinator.

Basically, we need not to write this from scratch, we have tools,

1. Relational Databases

Most relational databases have built-in support for distributed transactions and 2PC.

  • PostgreSQL: Implements distributed transactions using foreign data wrappers (FDWs) with PREPARE TRANSACTION and COMMIT PREPARED.
  • MySQL: Supports XA transactions, which follow the 2PC protocol.
  • Oracle Database: Offers robust distributed transaction support using XA.
  • Microsoft SQL Server: Provides distributed transactions through MS-DTC.

2. Distributed Transaction Managers

These tools manage distributed transactions across multiple systems.

  • Atomikos: A popular Java-based transaction manager supporting JTA/XA for distributed systems.
  • Bitronix: Another lightweight transaction manager for Java applications supporting JTA/XA.
  • JBoss Transactions (Narayana): A robust Java transaction manager that supports 2PC, often used in conjunction with JBoss servers.

3. Message Brokers

Message brokers provide transaction capabilities with 2PC.

  • RabbitMQ: Supports the 2PC protocol using transactional channels.
  • Apache Kafka: Supports transactions, ensuring “exactly-once” semantics across producers and consumers.
  • ActiveMQ: Provides distributed transaction support through JTA integration

4. Workflow Engines

Workflow engines can orchestrate 2PC across distributed systems.

  • Apache Camel: Can coordinate 2PC transactions using its transaction policy.
  • Camunda: Provides BPMN-based orchestration that can include transactional boundaries.
  • Zeebe: Supports distributed transaction workflows in modern architectures.

Key Properties of 2PC

  1. Atomicity: Ensures all-or-nothing transaction behavior.
  2. Consistency: Guarantees system consistency across all nodes.
  3. Durability: Uses logs to ensure decisions survive node failures.

Challenges of 2PC

  1. Blocking Nature: If the coordinator fails during the commit phase, participants must wait indefinitely unless a timeout or external mechanism is implemented.
  2. Performance Overhead: Multiple message exchanges and logging operations introduce latency.
  3. Single Point of Failure: The coordinator’s failure can stall the entire transaction.

Learning Notes #12 – Alternate Exchanges | RabbitMQ

27 December 2024 at 10:36

Today i learnt about Alternate Exchange, which provide a way to handle undeliverable messages. In this blog, i share the notes on what alternate exchanges are, why they are useful, and how to implement them in your RabbitMQ setup.

What Are Alternate Exchanges?

In the normal flow, producer will send a message to the exchange and if the queue is binded correctly then it will be placed in the correct queue.

An alternate exchange in RabbitMQ is a fallback exchange configured for another exchange. If a message cannot be routed to any queue bound to the primary exchange, RabbitMQ will publish the message to the alternate exchange instead. This mechanism ensures that undeliverable messages are not lost but can be processed in a different way, such as logging, alerting, or storing them for later inspection.

When this scenario happens

A message goes to an alternate exchange in RabbitMQ in the following scenarios:

1. No Binding for the Routing Key

  • The primary exchange does not have any queue bound to it with the routing key specified in the message.
  • Example: A message with routing key invalid_key is sent to a direct exchange that has no queue bound to invalid_key.

2. Unbound Queues:

  • Even if a queue exists, it is not bound to the primary exchange or the specific routing key used in the message.
  • Example: A queue exists for the primary exchange but is not explicitly bound to any routing key.

3. Exchange Type Mismatch

  • The exchange type (e.g., direct, fanout, topic) does not match the routing pattern of the message.
  • Example: A message is sent with a specific routing key to a fanout exchange that delivers to all bound queues regardless of the key.

4. Misconfigured Bindings

  • Bindings exist but do not align with the routing requirements of the message.
  • Example: A topic exchange has a binding for user.* but receives a message with the routing key order.processed.

5. Queue Deletion After Binding

  • A queue was bound to the exchange but is deleted or unavailable at runtime.
  • Example: A message with a valid routing key arrives, but the corresponding queue is no longer active.

6. TTL (Time-to-Live) Expired Queues

  • Messages routed to a queue with a time-to-live setting expire before being consumed and are re-routed to an alternate exchange if dead-lettering is enabled.
  • Example: A primary exchange routes messages to a TTL-bound queue, and expired messages are forwarded to the alternate exchange.

7. Exchange Misconfiguration

  • The primary exchange is operational, but its configurations prevent messages from being delivered to any queue.
  • Example: A missing or incorrect alternate-exchange argument setup leads to misrouting.

Use Cases for Alternate Exchanges

  • Error Handling: Route undeliverable messages to a dedicated queue for later inspection or reprocessing.
  • Logging: Keep track of messages that fail routing for auditing purposes.
  • Dead Letter Queues: Use alternate exchanges to implement dead-letter queues to analyze why messages could not be routed.
  • Load Balancing: Forward undeliverable messages to another exchange for alternative processing

How to Implement Alternate Exchanges in Python

Let’s walk through the steps to configure and use alternate exchanges in RabbitMQ using Python.

Scenario 1: Handling Messages with Valid and Invalid Routing Keys

producer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare the alternate exchange
channel.exchange_declare(exchange='alternate_exchange', exchange_type='fanout')

# Declare a queue and bind it to the alternate exchange
channel.queue_declare(queue='unroutable_queue')
channel.queue_bind(exchange='alternate_exchange', queue='unroutable_queue')

# Declare the primary exchange with an alternate exchange argument
channel.exchange_declare(
    exchange='primary_exchange',
    exchange_type='direct',
    arguments={'alternate-exchange': 'alternate_exchange'}
)

# Declare and bind a queue to the primary exchange
channel.queue_declare(queue='valid_queue')
channel.queue_bind(exchange='primary_exchange', queue='valid_queue', routing_key='key1')

# Publish a message with a valid routing key
channel.basic_publish(
    exchange='primary_exchange',
    routing_key='key1',
    body='Message with a valid routing key'
)

print("Message with valid routing key sent to 'valid_queue'.")

# Publish a message with an invalid routing key
channel.basic_publish(
    exchange='primary_exchange',
    routing_key='invalid_key',
    body='Message with an invalid routing key'
)

print("Message with invalid routing key sent to 'alternate_exchange'.")

# Close the connection
connection.close()

consumer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Consume messages from the alternate queue
method_frame, header_frame, body = channel.basic_get(queue='unroutable_queue', auto_ack=True)
if method_frame:
    print(f"Received message from alternate queue: {body.decode()}")
else:
    print("No messages in the alternate queue")

# Close the connection
connection.close()

Scenario 2: Logging Unroutable Messages

producer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare the alternate exchange
channel.exchange_declare(exchange='logging_exchange', exchange_type='fanout')

# Declare a logging queue and bind it to the logging exchange
channel.queue_declare(queue='logging_queue')
channel.queue_bind(exchange='logging_exchange', queue='logging_queue')

# Declare the primary exchange with a logging alternate exchange argument
channel.exchange_declare(
    exchange='primary_logging_exchange',
    exchange_type='direct',
    arguments={'alternate-exchange': 'logging_exchange'}
)

# Publish a message with an invalid routing key
channel.basic_publish(
    exchange='primary_logging_exchange',
    routing_key='invalid_logging_key',
    body='Message for logging'
)

print("Message with invalid routing key sent to 'logging_exchange'.")

# Close the connection
connection.close()

consumer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Consume messages from the logging queue
method_frame, header_frame, body = channel.basic_get(queue='logging_queue', auto_ack=True)
if method_frame:
    print(f"Logged message: {body.decode()}")
else:
    print("No messages in the logging queue")

# Close the connection
connection.close()

Learning Notes #10 – Lazy Queues | RabbitMQ

26 December 2024 at 06:54

What Are Lazy Queues?

  • Lazy Queues are designed to store messages primarily on disk rather than in memory.
  • They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.

Key Characteristics

  1. Disk-Based Storage – Messages are stored on disk immediately upon arrival, rather than being held in memory.
  2. Low Memory Usage – Only minimal metadata for messages is kept in memory.
  3. Scalability – Can handle millions of messages without consuming significant memory.
  4. Message Retrieval – Retrieving messages is slower because messages are fetched from disk.
  5. Durability – Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.

Trade-offs

  • Latency: Fetching messages from disk is slower than retrieving them from memory.
  • Throughput: Not suitable for high-throughput, low-latency applications.

Choose Lazy Queues if

  • You need to handle very large backlogs of messages.
  • Memory is a constraint in your system.Latency and throughput are less critical.

Implementation

Pre-requisites

1. Install and run RabbitMQ on your local machine.


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

2. Install the pika library


pip install pika

Producer (producer.py)

This script sends a persistent message to a Lazy Queue.

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare a Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Publish a message
message = "Hello from the Producer via Custom Exchange!"
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent message
)

print(f"Message sent to Lazy Queue via Exchange: {message}")

# Close the connection
connection.close()

Consumer (consumer.py)

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare the Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Callback function to process messages
def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Acknowledge the message

# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)

print("Waiting for messages. To exit, press CTRL+C")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print("Stopped consuming.")

# Close the connection
connection.close()

Explanation

  1. Producer
    • Defines a custom exchange (custom_exchange) of type direct.
    • Declares a Lazy Queue (lazy_queue_example).
    • Binds the queue to the exchange using a routing key (custom_routing_key).
    • Publishes a persistent message via the custom exchange and routing key.
  2. Consumer
    • Declares the same exchange and Lazy Queue to ensure they exist.
    • Consumes messages routed to the queue through the custom exchange and routing key.
  3. Custom Exchange and Binding
    • The direct exchange type routes messages based on an exact match of the routing key.
    • Binding ensures the queue receives messages published to the exchange with the specified key.
  4. Lazy Queue Behavior
    • Messages are stored directly on disk to minimize memory usage.

Learning Notes #9 – Quorum Queues | RabbitMQ

25 December 2024 at 16:42

What Are Quorum Queues?

  • Quorum Queues are distributed queues built on the Raft consensus algorithm.
  • They are designed for high availability, durability, and data safety by replicating messages across multiple nodes in a RabbitMQ cluster.
  • Its a replacement of Mirrored Queues.

Key Characteristics

  1. Replication:
    • Messages are replicated across a quorum (a majority of nodes).
    • A quorum consists of an odd number of replicas (e.g., 3, 5, 7) to ensure a majority can elect a leader during failovers.
  2. Leader-Follower Architecture:
    • Each Quorum Queue has one leader and multiple followers.
    • The leader handles all write and read operations, while followers replicate messages and provide redundancy.
  3. Durability:
    • Messages are written to disk on all quorum nodes, ensuring persistence even if nodes fail.
  4. High Availability:
    • If the leader node fails, RabbitMQ elects a new leader from the remaining quorum, ensuring continued operation.
  5. Consistency:
    • Quorum Queues prioritize consistency over availability.
    • Messages are acknowledged only after replication is successful on a majority of nodes.
  6. Message Ordering:
    • Message ordering is preserved during normal operations but may be disrupted during leader failovers.

Use Cases

  • Mission-Critical Applications – Systems where message loss is unacceptable (e.g., financial transactions, order processing).
  • Distributed Systems – Environments requiring high availability and fault tolerance.
  • Data Safety – Applications prioritizing consistency over throughput (e.g., event logs, audit trails).

Setups

Using rabbitmqctl


rabbitmqctl add_queue quorum_queue --type quorum

Using python


channel.queue_declare(queue="quorum_queue", arguments={"x-queue-type": "quorum"})

References:

  1. https://www.rabbitmq.com/docs/quorum-queues

Learning Notes #7 – AMQP Protocol and RabbitMQ | An Overview

24 December 2024 at 18:22

Today, i learned about AMQP Protocol, Components of RabbitMQ (Connections, Channels, Queues, Exchanges, Bindings and Different Types of Exchanges, Acknowledgement and Publisher Confirmation). I learned these all from CloudAMQP In this blog, you will find a crisp details on these topics.

1. Overview of AMQP Protocol

  • Advanced Message Queuing Protocol (AMQP) is an open standard for messaging middleware. It enables systems to exchange messages in a reliable and flexible manner.
  • Key components:
    • Producers: Applications that send messages.
    • Consumers: Applications that receive messages.
    • Broker: Middleware (e.g., RabbitMQ) that manages message exchanges.
    • Message: A unit of data transferred between producer and consumer.

2. How AMQP Works in RabbitMQ

  • RabbitMQ implements AMQP to facilitate message exchange. It acts as the broker, managing queues, exchanges, and bindings.
  • AMQP Operations:
    1. Producer sends a message to an exchange.
    2. The exchange routes the message to one or more queues based on bindings.
    3. Consumer retrieves the message from the queue.

3. Connections and Channels

Connections

A connection is a persistent, long-lived TCP connection between a client application and the RabbitMQ broker. Connections are relatively resource-intensive because they involve socket communication and the overhead of establishing and maintaining the connection. Each connection is uniquely identified by the broker and can be shared across multiple threads or processes.

When an application establishes a connection to RabbitMQ, it uses it as a gateway to interact with the broker. This includes creating channels, declaring queues and exchanges, publishing messages, and consuming messages. Connections should ideally be reused across the application to reduce overhead and optimize resource usage.

Channels

A channel is a lightweight, logical communication pathway established within a connection. Channels provide a way to perform multiple operations concurrently over a single connection. They are less resource-intensive than connections and are designed to handle operations such as queue declarations, message publishing, and consuming.

Using channels allows applications to:

  1. Scale efficiently: Instead of opening multiple connections, applications can open multiple channels over a single connection.
  2. Isolate operations: Each channel operates independently. For instance, one channel can consume messages while another publishes.

How They Work Together

When a client connects to RabbitMQ, it first establishes a connection. Within that connection, it can open multiple channels. Each channel operates as a virtual connection, allowing concurrent tasks without needing separate TCP connections.


import pika

# Establish a connection to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# Create multiple channels on the same connection
channel1 = connection.channel()
channel2 = connection.channel()

# Declare queues on each channel
channel1.queue_declare(queue='queue1')
channel2.queue_declare(queue='queue2')

# Publish messages on different channels
channel1.basic_publish(exchange='', routing_key='queue1', body='Message for Queue 1')
channel2.basic_publish(exchange='', routing_key='queue2', body='Message for Queue 2')

print("Messages sent to both queues!")

# Close the connection
connection.close()

Best Practices (Not Tried; Got this from the video)

  1. Reusing Connections: Establish one connection per application or service and share it across threads or processes for efficiency.
  2. Using Channels for Parallelism: Open separate channels for different operations like publishing and consuming.
  3. Graceful Cleanup: Always close channels and connections when done to avoid resource leaks.

4. Queues

  • Act as message storage.
  • Can be:
    • Durable: Survives broker restarts.
    • Exclusive: Used by a single connection.
    • Auto-delete: Deleted when the last consumer disconnects.

# Declaring a durable queue
channel.queue_declare(queue='durable_queue', durable=True)

# Sending a persistent message
channel.basic_publish(
    exchange='',  # Default exchange
    routing_key='durable_queue',
    body='Persistent message',
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

5. Exchanges

An exchange in RabbitMQ is a routing mechanism that determines how messages sent by producers are directed to queues. Exchanges act as intermediaries between producers and queues, enabling flexible and efficient message routing based on routing rules and patterns.

Types of Exchanges

RabbitMQ supports four types of exchanges, each with its unique routing mechanism:

1. Direct Exchange

  • Routes messages to queues based on an exact match of the routing key.
  • If the routing key in the message matches the binding key of a queue, the message is routed to that queue.
  • Use case: Task queues where each task has a specific destination.

Example:

  • Queue queue1 is bound to the exchange with the routing key info.
  • A message with the routing key info is routed to queue1.

channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
channel.queue_declare(queue='direct_queue')
channel.queue_bind(exchange='direct_exchange', queue='direct_queue', routing_key='info')
channel.basic_publish(exchange='direct_exchange', routing_key='info', body='Direct message')

2. Fanout Exchange

  • Broadcasts messages to all queues bound to the exchange, ignoring routing keys.
  • Use case: Broadcasting events to multiple consumers, such as notifications or logs.

Example:

  • All queues bound to the exchange receive the same message.

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')

channel.queue_bind(exchange='fanout_exchange', queue='queue1')
channel.queue_bind(exchange='fanout_exchange', queue='queue2')

channel.basic_publish(exchange='fanout_exchange', routing_key='', body='Broadcast message')

3. Topic Exchange

  • Routes messages to queues based on pattern matching of routing keys.
  • Routing keys are dot-separated words, and queues can bind with patterns using wildcards:
    • * matches exactly one word.
    • # matches zero or more words.
  • Use case: Complex routing scenarios, such as logging systems with multiple log levels and sources.

Example:

  • Queue queue1 is bound with the pattern logs.info.*.
  • A message with the routing key logs.info.app1 is routed to queue1.

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
channel.queue_declare(queue='topic_queue')

channel.queue_bind(exchange='topic_exchange', queue='topic_queue', routing_key='logs.info.*')

channel.basic_publish(exchange='topic_exchange', routing_key='logs.info.app1', body='Topic message')

4. Headers Exchange

  • Routes messages based on message header attributes instead of routing keys.
  • Headers can specify conditions like x-match:
    • x-match = all: All specified headers must match.
    • x-match = any: At least one specified header must match.
  • Use case: Advanced filtering scenarios.

Example:

  • Queue queue1 is bound with headers format=json and type=report.

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
channel.queue_declare(queue='headers_queue')

channel.queue_bind(
    exchange='headers_exchange',
    queue='headers_queue',
    arguments={'format': 'json', 'type': 'report', 'x-match': 'all'}
)

channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',
    body='Headers message',
    properties=pika.BasicProperties(headers={'format': 'json', 'type': 'report'})
)

Exchange Lifecycle

  1. Declaration: Exchanges must be explicitly declared before use. If an exchange is not declared and a producer tries to publish a message to it, an error will occur.
  2. Binding: Queues are bound to exchanges with routing keys or header arguments.
  3. Publishing: Producers publish messages to exchanges with optional routing keys.

Durable and Non-Durable Exchanges

  • Durable Exchange: Survives broker restarts. Useful for critical applications.
  • Non-Durable Exchange: Deleted when the broker restarts. Suitable for transient tasks.

# Declare a durable exchange
channel.exchange_declare(exchange='durable_exchange', exchange_type='direct', durable=True)

Default Exchange

RabbitMQ provides a built-in default exchange (unnamed exchange) that routes messages directly to a queue with a name matching the routing key.


channel.queue_declare(queue='default_queue')
channel.basic_publish(exchange='', routing_key='default_queue', body='Default exchange message')

Best Practices for Exchanges

  • Use durable exchanges for critical applications that require persistence across broker restarts.
  • Use direct exchanges for targeted delivery when routing keys are predictable.
  • Use fanout exchanges for broadcasting to multiple queues.
  • Use topic exchanges for complex routing needs, especially with hierarchical routing keys.
  • Use headers exchanges for advanced filtering based on metadata.

6. Bindings

Bindings connect queues to exchanges with routing rules.


# Binding a queue with a routing key
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')

7. Consumer Acknowledgments

Two acknowledgment types:

  • Manual: Consumer explicitly sends an acknowledgment.
  • Automatic: RabbitMQ assumes successful processing.
# Auto ACK
channel.basic_consume(queue='test_queue', on_message_callback=lambda ch, method, properties, body: print(body), auto_ack=True)

# Manual ACK
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='test_queue', on_message_callback=callback)
channel.start_consuming()

8. Publisher Confirmations

  • Guarantees that RabbitMQ successfully received a message.
  • Enable publisher confirms for robust systems.

# Enable delivery confirmation
channel.confirm_delivery()

# Publish and handle confirmation
try:
    channel.basic_publish(exchange='direct_logs', routing_key='error', body='Message with confirmation')
    print("Message successfully published!")
except pika.exceptions.UnroutableError:
    print("Message could not be routed.")

9. Virtual Hosts (vhosts)

  • Logical partitions to segregate exchanges, queues, and users.
  • Use vhosts for multi-tenant setups.

Tomm, I am planning to explore more on RabbitMQ. Let’s see tomm.

Learning Notes #5 – Message Queues | RabbitMQ

22 December 2024 at 12:05

Github: https://github.com/syedjaferk/rabbitmq_message_queues

Imagine you own a busy online store. Customers place orders, payments are processed, inventory is updated, and confirmation emails are sent.

If these steps happen one after another in real-time (synchronous), your website could slow down or even crash under high demand. This is where message queues come in to picture. They help different parts of your system communicate smoothly and handle tasks efficiently, even during a rush. Its one of the solution for asynchronous communication.

What is a Message Queue?

A message queue is a software system that enables different parts of an application to send and receive messages asynchronously. Messages are temporarily stored in a queue until the recipient is ready to process them.

For example, think of it as a waiting line at a busy coffee shop. Each order (or message) waits in line until it’s picked up and handled by a coffee maker (or worker). The beauty of a message queue is that the coffee shop (producer) can keep taking orders without waiting for the coffee maker (consumer) to finish the current one.

Here’s how it works:

  • The producer sends messages to the queue.
  • The queue stores the messages.
  • The consumer picks up messages one by one to process them.

RabbitMQ is one kind of tool which helps in enabling async communication.

Key Components of RabbitMQ (a Popular Message Queue System)

  1. Producer: The sender of messages. For example, your website sending an order to the queue.
  2. Queue: The holding area for messages, like a to-do list. Each order waits here until processed.
  3. Consumer: The worker that processes messages. For example, the service that charges a credit card.
  4. Exchange: Think of this as a traffic controller. It decides which queue gets each message based on rules you set.
  5. Message: The data being sent, such as order details (customer name, items, total price).
  6. Acknowledgements (ACKs): A signal from the consumer to RabbitMQ saying, “Message processed successfully!”.

How a Message Queue Solves Real Problems

Scenario: Imagine your online store uses a message queue during a holiday rush.

  1. Placing Orders
    • Customers place orders on your website (producer).
    • Orders are sent to the RabbitMQ queue.
  2. Processing Payments
    • The payment service (consumer) picks up orders from the queue, one by one, to charge credit cards.
  3. Sending Emails
    • Once payment is successful, another consumer sends confirmation emails.
  4. Updating Inventory
    • A third consumer updates the inventory system.

Without a queue: All these tasks would happen one after the other, causing delays and potential failures.

With a queue: Each task works independently and efficiently, ensuring smooth operations.

Simple RabbitMQ Example

Step1: I am spinning up a RabbitMQ from a Docker


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

Step 2: Producer Code (Sending Messages)


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

channel.basic_publish(exchange='',
                      routing_key='order_queue',
                      body='Order #12345')
print("[x] Sent 'Order #12345'")
connection.close()

Explanation

  1. pika.ConnectionParameters('localhost'): Connects to RabbitMQ running locally.
  2. channel.queue_declare(queue='order_queue'): Ensures the queue exists. If it doesn’t, RabbitMQ will create it.
  3. channel.basic_publish(...): Publishes a message (in this case, “Order #12345”) to the specified queue.
  4. connection.close(): Cleans up and closes the connection.

Step 3: Consumer Code (Processing Message)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

def callback(ch, method, properties, body):
    print(f"[x] Processed {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='order_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Explanation

  1. channel.queue_declare(queue='order_queue'): Ensures the consumer is listening to the correct queue.
  2. callback: A function that processes each message. Here, it prints the message content and acknowledges it.
  3. channel.basic_consume(...): Binds the callback function to the queue, so the consumer processes messages as they arrive.
  4. channel.start_consuming(): Starts the consumer, waiting for messages indefinitely.

Best Practices (Not Tried – Just Got it from Course page.)

  1. Keep Messages Small: Only send necessary data to avoid delays.
  2. Use Dead Letter Queues: Handle failed messages separately to keep the main queue clear.
  3. Monitor Performance: Watch queue sizes and processing times to prevent backlogs.
  4. Scale Consumers: Add more workers during busy times to process messages faster.
  5. Secure Your System: Use encryption and authentication to protect sensitive data.

❌
❌