PostgreSQLのソースコードのサブディレクトリ
contrib/test_decoding
にサンプル出力プラグインがあります。
出力プラグインは、出力プラグインの名前をライブラリのベース名として持つ共有ライブラリを動的にロードすることによってロードされます。
必要な出力プラグインコールバックを提供し、そのライブラリが実際に出力プラグインであることを示すために、_PG_output_plugin_initという名前の関数を作成しなければなりません。
この関数には、各々のアクションに対応するコールバック関数へのポインタを持つ構造体が渡されます。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
コールバック関数のbegin_cb、change_cb、および、commit_cbは必須ですが、startup_cb、filter_by_origin_cb、truncate_cb、および、shutdown_cbは必須ではありません。
truncate_cbが設定されていないけれども、TRUNCATEがデコードされることになった場合、この動作は無視されます。
出力プラグインは、大きな継続中(in-progress)トランザクションのストリーミングをサポートする関数を定義することもできます。
stream_start_cb、stream_stop_cb、stream_abort_cb、stream_commit_cb、stream_change_cb、stream_prepare_cbは必須ですが、stream_message_cbとstream_truncate_cbは必須ではありません。
出力プラグインは、PREPARE TRANSACTIONでアクションをデコードできるようにする2相コミットをサポートする関数を定義することもできます。
begin_prepare_cb、prepare_cb、stream_prepare_cb、commit_prepared_cb、rollback_prepared_cbコールバックは必須ですが、filter_prepare_cbは必須ではありません。
更新データをデコード、整形、出力するために、出力関数を呼び出すことを含め、出力プラグインはバックエンドの通常のインフラストラクチャのほとんどを利用できます。
テーブルは、initdbで作られ、pg_catalogスキーマに含まれているか、以下のコマンドでユーザ定義のカタログテーブルであると印が付けられている限り、読み込み専用のアクセスが許可されます。
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
出力プラグイン内のユーザカタログテーブルまたは通常のシステムカタログテーブルへのアクセスは、systable_*スキャンAPIを介してのみ行う必要があることに注意してください。
heap_*スキャンAPIを介したアクセスはエラーになります。
さらに、トランザクションIDの割り当てにつながるアクションは禁止されています。
これには、テーブルへの書き込み、DDL変更の実行、pg_current_xact_id()の呼び出しなどが含まれます。
出力プラグインコールバックは、かなり自由な形式で消費者にデータを渡すことができます。
SQLで変更データを見るような場合、任意のかたちでデータを返すことのできるデータ型(たとえばbytea)は扱いにくいです。
出力プラグインがサーバエンコーディングのテキストデータのみを含むことにするには、起動コールバックで、OutputPluginOptions.output_typeにOUTPUT_PLUGIN_BINARY_OUTPUTではなく、OUTPUT_PLUGIN_TEXTUAL_OUTPUTを設定することによって宣言できます。
この場合、textdatumが格納することができるように、すべてのデータはサーバエンコーディングでエンコードされていなければなりません。
出力プラグインには、必要に応じて発生した更新に関する通知が様々なコールバックを通じて送られます。
同時に実行されたトランザクションは、コミットした順番にデコードされます。
指定したトランザクションに含まれる更新だけがbeginとcommitの間のコールバックによってデコードされます。
明示的あるいは暗黙的にロールバックされたトランザクションは、決してデコードされません。
成功したセーブポイントは、実行された順番にセーブポイントが実行されたトランザクションの中に折り込まれます。
PREPARE TRANSACTIONを使用して2相コミット用に準備されたトランザクションも、デコードに必要な出力プラグインコールバックが提供されていればデコードされます。
ROLLBACK PREPAREDコマンドを使用して、現在準備されているトランザクションが同時にアボートされる可能性があります。
その場合、このトランザクションのロジカルデコーディングもアボートされます。
そのようなトランザクションのすべての変更は、アボートが検出され、prepare_cbコールバックが呼び出されるとスキップされます。
このように、同時にアボートされた場合でも、デコードされたROLLBACK PREPAREDを適切に処理するために十分な情報が出力プラグインに提供されます。
ディスクに安全に書きだされたトランザクションだけがデコードされます。
そのため、synchronous_commitがoffの場合には、直後に呼び出されたpg_logical_slot_get_changes()がそのCOMMITをデコードしないことがあります。
ストリームに投入可能な更新の数に関係なく、レプリケーションスロットが作られるか、ストリームの変更がリクエストされた場合にオプションのstartup_cbコールバック呼び出されます。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
is_init パラメータは、レプリケーションスロットが作られる際にはtrue、それ以外ではfalseになります。
optionsは、出力プラグインが書き込む以下の構造体を指します。
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_typeはOUTPUT_PLUGIN_TEXTUAL_OUTPUTかOUTPUT_PLUGIN_BINARY_OUTPUTのどちらかです。
49.6.3も参照してください。
receive_rewritesが真なら、何らかDDL操作時のヒープ書き換えで生じた変更に対して、出力プラグインも呼ばれます。
これはDDLレプリケーションを処理するプラグインを対象としていますが、これらは特別な処理を必要とします。
開始コールバックでは、ctx->output_plugin_optionsで指定されるオプションを検証しましょう。
出力プラグインが状態を持つ必要がある場合には、ctx->output_plugin_privateを利用できます。
以前アクティブだったレプリケーションスロットが使われなくなったら、いつでもshutdown_cbコールバックが呼び出され、出力プラグインのプライベートリソースが解放されます。
スロットは削除される必要はありません。単にストリームが停止します。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必須であるbegin_cbコールバックは、コミットしたトランザクションの開始がデコードされる際に必ず呼び出されます。
アボートしたトランザクションとその内容は決してデコードされません。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
txn引数は、コミット時のタイムスタンプやトランザクションIDなどのトランザクションに関するメタ情報を含みます。
必須であるcommit_cbコールバックは、トランザクションのコミットがデコードされる際に必ず呼び出されます。
行が更新された場合は、それぞれの行に対してchange_cbコールバックが、commit_cbの前に呼び出されます。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
トランザクション内のINSERT、UPDATE、DELETEの更新に対して、必須コールバックであるchange_cbが呼び出されます。
元の更新コマンドが複数の行を一度に更新する場合は、それぞれの行に対してこのコールバックが呼び出されます。
change_cbコールバックは、システムまたはユーザカタログテーブルにアクセスして、行変更の詳細を出力する処理を支援することができます。
準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、この変更コールバックは、まったく同じトランザクションが同時にロールバックされるためにエラーになることもあります。
この場合、このアボートされたトランザクションのロジカルデコーディングは正常に停止されます。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
ctxとtxnは、begin_cb、commit_cbコールバックでは同じ内容になります。
これに加えてrelationは行が属するリレーションを指定し、行の変更を記述するchangeパラメータが渡されます。
unloggedテーブル(UNLOGGED参照)と(TEMPORARYまたはTEMP参照)以外のユーザ定義テーブルだけが、ロジカルデコーディングを使って更新データを取得できます。
truncate_cbコールバックは、TRUNCATEコマンドに対して呼ばれます。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
パラメータはchange_cbコールバックと似ています。
しかしながら、外部キーで結びついたテーブル群のTRUNCATE動作は一緒に実行される必要があるため、このコールバックは単一リレーションではなく、リレーションの配列を受け取ります。
詳しくはTRUNCATE文の説明を参照してください。
オプションのfilter_by_origin_cbコールバックは、origin_idからリプレイされたデータがアウトプットプラグインの対象となるかどうかを判定するために呼び出されます。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
ctxパラメータは、他のコールバックと同じ内容を持ちます。
オリジンの情報だけが得られます。
渡されたノードで発生した変更が無関係であることを伝えるには、trueを返します。
これにより、その変更は無視されることになります。
無視されたトランザクション変更に関わる他のコールバックは呼び出されません。
これは、カスケード、あるいは双方向レプリケーションソリューションを実装する際に有用です。 オリジンでフィルターすることにより、そのような構成で、同じ変更のレプリケーションが往復するのを防ぐことができます。 トランザクションや変更もオリジンに関する情報を持っていますが、このコールバックでフィルターするほうがずっと効率的です。
オプションのmessage_cbコールバックは、ロジカルデコーディングメッセージがデコードされる度に呼び出されます。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
txnパラメータは、コミット時のタイムスタンプとXIDのような、トランザクションに関するメタ情報を含んでいます。
ただし、そのメッセージがトランザクション扱いではなく、メッセージをログしたトランザクションにXIDが割り当てられてない場合はNULLになることに注意してください。
lsnは、メッセージに対応するWALの位置です。
transactionalは、メッセージがトランザクションとして送られたものかどうかを表しています。
変更コールバックと同様に、準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、このメッセージコールバックも、まったく同じトランザクションの同時ロールバックのためにエラーになることがあります。
この場合、アボートされたトランザクションのロジカルデコーディングは正常に停止されます。
prefixはnull終端された任意の接頭辞で、現在のプラグインが興味のあるメッセージを特定するために利用できます。
最後に、messageパラメータは、大きさがmessage_sizeの、実際のメッセージを保持します。
出力プラグインが利用を考慮している接頭辞が一意になるように、特に注意を払ってください。 拡張の名前か、出力プラグインの名前を使うのが良い場合が多いです。
オプションのfilter_prepare_cbコールバックは、現在の2相コミットトランザクションの一部であるデータを、この準備段階でデコードするか、またはCOMMIT PREPARED時に通常の1相トランザクションとしてデコードするかを決定するために呼び出されます。
デコードをスキップするように合図するには、trueを返します。
そうでなければfalseを返します。
コールバックが定義されていない場合、falseが想定されます(すなわち、フィルタリングなしで、2相コミットを使用するすべてのトランザクションも同様に2相でデコードされます)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
ctxパラメータは他のコールバックと同じ内容です。
パラメータxidとgidは、トランザクションを識別するための2つの異なる方法を提供します。
後のCOMMIT PREPAREDまたはROLLBACK PREPAREDは両方の識別子を持ち、出力プラグインに何を使用するかの選択を提供します。
このコールバックは、デコードするトランザクションごとに複数回呼び出すことができ、呼び出されるたびにxidとgidの与えられたペアに対して同じ静的な答えを提供しなければなりません。
必須であるbegin_prepare_cbコールバックは、準備されたトランザクションの開始がデコードされるたびに呼び出されます。
txnパラメータの一部であるgidフィールドをこのコールバックで使用して、プラグインがこのPREPAREを既に受信しているかどうかをチェックできます。
この場合、エラーになるか、トランザクションの残りの変更をスキップできます。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
必須であるprepare_cbコールバックは、2相コミット用に準備されたトランザクションがデコードされるたびに呼び出されます。
修正された行がある場合、すべての修正された行に対するchange_cbコールバックはこの前に呼び出されています。
txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
必須であるcommit_prepared_cbコールバックは、トランザクションCOMMIT PREPAREDがデコードされるたびに呼び出されます。
txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
必須であるrollback_prepared_cbコールバックは、トランザクションROLLBACK PREPAREDがデコードされるたびに呼び出されます。
txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。
パラメータprepare_end_lsnとprepare_timeは、プラグインがこのPREPARE TRANSACTIONを受信したかどうかをチェックするために使用できます。
この場合、プラグインはロールバックを適用できます。
そうでない場合は、ロールバック操作をスキップできます。
gidだけでは十分ではありません。
なぜなら、下流ノードは同じ識別子を持つ準備されたトランザクションを持つことができるからです。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
stream_start_cbコールバックは、継続中のトランザクションからストリーム化された変更のブロックを開くときに呼び出されます。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
stream_stop_cbコールバックは、継続中のトランザクションからストリーム化された変更のブロックをクローズするときに呼び出されます。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
stream_abort_cbコールバックは、以前にストリーム化されたトランザクションをアボートするために呼び出されます。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
stream_prepare_cbコールバックは、2相コミットの一部として前にストリーム化されたトランザクションを準備するために呼び出されます。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
stream_commit_cbコールバックは、以前にストリーム化されたトランザクションをコミットするために呼び出されます。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
stream_change_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbとstream_stop_cb呼び出しで区切られます)で変更を送信するときに呼び出されます。
実際の変更は表示されません。
なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
stream_message_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbとstream_stop_cbコールで区切られた)で汎用メッセージを送信するときに呼び出されます。
トランザクションメッセージのメッセージ内容は表示されません。
なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
stream_truncate_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbとstream_stop_cb呼び出しで区切られます)内のTRUNCATEコマンドに対して呼び出されます。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
パラメータはstream_change_cbコールバックに類似しています。
ただし、外部キーで接続されたテーブルに対するTRUNCATEアクションは一緒に実行する必要があるため、このコールバックは単一のリレーションではなくリレーションの配列を受け取ります。
詳細はTRUNCATE文の説明を参照してください。
begin_cb、commit_cb、change_cbコールバックにおいて、出力プラグインは実際にデータ出力するためにctx->outのStringInfo出力バッファに書き込みます。
出力バッファに書き込む前に、OutputPluginPrepareWrite(ctx, last_write)を呼び出します。
また、書き込みバッファにデータを書き終えたら、OutputPluginWrite(ctx, last_write)を呼び出してデータの書き込みを実施します。
last_write引数により、その書き込みがコールバックの最終的な書き込みであるかどうかを指定します。
以下の例では、出力プラグインにおいて消費者に向けてデータを出力する方法を示します。
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);