❌

Normal view

There are new articles available, click to refresh the page.
Before yesterdayMain stream

Learning Notes #51 – Postgres as a Queue using SKIP LOCKED

11 January 2025 at 06:56

Yesterday, i came across a blog from inferable.ai https://www.inferable.ai/blog/posts/postgres-skip-locked, which walkthrough about using postgres as a queue. In this blog, i jot down notes on using postgres as a queue for future references.

PostgreSQL is a robust relational database that can be used for more than just storing structured data. With the SKIP LOCKED feature introduced in PostgreSQL 9.5, you can efficiently turn a PostgreSQL table into a job queue for distributed processing.

Why Use PostgreSQL as a Queue?

Using PostgreSQL as a queue can be advantageous because,

  • Familiarity: If you’re already using PostgreSQL, there’s no need for an additional message broker.
  • Durability: PostgreSQL ensures ACID compliance, offering reliability for your job processing.
  • Simplicity: No need to manage another component like RabbitMQ or Kafka

Implementing a Queue with SKIP LOCKED

1. Create a Queue Table

To start, you need a table to store the jobs,


CREATE TABLE job_queue (
    id SERIAL PRIMARY KEY,
    job_data JSONB NOT NULL,
    status TEXT DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

This table has the following columns,

  • id: A unique identifier for each job.
  • job_data: The data or payload for the job.
  • status: Tracks the job’s state (β€˜pending’, β€˜in_progress’, or β€˜completed’).
  • created_at: Timestamp of job creation.

2. Insert Jobs into the Queue

Adding jobs is straightforward,


INSERT INTO job_queue (job_data)
VALUES ('{"task": "send_email", "email": "user@example.com"}');

3. Fetch Jobs for Processing with SKIP LOCKED

Workers will fetch jobs from the queue using SELECT ... FOR UPDATE SKIP LOCKED to avoid contention,

WITH next_job AS (
    SELECT id, job_data
    FROM job_queue
    WHERE status = 'pending'
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE job_queue
SET status = 'in_progress'
FROM next_job
WHERE job_queue.id = next_job.id
RETURNING job_queue.id, job_queue.job_data;

Key Points:

  • FOR UPDATE locks the selected row to prevent other workers from picking it up.
  • SKIP LOCKED ensures locked rows are skipped, enabling concurrent workers to operate without waiting.
  • LIMIT 1 processes one job at a time per worker.

4. Mark Jobs as Completed

Once a worker finishes processing a job, it should update the job’s status,


UPDATE job_queue
SET status = 'completed'
WHERE id = $1; -- Replace $1 with the job ID

5. Delete Old or Processed Jobs

To keep the table clean, you can periodically remove completed jobs,


DELETE FROM job_queue
WHERE status = 'completed' AND created_at < NOW() - INTERVAL '30 days';

Example Worker Implementation

Here’s an example of a worker implemented in Python using psycopg2


import psycopg2
from psycopg2.extras import RealDictCursor

connection = psycopg2.connect("dbname=yourdb user=youruser")

while True:
    with connection.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(
            """
            WITH next_job AS (
                SELECT id, job_data
                FROM job_queue
                WHERE status = 'pending'
                FOR UPDATE SKIP LOCKED
                LIMIT 1
            )
            UPDATE job_queue
            SET status = 'in_progress'
            FROM next_job
            WHERE job_queue.id = next_job.id
            RETURNING job_queue.id, job_queue.job_data;
            """
        )

        job = cursor.fetchone()
        if job:
            print(f"Processing job {job['id']}: {job['job_data']}")

            # Simulate job processing
            cursor.execute("UPDATE job_queue SET status = 'completed' WHERE id = %s", (job['id'],))

        else:
            print("No jobs available. Sleeping...")
            time.sleep(5)

    connection.commit()

Considerations

  1. Transaction Isolation: Use the REPEATABLE READ or SERIALIZABLE isolation level cautiously to avoid unnecessary locks.
  2. Row Locking: SKIP LOCKED only skips rows locked by other transactions, not those locked within the same transaction.
  3. Performance: Regularly archive or delete old jobs to prevent the table from growing indefinitely. Consider indexing the status column to improve query performance.
  4. Fault Tolerance: Ensure that workers handle crashes or timeouts gracefully. Use a timeout mechanism to revert jobs stuck in the β€˜in_progress’ state.
  5. Scaling: Distribute workers across multiple nodes to handle a higher job throughput.
  6. The SKIP LOCKED clause only applies to row-level locks – the required ROW SHARE table-level lock is still taken normally.
  7. Using SKIP LOCKED provides an inconsistent view of the data by design. This is why it’s perfect for queue-like tables where we want to distribute work, but not suitable for general purpose work where consistency is required.

Task – Annachi Kadai – Python Dictionary

3 August 2024 at 09:54
  1. Create a dictionary named student with the following keys and values. and print the same
    • "name": "Alice"
    • "age": 21
    • "major": "Computer Science"
  2. Using the student dictionary, print the values associated with the keys "name" and "major".
  3. Add a new key-value pair to the student dictionary: "gpa": 3.8. Then update the "age" to 22.
  4. Remove the key "major" from the student dictionary using the del statement. Print the dictionary to confirm the removal.
  5. Check if the key "age" exists in the student dictionary. Print True or False based on the result.
  6. Create a dictionary prices with three items, e.g., "apple": 0.5, "banana": 0.3, "orange": 0.7. Iterate over the dictionary and print each key-value pair.
  7. Use the len() function to find the number of key-value pairs in the prices dictionary. Print the result.
  8. Use the get() method to access the "gpa" in the student dictionary. Try to access a non-existing key, e.g., "graduation_year", with a default value of 2025.
  9. Create another dictionary extra_info with the following keys and values. Also merge extra_info into the student dictionary using the update() method.
    • "graduation_year": 2025
    • "hometown": "Springfield"
  10. Create a dictionary squares where the keys are numbers from 1 to 5 and the values are the squares of the keys. Use dictionary comprehension.
  11. Using the prices dictionary, print the keys and values as separate lists using the keys() and values() methods.
  12. Create a dictionary school with two nested dictionaries. Access and print the age of "student2".
    • "student1": {"name": "Alice", "age": 21}
    • "student2": {"name": "Bob", "age": 22}
  13. Use the setdefault() method to add a new key "advisor" with the value "Dr. Smith" to the student dictionary if it does not exist.
  14. Use the pop() method to remove the "hometown" key from the student dictionary and store its value in a variable. Print the variable.
  15. Use the clear() method to remove all items from the prices dictionary. Print the dictionary to confirm it’s empty.
  16. Make a copy of the student dictionary using the copy() method. Modify the copy by changing "name" to "Charlie". Print both dictionaries to see the differences.
  17. Create two lists: keys = ["name", "age", "major"] and values = ["Eve", 20, "Mathematics"]. Use the zip() function to create a dictionary from these lists.
  18. Use the items() method to iterate over the student dictionary and print each key-value pair.
  19. Given a list of fruits: ["apple", "banana", "apple", "orange", "banana", "banana"], create a dictionary fruit_count that counts the occurrences of each fruit.
  20. Use collections.defaultdict to create a dictionary word_count that counts the number of occurrences of each word in a list: ["hello", "world", "hello", "python"].

The Botanical Garden and Rose Garden: Understanding Sets

3 August 2024 at 09:36

Introduction to the Botanical Garden

We are planning to opening a botanical garden with flowers which will attract people to visit.

Morning: Planting Unique Flowers

One morning, we decides to plant flowers in the garden. They ensure that each flower they plant is unique.


botanical_garden = {"Rose", "Lily", "Sunflower"}

Noon: Adding More Flowers

At noon, they find some more flowers and add them to the garden, making sure they only add flowers that aren’t already there.

Adding Elements to a Set:


# Adding more unique flowers to the enchanted garden
botanical_garden.add("Jasmine")
botanical_garden.add("Hibiscus")
print(botanical_garden)
# output: {'Hibiscus', 'Rose', 'Tulip', 'Sunflower', 'Jasmine'}

Afternoon: Trying to Plant Duplicate Flowers

In the afternoon, they accidentally try to plant another Rose, but the garden’s rule prevents any duplicates from being added.

Adding Duplicate Elements:


# Attempting to add a duplicate flower
botanical_garden.add("Rose")
print(botanical_garden)
# output: {'Lily', 'Sunflower', 'Rose'}

Evening: Removing Unwanted Plants

As evening approaches, they decide to remove some flowers they no longer want in their garden.

Removing Elements from a Set:


# Removing a flower from the enchanted garden
botanical_garden.remove("Lily")
print(botanical_garden)
# output: {'Sunflower', 'Rose'}

Night: Checking Flower Types

Before going to bed, they check if certain flowers are present in their botanical garden.

Checking Membership:


# Checking if certain flowers are in the garden
is_rose_in_garden = "Rose" in botanical_garden
is_tulip_in_garden = "Tulip" in botanical_garden

print(f"Is Rose in the garden? {is_rose_in_garden}")
print(f"Is Tulip in the garden? {is_tulip_in_garden}")

# Output
# Is Rose in the garden? True
# Is Tulip in the garden? False

Midnight: Comparing with Rose Garden

Late at night, they compare their botanical garden with their rose garden to see which flowers they have in common and which are unique to each garden.

Set Operations:

Intersections:


# Neighbor's enchanted garden
rose_garden = {"Rose", "Lavender"}

# Flowers in both gardens (Intersection)
common_flowers = botanical_garden.intersection(rose_garden)
print(f"Common flowers: {common_flowers}")

# Output
# Common flowers: {'Rose'}
# Unique flowers: {'Sunflower'}
# All unique flowers: {'Sunflower', 'Lavender', 'Rose'}

Difference:



# Flowers unique to their garden (Difference)
unique_flowers = botanical_garden.difference(rose_garden)
print(f"Unique flowers: {unique_flowers}")

#output
# Unique flowers: {'Sunflower'}


Union:



# All unique flowers from both gardens (Union)
all_unique_flowers = botanical_garden.union(rose_garden)
print(f"All unique flowers: {all_unique_flowers}")
# Output: All unique flowers: {'Sunflower', 'Lavender', 'Rose'}

ANNACHI KADAI – The Dictionary

3 August 2024 at 09:23

In a vibrant town in Tamil Nadu, there is a popular grocery store called Annachi Kadai. This store is always bustling with fresh deliveries of items.

The store owner, Pandian, uses a special inventory system to track the products. This system functions like a dictionary in Python, where each item is labeled with its name, and the quantity available is recorded.

Morning: Delivering Items to the Store

One bright morning, a new delivery truck arrives at the grocery store, packed with fresh items. Pandian records these new items in his inventory list.

Creating and Updating the Inventory:


# Initial delivery of items to the store
inventory = {
    "apples": 20,
    "bananas": 30,
    "carrots": 15,
    "milk": 10
}

print("Initial Inventory:", inventory)
# Output: Initial Inventory: {'apples': 20, 'bananas': 30, 'carrots': 15, 'milk': 10}

Noon: Additional Deliveries

As the day progresses, more deliveries arrive with additional items that need to be added to the inventory. Pandian updates the system with these new arrivals.

Adding New Items to the Inventory:


# Adding more items from the delivery
inventory["bread"] = 25
inventory["eggs"] = 50

print("Updated Inventory:", inventory)
# Output: Updated Inventory: {'apples': 20, 'bananas': 30, 'carrots': 15, 'milk': 10, 'bread': 25, 'eggs': 50}

Afternoon: Stocking the Shelves

In the afternoon, Pandian notices that some items are running low and restocks them by updating the quantities in the inventory system.

Updating Quantities:


# Updating item quantities after restocking shelves
inventory["apples"] += 10  # 10 more apples added
inventory["milk"] += 5     # 5 more bottles of milk added

print("Inventory after Restocking:", inventory)
# Output: Inventory after Restocking: {'apples': 30, 'bananas': 30, 'carrots': 15, 'milk': 15, 'bread': 25, 'eggs': 50}

Evening: Removing Sold-Out Items

As evening falls, some items are sold out, and Pandian needs to remove them from the inventory to reflect their unavailability.

Removing Items from the Inventory:


# Removing sold-out items
del inventory["carrots"]

print("Inventory after Removal:", inventory)
# Output: Inventory after Removal: {'apples': 30, 'bananas': 30, 'milk': 15, 'bread': 25, 'eggs': 50}

Night: Checking Inventory

Before closing the store, Pandian checks the inventory to ensure that all items are accurately recorded and none are missing.

Checking for Items:

# Checking if specific items are in the inventory
is_bananas_in_stock = "bananas" in inventory
is_oranges_in_stock = "oranges" in inventory

print(f"Are bananas in stock? {is_bananas_in_stock}")
print(f"Are oranges in stock? {is_oranges_in_stock}")
# Output: Are bananas in stock? True
# Output: Are oranges in stock? False


Midnight: Reviewing Inventory

After a busy day, Pandian reviews the entire inventory to ensure all deliveries and sales are accurately recorded.

Iterating Over the Inventory:


# Reviewing the final inventory
for item, quantity in inventory.items():
    print(f"Item: {item}, Quantity: {quantity}")

# Output:
# Item: apples, Quantity: 30
# Item: bananas, Quantity: 30
# Item: milk, Quantity: 15
# Item: bread, Quantity: 25
# Item: eggs, Quantity: 50

❌
❌