PostgreSQLのソースコードのサブディレクトリ
contrib/test_decoding
にサンプル出力プラグインがあります。
出力プラグインは、出力プラグインの名前をライブラリのベース名として持つ共有ライブラリを動的にロードすることによってロードされます。
必要な出力プラグインコールバックを提供し、そのライブラリが実際に出力プラグインであることを示すために、_PG_output_plugin_initという名前の関数を作成しなければなりません。
この関数には、各々のアクションに対応するコールバック関数へのポインタを持つ構造体が渡されます。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
コールバック関数のbegin_cb、change_cb、および、commit_cbは必須ですが、startup_cb、filter_by_origin_cb、truncate_cb、および、shutdown_cbは必須ではありません。
truncate_cbが設定されていないけれども、TRUNCATEがデコードされることになった場合、この動作は無視されます。
更新データをデコード、整形、出力するために、出力関数を呼び出すことを含め、出力プラグインはバックエンドの通常のインフラストラクチャのほとんどを利用できます。
テーブルは、initdbで作られ、pg_catalogスキーマに含まれているか、以下のコマンドでユーザ定義のカタログテーブルであると印が付けられている限り、読み込み専用のアクセスが許可されます。
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
トランザクションIDの割り当てが発生するような動作は許可されていません。
そのような動作としては、テーブルへの書き込み、DDLの変更操作、txid_current()の呼び出しなどがあります。
出力プラグインコールバックは、かなり自由な形式で消費者にデータを渡すことができます。
SQLで変更データを見るような場合、任意のかたちでデータを返すことのできるデータ型(たとえばbytea)は扱いにくいです。
出力プラグインがサーバエンコーディングのテキストデータのみを含むことにするには、
OutputPluginOptions.output_typeに
OUTPUT_PLUGIN_BINARY_OUTPUTではなく、OUTPUT_PLUGIN_TEXTUAL_OUTPUTを設定することによって宣言できます。
この場合、textdatumが格納することができるように、すべてのデータはサーバエンコーディングでエンコードされていなければなりません。
出力プラグインには、必要に応じて発生した更新に関する通知が様々なコールバックを通じて送られます。
同時に実行されたトランザクションは、コミットした順番にデコードされます。
指定したトランザクションに含まれる更新だけがbeginとcommitの間のコールバックによってデコードされます。
明示的あるいは暗黙的にロールバックされたトランザクションは、決してデコードされません。
成功したセーブポイントは、実行された順番にセーブポイントが実行されたトランザクションの中に折り込まれます。
ディスクに安全に書きだされたトランザクションだけがデコードされます。
そのため、synchronous_commitがoffの場合には、直後に呼び出されたpg_logical_slot_get_changes()がそのCOMMITをデコードしないことがあります。
ストリームに投入可能な更新の数に関係なく、レプリケーションスロットが作られるか、ストリームの変更がリクエストされた場合にオプションのstartup_cbコールバック呼び出されます。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
is_init パラメータは、レプリケーションスロットが作られる際にはtrue、それ以外ではfalseになります。
optionsは、出力プラグインが書き込む以下の構造体を指します。
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_typeはOUTPUT_PLUGIN_TEXTUAL_OUTPUTかOUTPUT_PLUGIN_BINARY_OUTPUTのどちらかです。
49.6.3も参照してください。
receive_rewritesが真なら、何らかDDL操作時のヒープ書き換えで生じた変更に対して、出力プラグインも呼ばれます。
これはDDLレプリケーションを処理するプラグインを対象としていますが、これらは特別な処理を必要とします。
開始コールバックでは、ctx->output_plugin_optionsで指定されるオプションを検証しましょう。
出力プラグインが状態を持つ必要がある場合には、ctx->output_plugin_privateを利用できます。
以前アクティブだったレプリケーションスロットが使われなくなったら、いつでもshutdown_cbコールバックが呼び出され、出力プラグインのプライベートリソースが解放されます。
スロットは削除される必要はありません。単にストリームが停止します。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必須であるbegin_cbコールバックは、コミットしたトランザクションの開始がデコードされる際に必ず呼び出されます。
アボートしたトランザクションとその内容は決してデコードされません。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
txn引数は、コミット時のタイムスタンプやトランザクションIDなどのトランザクションに関するメタ情報を含みます。
必須であるcommit_cbコールバックは、トランザクションのコミットがデコードされる際に必ず呼び出されます。
行が更新された場合は、それぞれの行に対してchange_cbコールバックが、commit_cbの前に呼び出されます。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
トランザクション内のINSERT、UPDATE、DELETEの更新に対して、必須コールバックであるchange_cbが呼び出されます。
元の更新コマンドが複数の行を一度に更新する場合は、それぞれの行に対してこのコールバックが呼び出されます。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
ctxとtxnは、begin_cb、commit_cbコールバックでは同じ内容になります。
これに加えてrelationは行が属するリレーションを指定し、行の変更を記述するchangeパラメータが渡されます。
unloggedテーブル(UNLOGGED参照)と(TEMPORARYまたはTEMP参照)以外のユーザ定義テーブルだけが、ロジカルデコーディングを使って更新データを取得できます。
truncate_cbコールバックは、TRUNCATEコマンドに対して呼ばれます。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
パラメータはchange_cbコールバックと似ています。
しかしながら、外部キーで結びついたテーブル群のTRUNCATE動作は一緒に実行される必要があるため、このコールバックは単一リレーションではなく、リレーションの配列を受け取ります。
詳しくはTRUNCATE文の説明を参照してください。
オプションのfilter_by_origin_cbコールバックは、origin_idからリプレイされたデータがアウトプットプラグインの対象となるかどうかを判定するために呼び出されます。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
ctxパラメータは、他のコールバックと同じ内容を持ちます。
オリジンの情報だけが得られます。
渡されたノードで発生した変更が無関係であることを伝えるには、trueを返します。
これにより、その変更は無視されることになります。
無視されたトランザクション変更に関わる他のコールバックは呼び出されません。
これは、カスケード、あるいは双方向レプリケーションソリューションを実装する際に有用です。 オリジンでフィルターすることにより、そのような構成で、同じ変更のレプリケーションが往復するのを防ぐことができます。 トランザクションや変更もオリジンに関する情報を持っていますが、このコールバックでフィルターするほうがずっと効率的です。
オプションのmessage_cbコールバックは、ロジカルデコーディングメッセージがデコードされる度に呼び出されます。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
txnパラメータは、コミット時のタイムスタンプとXIDのような、トランザクションに関するメタ情報を含んでいます。
ただし、そのメッセージがトランザクション扱いではなく、メッセージをログしたトランザクションにXIDが割り当てられてない場合はNULLになることに注意してください。
lsnは、メッセージに対応するWALの位置です。
transactionalは、メッセージがトランザクションとして送られたものかどうかを表しています。
prefixはnull終端された任意の接頭辞で、現在のプラグインが興味のあるメッセージを特定するために利用できます。
最後に、messageパラメータは、大きさがmessage_sizeの、実際のメッセージを保持します。
出力プラグインが利用を考慮している接頭辞が一意になるように、特に注意を払ってください。 拡張の名前か、出力プラグインの名前を使うのが良い場合が多いです。
begin_cb、commit_cb、change_cbコールバックにおいて、出力プラグインは実際にデータ出力するためにctx->outのStringInfo出力バッファに書き込みます。
出力バッファに書き込む前に、OutputPluginPrepareWrite(ctx, last_write)を呼び出します。
また、書き込みバッファにデータを書き終えたら、OutputPluginWrite(ctx, last_write)を呼び出してデータの書き込みを実施します。
last_write引数により、その書き込みがコールバックの最終的な書き込みであるかどうかを指定します。
以下の例では、出力プラグインにおいて消費者に向けてデータを出力する方法を示します。
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);