Skip to content

Postgres Publish To RabbitMQ

Beau Barker edited this page Dec 3, 2025 · 5 revisions
cd db

📦 1. Install pg_amqp

Clone pg_amqp:

git clone https://github.com/omniti-labs/pg_amqp postgres/pg_amqp

Install the extension in the Postgres image:

db/postgres/Dockerfile

RUN apt-get update && apt-get install -y \
 build-essential \
 postgresql-server-dev-17

# pg_amqp - Used by api schema
COPY ./pg_amqp /pg_amqp
WORKDIR /pg_amqp
RUN make
RUN make install

WORKDIR /var/lib/postgresql

Build Postgres:

docker compose build postgres

Note

You may need to fix "implicit int" errors in pg_amqp, which were reported here, and fixed but not yet merged. It's easy to fix. Change the parameters from broker_id to int broker_id in postgres/pg_amqp/src/pg_amqp.c, on lines 140, 152, and 239.

📚 2. Load Required Extensions

Add this to a migration file:

db/postgres/migrations/01-extensions.sql

-- amqp extension for rabbitmq connection
create extension amqp;

Note

Don't wrap this file in a BEGIN/COMMIT block — create extension is non-transactional.

Run this migration to load the extension:

bin/postgres migrate

3. Add Broker Configuration

Create a migration:

db/postgres/migrations/xx-insert_amqp_broker.sql

insert into amqp.broker (host, port, vhost, username, password) values (
  'rabbitmq', 5672, '/', 'guest', 'guest'
) on conflict do nothing;

Run the script:

bin/postgres psql < postgres/seed/broker.sql

🏗 3. Publish a message

We'll send a message whenever a row is inserted into a table.

db/postgres/migrations/03-jobs.sql

create function start_job() returns void
language plpgsql as $$
begin
  perform amqp.publish(
    1,                   -- broker_id
    'amq.direct',        -- exchange
    new.command,         -- routing key
    payload::text,       -- body
    2,                   -- delivery_mode = 2 (durable)
    'application/json',  -- content_type
    new.mercure_topic,   -- reply_to
    new.public_id::text  -- correlation_id (very useful)
  );
end;
$$;

The parameters are:

  1. broker_id (id of the inserted broker configuration)
  2. exchange
  3. routing_key
  4. message

Grant permission to publish

db/postgres/migrations/05-grants.sql

grant execute on function
amqp.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar)
to service;

▶️ 4. Run the Migrations

bin/postgres migrate

Clone this wiki locally