❌

Reading view

There are new articles available, click to refresh the page.

Learning Notes #51 – Postgres as a Queue using SKIP LOCKED

Yesterday, i came across a blog from inferable.ai https://www.inferable.ai/blog/posts/postgres-skip-locked, which walkthrough about using postgres as a queue. In this blog, i jot down notes on using postgres as a queue for future references.

PostgreSQL is a robust relational database that can be used for more than just storing structured data. With the SKIP LOCKED feature introduced in PostgreSQL 9.5, you can efficiently turn a PostgreSQL table into a job queue for distributed processing.

Why Use PostgreSQL as a Queue?

Using PostgreSQL as a queue can be advantageous because,

  • Familiarity: If you’re already using PostgreSQL, there’s no need for an additional message broker.
  • Durability: PostgreSQL ensures ACID compliance, offering reliability for your job processing.
  • Simplicity: No need to manage another component like RabbitMQ or Kafka

Implementing a Queue with SKIP LOCKED

1. Create a Queue Table

To start, you need a table to store the jobs,


CREATE TABLE job_queue (
    id SERIAL PRIMARY KEY,
    job_data JSONB NOT NULL,
    status TEXT DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

This table has the following columns,

  • id: A unique identifier for each job.
  • job_data: The data or payload for the job.
  • status: Tracks the job’s state (β€˜pending’, β€˜in_progress’, or β€˜completed’).
  • created_at: Timestamp of job creation.

2. Insert Jobs into the Queue

Adding jobs is straightforward,


INSERT INTO job_queue (job_data)
VALUES ('{"task": "send_email", "email": "user@example.com"}');

3. Fetch Jobs for Processing with SKIP LOCKED

Workers will fetch jobs from the queue using SELECT ... FOR UPDATE SKIP LOCKED to avoid contention,

WITH next_job AS (
    SELECT id, job_data
    FROM job_queue
    WHERE status = 'pending'
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE job_queue
SET status = 'in_progress'
FROM next_job
WHERE job_queue.id = next_job.id
RETURNING job_queue.id, job_queue.job_data;

Key Points:

  • FOR UPDATE locks the selected row to prevent other workers from picking it up.
  • SKIP LOCKED ensures locked rows are skipped, enabling concurrent workers to operate without waiting.
  • LIMIT 1 processes one job at a time per worker.

4. Mark Jobs as Completed

Once a worker finishes processing a job, it should update the job’s status,


UPDATE job_queue
SET status = 'completed'
WHERE id = $1; -- Replace $1 with the job ID

5. Delete Old or Processed Jobs

To keep the table clean, you can periodically remove completed jobs,


DELETE FROM job_queue
WHERE status = 'completed' AND created_at < NOW() - INTERVAL '30 days';

Example Worker Implementation

Here’s an example of a worker implemented in Python using psycopg2


import psycopg2
from psycopg2.extras import RealDictCursor

connection = psycopg2.connect("dbname=yourdb user=youruser")

while True:
    with connection.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(
            """
            WITH next_job AS (
                SELECT id, job_data
                FROM job_queue
                WHERE status = 'pending'
                FOR UPDATE SKIP LOCKED
                LIMIT 1
            )
            UPDATE job_queue
            SET status = 'in_progress'
            FROM next_job
            WHERE job_queue.id = next_job.id
            RETURNING job_queue.id, job_queue.job_data;
            """
        )

        job = cursor.fetchone()
        if job:
            print(f"Processing job {job['id']}: {job['job_data']}")

            # Simulate job processing
            cursor.execute("UPDATE job_queue SET status = 'completed' WHERE id = %s", (job['id'],))

        else:
            print("No jobs available. Sleeping...")
            time.sleep(5)

    connection.commit()

Considerations

  1. Transaction Isolation: Use the REPEATABLE READ or SERIALIZABLE isolation level cautiously to avoid unnecessary locks.
  2. Row Locking: SKIP LOCKED only skips rows locked by other transactions, not those locked within the same transaction.
  3. Performance: Regularly archive or delete old jobs to prevent the table from growing indefinitely. Consider indexing the status column to improve query performance.
  4. Fault Tolerance: Ensure that workers handle crashes or timeouts gracefully. Use a timeout mechanism to revert jobs stuck in the β€˜in_progress’ state.
  5. Scaling: Distribute workers across multiple nodes to handle a higher job throughput.
  6. The SKIP LOCKED clause only applies to row-level locks – the required ROW SHARE table-level lock is still taken normally.
  7. Using SKIP LOCKED provides an inconsistent view of the data by design. This is why it’s perfect for queue-like tables where we want to distribute work, but not suitable for general purpose work where consistency is required.

❌