❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #38 – Choreography Pattern | Cloud Pattern

5 January 2025 at 12:21

Today i learnt about Choreography pattern, where each and every service is communicating using a messaging queue. In this blog, i jot down notes on choreography pattern for my future self.

What is the Choreography Pattern?

In the Choreography Pattern, services communicate directly with each other via asynchronous events, without a central controller. Each service is responsible for a specific part of the workflow and responds to events produced by other services. This pattern allows for a more autonomous and loosely coupled system.

Key Features

  • High scalability and independence of services.
  • Decentralized control.
  • Services respond to events they subscribe to.

When to Use the Choreography Pattern

  • Event-Driven Systems: When workflows can be modeled as events triggering responses.
  • High Scalability: When services need to operate independently and scale autonomously.
  • Loose Coupling: When minimizing dependencies between services is critical.

Benefits of the Choreography Pattern

  1. Decentralized Control: No single point of failure or bottleneck.
  2. Increased Flexibility: Services can be added or modified without affecting others.
  3. Better Scalability: Services operate independently and scale based on their workloads.
  4. Resilience: The system can handle partial failures more gracefully, as services continue independently.

Example: E-Commerce Order Fulfillment

Problem

A fictional e-commerce platform needs to manage the following workflow:

  1. Accepting an order.
  2. Validating payment.
  3. Reserving inventory.
  4. Sending notifications to the customer.

Each step is handled by an independent service.

Solution

Using the Choreography Pattern, each service listens for specific events and publishes new events as needed. The workflow emerges naturally from the interaction of these services.

Implementation

Step 1: Define the Workflow as Events

  • OrderPlaced: Triggered when a customer places an order.
  • PaymentProcessed: Triggered after successful payment.
  • InventoryReserved: Triggered after reserving inventory.
  • NotificationSent: Triggered when the customer is notified.

Step 2: Implement Services

Each service subscribes to events and performs its task.

shared_utility.py

import pika
import json

def publish_event(exchange, event_type, data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    message = json.dumps({"event_type": event_type, "data": data})
    channel.basic_publish(exchange=exchange, routing_key='', body=message)
    connection.close()

def subscribe_to_event(exchange, callback):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    queue = channel.queue_declare('', exclusive=True).method.queue
    channel.queue_bind(exchange=exchange, queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
    print(f"Subscribed to events on exchange '{exchange}'")
    channel.start_consuming()

Order Service


from shared_utils import publish_event

def place_order(order_id, customer):
    print(f"Placing order {order_id} for {customer}")
    publish_event("order_exchange", "OrderPlaced", {"order_id": order_id, "customer": customer})

if __name__ == "__main__":
    # Simulate placing an order
    place_order(order_id=101, customer="John Doe")

Payment Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_order_placed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "OrderPlaced":
        order_id = event["data"]["order_id"]
        print(f"Processing payment for order {order_id}")
        time.sleep(1)  # Simulate payment processing
        publish_event("payment_exchange", "PaymentProcessed", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("order_exchange", handle_order_placed)

Inventory Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_payment_processed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "PaymentProcessed":
        order_id = event["data"]["order_id"]
        print(f"Reserving inventory for order {order_id}")
        time.sleep(1)  # Simulate inventory reservation
        publish_event("inventory_exchange", "InventoryReserved", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("payment_exchange", handle_payment_processed)

Notification Service


from shared_utils import subscribe_to_event
import time

def handle_inventory_reserved(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "InventoryReserved":
        order_id = event["data"]["order_id"]
        print(f"Notifying customer for order {order_id}")
        time.sleep(1)  # Simulate notification
        print(f"Customer notified for order {order_id}")

if __name__ == "__main__":
    subscribe_to_event("inventory_exchange", handle_inventory_reserved)

Step 3: Run the Workflow

  1. Start RabbitMQ using Docker as described above.
  2. Run the services in the following order:
    • Notification Service: python notification_service.py
    • Inventory Service: python inventory_service.py
    • Payment Service: python payment_service.py
    • Order Service: python order_service.py
  3. Place an order by running the Order Service. The workflow will propagate through the services as events are handled.

Key Considerations

  1. Event Bus: Use an event broker like RabbitMQ, Kafka, or AWS SNS to manage communication between services.
  2. Event Versioning: Include versioning to handle changes in event formats over time.
  3. Idempotency: Ensure services handle repeated events gracefully to avoid duplication.
  4. Monitoring and Tracing: Use tools like OpenTelemetry to trace and debug distributed workflows.
  5. Error Handling:
    • Dead Letter Queues (DLQs) to capture failed events.
    • Retries with backoff for transient errors.

Advantages of the Choreography Pattern

  1. Loose Coupling: Services interact via events without direct knowledge of each other.
  2. Resilience: Failures in one service don’t block the entire workflow.
  3. High Autonomy: Services operate independently and can be deployed or scaled separately.
  4. Dynamic Workflows: Adding new services to the workflow requires subscribing them to relevant events.

Challenges of the Choreography Pattern

  1. Complex Debugging: Tracing errors across distributed services can be difficult.
  2. Event Storms: Poorly designed workflows may generate excessive events, overwhelming the system.
  3. Coordination Overhead: Decentralized logic can lead to inconsistent behavior if not carefully managed.

Orchestrator vs. Choreography: When to Choose?

  • Use Orchestrator Pattern when workflows are complex, require central control, or involve many dependencies.
  • Use Choreography Pattern when you need high scalability, loose coupling, or event-driven workflows.

Learning Notes #30 – Queue Based Loading | Cloud Patterns

3 January 2025 at 14:47

Today, i learnt about Queue Based Loading pattern, which helps to manage intermittent peak load to a service via queues. Basically decoupling Tasks from Services. In this blog i jot down notes on this pattern for my future self.

In today’s digital landscape, applications are expected to handle large-scale operations efficiently. Whether it’s processing massive data streams, ensuring real-time responsiveness, or integrating with multiple third-party services, scalability and reliability are paramount. One pattern that elegantly addresses these challenges is the Queue-Based Loading Pattern.

What Is the Queue-Based Loading Pattern?

The Queue-Based Loading Pattern leverages message queues to decouple and coordinate tasks between producers (such as applications or services generating data) and consumers (services or workers processing that data). By using queues as intermediaries, this pattern allows systems to manage workloads efficiently, ensuring seamless and scalable operation.

Key Components of the Pattern

  1. Producers: Producers are responsible for generating tasks or data. They send these tasks to a message queue instead of directly interacting with consumers. Examples include:
    • Web applications logging user activity.
    • IoT devices sending sensor data.
  2. Message Queue: The queue acts as a buffer, storing tasks until consumers are ready to process them. Popular tools for implementing queues include RabbitMQ, Apache Kafka, AWS SQS, and Redis.
  3. Consumers: Consumers retrieve messages from the queue and process them asynchronously. They are typically designed to handle tasks independently and at their own pace.
  4. Processing Logic: This is the core functionality that processes the tasks retrieved by consumers. For example, resizing images, sending notifications, or updating a database.

How It Works

  1. Task Generation: Producers push tasks to the queue as they are generated.
  2. Message Storage: The queue stores tasks in a structured manner (FIFO, priority-based, etc.) and ensures reliable delivery.
  3. Task Consumption: Consumers pull tasks from the queue, process them, and optionally acknowledge completion.
  4. Scalability: New consumers can be added dynamically to handle increased workloads, ensuring the system remains responsive.

Benefits of the Queue-Based Loading Pattern

  1. Decoupling: Producers and consumers operate independently, reducing tight coupling and improving system maintainability.
  2. Scalability: By adding more consumers, systems can easily scale to handle higher workloads.
  3. Fault Tolerance: If a consumer fails, messages remain in the queue, ensuring no data is lost.
  4. Load Balancing: Tasks are distributed evenly among consumers, preventing any single consumer from becoming a bottleneck.
  5. Asynchronous Processing: Consumers can process tasks in the background, freeing producers to continue generating data without delay.

Issues and Considerations

  1. Rate Limiting: Implement logic to control the rate at which services handle messages to prevent overwhelming the target resource. Test the system under load and adjust the number of queues or service instances to manage demand effectively.
  2. One-Way Communication: Message queues are inherently one-way. If tasks require responses, you may need to implement a separate mechanism for replies.
  3. Autoscaling Challenges: Be cautious when autoscaling consumers, as it can lead to increased contention for shared resources, potentially reducing the effectiveness of load leveling.
  4. Traffic Variability: Consider the variability of incoming traffic to avoid situations where tasks pile up faster than they are processed, creating a perpetual backlog.
  5. Queue Persistence: Ensure your queue is durable and capable of persisting messages. Crashes or system limits could lead to dropped messages, risking data loss.

Use Cases

  1. Email and Notification Systems: Sending bulk emails or push notifications without overloading the main application.
  2. Data Pipelines: Ingesting, transforming, and analyzing large datasets in real-time or batch processing.
  3. Video Processing: Queues facilitate tasks like video encoding and thumbnail generation.
  4. Microservices Communication: Ensures reliable and scalable communication between microservices.

Best Practices

  1. Message Durability: Configure your queue to persist messages to disk, ensuring they are not lost during system failures.
  2. Monitoring and Metrics: Use monitoring tools to track queue lengths, processing rates, and consumer health.
  3. Idempotency: Design consumers to handle duplicate messages gracefully.
  4. Error Handling and Dead Letter Queues (DLQs): Route failed messages to DLQs for later analysis and reprocessing.

Learning Notes #29 – Two Phase Commit Protocol | ACID in Distributed Systems

3 January 2025 at 13:45

Today, i learnt about compensating transaction pattern which leads to two phase commit protocol which helps in maintaining the Atomicity of a distributed transactions. Distributed transactions are hard.

In this blog, i jot down notes on Two Phase Commit protocol for better understanding.

The Two-Phase Commit (2PC) protocol is a distributed algorithm used to ensure atomicity in transactions spanning multiple nodes or databases. Atomicity ensures that either all parts of a transaction are committed or none are, maintaining consistency in distributed systems.

Why Two-Phase Commit?

In distributed systems, a transaction might involve several independent nodes, each maintaining its own database. Without a mechanism like 2PC, failures in one node can leave the system in an inconsistent state.

For example, consider an e-commerce platform where a customer places an order.

The transaction involves updating the inventory in one database, recording the payment in another, and generating a shipment request in a third system. If the payment database successfully commits but the inventory database fails, the system becomes inconsistent, potentially causing issues like double selling or incomplete orders. 2PC mitigates this by providing a coordinated protocol to commit or abort transactions across all nodes.

The Phases of 2PC

The protocol operates in two main phases

1. Prepare Phase (Voting Phase)

The coordinator node initiates the transaction and prepares to commit it across all participating nodes.

  1. Request to Prepare: The coordinator sends a PREPARE request to all participant nodes.
  2. Vote: Each participant checks if it can commit the transaction (e.g., no constraints violated, resources available). It logs its decision (YES or NO) locally and sends its vote to the coordinator. If any participant votes NO, the transaction cannot be committed.

2. Commit Phase (Decision Phase)

Based on the votes received in the prepare phase, the coordinator decides the final outcome.

Commit Decision:

If all participants vote YES, the coordinator logs a COMMIT decision, sends COMMIT messages to all participants, and participants apply the changes and confirm with an acknowledgment.

Abort Decision:

If any participant votes NO, the coordinator logs an ABORT decision, sends ABORT messages to all participants, and participants roll back any changes made during the transaction.

Implementation:

For a simple implementation of 2PC, we can try out the below flow using RabbitMQ as a medium for Co-Ordinator.

Basically, we need not to write this from scratch, we have tools,

1. Relational Databases

Most relational databases have built-in support for distributed transactions and 2PC.

  • PostgreSQL: Implements distributed transactions using foreign data wrappers (FDWs) with PREPARE TRANSACTION and COMMIT PREPARED.
  • MySQL: Supports XA transactions, which follow the 2PC protocol.
  • Oracle Database: Offers robust distributed transaction support using XA.
  • Microsoft SQL Server: Provides distributed transactions through MS-DTC.

2. Distributed Transaction Managers

These tools manage distributed transactions across multiple systems.

  • Atomikos: A popular Java-based transaction manager supporting JTA/XA for distributed systems.
  • Bitronix: Another lightweight transaction manager for Java applications supporting JTA/XA.
  • JBoss Transactions (Narayana): A robust Java transaction manager that supports 2PC, often used in conjunction with JBoss servers.

3. Message Brokers

Message brokers provide transaction capabilities with 2PC.

  • RabbitMQ: Supports the 2PC protocol using transactional channels.
  • Apache Kafka: Supports transactions, ensuring β€œexactly-once” semantics across producers and consumers.
  • ActiveMQ: Provides distributed transaction support through JTA integration

4. Workflow Engines

Workflow engines can orchestrate 2PC across distributed systems.

  • Apache Camel: Can coordinate 2PC transactions using its transaction policy.
  • Camunda: Provides BPMN-based orchestration that can include transactional boundaries.
  • Zeebe: Supports distributed transaction workflows in modern architectures.

Key Properties of 2PC

  1. Atomicity: Ensures all-or-nothing transaction behavior.
  2. Consistency: Guarantees system consistency across all nodes.
  3. Durability: Uses logs to ensure decisions survive node failures.

Challenges of 2PC

  1. Blocking Nature: If the coordinator fails during the commit phase, participants must wait indefinitely unless a timeout or external mechanism is implemented.
  2. Performance Overhead: Multiple message exchanges and logging operations introduce latency.
  3. Single Point of Failure: The coordinator’s failure can stall the entire transaction.

Learning Notes #12 – Alternate Exchanges | RabbitMQ

27 December 2024 at 10:36

Today i learnt about Alternate Exchange, which provide a way to handle undeliverable messages. In this blog, i share the notes on what alternate exchanges are, why they are useful, and how to implement them in your RabbitMQ setup.

What Are Alternate Exchanges?

In the normal flow, producer will send a message to the exchange and if the queue is binded correctly then it will be placed in the correct queue.

An alternate exchange in RabbitMQ is a fallback exchange configured for another exchange. If a message cannot be routed to any queue bound to the primary exchange, RabbitMQ will publish the message to the alternate exchange instead. This mechanism ensures that undeliverable messages are not lost but can be processed in a different way, such as logging, alerting, or storing them for later inspection.

When this scenario happens

A message goes to an alternate exchange in RabbitMQ in the following scenarios:

1. No Binding for the Routing Key

  • The primary exchange does not have any queue bound to it with the routing key specified in the message.
  • Example: A message with routing key invalid_key is sent to a direct exchange that has no queue bound to invalid_key.

2. Unbound Queues:

  • Even if a queue exists, it is not bound to the primary exchange or the specific routing key used in the message.
  • Example: A queue exists for the primary exchange but is not explicitly bound to any routing key.

3. Exchange Type Mismatch

  • The exchange type (e.g., direct, fanout, topic) does not match the routing pattern of the message.
  • Example: A message is sent with a specific routing key to a fanout exchange that delivers to all bound queues regardless of the key.

4. Misconfigured Bindings

  • Bindings exist but do not align with the routing requirements of the message.
  • Example: A topic exchange has a binding for user.* but receives a message with the routing key order.processed.

5. Queue Deletion After Binding

  • A queue was bound to the exchange but is deleted or unavailable at runtime.
  • Example: A message with a valid routing key arrives, but the corresponding queue is no longer active.

6. TTL (Time-to-Live) Expired Queues

  • Messages routed to a queue with a time-to-live setting expire before being consumed and are re-routed to an alternate exchange if dead-lettering is enabled.
  • Example: A primary exchange routes messages to a TTL-bound queue, and expired messages are forwarded to the alternate exchange.

7. Exchange Misconfiguration

  • The primary exchange is operational, but its configurations prevent messages from being delivered to any queue.
  • Example: A missing or incorrect alternate-exchange argument setup leads to misrouting.

Use Cases for Alternate Exchanges

  • Error Handling: Route undeliverable messages to a dedicated queue for later inspection or reprocessing.
  • Logging: Keep track of messages that fail routing for auditing purposes.
  • Dead Letter Queues: Use alternate exchanges to implement dead-letter queues to analyze why messages could not be routed.
  • Load Balancing: Forward undeliverable messages to another exchange for alternative processing

How to Implement Alternate Exchanges in Python

Let’s walk through the steps to configure and use alternate exchanges in RabbitMQ using Python.

Scenario 1: Handling Messages with Valid and Invalid Routing Keys

producer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare the alternate exchange
channel.exchange_declare(exchange='alternate_exchange', exchange_type='fanout')

# Declare a queue and bind it to the alternate exchange
channel.queue_declare(queue='unroutable_queue')
channel.queue_bind(exchange='alternate_exchange', queue='unroutable_queue')

# Declare the primary exchange with an alternate exchange argument
channel.exchange_declare(
    exchange='primary_exchange',
    exchange_type='direct',
    arguments={'alternate-exchange': 'alternate_exchange'}
)

# Declare and bind a queue to the primary exchange
channel.queue_declare(queue='valid_queue')
channel.queue_bind(exchange='primary_exchange', queue='valid_queue', routing_key='key1')

# Publish a message with a valid routing key
channel.basic_publish(
    exchange='primary_exchange',
    routing_key='key1',
    body='Message with a valid routing key'
)

print("Message with valid routing key sent to 'valid_queue'.")

# Publish a message with an invalid routing key
channel.basic_publish(
    exchange='primary_exchange',
    routing_key='invalid_key',
    body='Message with an invalid routing key'
)

print("Message with invalid routing key sent to 'alternate_exchange'.")

# Close the connection
connection.close()

consumer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Consume messages from the alternate queue
method_frame, header_frame, body = channel.basic_get(queue='unroutable_queue', auto_ack=True)
if method_frame:
    print(f"Received message from alternate queue: {body.decode()}")
else:
    print("No messages in the alternate queue")

# Close the connection
connection.close()

Scenario 2: Logging Unroutable Messages

producer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare the alternate exchange
channel.exchange_declare(exchange='logging_exchange', exchange_type='fanout')

# Declare a logging queue and bind it to the logging exchange
channel.queue_declare(queue='logging_queue')
channel.queue_bind(exchange='logging_exchange', queue='logging_queue')

# Declare the primary exchange with a logging alternate exchange argument
channel.exchange_declare(
    exchange='primary_logging_exchange',
    exchange_type='direct',
    arguments={'alternate-exchange': 'logging_exchange'}
)

# Publish a message with an invalid routing key
channel.basic_publish(
    exchange='primary_logging_exchange',
    routing_key='invalid_logging_key',
    body='Message for logging'
)

print("Message with invalid routing key sent to 'logging_exchange'.")

# Close the connection
connection.close()

consumer.py

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Consume messages from the logging queue
method_frame, header_frame, body = channel.basic_get(queue='logging_queue', auto_ack=True)
if method_frame:
    print(f"Logged message: {body.decode()}")
else:
    print("No messages in the logging queue")

# Close the connection
connection.close()

Learning Notes #10 – Lazy Queues | RabbitMQ

26 December 2024 at 06:54

What Are Lazy Queues?

  • Lazy Queues are designed to store messages primarily on disk rather than in memory.
  • They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.

Key Characteristics

  1. Disk-Based Storage – Messages are stored on disk immediately upon arrival, rather than being held in memory.
  2. Low Memory Usage – Only minimal metadata for messages is kept in memory.
  3. Scalability – Can handle millions of messages without consuming significant memory.
  4. Message Retrieval – Retrieving messages is slower because messages are fetched from disk.
  5. Durability – Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.

Trade-offs

  • Latency: Fetching messages from disk is slower than retrieving them from memory.
  • Throughput: Not suitable for high-throughput, low-latency applications.

Choose Lazy Queues if

  • You need to handle very large backlogs of messages.
  • Memory is a constraint in your system.Latency and throughput are less critical.

Implementation

Pre-requisites

1. Install and run RabbitMQ on your local machine.


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

2. Install the pika library


pip install pika

Producer (producer.py)

This script sends a persistent message to a Lazy Queue.

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare a Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Publish a message
message = "Hello from the Producer via Custom Exchange!"
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent message
)

print(f"Message sent to Lazy Queue via Exchange: {message}")

# Close the connection
connection.close()

Consumer (consumer.py)

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare the Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Callback function to process messages
def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Acknowledge the message

# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)

print("Waiting for messages. To exit, press CTRL+C")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print("Stopped consuming.")

# Close the connection
connection.close()

Explanation

  1. Producer
    • Defines a custom exchange (custom_exchange) of type direct.
    • Declares a Lazy Queue (lazy_queue_example).
    • Binds the queue to the exchange using a routing key (custom_routing_key).
    • Publishes a persistent message via the custom exchange and routing key.
  2. Consumer
    • Declares the same exchange and Lazy Queue to ensure they exist.
    • Consumes messages routed to the queue through the custom exchange and routing key.
  3. Custom Exchange and Binding
    • The direct exchange type routes messages based on an exact match of the routing key.
    • Binding ensures the queue receives messages published to the exchange with the specified key.
  4. Lazy Queue Behavior
    • Messages are stored directly on disk to minimize memory usage.

Learning Notes #9 – Quorum Queues | RabbitMQ

25 December 2024 at 16:42

What Are Quorum Queues?

  • Quorum Queues are distributed queues built on the Raft consensus algorithm.
  • They are designed for high availability, durability, and data safety by replicating messages across multiple nodes in a RabbitMQ cluster.
  • Its a replacement of Mirrored Queues.

Key Characteristics

  1. Replication:
    • Messages are replicated across a quorum (a majority of nodes).
    • A quorum consists of an odd number of replicas (e.g., 3, 5, 7) to ensure a majority can elect a leader during failovers.
  2. Leader-Follower Architecture:
    • Each Quorum Queue has one leader and multiple followers.
    • The leader handles all write and read operations, while followers replicate messages and provide redundancy.
  3. Durability:
    • Messages are written to disk on all quorum nodes, ensuring persistence even if nodes fail.
  4. High Availability:
    • If the leader node fails, RabbitMQ elects a new leader from the remaining quorum, ensuring continued operation.
  5. Consistency:
    • Quorum Queues prioritize consistency over availability.
    • Messages are acknowledged only after replication is successful on a majority of nodes.
  6. Message Ordering:
    • Message ordering is preserved during normal operations but may be disrupted during leader failovers.

Use Cases

  • Mission-Critical Applications – Systems where message loss is unacceptable (e.g., financial transactions, order processing).
  • Distributed Systems – Environments requiring high availability and fault tolerance.
  • Data Safety – Applications prioritizing consistency over throughput (e.g., event logs, audit trails).

Setups

Using rabbitmqctl


rabbitmqctl add_queue quorum_queue --type quorum

Using python


channel.queue_declare(queue="quorum_queue", arguments={"x-queue-type": "quorum"})

References:

  1. https://www.rabbitmq.com/docs/quorum-queues

Learning Notes #7 – AMQP Protocol and RabbitMQ | An Overview

24 December 2024 at 18:22

Today, i learned about AMQP Protocol, Components of RabbitMQ (Connections, Channels, Queues, Exchanges, Bindings and Different Types of Exchanges, Acknowledgement and Publisher Confirmation). I learned these all from CloudAMQP In this blog, you will find a crisp details on these topics.

1. Overview of AMQP Protocol

  • Advanced Message Queuing Protocol (AMQP) is an open standard for messaging middleware. It enables systems to exchange messages in a reliable and flexible manner.
  • Key components:
    • Producers: Applications that send messages.
    • Consumers: Applications that receive messages.
    • Broker: Middleware (e.g., RabbitMQ) that manages message exchanges.
    • Message: A unit of data transferred between producer and consumer.

2. How AMQP Works in RabbitMQ

  • RabbitMQ implements AMQP to facilitate message exchange. It acts as the broker, managing queues, exchanges, and bindings.
  • AMQP Operations:
    1. Producer sends a message to an exchange.
    2. The exchange routes the message to one or more queues based on bindings.
    3. Consumer retrieves the message from the queue.

3. Connections and Channels

Connections

A connection is a persistent, long-lived TCP connection between a client application and the RabbitMQ broker. Connections are relatively resource-intensive because they involve socket communication and the overhead of establishing and maintaining the connection. Each connection is uniquely identified by the broker and can be shared across multiple threads or processes.

When an application establishes a connection to RabbitMQ, it uses it as a gateway to interact with the broker. This includes creating channels, declaring queues and exchanges, publishing messages, and consuming messages. Connections should ideally be reused across the application to reduce overhead and optimize resource usage.

Channels

A channel is a lightweight, logical communication pathway established within a connection. Channels provide a way to perform multiple operations concurrently over a single connection. They are less resource-intensive than connections and are designed to handle operations such as queue declarations, message publishing, and consuming.

Using channels allows applications to:

  1. Scale efficiently: Instead of opening multiple connections, applications can open multiple channels over a single connection.
  2. Isolate operations: Each channel operates independently. For instance, one channel can consume messages while another publishes.

How They Work Together

When a client connects to RabbitMQ, it first establishes a connection. Within that connection, it can open multiple channels. Each channel operates as a virtual connection, allowing concurrent tasks without needing separate TCP connections.


import pika

# Establish a connection to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# Create multiple channels on the same connection
channel1 = connection.channel()
channel2 = connection.channel()

# Declare queues on each channel
channel1.queue_declare(queue='queue1')
channel2.queue_declare(queue='queue2')

# Publish messages on different channels
channel1.basic_publish(exchange='', routing_key='queue1', body='Message for Queue 1')
channel2.basic_publish(exchange='', routing_key='queue2', body='Message for Queue 2')

print("Messages sent to both queues!")

# Close the connection
connection.close()

Best Practices (Not Tried; Got this from the video)

  1. Reusing Connections: Establish one connection per application or service and share it across threads or processes for efficiency.
  2. Using Channels for Parallelism: Open separate channels for different operations like publishing and consuming.
  3. Graceful Cleanup: Always close channels and connections when done to avoid resource leaks.

4. Queues

  • Act as message storage.
  • Can be:
    • Durable: Survives broker restarts.
    • Exclusive: Used by a single connection.
    • Auto-delete: Deleted when the last consumer disconnects.

# Declaring a durable queue
channel.queue_declare(queue='durable_queue', durable=True)

# Sending a persistent message
channel.basic_publish(
    exchange='',  # Default exchange
    routing_key='durable_queue',
    body='Persistent message',
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

5. Exchanges

An exchange in RabbitMQ is a routing mechanism that determines how messages sent by producers are directed to queues. Exchanges act as intermediaries between producers and queues, enabling flexible and efficient message routing based on routing rules and patterns.

Types of Exchanges

RabbitMQ supports four types of exchanges, each with its unique routing mechanism:

1. Direct Exchange

  • Routes messages to queues based on an exact match of the routing key.
  • If the routing key in the message matches the binding key of a queue, the message is routed to that queue.
  • Use case: Task queues where each task has a specific destination.

Example:

  • Queue queue1 is bound to the exchange with the routing key info.
  • A message with the routing key info is routed to queue1.

channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
channel.queue_declare(queue='direct_queue')
channel.queue_bind(exchange='direct_exchange', queue='direct_queue', routing_key='info')
channel.basic_publish(exchange='direct_exchange', routing_key='info', body='Direct message')

2. Fanout Exchange

  • Broadcasts messages to all queues bound to the exchange, ignoring routing keys.
  • Use case: Broadcasting events to multiple consumers, such as notifications or logs.

Example:

  • All queues bound to the exchange receive the same message.

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')

channel.queue_bind(exchange='fanout_exchange', queue='queue1')
channel.queue_bind(exchange='fanout_exchange', queue='queue2')

channel.basic_publish(exchange='fanout_exchange', routing_key='', body='Broadcast message')

3. Topic Exchange

  • Routes messages to queues based on pattern matching of routing keys.
  • Routing keys are dot-separated words, and queues can bind with patterns using wildcards:
    • * matches exactly one word.
    • # matches zero or more words.
  • Use case: Complex routing scenarios, such as logging systems with multiple log levels and sources.

Example:

  • Queue queue1 is bound with the pattern logs.info.*.
  • A message with the routing key logs.info.app1 is routed to queue1.

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
channel.queue_declare(queue='topic_queue')

channel.queue_bind(exchange='topic_exchange', queue='topic_queue', routing_key='logs.info.*')

channel.basic_publish(exchange='topic_exchange', routing_key='logs.info.app1', body='Topic message')

4. Headers Exchange

  • Routes messages based on message header attributes instead of routing keys.
  • Headers can specify conditions like x-match:
    • x-match = all: All specified headers must match.
    • x-match = any: At least one specified header must match.
  • Use case: Advanced filtering scenarios.

Example:

  • Queue queue1 is bound with headers format=json and type=report.

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
channel.queue_declare(queue='headers_queue')

channel.queue_bind(
    exchange='headers_exchange',
    queue='headers_queue',
    arguments={'format': 'json', 'type': 'report', 'x-match': 'all'}
)

channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',
    body='Headers message',
    properties=pika.BasicProperties(headers={'format': 'json', 'type': 'report'})
)

Exchange Lifecycle

  1. Declaration: Exchanges must be explicitly declared before use. If an exchange is not declared and a producer tries to publish a message to it, an error will occur.
  2. Binding: Queues are bound to exchanges with routing keys or header arguments.
  3. Publishing: Producers publish messages to exchanges with optional routing keys.

Durable and Non-Durable Exchanges

  • Durable Exchange: Survives broker restarts. Useful for critical applications.
  • Non-Durable Exchange: Deleted when the broker restarts. Suitable for transient tasks.

# Declare a durable exchange
channel.exchange_declare(exchange='durable_exchange', exchange_type='direct', durable=True)

Default Exchange

RabbitMQ provides a built-in default exchange (unnamed exchange) that routes messages directly to a queue with a name matching the routing key.


channel.queue_declare(queue='default_queue')
channel.basic_publish(exchange='', routing_key='default_queue', body='Default exchange message')

Best Practices for Exchanges

  • Use durable exchanges for critical applications that require persistence across broker restarts.
  • Use direct exchanges for targeted delivery when routing keys are predictable.
  • Use fanout exchanges for broadcasting to multiple queues.
  • Use topic exchanges for complex routing needs, especially with hierarchical routing keys.
  • Use headers exchanges for advanced filtering based on metadata.

6. Bindings

Bindings connect queues to exchanges with routing rules.


# Binding a queue with a routing key
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')

7. Consumer Acknowledgments

Two acknowledgment types:

  • Manual: Consumer explicitly sends an acknowledgment.
  • Automatic: RabbitMQ assumes successful processing.
# Auto ACK
channel.basic_consume(queue='test_queue', on_message_callback=lambda ch, method, properties, body: print(body), auto_ack=True)

# Manual ACK
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='test_queue', on_message_callback=callback)
channel.start_consuming()

8. Publisher Confirmations

  • Guarantees that RabbitMQ successfully received a message.
  • Enable publisher confirms for robust systems.

# Enable delivery confirmation
channel.confirm_delivery()

# Publish and handle confirmation
try:
    channel.basic_publish(exchange='direct_logs', routing_key='error', body='Message with confirmation')
    print("Message successfully published!")
except pika.exceptions.UnroutableError:
    print("Message could not be routed.")

9. Virtual Hosts (vhosts)

  • Logical partitions to segregate exchanges, queues, and users.
  • Use vhosts for multi-tenant setups.

Tomm, I am planning to explore more on RabbitMQ. Let’s see tomm.

Learning Notes #5 – Message Queues | RabbitMQ

22 December 2024 at 12:05

Github: https://github.com/syedjaferk/rabbitmq_message_queues

Imagine you own a busy online store. Customers place orders, payments are processed, inventory is updated, and confirmation emails are sent.

If these steps happen one after another in real-time (synchronous), your website could slow down or even crash under high demand. This is where message queues come in to picture. They help different parts of your system communicate smoothly and handle tasks efficiently, even during a rush. Its one of the solution for asynchronous communication.

What is a Message Queue?

A message queue is a software system that enables different parts of an application to send and receive messages asynchronously. Messages are temporarily stored in a queue until the recipient is ready to process them.

For example, think of it as a waiting line at a busy coffee shop. Each order (or message) waits in line until it’s picked up and handled by a coffee maker (or worker). The beauty of a message queue is that the coffee shop (producer) can keep taking orders without waiting for the coffee maker (consumer) to finish the current one.

Here’s how it works:

  • The producer sends messages to the queue.
  • The queue stores the messages.
  • The consumer picks up messages one by one to process them.

RabbitMQ is one kind of tool which helps in enabling async communication.

Key Components of RabbitMQ (a Popular Message Queue System)

  1. Producer: The sender of messages. For example, your website sending an order to the queue.
  2. Queue: The holding area for messages, like a to-do list. Each order waits here until processed.
  3. Consumer: The worker that processes messages. For example, the service that charges a credit card.
  4. Exchange: Think of this as a traffic controller. It decides which queue gets each message based on rules you set.
  5. Message: The data being sent, such as order details (customer name, items, total price).
  6. Acknowledgements (ACKs): A signal from the consumer to RabbitMQ saying, β€œMessage processed successfully!”.

How a Message Queue Solves Real Problems

Scenario: Imagine your online store uses a message queue during a holiday rush.

  1. Placing Orders
    • Customers place orders on your website (producer).
    • Orders are sent to the RabbitMQ queue.
  2. Processing Payments
    • The payment service (consumer) picks up orders from the queue, one by one, to charge credit cards.
  3. Sending Emails
    • Once payment is successful, another consumer sends confirmation emails.
  4. Updating Inventory
    • A third consumer updates the inventory system.

Without a queue: All these tasks would happen one after the other, causing delays and potential failures.

With a queue: Each task works independently and efficiently, ensuring smooth operations.

Simple RabbitMQ Example

Step1: I am spinning up a RabbitMQ from a Docker


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

Step 2: Producer Code (Sending Messages)


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

channel.basic_publish(exchange='',
                      routing_key='order_queue',
                      body='Order #12345')
print("[x] Sent 'Order #12345'")
connection.close()

Explanation

  1. pika.ConnectionParameters('localhost'): Connects to RabbitMQ running locally.
  2. channel.queue_declare(queue='order_queue'): Ensures the queue exists. If it doesn’t, RabbitMQ will create it.
  3. channel.basic_publish(...): Publishes a message (in this case, β€œOrder #12345”) to the specified queue.
  4. connection.close(): Cleans up and closes the connection.

Step 3: Consumer Code (Processing Message)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue')

def callback(ch, method, properties, body):
    print(f"[x] Processed {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='order_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Explanation

  1. channel.queue_declare(queue='order_queue'): Ensures the consumer is listening to the correct queue.
  2. callback: A function that processes each message. Here, it prints the message content and acknowledges it.
  3. channel.basic_consume(...): Binds the callback function to the queue, so the consumer processes messages as they arrive.
  4. channel.start_consuming(): Starts the consumer, waiting for messages indefinitely.

Best Practices (Not Tried – Just Got it from Course page.)

  1. Keep Messages Small: Only send necessary data to avoid delays.
  2. Use Dead Letter Queues: Handle failed messages separately to keep the main queue clear.
  3. Monitor Performance: Watch queue sizes and processing times to prevent backlogs.
  4. Scale Consumers: Add more workers during busy times to process messages faster.
  5. Secure Your System: Use encryption and authentication to protect sensitive data.

❌
❌