Implementing a Postgres job queue in less than an hour

I recently listened to the rust in production podcast with the founder of tembo.io. The guest and host talked about their PGMQ rust project, a job queue implemented in postgres. They went into some details of how they build this using the amazing pgrx rust lib for writing Rust postgres extension. I wouldn’t really recommend using Postgres as a message broker nor a task queue for obvious reasons (will get to later) but I figured I might learn something by trying to implement a very simple job queue myself. Took 2 hours on a Saturday afternoon and came up with this !

Schema First

The job queue schema is straightforward:

-- Job status
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'done', 'failed');

-- Simple Job table
CREATE TABLE jobs (
    id SERIAL PRIMARY KEY,
    status job_status NOT NULL DEFAULT 'pending', -- Tracks the job lifecycle
    payload JSONB, -- Chose payload as `jsonb` but could have gone with `bytea` instead.
    visible_at TIMESTAMP DEFAULT now(), -- SQS visibility timeout (will come back to this later)
    retry_count INT DEFAULT 0, -- Tracks retry attempts
    created_at TIMESTAMP DEFAULT now(),
    updated_at TIMESTAMP DEFAULT now(),
);

Project Structure

The first decision I made was to use Cornucopia, a Rust project I’ve been wanting to try for a while. If you’re familiar with sqlc in Go, Cornucopia follows a similar philosophy: you write plain SQL queries, and it generates type-safe Rust code for you. No ORM nonsense, no weird prisma schemas. For all its downside, SQL is an amazing interface/language to express data queries in. At some point, you have to think that the db you are using is a SQL engine and no matter the layers of abstractions and libs you put in front of it has to transpile that non sense to SQL.

Here’s how cornucopia works:

  1. You write your SQL queries in .sql files with special annotations
  2. Cornucopia generates a Rust module with strongly-typed functions and structs
  3. Works seamlessly with deadpool-postgres and tokio-postgres as database drivers

Using cornucopia is basically the best of both worlds, you write plain SQL queries and you get type-checked rust structs. No matter how complex the query gets, it generates a single function that you can call by passing in a client like object and params.

Here’s what our query file looks like:

--! enqueue
INSERT INTO jobs ( payload)
VALUES (:payload);

--! dequeue
WITH next_job AS (
    SELECT id
    FROM jobs
    WHERE
        retry_count < :max_retries
        AND (
            status = 'pending'
            OR (status = 'in_progress' AND visible_at <= now())
        )
    ORDER BY created_at
    LIMIT :batch_size
    FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'in_progress',
    updated_at = now(),
    visible_at = now() + interval '60 seconds',
    retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;

--! ack
UPDATE jobs
SET status = 'done',
    updated_at = now()
WHERE id = :job_id;

--! ack_batch
UPDATE jobs
SET status = 'done',
    updated_at = now()
    WHERE id = ANY(:job_id);


--! nack
UPDATE jobs
SET status = 'failed',
    updated_at = now()
WHERE id = :job_id;

Enqueue: The Simple Part

Enqueueing jobs is straightforward - just insert a new row with the payload.

The generated Cornucopia code provides a type-safe enqueue() function that binds our payload and executes the INSERT.

async fn enqueue_all<C>(client: C) -> anyhow::Result<()>
where
    C: GenericClient,
{
    loop {
        let data = r#"
               {
                   "version": 1,
                   "type": "send_message",
                   "payload": {
                        "message": "hi"
                   }
               }"#;

        let data = serde_json::to_value(data)?;
        enqueue().bind(&client, &data).await?;

        sleep(Duration::from_millis(10)).await;
    }
}

Done !

Dequeue: The Tricky Part

Dequeuing is where things get interesting. In a distributed system with multiple workers, we need to handle race conditions - we can’t have two workers processing the same job. This is where PostgreSQL’s row-level locking shines.

Let’s look at the dequeue query:

--! dequeue
WITH next_job AS (
    SELECT id
    FROM jobs
    WHERE
        retry_count < :max_retries
        AND (
            status = 'pending'
            OR (status = 'in_progress' AND visible_at <= now())
        )
    ORDER BY created_at
    LIMIT :batch_size
    FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'in_progress',
    updated_at = now(),
    visible_at = now() + interval '60 seconds',
    retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;

Let’s break down the query:

1. The CTE (Common Table Expression)

The WITH next_job AS (...) creates a temporary result set that we’ll use in our UPDATE. This is crucial because it lets us: first, find and lock the jobs we want. Then, update only those specific jobs

2. The WHERE Clause Logic

WHERE
    retry_count < :max_retries
    AND (
        status = 'pending'
        OR (status = 'in_progress' AND visible_at <= now())
    )

This finds jobs that are eligible for processing:

  • Haven’t exceeded their retry limit
  • Are either:
    • Brand new (status = 'pending')
    • OR stuck in progress past their visibility timeout.

This is pretty straightforward sql until now.

3. The Magic of FOR UPDATE SKIP LOCKED

This is the real PostgreSQL magic. Let’s understand each part:

  • FOR UPDATE: This tells PostgreSQL to acquire a row-level lock on every row returned by the SELECT. These locks are held until the transaction commits or rolls back.

  • SKIP LOCKED: This is the secret sauce! It tells PostgreSQL: “If you encounter a row that’s already locked by another transaction, don’t wait for it - just skip it and move on to the next one.”

Let’s visualize what happens when multiple workers try to dequeue simultaneously with a batch_size of 1:

Time 0ms:
  Jobs Table: [Job1(pending), Job2(pending), Job3(pending)]

Time 1ms:
  Worker A: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
  Worker A: Locks Job1, returns Job1

Time 2ms:
  Worker B: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
  Worker B: Tries Job1 (locked!), skips it, locks Job2, returns Job2

Time 3ms:
  Worker C: BEGIN; SELECT ... FOR UPDATE SKIP LOCKED;
  Worker C: Tries Job1 (locked!), tries Job2 (locked!), locks Job3, returns Job3

Each worker automatically gets a different job without any explicit coordination!

4. The UPDATE part

Once we have our locked rows, the UPDATE part of the query:

UPDATE jobs
SET status = 'in_progress',
    updated_at = now(),
    visible_at = now() + interval '60 seconds',
    retry_count = retry_count + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.*;

This atomically:

  1. Changes the status to ‘in_progress’
  2. Sets a visibility timeout 60 seconds in the future (more about this in visibility section)
  3. Increments the retry count
  4. Returns the full job to the worker

Visibility Timeouts

The visible_at field is the secret sauce that makes our job queue resilient to worker crashes. Imagine this scenario:

  1. Worker A dequeues a job and starts processing
  2. Worker A crashes for some weird reason
  3. The job is stuck in in_progress forever
  4. No other worker can pick it up.

The solution when a worker dequeues a job, it doesn’t just change the status to ‘in_progress’. It also sets a future timestamp in visible_at:

visible_at = now() + interval '60 seconds', -- This should be a param
retry_count = retry_count + 1

This basically means:

“For the next 60 seconds I’ll try to process this job. If I don’t finish by then, someone else can have this job”.

A more complete implementation would probably need to have some timeout extension mechanism for the worker to extend the visibility region to a later date. But that’s a problem for future me.

Walking Through Different Scenarios

Scenario 1: Happy Path

Time 0:00 - Job created: status='pending', visible_at=now()
Time 0:01 - Worker dequeues: status='in_progress', visible_at=now()+60s
Time 0:30 - Worker completes: status='done'

Scenario 2: Worker Crash

Time 0:00 - Job created: status='pending', visible_at=now()
Time 0:01 - Worker A dequeues: status='in_progress', visible_at='0:61'
Time 0:05 - Worker A crashes (job still 'in_progress')
Time 0:30 - Worker B tries dequeue: Job not visible (visible_at > now())
Time 1:02 - Worker B dequeues: Job visible again! retry_count incremented

Notice what we DON’T have:

  • No background process checking for stuck jobs
  • No heartbeat mechanism
  • No complex state management

Everything is handled declaratively in the SQL query. The database becomes the single source of truth, and time marches forward automatically. Me like this !

Acking and Nacking: Job Completion

Once a worker has processed a job, it needs to either acknowledge success (ACK) or failure (NACK):

// simulate work
match work().await {
    Ok(_) => {
        ack_batch().bind(&client, &job_ids).await.unwrap();
    }
    Err(_) => {
        println!("[WORKER-{}] Failed. NACK batch", worker_idx);
        for id in job_ids {
            nack().bind(&client, &id).await.unwrap();
        }
    }
}

The ACK operation marks jobs as ‘done’:

--! ack_batch
UPDATE jobs
SET status = 'done',
    updated_at = now()
    WHERE id = ANY(:job_id);

The NACK operation marks jobs as ‘failed’:

--! nack
UPDATE jobs
SET status = 'failed',
    updated_at = now()
WHERE id = :job_id;

Notice that I added a batch ACK variant which is more efficient when processing multiple jobs at once.

Now there is a very subtle issue with our ack implementation. It didn’t occur to me while implementing the system, let’s look at this scenario:

Slow Worker

Time 0:00 - Job created: status='pending'
Time 0:01 - Worker dequeues: visible_at='0:61'
Time 1:30 - Worker still processing (took 90 seconds)
Time 1:31 - Worker tries to ACK... but another worker might have grabbed it! Job is already ACKED

We have built a more that once delivery semantic system. If a job might take longer than the visibility timeout, you need to either:

  1. Increase the timeout globally
  2. Implement visibility timeout extension
  3. Make your ACK operation check that it still owns the job using something like a unique worker_id field. (Future me do this!)

Retry Mechanism

The retry mechanism is very simple:

  1. Retry Count: Each time a job is dequeued, its retry_count is incremented
  2. Max Retries: Jobs are only eligible for dequeue if retry_count < max_retries
  3. Visibility Timeout: When a job is dequeued, it gets a 60-second visibility timeout

Here’s how it works in practice:

  • Job is created with status='pending', retry_count=0
  • Worker dequeues it: status='in_progress', retry_count=1, visible_at=now()+60s
  • If worker crashes, after 60 seconds the job becomes eligible for dequeue again
  • Another worker picks it up: retry_count=2
  • This continues until either the job succeeds or hits max_retries

In a production setting, you would also have some kind of DLQ to pick up jobs that have failed more than max_retries times and do something with them…

Why Not Use PostgreSQL as a Job Queue?

Honestly, I learned a ton of PostgreSQL features while implementing this, neat tricks that will be useful in other scenarios. But there are good reasons to use dedicated job queue systems in production:

  1. Performance: Polling-based systems don’t scale as well as push-based message brokers. If you notice, workers need to poll, sending requests to db and increasing traffic. I thought about adding a notification system using LISTEN/NOTIFY postgres features but then you have a dual state that you need to manage??
  2. Features: Dedicated systems offer features like dead letter queues, message routing, priorities, multiple queues/topics, delays…
  3. Overhead: Using your primary database for queuing can impact OLTP performance. Running a simple benchmark used 25% CPU
  4. Observability: Purpose-built queue systems have better observability tools, you can have UIs, dedicated metrics etc

That said, for small to medium applications where you already have PostgreSQL and want to avoid additional infrastructure, this approach can be perfectly ok. The implementation is simple, reliable, and PostgreSQL’s ACID guarantees are rock solid!

Building a job queue in PostgreSQL was a fun ! Again, I wouldn’t recommend this for high-scale production use, it’s a great learning exercise and could work well for smaller applications. The entire implementation is under 100 lines of Rust code plus some SQL - not bad for a distributed job queue!

Remember, the best way to understand a technology is to build a simpler version yourself! Go have fun…