Learning Notes #51 β Postgres as a Queue using SKIP LOCKED
Yesterday, i came across a blog from inferable.ai https://www.inferable.ai/blog/posts/postgres-skip-locked, which walkthrough about using postgres as a queue. In this blog, i jot down notes on using postgres as a queue for future references.
PostgreSQL is a robust relational database that can be used for more than just storing structured data. With the SKIP LOCKED
feature introduced in PostgreSQL 9.5, you can efficiently turn a PostgreSQL table into a job queue for distributed processing.
Why Use PostgreSQL as a Queue?
Using PostgreSQL as a queue can be advantageous because,
- Familiarity: If youβre already using PostgreSQL, thereβs no need for an additional message broker.
- Durability: PostgreSQL ensures ACID compliance, offering reliability for your job processing.
- Simplicity: No need to manage another component like RabbitMQ or Kafka
Implementing a Queue with SKIP LOCKED
1. Create a Queue Table
To start, you need a table to store the jobs,
CREATE TABLE job_queue ( id SERIAL PRIMARY KEY, job_data JSONB NOT NULL, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
This table has the following columns,
id
: A unique identifier for each job.job_data
: The data or payload for the job.status
: Tracks the jobβs state (βpendingβ, βin_progressβ, or βcompletedβ).created_at
: Timestamp of job creation.
2. Insert Jobs into the Queue
Adding jobs is straightforward,
INSERT INTO job_queue (job_data) VALUES ('{"task": "send_email", "email": "user@example.com"}');
3. Fetch Jobs for Processing with SKIP LOCKED
Workers will fetch jobs from the queue using SELECT ... FOR UPDATE SKIP LOCKED
to avoid contention,
WITH next_job AS ( SELECT id, job_data FROM job_queue WHERE status = 'pending' FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE job_queue SET status = 'in_progress' FROM next_job WHERE job_queue.id = next_job.id RETURNING job_queue.id, job_queue.job_data;
Key Points:
FOR UPDATE
locks the selected row to prevent other workers from picking it up.SKIP LOCKED
ensures locked rows are skipped, enabling concurrent workers to operate without waiting.LIMIT 1
processes one job at a time per worker.
4. Mark Jobs as Completed
Once a worker finishes processing a job, it should update the jobβs status,
UPDATE job_queue SET status = 'completed' WHERE id = $1; -- Replace $1 with the job ID
5. Delete Old or Processed Jobs
To keep the table clean, you can periodically remove completed jobs,
DELETE FROM job_queue WHERE status = 'completed' AND created_at < NOW() - INTERVAL '30 days';
Example Worker Implementation
Hereβs an example of a worker implemented in Python using psycopg2
import psycopg2 from psycopg2.extras import RealDictCursor connection = psycopg2.connect("dbname=yourdb user=youruser") while True: with connection.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute( """ WITH next_job AS ( SELECT id, job_data FROM job_queue WHERE status = 'pending' FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE job_queue SET status = 'in_progress' FROM next_job WHERE job_queue.id = next_job.id RETURNING job_queue.id, job_queue.job_data; """ ) job = cursor.fetchone() if job: print(f"Processing job {job['id']}: {job['job_data']}") # Simulate job processing cursor.execute("UPDATE job_queue SET status = 'completed' WHERE id = %s", (job['id'],)) else: print("No jobs available. Sleeping...") time.sleep(5) connection.commit()
Considerations
- Transaction Isolation: Use the
REPEATABLE READ
orSERIALIZABLE
isolation level cautiously to avoid unnecessary locks. - Row Locking:
SKIP LOCKED
only skips rows locked by other transactions, not those locked within the same transaction. - Performance: Regularly archive or delete old jobs to prevent the table from growing indefinitely. Consider indexing the
status
column to improve query performance. - Fault Tolerance: Ensure that workers handle crashes or timeouts gracefully. Use a timeout mechanism to revert jobs stuck in the βin_progressβ state.
- Scaling: Distribute workers across multiple nodes to handle a higher job throughput.
- The
SKIP LOCKED
clause only applies to row-level locks β the requiredROW SHARE
table-level lock is still taken normally. - Using
SKIP LOCKED
provides an inconsistent view of the data by design. This is why itβs perfect for queue-like tables where we want to distribute work, but not suitable for general purpose work where consistency is required.