❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Ingesting large files to postgres through S3

By: ashish
13 April 2022 at 20:16

One of the tasks I recently came accross my job was ingest large files but with the following

  1. Do some processing ( like generate hash for each row )
  2. Insert it into S3 for audit purposes
  3. Insert into postgres

Note:

Keep in mind your postgres database needs to support this and a s3 bucket policy needs to exist in order to allow the data to be copied over.

The setup I am using is a RDS database with S3 in the same region and proper policies and IAM roles already created.

Read more on that here – AWS documentation

For the purpose of this post I will be using dummy data from – eforexcel(1 million records)

The most straight forward way to do this would be to just do a df.to_sql like this

df = pd.read_csv("records.csv")
df.to_sql(
    name="test_table",
    con=connection_detail,
    schema="schema",
    if_exists="replace",
)

Something like this would take more than an hour! Lets do it in less than 5 minutes.

Now ofcourse there are several ways to make this faster – using copy expert, psycogpg driver etc(maybe a sepearate blog post on these), but that’s not the use case I have been tasked with. Since we need to upload the file s3 in the end for audit purposes I will ingest the data from S3 to DB.

Generate table metadata

Before we can assign an s3 operator to ingest the data we need to create the table into which this data will be inserted. We have two ways that I can think of

  1. Each column in the file will be created in the DB with a highest threshold value like varchar(2000)
  2. Each column is created with the data length as max length in each row

I will be going with option 2 here.

This entire process took around 210 seconds instead of more than an hour like the last run.

Let’s go over the code one by one

Read the csv

  1. We can pass the data directly to pandas or stream it into buffered memory something like this
with open("records.csv") as f:
    csv_rdr = csv.reader(f, delimiter=",")
    header = next(csv_rdr)
    with gzip.GzipFile(fileobj=mem_file, mode="wb", compresslevel=6) as gz:
        buff = io.StringIO()
        writer = csv.writer(buff)
        writer.writerows([header])
        for row in csv_rdr:
            writer.writerows([row])
        gz.write(buff.getvalue().encode("utf-8", "replace"))
    mem_file.seek(0)
    s3.put_object(Bucket="mybucket", Key="folder/file.gz", Body=mem_file)

2. Since the file is less than 50 MB i’ll go ahead and load it directly.

Create the table

Get the max lengths of each column and use that to generate the table. We use pandas to_sql() function for this and pass the dtypes.

Copy data from s3 gzipped file to postgres

Finally we use –

aws_s3.table_import_from_s3

to copy over the file to the postgres table.

Generating Signature Version 4 URL’s using boto3

By: ashish
12 April 2022 at 21:16

If your application allows your users to download files directly from s3, you are bound to get this error sometime in the future whenever you scale to other regions – The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.

The issue has been raised on various forums and github eg – https://github.com/rstudio/pins/issues/233 , https://stackoverflow.com/questions/57591989/amazon-web-services-s3-the-authorization-mechanism-you-have-provided-is-not-s. None of these solutions worked for me.

Here is what did

Replace the Region and Airflow bucket and you’re good to go.

code for s3 bucket creation and public access

7 January 2024 at 12:55
provider "aws" {
region = "ap-south-1"
}

resource "aws_s3_bucket" "example" {
bucket = "example-my"
}

resource "aws_s3_bucket_ownership_controls" "ownership" {
bucket = aws_s3_bucket.example.id
rule {
object_ownership = "BucketOwnerPreferred"
}
}

resource "aws_s3_bucket_public_access_block" "pb" {
bucket = aws_s3_bucket.example.id

block_public_acls = false
block_public_policy = false
ignore_public_acls = false
restrict_public_buckets = false
}

resource "aws_s3_bucket_acl" "acl" {
depends_on = [aws_s3_bucket_ownership_controls.ownership]
bucket = aws_s3_bucket.example.id
acl = "private"
}

❌
❌