❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #22 – Claim Check Pattern | Cloud Pattern

31 December 2024 at 17:03

Today, i learnt about claim check pattern, which tells how to handle a big message into the queue. Every message broker has a defined message size limit. If our message size exceeds the size, it wont work.

The Claim Check Pattern emerges as a pivotal architectural design to address challenges in managing large payloads in a decoupled and efficient manner. In this blog, i jot down notes on my learning for my future self.

What is the Claim Check Pattern?

The Claim Check Pattern is a messaging pattern used in distributed systems to manage large messages efficiently. Instead of transmitting bulky data directly between services, this pattern extracts and stores the payload in a dedicated storage system (e.g., object storage or a database).

A lightweight reference or β€œclaim check” is then sent through the message queue, which the receiving service can use to retrieve the full data from the storage.

This pattern is inspired by the physical process of checking in luggage at an airport: you hand over your luggage, receive a claim check (a token), and later use it to retrieve your belongings.

How Does the Claim Check Pattern Work?

The process typically involves the following steps

  1. Data Submission The sender service splits a message into two parts:
    • Metadata: A small piece of information that provides context about the data.
    • Payload: The main body of data that is too large or sensitive to send through the message queue.
  2. Storing the Payload
    • The sender uploads the payload to a storage service (e.g., AWS S3, Azure Blob Storage, or Google Cloud Storage).
    • The storage service returns a unique identifier (e.g., a URL or object key).
  3. Sending the Claim Check
    • The sender service places the metadata and the unique identifier (claim check) onto the message queue.
  4. Receiving the Claim Check
    • The receiver service consumes the message from the queue, extracts the claim check, and retrieves the payload from the storage system.
  5. Processing
    • The receiver processes the payload alongside the metadata as required.

Use Cases

1. Media Processing Pipelines In video transcoding systems, raw video files can be uploaded to storage while metadata (e.g., video format and length) is passed through the message queue.

2. IoT Systems – IoT devices generate large datasets. Using the Claim Check Pattern ensures efficient transmission and processing of these data chunks.

3. Data Processing Workflows – In big data systems, datasets can be stored in object storage while processing metadata flows through orchestration tools like Apache Airflow.

4. Event-Driven Architectures – For systems using event-driven models, large event payloads can be offloaded to storage to avoid overloading the messaging layer.

Example with RabbitMQ

1.Sender Service


import boto3
import pika

s3 = boto3.client('s3')
bucket_name = 'my-bucket'
object_key = 'data/large-file.txt'

response = s3.upload_file('large-file.txt', bucket_name, object_key)
claim_check = f's3://{bucket_name}/{object_key}'

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a queue
channel.queue_declare(queue='claim_check_queue')

# Send the claim check
message = {
    'metadata': 'Some metadata',
    'claim_check': claim_check
}
channel.basic_publish(exchange='', routing_key='claim_check_queue', body=str(message))

connection.close()

2. Consumer


import boto3
import pika

s3 = boto3.client('s3')

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a queue
channel.queue_declare(queue='claim_check_queue')

# Callback function to process messages
def callback(ch, method, properties, body):
    message = eval(body)
    claim_check = message['claim_check']

    bucket_name, object_key = claim_check.replace('s3://', '').split('/', 1)
    s3.download_file(bucket_name, object_key, 'retrieved-large-file.txt')
    print("Payload retrieved and processed.")

# Consume messages
channel.basic_consume(queue='claim_check_queue', on_message_callback=callback, auto_ack=True)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

References

  1. https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check
  2. https://medium.com/@dmosyan/claim-check-design-pattern-603dc1f3796d

Ingesting large files to postgres through S3

By: ashish
13 April 2022 at 20:16

One of the tasks I recently came accross my job was ingest large files but with the following

  1. Do some processing ( like generate hash for each row )
  2. Insert it into S3 for audit purposes
  3. Insert into postgres

Note:

Keep in mind your postgres database needs to support this and a s3 bucket policy needs to exist in order to allow the data to be copied over.

The setup I am using is a RDS database with S3 in the same region and proper policies and IAM roles already created.

Read more on that here – AWS documentation

For the purpose of this post I will be using dummy data from – eforexcel(1 million records)

The most straight forward way to do this would be to just do a df.to_sql like this

df = pd.read_csv("records.csv")
df.to_sql(
    name="test_table",
    con=connection_detail,
    schema="schema",
    if_exists="replace",
)

Something like this would take more than an hour! Lets do it in less than 5 minutes.

Now ofcourse there are several ways to make this faster – using copy expert, psycogpg driver etc(maybe a sepearate blog post on these), but that’s not the use case I have been tasked with. Since we need to upload the file s3 in the end for audit purposes I will ingest the data from S3 to DB.

Generate table metadata

Before we can assign an s3 operator to ingest the data we need to create the table into which this data will be inserted. We have two ways that I can think of

  1. Each column in the file will be created in the DB with a highest threshold value like varchar(2000)
  2. Each column is created with the data length as max length in each row

I will be going with option 2 here.

This entire process took around 210 seconds instead of more than an hour like the last run.

Let’s go over the code one by one

Read the csv

  1. We can pass the data directly to pandas or stream it into buffered memory something like this
with open("records.csv") as f:
    csv_rdr = csv.reader(f, delimiter=",")
    header = next(csv_rdr)
    with gzip.GzipFile(fileobj=mem_file, mode="wb", compresslevel=6) as gz:
        buff = io.StringIO()
        writer = csv.writer(buff)
        writer.writerows([header])
        for row in csv_rdr:
            writer.writerows([row])
        gz.write(buff.getvalue().encode("utf-8", "replace"))
    mem_file.seek(0)
    s3.put_object(Bucket="mybucket", Key="folder/file.gz", Body=mem_file)

2. Since the file is less than 50 MB i’ll go ahead and load it directly.

Create the table

Get the max lengths of each column and use that to generate the table. We use pandas to_sql() function for this and pass the dtypes.

Copy data from s3 gzipped file to postgres

Finally we use –

aws_s3.table_import_from_s3

to copy over the file to the postgres table.

Generating Signature Version 4 URL’s using boto3

By: ashish
12 April 2022 at 21:16

If your application allows your users to download files directly from s3, you are bound to get this error sometime in the future whenever you scale to other regions – The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.

The issue has been raised on various forums and github eg – https://github.com/rstudio/pins/issues/233 , https://stackoverflow.com/questions/57591989/amazon-web-services-s3-the-authorization-mechanism-you-have-provided-is-not-s. None of these solutions worked for me.

Here is what did

Replace the Region and Airflow bucket and you’re good to go.

code for s3 bucket creation and public access

7 January 2024 at 12:55
provider "aws" {
region = "ap-south-1"
}

resource "aws_s3_bucket" "example" {
bucket = "example-my"
}

resource "aws_s3_bucket_ownership_controls" "ownership" {
bucket = aws_s3_bucket.example.id
rule {
object_ownership = "BucketOwnerPreferred"
}
}

resource "aws_s3_bucket_public_access_block" "pb" {
bucket = aws_s3_bucket.example.id

block_public_acls = false
block_public_policy = false
ignore_public_acls = false
restrict_public_buckets = false
}

resource "aws_s3_bucket_acl" "acl" {
depends_on = [aws_s3_bucket_ownership_controls.ownership]
bucket = aws_s3_bucket.example.id
acl = "private"
}

❌
❌