Implementing a Postgres job queue in less than an hour
I recently listened to the rust in production podcast with the founder of tembo.io
. The guest and host talked about their PGMQ rust project, a job queue implemented in postgres. They went into some details of how they build this using the amazing pgrx
rust lib for writing Rust postgres extension.
I wouldn’t really recommend using Postgres as a message broker nor a task queue for obvious reasons (will get to later) but I figured I might learn something by trying to implement a very simple job queue myself. Took 2 hours on a Saturday afternoon and came up with this !
Schema First
The job queue schema is straightforward:
-- Job status
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'done', 'failed');
-- Simple Job table
CREATE TABLE jobs (
id SERIAL PRIMARY KEY,
status job_status NOT NULL DEFAULT 'pending', -- Tracks the job lifecycle
payload JSONB, -- Chose payload as `jsonb` but could have gone with `bytea` instead.
visible_at TIMESTAMP DEFAULT now(), -- SQS visibility timeout (will come back to this later)
retry_count INT DEFAULT 0, -- Tracks retry attempts
created_at TIMESTAMP DEFAULT now(),
updated_at TIMESTAMP DEFAULT now(),
);
Project Structure
The first decision I made was to use Cornucopia, a Rust project I’ve been wanting to try for a while. If you’re familiar with sqlc in Go, Cornucopia follows a similar philosophy: you write plain SQL queries, and it generates type-safe Rust code for you. No ORM nonsense, no weird prisma schemas. For all its downside, SQL is an amazing interface/language to express data queries in. At some point, you have to think that the db you are using is a SQL engine and no matter the layers of abstractions and libs you put in front of it has to transpile that non sense to SQL.
Here’s how cornucopia
works:
- You write your SQL queries in
.sql
files with special annotations - Cornucopia generates a Rust module with strongly-typed functions and structs
- Works seamlessly with
deadpool-postgres
andtokio-postgres
as database drivers
Using cornucopia
is basically the best of both worlds, you write plain SQL queries and you get type-checked rust structs. No matter how complex the query gets, it generates a single function that you can call by passing in a client like object and params.
Here’s what our query file looks like:
--! enqueue
INSERT INTO jobs ( payload)
VALUES (:payload);
--! dequeue
WITH next_job AS (
SELECT id
FROM jobs
WHERE
retry_count < :max_retries
AND (
status = 'pending'
OR (status = 'in_progress' AND visible_at <= now())
)
ORDER BY created_at
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'in_progress',
updated_at = now(),
visible_at = now() + interval '60 seconds',
retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;
--! ack
UPDATE jobs
SET status = 'done',
updated_at = now()
WHERE id = :job_id;
--! ack_batch
UPDATE jobs
SET status = 'done',
updated_at = now()
WHERE id = ANY(:job_id);
--! nack
UPDATE jobs
SET status = 'failed',
updated_at = now()
WHERE id = :job_id;
Enqueue: The Simple Part
Enqueueing jobs is straightforward - just insert a new row with the payload.
The generated Cornucopia code provides a type-safe enqueue()
function that binds our payload and executes the INSERT.
async fn enqueue_all<C>(client: C) -> anyhow::Result<()>
where
C: GenericClient,
{
loop {
let data = r#"
{
"version": 1,
"type": "send_message",
"payload": {
"message": "hi"
}
}"#;
let data = serde_json::to_value(data)?;
enqueue().bind(&client, &data).await?;
sleep(Duration::from_millis(10)).await;
}
}
Done !
Dequeue: The Tricky Part
Dequeuing is where things get interesting. In a distributed system with multiple workers, we need to handle race conditions - we can’t have two workers processing the same job. This is where PostgreSQL’s row-level locking shines.
Let’s look at the dequeue query:
--! dequeue
WITH next_job AS (
SELECT id
FROM jobs
WHERE
retry_count < :max_retries
AND (
status = 'pending'
OR (status = 'in_progress' AND visible_at <= now())
)
ORDER BY created_at
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'in_progress',
updated_at = now(),
visible_at = now() + interval '60 seconds',
retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;
Let’s break down the query:
1. The CTE (Common Table Expression)
The WITH next_job AS (...)
creates a temporary result set that we’ll use in our UPDATE. This is crucial because it lets us: first, find and lock the jobs we want. Then, update only those specific jobs
2. The WHERE Clause Logic
WHERE
retry_count < :max_retries
AND (
status = 'pending'
OR (status = 'in_progress' AND visible_at <= now())
)
This finds jobs that are eligible for processing:
- Haven’t exceeded their retry limit
- Are either:
- Brand new (
status = 'pending'
) - OR stuck in progress past their visibility timeout.
- Brand new (
This is pretty straightforward sql until now.
3. The Magic of FOR UPDATE SKIP LOCKED
This is the real PostgreSQL magic. Let’s understand each part:
-
FOR UPDATE
: This tells PostgreSQL to acquire a row-level lock on every row returned by the SELECT. These locks are held until the transaction commits or rolls back. -
SKIP LOCKED
: This is the secret sauce! It tells PostgreSQL: “If you encounter a row that’s already locked by another transaction, don’t wait for it - just skip it and move on to the next one.”
Let’s visualize what happens when multiple workers try to dequeue simultaneously with a batch_size of 1:
Time 0ms:
Jobs Table: [Job1(pending), Job2(pending), Job3(pending)]
Time 1ms:
Worker A: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
Worker A: Locks Job1, returns Job1
Time 2ms:
Worker B: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
Worker B: Tries Job1 (locked!), skips it, locks Job2, returns Job2
Time 3ms:
Worker C: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
Worker C: Tries Job1 (locked!), tries Job2 (locked!), locks Job3, returns Job3
Each worker automatically gets a different job without any explicit coordination!
4. The UPDATE part
Once we have our locked rows, the UPDATE part of the query:
UPDATE jobs
SET status = 'in_progress',
updated_at = now(),
visible_at = now() + interval '60 seconds',
retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;
This atomically:
- Changes the status to ‘in_progress’
- Sets a visibility timeout 60 seconds in the future (more about this in visibility section)
- Increments the retry count
- Returns the full job to the worker
Visibility Timeouts
The visible_at
field is the secret sauce that makes our job queue resilient to worker crashes.
Imagine this scenario:
- Worker A dequeues a job and starts processing
- Worker A crashes for some weird reason
- The job is stuck in
in_progress
forever - No other worker can pick it up.
The solution when a worker dequeues a job, it doesn’t just change the status to ‘in_progress’. It also sets a future timestamp in visible_at
:
visible_at = now() + interval '60 seconds', -- This should be a param
retry_count = retry_count + 1
This basically means:
“For the next 60 seconds I’ll try to process this job. If I don’t finish by then, someone else can have this job”.
A more complete implementation would probably need to have some timeout extension mechanism for the worker to extend the visibility region to a later date. But that’s a problem for future me.
Walking Through Different Scenarios
Scenario 1: Happy Path
Time 0:00 - Job created: status='pending', visible_at=now()
Time 0:01 - Worker dequeues: status='in_progress', visible_at=now()+60s
Time 0:30 - Worker completes: status='done'
Scenario 2: Worker Crash
Time 0:00 - Job created: status='pending', visible_at=now()
Time 0:01 - Worker A dequeues: status='in_progress', visible_at='0:61'
Time 0:05 - Worker A crashes (job still 'in_progress')
Time 0:30 - Worker B tries dequeue: Job not visible (visible_at > now())
Time 1:02 - Worker B dequeues: Job visible again! retry_count incremented
Notice what we DON’T have:
- No background process checking for stuck jobs
- No heartbeat mechanism
- No complex state management
Everything is handled declaratively in the SQL query. The database becomes the single source of truth, and time marches forward automatically. Me like this !
Acking and Nacking: Job Completion
Once a worker has processed a job, it needs to either acknowledge success (ACK) or failure (NACK):
// simulate work
match work().await {
Ok(_) => {
ack_batch().bind(&client, &job_ids).await.unwrap();
}
Err(_) => {
println!("[WORKER-{}] Failed. NACK batch", worker_idx);
for id in job_ids {
nack().bind(&client, &id).await.unwrap();
}
}
}
The ACK operation marks jobs as ‘done’:
--! ack_batch
UPDATE jobs
SET status = 'done',
updated_at = now()
WHERE id = ANY(:job_id);
The NACK operation marks jobs as ‘failed’:
--! nack
UPDATE jobs
SET status = 'failed',
updated_at = now()
WHERE id = :job_id;
Notice that I added a batch ACK variant which is more efficient when processing multiple jobs at once.
Now there is a very subtle issue with our ack implementation. It didn’t occur to me while implementing the system, let’s look at this scenario:
Slow Worker
Time 0:00 - Job created: status='pending'
Time 0:01 - Worker dequeues: visible_at='0:61'
Time 1:30 - Worker still processing (took 90 seconds)
Time 1:31 - Worker tries to ACK... but another worker might have grabbed it! Job is already ACKED
We have built a more that once delivery semantic system. If a job might take longer than the visibility timeout, you need to either:
- Increase the timeout globally
- Implement visibility timeout extension
- Make your ACK operation check that it still owns the job using something like a unique worker_id field. (Future me do this!)
Retry Mechanism
The retry mechanism is very simple:
- Retry Count: Each time a job is dequeued, its
retry_count
is incremented - Max Retries: Jobs are only eligible for dequeue if
retry_count < max_retries
- Visibility Timeout: When a job is dequeued, it gets a 60-second visibility timeout
Here’s how it works in practice:
- Job is created with
status='pending'
,retry_count=0
- Worker dequeues it:
status='in_progress'
,retry_count=1
,visible_at=now()+60s
- If worker crashes, after 60 seconds the job becomes eligible for dequeue again
- Another worker picks it up:
retry_count=2
- This continues until either the job succeeds or hits
max_retries
In a production setting, you would also have some kind of DLQ to pick up jobs that have failed more than max_retries
times and do something with them…
Why Not Use PostgreSQL as a Job Queue?
Honestly, I learned a ton of PostgreSQL features while implementing this, neat tricks that will be useful in other scenarios. But there are good reasons to use dedicated job queue systems in production:
- Performance: Polling-based systems don’t scale as well as push-based message brokers. If you notice, workers need to poll, sending requests to db and increasing traffic. I thought about adding a notification system using LISTEN/NOTIFY postgres features but then you have a dual state that you need to manage??
- Features: Dedicated systems offer features like dead letter queues, message routing, priorities, multiple queues/topics, delays…
- Overhead: Using your primary database for queuing can impact OLTP performance. Running a simple benchmark used 25% CPU
- Observability: Purpose-built queue systems have better observability tools, you can have UIs, dedicated metrics etc
That said, for small to medium applications where you already have PostgreSQL and want to avoid additional infrastructure, this approach can be perfectly ok. The implementation is simple, reliable, and PostgreSQL’s ACID guarantees are rock solid!
Building a job queue in PostgreSQL was a fun ! Again, I wouldn’t recommend this for high-scale production use, it’s a great learning exercise and could work well for smaller applications. The entire implementation is under 100 lines of Rust code plus some SQL - not bad for a distributed job queue!
Remember, the best way to understand a technology is to build a simpler version yourself! Go have fun…