Blog

PostgreSQL and RabbitMQ

Introduction

I want to share the experience of using PostgreSQL and RabbitMQ. The matter is that recently we had a question of guaranteed, transactional delivery of messages from DB to the queue of RabbitMQ. In addition, it was required to do this as quickly as possible. I think you are aware of the fact that PostgreSQL has a remarkable opportunity for extensibility. And in particular, he has an extension that can work with RabbitMQ. It is called pg_amqp.

Installation

pg_amqp provides stored procedures in PostgreSQL for sending to amqp. The extension works fine at the application logic level: by rolling back the transaction to PostgreSQL – the data in amqp will not get caught. And if the commit – get.
Install the extension:
git clone https://github.com/omniti-labs/pg_amqp.git
cd pg_amqp
make
make install
Add this line to my postgresql.conf file:
shared_preload_libraries = 'pg_amqp.so'
And create extension:
CREATE EXTENSION amqp;
In the database was added the scheme “amqp”. In the schema we have once table and four functions:
CREATE TABLE amqp.broker
(broker_id serial NOT NULL,
 host text NOT NULL,
 port integer NOT NULL DEFAULT 5672,
 vhost text,
 username text,
 password text,
 CONSTRAINT broker_pkey PRIMARY KEY (broker_id, host, port));
broker_id – serial number

host – RabbitMQ hosts

port – RabbitMQ port

vhost – virtual host

username – RabbitMQ user name

password – RabbitMQ user password

amqp.publish(broker_id integer, exchange character varying, routing_key character varying,
 message character varying, delivery_mode integer DEFAULT NULL::integer, content_type character varying DEFAULT NULL::character varying, reply_to character varying DEFAULT NULL::character varying,
 correlation_id character varying DEFAULT NULL::character varying)

amqp.autonomous_publish(broker_id integer, exchange character varying, routing_key character varying,
 message character varying, delivery_mode integer DEFAULT NULL::integer, content_type character varying DEFAULT NULL::character varying, reply_to character varying DEFAULT NULL::character varying,
 correlation_id character varying DEFAULT NULL::character varying)

amqp.exchange_declare(broker_id integer, exchange character varying, exchange_type character varying,
 passive boolean, durable boolean, auto_delete boolean DEFAULT false)

amqp.disconnect(broker_id integer)
Adding broker:
INSERT INTO amqp.broker(host, port, vhost, username, password)
 VALUES ('10.0.0.100', 5672, '/', 'user', 'pass');
We will use pg_amqp for push notifications. For example:
BEGIN;
UPDATE products SET name = milk, quantity = 10 , price= 10 RETURNING amqp.publish(1, 'pushDBEx', '', CONCAT(name,' now costs', price::TEXT, ' dollars' );
SELECT amqp.disconnect(1);
COMMIT;
We will use pg_amqp for autonomous transactions. For example:
BEGIN; 
INSERT INTO products (name, quantity,price) VALUES ('milk', 10, 10 );
SELECT amqp.publish(1, 'pushDBEx', '', CONCAT(name,' now costs', price::TEXT, ' dollars' );
SELECT amqp.disconnect(1); 
ROLLBACK;
Despite the rollback recording, it will still be sent to the queue.
In fact, the extension does not guarantee that the message will fall into amqp. Inside, only a sequential transaction commit first in PostgreSQL, and then in amqp. And if the connection with amqp is lost between the two commits, the message will be lost. Despite the fact that the probability of such an event is small, the lost packages will be.
For those who are not allowed to lose 0.01% of packets, we can advise additionally to save messages in the PostgreSQL table and send again messages to another queue with a slight delay. Thus, by checking the data of several queues, you can reduce the probability of message loss.

Conclusion

PostgreSQL has an interesting extension pg_amqp to work with the RabbitMQ queue. This can be useful if you need to be sure to send the results of executing database commands, as well as for autonomous transactions (such as auditing) in PostgreSQL.
PostgreSQL