Un exemple de plugin de sortie peut être trouvé dans le sous-répertoire
contrib/test_decoding
de l'arborescence du code source de PostgreSQL.
Un plugin de sortie est initialisé en chargeant dynamiquement une bibliothèque
partagée avec comme nom de base le nom du plugin de sortie. Le chemin de
recherche de bibliothèque habituel est utilisé pour localiser cette
bibliothèque. Pour fournir les callbacks de plugins de sortie requis et
pour indiquer que la bibliothèque est effectivement un plugin de sortie,
elle doit fournir une fonction nommée
_PG_output_plugin_init
. Une structure est passée à
cette fonction qui doit la remplir avec les pointeurs des fonctions de
callback pour chaque action individuelle.
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Les fonctions callback begin_cb
,
change_cb
et commit_cb
sont
obligatoires, alors que startup_cb
,
truncate_cb
, message_cb
,
filter_by_origin_cb
et shutdown_cb
sont optionnelles. Si truncate_cb
n'est pas configuré
mais que TRUNCATE
doit être décodé, l'action sera
ignorée.
Un plugin de sortie peut aussi définir les fonctions pour accepter le flux
de grosses transactions en cours. Les callbacks
stream_start_cb
,
stream_stop_cb
, stream_abort_cb
,
stream_commit_cb
et stream_change_cb
sont obligatoires, alors que stream_message_cb
et
stream_truncate_cb
sont optionnelles. La callback
stream_prepare_cb
est aussi requise si le plugin de
sortie accepte aussi les validations en deux phases.
Un plugin de sortie peut aussi définir les fonctions pour accepter les
validations en deux phases, ce qui permet de décoder un PREPARE
TRANSACTION
. Les callbacks
begin_prepare_cb
, prepare_cb
,
commit_prepared_cb
et
rollback_prepared_cb
sont obligatoires, alors que
filter_prepare_cb
est facultatif.
La callback stream_prepare_cb
est aussi requise si le
plugin de sortie accepte le flux de grosses transactions en cours.
Pour décoder, formater et afficher les changements, les plugins de sortie
peuvent utiliser une grande partie de l'infrastructure habituelle des
processus clients, y compris l'appel aux fonctions de sortie. Les accès
en lecture seule aux relations est permis du moment que les relations
accédées ont été créées par initdb
dans le schéma
pg_catalog
, ou ont été marquées comme tables du
catalogue pour l'utilisateur en utilisant :
ALTER TABLE table_catalogue_utilisateur SET (user_catalog_table = true); CREATE TABLE autre_table_catalogue(data text) WITH (user_catalog_table = true);
Notez que l'accès aux tables utilisateurs ou aux tables systèmes dans le
plugin de sortie doit se faire uniquement via les API de parcours
systable_*
. L'accès via les API de parcours
heap_*
renverra une erreur. De plus, toute action
amenant à l'affectation d'un identifiant de transaction est prohibée.
Ceci, parmi d'autres choses, inclut l'écriture dans les tables, la réalisation
de modifications DDL, et l'appel à pg_current_xact_id()
.
Les fonctions callbacks des plugins en sortie peuvent renvoyer des données
au consommateur dans des formats pratiquement arbitraires. Pour certains
cas d'utilisation, comme la visualisation des changements en SQL, le
renvoi des données dans un type de données qui peut contenir des données
arbitraires (par exemple du bytea
) est complexe. Si le
plugin en sortie renvoit seulement les données au format texte dans
l'encodage du serveur, il peut déclarer cela en configurant
OutputPluginOptions.output_type
à
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
au lieu de
OUTPUT_PLUGIN_BINARY_OUTPUT
dans la fonction callback de
démarrage. Dans ce cas, toutes les données doivent être dans
l'encodage du serveur pour qu'un champ de type text
puisse
les contenir. Ceci est vérifié dans les constructions comprenant les
assertions.
Un plugin de sortie est notifié des changements arrivant au travers de différents callbacks qu'il doit fournir.
Les transactions concurrentes sont décodées dans l'ordre dans lequel elles
sont validées, et seuls les changements appartenant à une transaction
spécifique sont décodés entre les callbacks begin
et
commit
. Les transactions qui ont été explicitement ou
implicitement annulées ne sont jamais décodées. Les savepoints validés
sont inclus dans la transaction les contenant, dans l'ordre dans lequel
ils ont été effectués dans la transaction. Une transaction qui est
préparée pour une validation en deux phases avec PREPARE
TRANSACTION
sera aussi décodée si les callbacks du plugin de
sortie nécessaires sont fournies pour le décodage. Il est possible que la
transaction préparée courante en cours de décodage soit annulée en
parallèle via une commande ROLLBACK PREPARED
. Dans ce
cas, le décodage logique de cette transaction sera lui-aussi annulé. Tous
les changements d'une telle transaction seront ignorés une fois que
l'annulation est détectée et que la fonction callback
prepare_cb
est appelée. De ce fait, même dans le cas
d'une annulation en parallèle, suffisamment d'informations sont fournies
au plugin de sortie pour gérer correctement un ROLLBACK
PREPARED
une fois qu'il est décodé.
Seules les transactions qui ont été synchronisées sur disque de manière
sûre seront décodées. Cela peut amener à ce qu'un
COMMIT
ne soit pas immédiatement décodé lors d'un
appel à pg_logical_slot_get_changes()
juste après
celui-ci quand synchronous_commit
est positionné à
off
.
Le callback facultatif startup_cb
est appelé chaque
fois qu'un slot de réplication est créé ou qu'on lui demande de fournir
les flux de changement, indépendamment du nombre de changements qui sont
prêt à être fournis.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
Le paramètre is_init
sera positioné à true quand le
slot de réplication est créé, et à false sinon.
options
pointe vers une structure d'options que
le plugin de sortie peut positionner :
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
doit être positionné soit à
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
ou à
OUTPUT_PLUGIN_BINARY_OUTPUT
. Voir aussi Section 47.6.3. Si
receive_rewrites
vaut true, le plugin de sortie sera
aussi appelé pour les modifications réalisées par des réécritures du
fichier HEAP lors de certaines opérations DDL. Ceci est intéressant pour
les plugins qui gèrent la réplication DDL mais ils nécessitent une
gestion particulière.
Le callback de démarrage devrait valider les options présentes dans
ctx->output_plugin_options
. Si le plugin de sortie
a besoin d'avoir un état, il peut utiliser
ctx->output_plugin_private
pour le stocker.
Le callback facultatif shutdown_cb
est appelé chaque
fois qu'un slot de réplication anciennement actif n'est plus utilisé et
peut être utilisé pour désallouer les ressources privées du plugin de
sortie. Le slot n'est pas nécessairement supprimé, le flux est juste
arrêté.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
Le callback obligatoire begin_cb
est appelé chaque
fois que le début d'une transaction validée a été décodé. Les
transactions annulées et leur contenu ne sont pas décodés.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
Le paramètre txn
contient des métadonnées sur la
transaction, comme l'heure à laquelle elle a été validée et son XID.
Le callback obligatoire commit_cb
est appelé chaque
fois qu'une transaction validée a été décodée. Le callback
change_cb
aura été appelé avant cela pour chacune
des lignes modifiées, s'il y en a eu.
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
Le callback obligatoire change_cb
est appelé pour
chacune des modifications de ligne au sein d'une transaction, qu'il
s'agisse d'un INSERT
, UPDATE
ou
DELETE
. Même si la commande d'origine a modifié
plusieurs ligne en une seule instruction, le callback sera appelé pour
chaque ligne individuellement. Le callback
change_cb
pourrait accéder aux tables systèmes et
utilisateurs pour aider au traitement en sortie des détails de
modification de la ligne. Dans le cas du décodage d'une transaction
préparée (mais pas encore validée) ou du décodage d'une transaction non
validée, ce callback de modification pourrait aussi renvoyer une erreur
à cause de l'annulation simultanée de cette même transaction. Dans ce
cas, le décodage logique de cette transaction annulée est arrêté
proprement.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
Les paramètres ctx
et txn
ont le même contenu que pour les callbacks begin_cb
et commit_cb
, mais en plus le descripteur de
relation relation
pointe vers la relation à
laquelle appartient la ligne et une structure
change
décrivant les modifications de la ligne y est
passée.
Seuls les changements dans les tables définies par les utilisateurs qui
sont journalisées (voir UNLOGGED
) et
non temporaires (voir TEMPORARY
ou TEMP
)
peuvent être extraits avec le décodage logique.
La fonction callback truncate_cb
optionnelle est
appelée pour la commande TRUNCATE
.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Les paramètres sont identiques à ceux du callback
change_cb
. Néanmoins, comme les actions du
TRUNCATE
sur les tables liées par clés étrangères
doivent être exécutées ensemble, ce callback reçoit un tableau de
relations au lieu d'une seule relation. Voir la description de
l'instruction TRUNCATE pour les détails.
La fonction optionnelle filter_by_origin_cb
est
appelée pour déterminer si les données rejouées à partir de
origin_id
ont un intérêt pour le plugin de
sortie.
typedef bool (*LogicalDecodeFilterByOriginCB) ( struct LogicalDecodingContext *ctx, RepNodeId origin_id );
Le paramètre ctx
a le même contenu que pour les
autres fonctions. Aucune information n'est disponible à part l'origine. Pour
signaler que les changements provenant du nœud sont hors de propos, elle
renvoie true, ce qui permet de les filtrer. Elle renvoie false dans les
autres cas. Les autres fonctions ne seront pas appelées pour les
transactions et changements qui ont été filtrés.
Ceci est utile pour implémenter des solutions de réplication en cascade ou des solutions de réplication multi-directionnelles. Filtrer par rapport à l'origine perment d'empêcher la réplication dans les deux sens des mêmes modifications dans ce type de configuration. Quand les transactions et les modifications contiennent aussi des informations sur l'origine, le filtre via cette fonction est beaucoup plus efficace.
La fonction (callback) message_cb
est appelée quand
un message de décodage logique a été décodé.
typedef void (*LogicalDecodeMessageCB) ( struct LogicalDecodingContext *, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message );
Le paramètre txn
contient des méta-informations
sur la transaction, comme l'horodatage de la validation de la transaction
et son identifiant (XID). Notez néanmoins qu'il peut être NULL
quand le message n'est pas transactionnel et que le XID n'a pas encore
été affecté dans la transaction qui a tracé le message. Le
lsn
a la position du message dans les WAL. Le
paramètre transactional
indique si le message a
été envoyé de façon transactionnelle ou non. De façon similaire au
callback de changement, dans le cas du décodage d'une transaction
préparée (mais pas encore validée) ou du décodage d'une transaction non
validée, ce callback de message pourrait renvoyer une erreur en cas
d'annulation simultanée de cette même transaction. Dans ce cas, le
décodage logique de cette transaction annulée est stoppé proprement. Le
paramètre prefix
est un préfixe arbitraire
terminé par un caractère nul qui peut être utilisé pour identifier les
messages intéressants pour le plugin courant. Et enfin, le paramètre
message
contient le message réel de taille
message_size
.
Une attention particulière doit être portée à l'unicité du préfixe que le plugin de sortie trouve intéressant. Utiliser le nom de l'extension ou du plugin de sortie est souvent un bon choix.
La fonction callback facultative filter_prepare_cb
est appelée pour déterminer si les données qui font partie de la
transaction de validation en deux phases en cours doivent être
considérées pour le décoage à cette étape préparatoire ou plus tard
comme une transaction standard en une phase au moment du COMMIT
PREPARED
. Pour signaler que le décodage doit être ignoré,
renvoyez true
; false
dans le
cas contraire. Quand la fonction callback est définie,
false
est supposé (autrement dit, il n'y a pas de
filtrage et toutes les transactions utilisant la validation en deux phases
sont décodées aussi en deux phases).
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
Le paramètre ctx
a le même contenu que pour les
autres fonctions callbacks. Les paramètres xid
et
gid
fournissent deux façons différentes
d'identifier la transaction. La commande COMMIT
PREPARED
ou ROLLBACK PREPARED
ultérieure
intègre les identifiants, autorisant au plugin de sortie le choix de
celui à utiliser.
La fonction callback pourrait être appelée plusieurs fois par transaction
pour décoder, et doit fournir la même réponse statique pour une paire
donnée de xid
et de gid
à
chaque fois qu'elle est appelée.
La fonction callback requise begin_prepare_cb
est
appelée à chaque fois que le début d'une transaction préparée a été
décodée. Le champ gid
, qui fait partie du
paramètre txn
, peut être utilisé dans cette
fonction callback pour vérifier si le plugin a déjà reçu cette commande
PREPARE
, auquel cas elle peut renvoyer une erreur ou
ignorer les changements restants de la transaction.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
La fonction callback requise prepare_cb
est appelée
à chaque fois qu'une transaction préparée pour une validation en deux
pahses a été décodée. La fonction callback
change_cb
sera appelée avant celle-ci pour toutes
les lignes modifiées, à condition que des lignes aient été modifiées. Le
champ gid
, qui fait partie du paramètre
txn
, peut être utilisé dans cette fonction
callback.
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
La fonction callback requise commit_prepared_cb
est
appelée quand une commande COMMIT PREPARED
d'une
transaction a été décodée. Le champ gid
, qui fait
partie du paramètre txn
, peut être utilisé dans
cette fonction callback.
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
La fonction callback requise rollback_prepared_cb
est appelée à chaque fois qu'une transaction ROLLBACK
PREPARED
a été décodée. Le champ gid
,
qui fait partie du paramètre txn
, peut être
utilisé dans cette fonction callback. Les paramètres
prepare_end_lsn
et
prepare_time
peuvent être utilisés pour vérifier
si le plugin a reça la commande PREPARE TRANSACTION
auquel cas il peut appliquer l'annulation. Sinon, il peut ignorer
l'opération d'annulation. Le paramètre gid
seul
n'est pas suffisant parce que le nœud en amont peut avoir une
transaction préparée avec le même identifiant.
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
La fonction callback stream_start_cb
obligatoire est
appelée à l'ouverture d'un bloc de changements en flux à partir d'une
transaction en cours.
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
La fonction callback stream_stop_cb
obligatoire est appelée lors
de la fermeture d'un bloc de changements en flux provenant d'une
transaction en cours.
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
La fonction callback stream_abort_cb
obligatoire est
appelée lors de l'annulation d'une transaction en flux précédente.
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
La fonction callback stream_prepare_cb
est appelée
pour préparer une transaction en flux précédente pour faire partie d'une
validation en deux phases. Ce callback est obligatoire quand le plugin de
sortie accepte à la fois le flux de grosses transactions en cours et les
validations en deux phases.
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
La fonction callback stream_commit_cb
obligatoire est appelée
pour valider une transaction en flux précédente.
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
La fonction callback stream_change_cb
obligatoire est appelée
pour un changement dans un bloc de changements de flux (démarqué par des
appels à stream_start_cb
et
stream_stop_cb
). Les modifications réelles ne sont
pas affichées car la transaction peut annuler plus tard et que nous ne
décodons par les changements pour les transactions annulées.
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
La fonction callback stream_message_cb
optionnalle
est appelée
lors de l'envoi d'un message générique dans un bloc de changements en
flux (démarqué par des appels à stream_start_cb
et
stream_stop_cb
). Le contenu du message pour des
messages transactionnels n'est pas affiché car la transaction peut
s'annuler plus tard et que nous ne décodons pas les changements pour les
transactions annulées.
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
La fonction callback stream_truncate_cb
optionnelle
est appelée pour une commande
TRUNCATE
dans un bloc de changements en flux
(démarqué par des appels à stream_start_cb
et
stream_stop_cb
).
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Les paramètres sont analogues à ceux de la fonction callback
stream_change_cb
. Néanmoins, comme les actions de
la commande TRUNCATE
sur des tables connectées par
des clés étrangères doivent s'exécuter ensemble, cette fonction callback
reçoit un tableau de relations au lieu d'une seule relation. Voir la
description de l'instruction TRUNCATE pour les
détails.
Pour pouvoir produire une sortie, les plugins de sortie peuvent écrire des
données dans le tampon de sortie StringInfo
dans
ctx->out
dans les callbacks
begin_cb
, commit_cb
ou
change_cb
. Avant d'écrire dans le tampon de sortie,
OutputPluginWrite(ctx, last_write)
doit avoir été
appelé pour effectuer l'écriture. last_write
indique si une écriture particulière était la dernière écriture du
callback.
L'exemple suivant montre comment sortir des données pour le consommateur d'un plugin de sortie :
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);