PostgreSQLのソースコードのサブディレクトリ
contrib/test_decoding
にサンプル出力プラグインがあります。
出力プラグインは、出力プラグインの名前をライブラリのベース名として持つ共有ライブラリを動的にロードすることによってロードされます。
必要な出力プラグインコールバックを提供し、そのライブラリが実際に出力プラグインであることを示すために、_PG_output_plugin_init
という名前の関数を作成しなければなりません。
この関数には、各々のアクションに対応するコールバック関数へのポインタを持つ構造体が渡されます。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
コールバック関数のbegin_cb
、change_cb
、および、commit_cb
は必須ですが、startup_cb
、truncate_cb
、message_cb
、filter_by_origin_cb
、および、shutdown_cb
は必須ではありません。
truncate_cb
が設定されていないけれども、TRUNCATE
がデコードされることになった場合、この動作は無視されます。
出力プラグインは、大きな継続中(in-progress)トランザクションのストリーミングをサポートする関数を定義することもできます。
stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
、stream_change_cb
は必須ですが、stream_message_cb
とstream_truncate_cb
は必須ではありません。
出力プラグインが2相コミットもサポートする場合は、stream_prepare_cb
も必須です。
出力プラグインは、PREPARE TRANSACTION
でアクションをデコードできるようにする2相コミットをサポートする関数を定義することもできます。
begin_prepare_cb
、prepare_cb
、commit_prepared_cb
、rollback_prepared_cb
コールバックは必須ですが、filter_prepare_cb
は必須ではありません。
出力プラグインが大きな進行中のトランザクションのストリーミングもサポートしている場合は、stream_prepare_cb
も必須です。
更新データをデコード、整形、出力するために、出力関数を呼び出すことを含め、出力プラグインはバックエンドの通常のインフラストラクチャのほとんどを利用できます。
テーブルは、initdb
で作られ、pg_catalog
スキーマに含まれているか、以下のコマンドでユーザ定義のカタログテーブルであると印が付けられている限り、読み込み専用のアクセスが許可されます。
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
出力プラグイン内のユーザカタログテーブルまたは通常のシステムカタログテーブルへのアクセスは、systable_*
スキャンAPIを介してのみ行う必要があることに注意してください。
heap_*
スキャンAPIを介したアクセスはエラーになります。
さらに、トランザクションIDの割り当てにつながるアクションは禁止されています。
これには、テーブルへの書き込み、DDL変更の実行、pg_current_xact_id()
の呼び出しなどが含まれます。
出力プラグインコールバックは、かなり自由な形式で消費者にデータを渡すことができます。
SQLで変更データを見るような場合、任意のかたちでデータを返すことのできるデータ型(たとえばbytea
)は扱いにくいです。
出力プラグインがサーバエンコーディングのテキストデータのみを含むことにするには、起動コールバックで、OutputPluginOptions.output_type
にOUTPUT_PLUGIN_BINARY_OUTPUT
ではなく、OUTPUT_PLUGIN_TEXTUAL_OUTPUT
を設定することによって宣言できます。
この場合、text
datumが格納することができるように、すべてのデータはサーバエンコーディングでエンコードされていなければなりません。
出力プラグインには、必要に応じて発生した更新に関する通知が様々なコールバックを通じて送られます。
同時に実行されたトランザクションは、コミットした順番にデコードされます。
指定したトランザクションに含まれる更新だけがbegin
とcommit
の間のコールバックによってデコードされます。
明示的あるいは暗黙的にロールバックされたトランザクションは、決してデコードされません。
成功したセーブポイントは、実行された順番にセーブポイントが実行されたトランザクションの中に折り込まれます。
PREPARE TRANSACTION
を使用して2相コミット用に準備されたトランザクションも、デコードに必要な出力プラグインコールバックが提供されていればデコードされます。
ROLLBACK PREPARED
コマンドを使用して、現在準備されているトランザクションが同時にアボートされる可能性があります。
その場合、このトランザクションのロジカルデコーディングもアボートされます。
そのようなトランザクションのすべての変更は、アボートが検出され、prepare_cb
コールバックが呼び出されるとスキップされます。
このように、同時にアボートされた場合でも、デコードされたROLLBACK PREPARED
を適切に処理するために十分な情報が出力プラグインに提供されます。
ディスクに安全にフラッシュされたトランザクションだけがデコードされます。
そのため、synchronous_commit
がoff
の場合には、直後に呼び出されたpg_logical_slot_get_changes()
がそのCOMMIT
をデコードしないことがあります。
ストリームに投入可能な更新の数に関係なく、レプリケーションスロットが作られるか、ストリームの変更がリクエストされた場合にオプションのstartup_cb
コールバック呼び出されます。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
is_init
パラメータは、レプリケーションスロットが作られる際にはtrue、それ以外ではfalseになります。
options
は、出力プラグインが書き込む以下の構造体を指します。
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
はOUTPUT_PLUGIN_TEXTUAL_OUTPUT
かOUTPUT_PLUGIN_BINARY_OUTPUT
のどちらかです。
49.6.3も参照してください。
receive_rewrites
が真なら、何らかDDL操作時のヒープ書き換えで生じた変更に対して、出力プラグインも呼ばれます。
これはDDLレプリケーションを処理するプラグインを対象としていますが、これらは特別な処理を必要とします。
開始コールバックでは、ctx->output_plugin_options
で指定されるオプションを検証しましょう。
出力プラグインが状態を持つ必要がある場合には、ctx->output_plugin_private
を利用できます。
以前アクティブだったレプリケーションスロットが使われなくなったら、いつでもshutdown_cb
コールバックが呼び出され、出力プラグインのプライベートリソースが解放されます。
スロットは削除される必要はありません。単にストリームが停止します。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必須であるbegin_cb
コールバックは、コミットしたトランザクションの開始がデコードされる際に必ず呼び出されます。
アボートしたトランザクションとその内容は決してデコードされません。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
引数は、コミット時のタイムスタンプやトランザクションIDなどのトランザクションに関するメタ情報を含みます。
必須であるcommit_cb
コールバックは、トランザクションのコミットがデコードされる際に必ず呼び出されます。
行が更新された場合は、それぞれの行に対してchange_cb
コールバックが、commit_cb
の前に呼び出されます。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
トランザクション内のINSERT
、UPDATE
、DELETE
の更新に対して、必須コールバックであるchange_cb
が呼び出されます。
元の更新コマンドが複数の行を一度に更新する場合は、それぞれの行に対してこのコールバックが呼び出されます。
change_cb
コールバックは、システムまたはユーザカタログテーブルにアクセスして、行変更の詳細を出力する処理を支援することができます。
準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、この変更コールバックは、まったく同じトランザクションが同時にロールバックされるためにエラーになることもあります。
この場合、このアボートされたトランザクションのロジカルデコーディングは正常に停止されます。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
ctx
とtxn
は、begin_cb
、commit_cb
コールバックでは同じ内容になります。
これに加えてrelation
は行が属するリレーションを指定し、行の変更を記述するchange
パラメータが渡されます。
ユーザ定義テーブルでは、ログを取らないテーブル(UNLOGGED
参照)ではなく、一時テーブル(TEMPORARY
またはTEMP
参照)でもないテーブルが、ロジカルデコーディングを使って更新データを取得できます。
オプションのtruncate_cb
コールバックは、TRUNCATE
コマンドに対して呼ばれます。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
パラメータはchange_cb
コールバックと似ています。
しかしながら、外部キーで結びついたテーブル群のTRUNCATE
動作は一緒に実行される必要があるため、このコールバックは単一リレーションではなく、リレーションの配列を受け取ります。
詳しくはTRUNCATE文の説明を参照してください。
オプションのfilter_by_origin_cb
コールバックは、origin_id
からリプレイされたデータが出力プラグインの対象となるかどうかを判定するために呼び出されます。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
ctx
パラメータは、他のコールバックと同じ内容を持ちます。
オリジンの情報だけが得られます。
渡されたノードで発生した変更が無関係であることを伝えるには、trueを返します。
これにより、その変更は無視されることになります。
無視されたトランザクション変更に関わる他のコールバックは呼び出されません。
これは、カスケード、あるいは双方向レプリケーションソリューションを実装する際に有用です。 オリジンでフィルタすることにより、そのような構成で、同じ変更のレプリケーションが往復するのを防ぐことができます。 トランザクションや変更もオリジンに関する情報を持っていますが、このコールバックでフィルタするほうがずっと効率的です。
オプションのmessage_cb
コールバックは、ロジカルデコーディングメッセージがデコードされる度に呼び出されます。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
パラメータは、コミット時のタイムスタンプとXIDのような、トランザクションに関するメタ情報を含んでいます。
ただし、そのメッセージがトランザクション扱いではなく、メッセージをログしたトランザクションにXIDが割り当てられてない場合はNULLになることに注意してください。
lsn
は、メッセージに対応するWALの位置です。
transactional
は、メッセージがトランザクションとして送られたものかどうかを表しています。
変更コールバックと同様に、準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、このメッセージコールバックも、まったく同じトランザクションの同時ロールバックのためにエラーになることがあります。
この場合、アボートされたトランザクションのロジカルデコーディングは正常に停止されます。
prefix
はnull終端された任意の接頭辞で、現在のプラグインが興味のあるメッセージを特定するために利用できます。
最後に、message
パラメータは、大きさがmessage_size
の、実際のメッセージを保持します。
出力プラグインが利用を考慮している接頭辞が一意になるように、特に注意を払ってください。 拡張の名前か、出力プラグインの名前を使うのが良い場合が多いです。
オプションのfilter_prepare_cb
コールバックは、現在の2相コミットトランザクションの一部であるデータを、この準備段階でデコードするか、またはCOMMIT PREPARED
時に通常の1相トランザクションとしてデコードするかを決定するために呼び出されます。
デコードをスキップするように合図するには、true
を返します。
そうでなければfalse
を返します。
コールバックが定義されていない場合、false
が想定されます(すなわち、フィルタリングなしで、2相コミットを使用するすべてのトランザクションも同様に2相でデコードされます)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
ctx
パラメータは他のコールバックと同じ内容です。
パラメータxid
とgid
は、トランザクションを識別するための2つの異なる方法を提供します。
後のCOMMIT PREPARED
またはROLLBACK PREPARED
は両方の識別子を持ち、出力プラグインに何を使用するかの選択を提供します。
このコールバックは、デコードするトランザクションごとに複数回呼び出すことができ、呼び出されるたびにxid
とgid
の与えられたペアに対して同じ静的な答えを提供しなければなりません。
必須であるbegin_prepare_cb
コールバックは、準備されたトランザクションの開始がデコードされるたびに呼び出されます。
txn
パラメータの一部であるgid
フィールドをこのコールバックで使用して、プラグインがこのPREPARE
を既に受信しているかどうかをチェックできます。
この場合、エラーになるか、トランザクションの残りの変更をスキップできます。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必須であるprepare_cb
コールバックは、2相コミット用に準備されたトランザクションがデコードされるたびに呼び出されます。
修正された行がある場合、すべての修正された行に対するchange_cb
コールバックはこの前に呼び出されています。
txn
パラメータの一部であるgid
フィールドは、このコールバックで使用できます。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
必須であるcommit_prepared_cb
コールバックは、トランザクションCOMMIT PREPARED
がデコードされるたびに呼び出されます。
txn
パラメータの一部であるgid
フィールドは、このコールバックで使用できます。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
必須であるrollback_prepared_cb
コールバックは、トランザクションROLLBACK PREPARED
がデコードされるたびに呼び出されます。
txn
パラメータの一部であるgid
フィールドは、このコールバックで使用できます。
パラメータprepare_end_lsn
とprepare_time
は、プラグインがこのPREPARE TRANSACTION
を受信したかどうかをチェックするために使用できます。
この場合、プラグインはロールバックを適用できます。
そうでない場合は、ロールバック操作をスキップできます。
gid
だけでは十分ではありません。
なぜなら、下流ノードは同じ識別子を持つ準備されたトランザクションを持つことができるからです。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
必須であるstream_start_cb
コールバックは、進行中のトランザクションからストリーム化された変更ブロックを開くときに呼び出されます。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必須であるstream_stop_cb
コールバックは、進行中のトランザクションからのストリーミング変更ブロックを閉じるときに呼び出されます。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必須であるstream_abort_cb
コールバックは、以前にストリームされたトランザクションを中止するために呼び出されます。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
stream_prepare_cb
コールバックは、2相コミットの一部としてストリーミングされているトランザクションを準備するために呼び出されます。
このコールバックは、出力プラグインが大きな進行中のトランザクションと2相コミットの両方をストリーミングする場合に必要です。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
必須であるstream_commit_cb
コールバックは、以前にストリーミングされたトランザクションをコミットするために呼び出されます。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
必須であるstream_change_cb
コールバックは、ストリーム化された変更のブロック(stream_start_cb
とstream_stop_cb
呼び出しで区切られます)で変更を送信するときに呼び出されます。
実際の変更は表示されません。
なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
オプションのstream_message_cb
コールバックは、ストリーム化された変更のブロック(stream_start_cb
とstream_stop_cb
コールで区切られた)で汎用メッセージを送信するときに呼び出されます。
トランザクションメッセージのメッセージ内容は表示されません。
なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
オプションのstream_truncate_cb
コールバックは、ストリーム化された変更のブロック(stream_start_cb
とstream_stop_cb
呼び出しで区切られます)内のTRUNCATE
コマンドに対して呼び出されます。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
パラメータはstream_change_cb
コールバックに類似しています。
ただし、外部キーで接続されたテーブルに対するTRUNCATE
アクションは一緒に実行する必要があるため、このコールバックは単一のリレーションではなくリレーションの配列を受け取ります。
詳細はTRUNCATE文の説明を参照してください。
begin_cb
、commit_cb
、change_cb
コールバックにおいて、出力プラグインは実際にデータ出力するためにctx->out
のStringInfo
出力バッファに書き込みます。
出力バッファに書き込む前に、OutputPluginPrepareWrite(ctx, last_write)
を呼び出します。
また、書き込みバッファにデータを書き終えたら、OutputPluginWrite(ctx, last_write)
を呼び出してデータの書き込みを実施します。
last_write
引数により、その書き込みがコールバックの最終的な書き込みであるかどうかを指定します。
以下の例では、出力プラグインにおいて消費者に向けてデータを出力する方法を示します。
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);