L'exemple suivant explique le contrôle du décodage logique en utilisant l'interface SQL.
Avant de pouvoir utiliser le décodage logique, il est nécessaire de
positionner le paramètre wal_level à la valeur
logical
et le paramètre max_replication_slots à 1 au moins. Il sera
alors possible de se connecter à la base de données cible (dans l'exemple
suivant, postgres
) en tant que superutilisateur.
postgres=# -- Créer un slot nommé 'regression_slot' utilisant le plugin de sortie 'test_decoding' postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); slot_name | lsn -----------------+--------------- regression_slot | 0/16B1970 (1 row) postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn -----------------+---------------+-----------+----------+--------+-------------+----------------- regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 (1 row) postgres=# -- Il n'y a pas encore de changement à voir postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) postgres=# CREATE TABLE data(id serial primary key, data text); CREATE TABLE postgres=# -- le DDL n'est pas répliqué, donc seule la transaction est visible postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+-------------- 0/BA2DA58 | 10297 | BEGIN 10297 0/BA5A5A0 | 10297 | COMMIT 10297 (2 rows) postgres=# -- Une fois les changements lus, ils sont consommés et ne seront pas renvoyés postgres=# -- dans un appel ultérieur : postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('1'); postgres=*# INSERT INTO data(data) VALUES('2'); postgres=*# COMMIT; postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A688 | 10298 | BEGIN 10298 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1' 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2' 0/BA5A8A8 | 10298 | COMMIT 10298 (4 rows) postgres=# INSERT INTO data(data) VALUES('3'); postgres=# -- Vous pouvez aussi jeter un œil dans le flux sans consommer les changements postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (3 rows) postgres=# -- L'appel suivant va renvoyer les mêmes changements postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (3 rows) postgres=# -- des options peuvent être passées au plugin de sortir pour influer sur le formatage postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04) (3 rows) postgres=# -- Il ne faut pas oublier de détruire un slot une fois qu'on n'en a plus besoin postgres=# -- afin qu'il ne consomme plus de ressources sur le serveur : postgres=# SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot ----------------------- (1 row)
Les exemples suivants montrent comment le décodage logique est contrôlé avec
le protocole de réplication en flux, en utilisant l'outil pg_recvlogical fourni avec la distribution PostgreSQL. Il
requiert que l'authentification du client soit configuré pour autoriser une
connexion de réplication (voir Section 26.2.5.1) et que le paramètre
max_wal_senders
soit configuré suffisamment haut pour
qu'une nouvelle connexion soit acceptée. Le deuxième exemple montre comment
envoyer en flux les transactions en deux phases. Avant d'utiliser des
commandes à deux phases, vous devez configurer le paramètre max_prepared_transactions à au moins 1.
Exemple 1 : $ pg_recvlogical -d postgres --slot=test --create-slot $ pg_recvlogical -d postgres --slot=test --start -f - Control+Z $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg BEGIN 693 table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 Control+C $ pg_recvlogical -d postgres --slot=test --drop-slot Exemple 2 : $ pg_recvlogical -d postgres --slot=test --create-slot --two-phase $ pg_recvlogical -d postgres --slot=test --start -f - Control+Z $ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';" $ fg BEGIN 694 table public.data: INSERT: id[integer]:5 data[text]:'5' PREPARE TRANSACTION 'test', txid 694 Control+Z $ psql -d postgres -c "COMMIT PREPARED 'test';" $ fg COMMIT PREPARED 'test', txid 694 Control+C $ pg_recvlogical -d postgres --slot=test --drop-slot
L'exemple suivant montre l'interface SQL pouvant être utilisée pour décoder
les transactions préparées. Avant d'utiliser les commandes du Two-Phase
Commit, vous devez configurer le paramètre
max_prepared_transactions
à la valeur 1 au minimum.
Vous devez aussi avoir configuré le paramètre two-phase à la valeur true
lors de la création du slot en utilisant
pg_create_logical_replication_slot
. Notez que la
transaction entière sera envoyée dans le flux après la validation de la
transaction si elle n'est pas déjà décodée.
postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('5'); postgres=*# PREPARE TRANSACTION 'test_prepared1'; postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+--------------------------------------------------------- 0/1689DC0 | 529 | BEGIN 529 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5' 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529 (3 rows) postgres=# COMMIT PREPARED 'test_prepared1'; postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+-------------------------------------------- 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529 (4 row) postgres=#-- On peut aussi faire un rollback de la transaction préparée postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('6'); postgres=*# PREPARE TRANSACTION 'test_prepared2'; postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+--------------------------------------------------------- 0/168A180 | 530 | BEGIN 530 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6' 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530 (3 rows) postgres=# ROLLBACK PREPARED 'test_prepared2'; postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+---------------------------------------------- 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530 (1 row)