PostgreSQLLa base de données la plus sophistiquée au monde.
Documentation PostgreSQL 17.1 » Programmation serveur » Décodage logique (Logical Decoding) » Plugins de sortie de décodage logique

47.6. Plugins de sortie de décodage logique #

Un exemple de plugin de sortie peut être trouvé dans le sous-répertoire contrib/test_decoding de l'arborescence du code source de PostgreSQL.

47.6.1. Fonction d'initialisation #

Un plugin de sortie est initialisé en chargeant dynamiquement une bibliothèque partagée avec comme nom de base le nom du plugin de sortie. Le chemin de recherche de bibliothèque habituel est utilisé pour localiser cette bibliothèque. Pour fournir les callbacks de plugins de sortie requis et pour indiquer que la bibliothèque est effectivement un plugin de sortie, elle doit fournir une fonction nommée _PG_output_plugin_init. Une structure est passée à cette fonction qui doit la remplir avec les pointeurs des fonctions de callback pour chaque action individuelle.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

Les fonctions callback begin_cb, change_cb et commit_cb sont obligatoires, alors que startup_cb, truncate_cb, message_cb, filter_by_origin_cb et shutdown_cb sont optionnelles. Si truncate_cb n'est pas configuré mais que TRUNCATE doit être décodé, l'action sera ignorée.

Un plugin de sortie peut aussi définir les fonctions pour accepter le flux de grosses transactions en cours. Les callbacks stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb et stream_change_cb sont obligatoires, alors que stream_message_cb et stream_truncate_cb sont optionnelles. La callback stream_prepare_cb est aussi requise si le plugin de sortie accepte aussi les validations en deux phases.

Un plugin de sortie peut aussi définir les fonctions pour accepter les validations en deux phases, ce qui permet de décoder un PREPARE TRANSACTION. Les callbacks begin_prepare_cb, prepare_cb, commit_prepared_cb et rollback_prepared_cb sont obligatoires, alors que filter_prepare_cb est facultatif. La callback stream_prepare_cb est aussi requise si le plugin de sortie accepte le flux de grosses transactions en cours.

47.6.2. Capacités #

Pour décoder, formater et afficher les changements, les plugins de sortie peuvent utiliser une grande partie de l'infrastructure habituelle des processus clients, y compris l'appel aux fonctions de sortie. Les accès en lecture seule aux relations est permis du moment que les relations accédées ont été créées par initdb dans le schéma pg_catalog, ou ont été marquées comme tables du catalogue pour l'utilisateur en utilisant :

ALTER TABLE table_catalogue_utilisateur SET (user_catalog_table = true);
CREATE TABLE autre_table_catalogue(data text) WITH (user_catalog_table = true);

Notez que l'accès aux tables utilisateurs ou aux tables systèmes dans le plugin de sortie doit se faire uniquement via les API de parcours systable_*. L'accès via les API de parcours heap_* renverra une erreur. De plus, toute action amenant à l'affectation d'un identifiant de transaction est prohibée. Ceci, parmi d'autres choses, inclut l'écriture dans les tables, la réalisation de modifications DDL, et l'appel à pg_current_xact_id().

47.6.3. Modes de sortie #

Les fonctions callbacks des plugins en sortie peuvent renvoyer des données au consommateur dans des formats pratiquement arbitraires. Pour certains cas d'utilisation, comme la visualisation des changements en SQL, le renvoi des données dans un type de données qui peut contenir des données arbitraires (par exemple du bytea) est complexe. Si le plugin en sortie renvoit seulement les données au format texte dans l'encodage du serveur, il peut déclarer cela en configurant OutputPluginOptions.output_type à OUTPUT_PLUGIN_TEXTUAL_OUTPUT au lieu de OUTPUT_PLUGIN_BINARY_OUTPUT dans la fonction callback de démarrage. Dans ce cas, toutes les données doivent être dans l'encodage du serveur pour qu'un champ de type text puisse les contenir. Ceci est vérifié dans les constructions comprenant les assertions.

47.6.4. Callbacks de plugin de sortie #

Un plugin de sortie est notifié des changements arrivant au travers de différents callbacks qu'il doit fournir.

Les transactions concurrentes sont décodées dans l'ordre dans lequel elles sont validées, et seuls les changements appartenant à une transaction spécifique sont décodés entre les callbacks begin et commit. Les transactions qui ont été explicitement ou implicitement annulées ne sont jamais décodées. Les savepoints validés sont inclus dans la transaction les contenant, dans l'ordre dans lequel ils ont été effectués dans la transaction. Une transaction qui est préparée pour une validation en deux phases avec PREPARE TRANSACTION sera aussi décodée si les callbacks du plugin de sortie nécessaires sont fournies pour le décodage. Il est possible que la transaction préparée courante en cours de décodage soit annulée en parallèle via une commande ROLLBACK PREPARED. Dans ce cas, le décodage logique de cette transaction sera lui-aussi annulé. Tous les changements d'une telle transaction seront ignorés une fois que l'annulation est détectée et que la fonction callback prepare_cb est appelée. De ce fait, même dans le cas d'une annulation en parallèle, suffisamment d'informations sont fournies au plugin de sortie pour gérer correctement un ROLLBACK PREPARED une fois qu'il est décodé.

Note

Seules les transactions qui ont été synchronisées sur disque de manière sûre seront décodées. Cela peut amener à ce qu'un COMMIT ne soit pas immédiatement décodé lors d'un appel à pg_logical_slot_get_changes() juste après celui-ci quand synchronous_commit est positionné à off.

47.6.4.1. Callback de démarrage #

Le callback facultatif startup_cb est appelé chaque fois qu'un slot de réplication est créé ou qu'on lui demande de fournir les flux de changement, indépendamment du nombre de changements qui sont prêt à être fournis.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

Le paramètre is_init sera positioné à true quand le slot de réplication est créé, et à false sinon. options pointe vers une structure d'options que le plugin de sortie peut positionner :

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type doit être positionné soit à OUTPUT_PLUGIN_TEXTUAL_OUTPUT ou à OUTPUT_PLUGIN_BINARY_OUTPUT. Voir aussi Section 47.6.3. Si receive_rewrites vaut true, le plugin de sortie sera aussi appelé pour les modifications réalisées par des réécritures du fichier HEAP lors de certaines opérations DDL. Ceci est intéressant pour les plugins qui gèrent la réplication DDL mais ils nécessitent une gestion particulière.

Le callback de démarrage devrait valider les options présentes dans ctx->output_plugin_options. Si le plugin de sortie a besoin d'avoir un état, il peut utiliser ctx->output_plugin_private pour le stocker.

47.6.4.2. Callback d'arrêt #

Le callback facultatif shutdown_cb est appelé chaque fois qu'un slot de réplication anciennement actif n'est plus utilisé et peut être utilisé pour désallouer les ressources privées du plugin de sortie. Le slot n'est pas nécessairement supprimé, le flux est juste arrêté.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

47.6.4.3. Callback de début de transaction #

Le callback obligatoire begin_cb est appelé chaque fois que le début d'une transaction validée a été décodé. Les transactions annulées et leur contenu ne sont pas décodés.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);

Le paramètre txn contient des métadonnées sur la transaction, comme l'heure à laquelle elle a été validée et son XID.

47.6.4.4. Callback de fin de transaction #

Le callback obligatoire commit_cb est appelé chaque fois qu'une transaction validée a été décodée. Le callback change_cb aura été appelé avant cela pour chacune des lignes modifiées, s'il y en a eu.

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

47.6.4.5. Callback de modification #

Le callback obligatoire change_cb est appelé pour chacune des modifications de ligne au sein d'une transaction, qu'il s'agisse d'un INSERT, UPDATE ou DELETE. Même si la commande d'origine a modifié plusieurs ligne en une seule instruction, le callback sera appelé pour chaque ligne individuellement. Le callback change_cb pourrait accéder aux tables systèmes et utilisateurs pour aider au traitement en sortie des détails de modification de la ligne. Dans le cas du décodage d'une transaction préparée (mais pas encore validée) ou du décodage d'une transaction non validée, ce callback de modification pourrait aussi renvoyer une erreur à cause de l'annulation simultanée de cette même transaction. Dans ce cas, le décodage logique de cette transaction annulée est arrêté proprement.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

Les paramètres ctx et txn ont le même contenu que pour les callbacks begin_cb et commit_cb, mais en plus le descripteur de relation relation pointe vers la relation à laquelle appartient la ligne et une structure change décrivant les modifications de la ligne y est passée.

Note

Seuls les changements dans les tables définies par les utilisateurs qui sont journalisées (voir UNLOGGED) et non temporaires (voir TEMPORARY ou TEMP) peuvent être extraits avec le décodage logique.

47.6.4.6. Callback Truncate #

La fonction callback truncate_cb optionnelle est appelée pour la commande TRUNCATE.

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

Les paramètres sont identiques à ceux du callback change_cb. Néanmoins, comme les actions du TRUNCATE sur les tables liées par clés étrangères doivent être exécutées ensemble, ce callback reçoit un tableau de relations au lieu d'une seule relation. Voir la description de l'instruction TRUNCATE pour les détails.

47.6.4.7. Fonction de filtre sur l'origine #

La fonction optionnelle filter_by_origin_cb est appelée pour déterminer si les données rejouées à partir de origin_id ont un intérêt pour le plugin de sortie.

typedef bool (*LogicalDecodeFilterByOriginCB) (
    struct LogicalDecodingContext *ctx,
    RepNodeId origin_id
);

Le paramètre ctx a le même contenu que pour les autres fonctions. Aucune information n'est disponible à part l'origine. Pour signaler que les changements provenant du nœud sont hors de propos, elle renvoie true, ce qui permet de les filtrer. Elle renvoie false dans les autres cas. Les autres fonctions ne seront pas appelées pour les transactions et changements qui ont été filtrés.

Ceci est utile pour implémenter des solutions de réplication en cascade ou des solutions de réplication multi-directionnelles. Filtrer par rapport à l'origine perment d'empêcher la réplication dans les deux sens des mêmes modifications dans ce type de configuration. Quand les transactions et les modifications contiennent aussi des informations sur l'origine, le filtre via cette fonction est beaucoup plus efficace.

47.6.4.8. Fonctions personnalisées de message générique #

La fonction (callback) message_cb est appelée quand un message de décodage logique a été décodé.

typedef void (*LogicalDecodeMessageCB) (
    struct LogicalDecodingContext *,
    ReorderBufferTXN *txn,
    XLogRecPtr message_lsn,
    bool transactional,
    const char *prefix,
    Size message_size,
    const char *message
);

Le paramètre txn contient des méta-informations sur la transaction, comme l'horodatage de la validation de la transaction et son identifiant (XID). Notez néanmoins qu'il peut être NULL quand le message n'est pas transactionnel et que le XID n'a pas encore été affecté dans la transaction qui a tracé le message. Le lsn a la position du message dans les WAL. Le paramètre transactional indique si le message a été envoyé de façon transactionnelle ou non. De façon similaire au callback de changement, dans le cas du décodage d'une transaction préparée (mais pas encore validée) ou du décodage d'une transaction non validée, ce callback de message pourrait renvoyer une erreur en cas d'annulation simultanée de cette même transaction. Dans ce cas, le décodage logique de cette transaction annulée est stoppé proprement. Le paramètre prefix est un préfixe arbitraire terminé par un caractère nul qui peut être utilisé pour identifier les messages intéressants pour le plugin courant. Et enfin, le paramètre message contient le message réel de taille message_size.

Une attention particulière doit être portée à l'unicité du préfixe que le plugin de sortie trouve intéressant. Utiliser le nom de l'extension ou du plugin de sortie est souvent un bon choix.

47.6.4.9. Fonction de filtre du PREPARE #

La fonction callback facultative filter_prepare_cb est appelée pour déterminer si les données qui font partie de la transaction de validation en deux phases en cours doivent être considérées pour le décoage à cette étape préparatoire ou plus tard comme une transaction standard en une phase au moment du COMMIT PREPARED. Pour signaler que le décodage doit être ignoré, renvoyez true ; false dans le cas contraire. Quand la fonction callback est définie, false est supposé (autrement dit, il n'y a pas de filtrage et toutes les transactions utilisant la validation en deux phases sont décodées aussi en deux phases).

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

Le paramètre ctx a le même contenu que pour les autres fonctions callbacks. Les paramètres xid et gid fournissent deux façons différentes d'identifier la transaction. La commande COMMIT PREPARED ou ROLLBACK PREPARED ultérieure intègre les identifiants, autorisant au plugin de sortie le choix de celui à utiliser.

La fonction callback pourrait être appelée plusieurs fois par transaction pour décoder, et doit fournir la même réponse statique pour une paire donnée de xid et de gid à chaque fois qu'elle est appelée.

47.6.4.10. Fonction de préparation de début de la transaction #

La fonction callback requise begin_prepare_cb est appelée à chaque fois que le début d'une transaction préparée a été décodée. Le champ gid, qui fait partie du paramètre txn, peut être utilisé dans cette fonction callback pour vérifier si le plugin a déjà reçu cette commande PREPARE, auquel cas elle peut renvoyer une erreur ou ignorer les changements restants de la transaction.

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

47.6.4.11. Fonction de transaction préparée #

La fonction callback requise prepare_cb est appelée à chaque fois qu'une transaction préparée pour une validation en deux pahses a été décodée. La fonction callback change_cb sera appelée avant celle-ci pour toutes les lignes modifiées, à condition que des lignes aient été modifiées. Le champ gid, qui fait partie du paramètre txn, peut être utilisé dans cette fonction callback.

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

47.6.4.12. Fonction pour le COMMIT PREPARED d'une transaction #

La fonction callback requise commit_prepared_cb est appelée quand une commande COMMIT PREPARED d'une transaction a été décodée. Le champ gid, qui fait partie du paramètre txn, peut être utilisé dans cette fonction callback.

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

47.6.4.13. Fonction d'annulation de transaction préparée #

La fonction callback requise rollback_prepared_cb est appelée à chaque fois qu'une transaction ROLLBACK PREPARED a été décodée. Le champ gid, qui fait partie du paramètre txn, peut être utilisé dans cette fonction callback. Les paramètres prepare_end_lsn et prepare_time peuvent être utilisés pour vérifier si le plugin a reça la commande PREPARE TRANSACTION auquel cas il peut appliquer l'annulation. Sinon, il peut ignorer l'opération d'annulation. Le paramètre gid seul n'est pas suffisant parce que le nœud en amont peut avoir une transaction préparée avec le même identifiant.

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

47.6.4.14. Fonction de début de flux #

La fonction callback stream_start_cb obligatoire est appelée à l'ouverture d'un bloc de changements en flux à partir d'une transaction en cours.

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

47.6.4.15. Fonction d'arrêt de flux #

La fonction callback stream_stop_cb obligatoire est appelée lors de la fermeture d'un bloc de changements en flux provenant d'une transaction en cours.

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

47.6.4.16. Fonction d'annulation du flux #

La fonction callback stream_abort_cb obligatoire est appelée lors de l'annulation d'une transaction en flux précédente.

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

47.6.4.17. Fonction de préparation du flux #

La fonction callback stream_prepare_cb est appelée pour préparer une transaction en flux précédente pour faire partie d'une validation en deux phases. Ce callback est obligatoire quand le plugin de sortie accepte à la fois le flux de grosses transactions en cours et les validations en deux phases.

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

47.6.4.18. Fonction de validation d'un flux #

La fonction callback stream_commit_cb obligatoire est appelée pour valider une transaction en flux précédente.

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

47.6.4.19. Fonction de changement du flux #

La fonction callback stream_change_cb obligatoire est appelée pour un changement dans un bloc de changements de flux (démarqué par des appels à stream_start_cb et stream_stop_cb). Les modifications réelles ne sont pas affichées car la transaction peut annuler plus tard et que nous ne décodons par les changements pour les transactions annulées.

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

47.6.4.20. Fonction de message du flux #

La fonction callback stream_message_cb optionnalle est appelée lors de l'envoi d'un message générique dans un bloc de changements en flux (démarqué par des appels à stream_start_cb et stream_stop_cb). Le contenu du message pour des messages transactionnels n'est pas affiché car la transaction peut s'annuler plus tard et que nous ne décodons pas les changements pour les transactions annulées.

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

47.6.4.21. Fonction troncage du flux #

La fonction callback stream_truncate_cb optionnelle est appelée pour une commande TRUNCATE dans un bloc de changements en flux (démarqué par des appels à stream_start_cb et stream_stop_cb).

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

Les paramètres sont analogues à ceux de la fonction callback stream_change_cb. Néanmoins, comme les actions de la commande TRUNCATE sur des tables connectées par des clés étrangères doivent s'exécuter ensemble, cette fonction callback reçoit un tableau de relations au lieu d'une seule relation. Voir la description de l'instruction TRUNCATE pour les détails.

47.6.5. Fonction pour produire une sortie #

Pour pouvoir produire une sortie, les plugins de sortie peuvent écrire des données dans le tampon de sortie StringInfo dans ctx->out dans les callbacks begin_cb, commit_cb ou change_cb. Avant d'écrire dans le tampon de sortie, OutputPluginWrite(ctx, last_write) doit avoir été appelé pour effectuer l'écriture. last_write indique si une écriture particulière était la dernière écriture du callback.

L'exemple suivant montre comment sortir des données pour le consommateur d'un plugin de sortie :

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);