❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #38 – Choreography Pattern | Cloud Pattern

5 January 2025 at 12:21

Today i learnt about Choreography pattern, where each and every service is communicating using a messaging queue. In this blog, i jot down notes on choreography pattern for my future self.

What is the Choreography Pattern?

In the Choreography Pattern, services communicate directly with each other via asynchronous events, without a central controller. Each service is responsible for a specific part of the workflow and responds to events produced by other services. This pattern allows for a more autonomous and loosely coupled system.

Key Features

  • High scalability and independence of services.
  • Decentralized control.
  • Services respond to events they subscribe to.

When to Use the Choreography Pattern

  • Event-Driven Systems: When workflows can be modeled as events triggering responses.
  • High Scalability: When services need to operate independently and scale autonomously.
  • Loose Coupling: When minimizing dependencies between services is critical.

Benefits of the Choreography Pattern

  1. Decentralized Control: No single point of failure or bottleneck.
  2. Increased Flexibility: Services can be added or modified without affecting others.
  3. Better Scalability: Services operate independently and scale based on their workloads.
  4. Resilience: The system can handle partial failures more gracefully, as services continue independently.

Example: E-Commerce Order Fulfillment

Problem

A fictional e-commerce platform needs to manage the following workflow:

  1. Accepting an order.
  2. Validating payment.
  3. Reserving inventory.
  4. Sending notifications to the customer.

Each step is handled by an independent service.

Solution

Using the Choreography Pattern, each service listens for specific events and publishes new events as needed. The workflow emerges naturally from the interaction of these services.

Implementation

Step 1: Define the Workflow as Events

  • OrderPlaced: Triggered when a customer places an order.
  • PaymentProcessed: Triggered after successful payment.
  • InventoryReserved: Triggered after reserving inventory.
  • NotificationSent: Triggered when the customer is notified.

Step 2: Implement Services

Each service subscribes to events and performs its task.

shared_utility.py

import pika
import json

def publish_event(exchange, event_type, data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    message = json.dumps({"event_type": event_type, "data": data})
    channel.basic_publish(exchange=exchange, routing_key='', body=message)
    connection.close()

def subscribe_to_event(exchange, callback):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, exchange_type='fanout')
    queue = channel.queue_declare('', exclusive=True).method.queue
    channel.queue_bind(exchange=exchange, queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
    print(f"Subscribed to events on exchange '{exchange}'")
    channel.start_consuming()

Order Service


from shared_utils import publish_event

def place_order(order_id, customer):
    print(f"Placing order {order_id} for {customer}")
    publish_event("order_exchange", "OrderPlaced", {"order_id": order_id, "customer": customer})

if __name__ == "__main__":
    # Simulate placing an order
    place_order(order_id=101, customer="John Doe")

Payment Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_order_placed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "OrderPlaced":
        order_id = event["data"]["order_id"]
        print(f"Processing payment for order {order_id}")
        time.sleep(1)  # Simulate payment processing
        publish_event("payment_exchange", "PaymentProcessed", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("order_exchange", handle_order_placed)

Inventory Service


from shared_utils import publish_event, subscribe_to_event
import time

def handle_payment_processed(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "PaymentProcessed":
        order_id = event["data"]["order_id"]
        print(f"Reserving inventory for order {order_id}")
        time.sleep(1)  # Simulate inventory reservation
        publish_event("inventory_exchange", "InventoryReserved", {"order_id": order_id})

if __name__ == "__main__":
    subscribe_to_event("payment_exchange", handle_payment_processed)

Notification Service


from shared_utils import subscribe_to_event
import time

def handle_inventory_reserved(ch, method, properties, body):
    event = json.loads(body)
    if event["event_type"] == "InventoryReserved":
        order_id = event["data"]["order_id"]
        print(f"Notifying customer for order {order_id}")
        time.sleep(1)  # Simulate notification
        print(f"Customer notified for order {order_id}")

if __name__ == "__main__":
    subscribe_to_event("inventory_exchange", handle_inventory_reserved)

Step 3: Run the Workflow

  1. Start RabbitMQ using Docker as described above.
  2. Run the services in the following order:
    • Notification Service: python notification_service.py
    • Inventory Service: python inventory_service.py
    • Payment Service: python payment_service.py
    • Order Service: python order_service.py
  3. Place an order by running the Order Service. The workflow will propagate through the services as events are handled.

Key Considerations

  1. Event Bus: Use an event broker like RabbitMQ, Kafka, or AWS SNS to manage communication between services.
  2. Event Versioning: Include versioning to handle changes in event formats over time.
  3. Idempotency: Ensure services handle repeated events gracefully to avoid duplication.
  4. Monitoring and Tracing: Use tools like OpenTelemetry to trace and debug distributed workflows.
  5. Error Handling:
    • Dead Letter Queues (DLQs) to capture failed events.
    • Retries with backoff for transient errors.

Advantages of the Choreography Pattern

  1. Loose Coupling: Services interact via events without direct knowledge of each other.
  2. Resilience: Failures in one service don’t block the entire workflow.
  3. High Autonomy: Services operate independently and can be deployed or scaled separately.
  4. Dynamic Workflows: Adding new services to the workflow requires subscribing them to relevant events.

Challenges of the Choreography Pattern

  1. Complex Debugging: Tracing errors across distributed services can be difficult.
  2. Event Storms: Poorly designed workflows may generate excessive events, overwhelming the system.
  3. Coordination Overhead: Decentralized logic can lead to inconsistent behavior if not carefully managed.

Orchestrator vs. Choreography: When to Choose?

  • Use Orchestrator Pattern when workflows are complex, require central control, or involve many dependencies.
  • Use Choreography Pattern when you need high scalability, loose coupling, or event-driven workflows.

Learning Notes #10 – Lazy Queues | RabbitMQ

26 December 2024 at 06:54

What Are Lazy Queues?

  • Lazy Queues are designed to store messages primarily on disk rather than in memory.
  • They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.

Key Characteristics

  1. Disk-Based Storage – Messages are stored on disk immediately upon arrival, rather than being held in memory.
  2. Low Memory Usage – Only minimal metadata for messages is kept in memory.
  3. Scalability – Can handle millions of messages without consuming significant memory.
  4. Message Retrieval – Retrieving messages is slower because messages are fetched from disk.
  5. Durability – Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.

Trade-offs

  • Latency: Fetching messages from disk is slower than retrieving them from memory.
  • Throughput: Not suitable for high-throughput, low-latency applications.

Choose Lazy Queues if

  • You need to handle very large backlogs of messages.
  • Memory is a constraint in your system.Latency and throughput are less critical.

Implementation

Pre-requisites

1. Install and run RabbitMQ on your local machine.


docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

2. Install the pika library


pip install pika

Producer (producer.py)

This script sends a persistent message to a Lazy Queue.

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare a Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Publish a message
message = "Hello from the Producer via Custom Exchange!"
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent message
)

print(f"Message sent to Lazy Queue via Exchange: {message}")

# Close the connection
connection.close()

Consumer (consumer.py)

import pika

# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")

# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"

# Declare the custom exchange
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="direct",  # Direct exchange routes messages based on the routing key
    durable=True
)

# Declare the Lazy Queue
channel.queue_declare(
    queue=queue_name,
    durable=True,
    arguments={"x-queue-mode": "lazy"}  # Configure the queue as lazy
)

# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

# Callback function to process messages
def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Acknowledge the message

# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)

print("Waiting for messages. To exit, press CTRL+C")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print("Stopped consuming.")

# Close the connection
connection.close()

Explanation

  1. Producer
    • Defines a custom exchange (custom_exchange) of type direct.
    • Declares a Lazy Queue (lazy_queue_example).
    • Binds the queue to the exchange using a routing key (custom_routing_key).
    • Publishes a persistent message via the custom exchange and routing key.
  2. Consumer
    • Declares the same exchange and Lazy Queue to ensure they exist.
    • Consumes messages routed to the queue through the custom exchange and routing key.
  3. Custom Exchange and Binding
    • The direct exchange type routes messages based on an exact match of the routing key.
    • Binding ensures the queue receives messages published to the exchange with the specified key.
  4. Lazy Queue Behavior
    • Messages are stored directly on disk to minimize memory usage.

❌
❌