❌

Reading view

There are new articles available, click to refresh the page.

Learning Notes #24 – Competing Consumer | Messaging Queue Patterns

Today, i learnt about competing consumer, its a simple concept of consuming messages with many consumers. In this blog, i jot down notes on competing consumer for better understanding.

The competing consumer pattern is a commonly used design paradigm in distributed systems for handling workloads efficiently. It addresses the challenge of distributing tasks among multiple consumers to ensure scalability, reliability, and better resource utilization. In this blog, we’ll delve into the details of this pattern, its implementation, and its benefits.

What is the Competing Consumer Pattern?

The competing consumer pattern involves multiple consumers that independently compete to process messages or tasks from a shared queue. This pattern is particularly effective in scenarios where the rate of incoming tasks is variable or high, as it allows multiple consumers to process tasks concurrently.

Key Components

  1. Producer: The component that generates tasks or messages.
  2. Queue: A shared storage medium (often a message broker) that holds tasks until a consumer is ready to process them.
  3. Consumer: The component that processes tasks. Multiple consumers operate concurrently and compete for tasks in the queue.
  4. Message Broker: Middleware (e.g., RabbitMQ, Kafka) that manages the queue and facilitates communication between producers and consumers.

How It Works (Message as Tasks)

  1. Task Generation
    • Producers create tasks and push them into the queue.
    • Tasks can represent anything, such as processing an image, sending an email, or handling a database operation.
  2. Task Storage
    • The queue temporarily stores tasks until they are picked up by consumers.
    • Queues often support features like message persistence and delivery guarantees to enhance reliability.
  3. Task Processing
    • Consumers pull tasks from the queue and process them independently.
    • Each consumer works on one task at a time, and no two consumers process the same task simultaneously.
  4. Task Completion
    • Upon successful processing, the consumer acknowledges the task’s completion to the message broker.
    • The message broker then removes the task from the queue.

Handling Poison Messages

A poison message is a task or message that a consumer repeatedly fails to process. Poison messages can cause delays, block the queue, or crash consumers if not handled appropriately.

Strategies for Handling Poison Messages

  1. Retry Mechanism
    • Allow a fixed number of retries for a task before marking it as failed.
    • Use exponential backoff to reduce the load on the system during retries.
  2. Dead Letter Queue (DLQ)
    • Configure a Dead Letter Queue to store messages that cannot be processed after a predefined number of attempts.
    • Poison messages in the DLQ can be analyzed or reprocessed manually.
  3. Logging and Alerting
    • Log details about the poison message for further debugging.
    • Set up alerts to notify administrators when a poison message is encountered.
  4. Idempotent Consumers
    • Design consumers to handle duplicate processing gracefully. This prevents issues if a message is retried multiple times.

RabbitMQ Example

Producer


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

messages = ["Task 1", "Task 2", "Task 3"]

for message in messages:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # Makes the message persistent
        )
    )
    print(f"[x] Sent {message}")

connection.close()

Dead Letter Exchange


channel.queue_declare(queue='task_queue', durable=True, arguments={
    'x-dead-letter-exchange': 'dlx_exchange'
})
channel.exchange_declare(exchange='dlx_exchange', exchange_type='fanout')
channel.queue_declare(queue='dlq', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dlq')

Consumer Code


import pika
import time

def callback(ch, method, properties, body):
    try:
        print(f"[x] Received {body}")
        # Simulate task processing
        if body == b"Task 2":
            raise ValueError("Cannot process this message")
        time.sleep(1)
        print(f"[x] Processed {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"[!] Failed to process message: {body}, error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for messages. To exit press CTRL+C')

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

Benefits of the Competing Consumer Pattern

  1. Scalability Adding more consumers allows the system to handle higher workloads.
  2. Fault Tolerance If a consumer fails, other consumers can continue processing tasks.
  3. Resource Optimization Consumers can be distributed across multiple machines to balance the load.
  4. Asynchronous Processing Decouples task generation from task processing, enabling asynchronous workflows.

Challenges and Considerations

  1. Message Duplication – In some systems, messages may be delivered more than once. Implement idempotent processing to handle duplicates.
  2. Load Balancing – Ensure tasks are evenly distributed among consumers to avoid bottlenecks.
  3. Queue Overload – High task rates may lead to queue overflow. Use rate limiting or scale your infrastructure to prevent this.
  4. Monitoring and Metrics – Implement monitoring to track queue sizes, processing rates, and consumer health.
  5. Poison Messages – Implement a robust strategy for handling poison messages, such as using a DLQ or retry mechanism.

References

  1. https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html
  2. https://dev.to/willvelida/the-competing-consumers-pattern-4h5n
  3. https://medium.com/event-driven-utopia/competing-consumers-pattern-explained-b338d54eff2b

❌