Learning Notes #22 β Claim Check Pattern | Cloud Pattern
Today, i learnt about claim check pattern, which tells how to handle a big message into the queue. Every message broker has a defined message size limit. If our message size exceeds the size, it wont work.
The Claim Check Pattern emerges as a pivotal architectural design to address challenges in managing large payloads in a decoupled and efficient manner. In this blog, i jot down notes on my learning for my future self.
What is the Claim Check Pattern?
The Claim Check Pattern is a messaging pattern used in distributed systems to manage large messages efficiently. Instead of transmitting bulky data directly between services, this pattern extracts and stores the payload in a dedicated storage system (e.g., object storage or a database).
A lightweight reference or βclaim checkβ is then sent through the message queue, which the receiving service can use to retrieve the full data from the storage.
This pattern is inspired by the physical process of checking in luggage at an airport: you hand over your luggage, receive a claim check (a token), and later use it to retrieve your belongings.
How Does the Claim Check Pattern Work?
The process typically involves the following steps
- Data Submission The sender service splits a message into two parts:
- Metadata: A small piece of information that provides context about the data.
- Payload: The main body of data that is too large or sensitive to send through the message queue.
- Storing the Payload
- The sender uploads the payload to a storage service (e.g., AWS S3, Azure Blob Storage, or Google Cloud Storage).
- The storage service returns a unique identifier (e.g., a URL or object key).
- Sending the Claim Check
- The sender service places the metadata and the unique identifier (claim check) onto the message queue.
- Receiving the Claim Check
- The receiver service consumes the message from the queue, extracts the claim check, and retrieves the payload from the storage system.
- Processing
- The receiver processes the payload alongside the metadata as required.
Use Cases
1. Media Processing Pipelines In video transcoding systems, raw video files can be uploaded to storage while metadata (e.g., video format and length) is passed through the message queue.
2. IoT Systems β IoT devices generate large datasets. Using the Claim Check Pattern ensures efficient transmission and processing of these data chunks.
3. Data Processing Workflows β In big data systems, datasets can be stored in object storage while processing metadata flows through orchestration tools like Apache Airflow.
4. Event-Driven Architectures β For systems using event-driven models, large event payloads can be offloaded to storage to avoid overloading the messaging layer.
Example with RabbitMQ
1.Sender Service
import boto3 import pika s3 = boto3.client('s3') bucket_name = 'my-bucket' object_key = 'data/large-file.txt' response = s3.upload_file('large-file.txt', bucket_name, object_key) claim_check = f's3://{bucket_name}/{object_key}' # Connect to RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare a queue channel.queue_declare(queue='claim_check_queue') # Send the claim check message = { 'metadata': 'Some metadata', 'claim_check': claim_check } channel.basic_publish(exchange='', routing_key='claim_check_queue', body=str(message)) connection.close()
2. Consumer
import boto3 import pika s3 = boto3.client('s3') # Connect to RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare a queue channel.queue_declare(queue='claim_check_queue') # Callback function to process messages def callback(ch, method, properties, body): message = eval(body) claim_check = message['claim_check'] bucket_name, object_key = claim_check.replace('s3://', '').split('/', 1) s3.download_file(bucket_name, object_key, 'retrieved-large-file.txt') print("Payload retrieved and processed.") # Consume messages channel.basic_consume(queue='claim_check_queue', on_message_callback=callback, auto_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming()