PostgreSQLのソースコードのサブディレクトリ
contrib/test_decoding
にサンプル出力プラグインがあります。
出力プラグインは、出力プラグインの名前をライブラリのベース名として持つ共有ライブラリを動的にロードすることによってロードされます。
必要な出力プラグインコールバックを提供し、そのライブラリが実際に出力プラグインであることを示すために、_PG_output_plugin_init
という名前の関数を作成しなければなりません。
この関数には、各々のアクションに対応するコールバック関数へのポインタを持つ構造体が渡されます。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
コールバック関数のbegin_cb
、change_cb
、および、commit_cb
は必須ですが、startup_cb
、filter_by_origin_cb
、truncate_cb
、および、shutdown_cb
は必須ではありません。
truncate_cb
が設定されていないけれども、TRUNCATE
がデコードされることになった場合、この動作は無視されます。
更新データをデコード、整形、出力するために、出力関数を呼び出すことを含め、出力プラグインはバックエンドの通常のインフラストラクチャのほとんどを利用できます。
テーブルは、initdb
で作られ、pg_catalog
スキーマに含まれているか、以下のコマンドでユーザ定義のカタログテーブルであると印が付けられている限り、読み込み専用のアクセスが許可されます。
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
トランザクションIDの割り当てが発生するような動作は許可されていません。
そのような動作としては、テーブルへの書き込み、DDLの変更操作、txid_current()
の呼び出しなどがあります。
出力プラグインコールバックは、かなり自由な形式で消費者にデータを渡すことができます。
SQLで変更データを見るような場合、任意のかたちでデータを返すことのできるデータ型(たとえばbytea
)は扱いにくいです。
出力プラグインがサーバエンコーディングのテキストデータのみを含むことにするには、
OutputPluginOptions.output_type
に
OUTPUT_PLUGIN_BINARY_OUTPUT
ではなく、OUTPUT_PLUGIN_TEXTUAL_OUTPUT
を設定することによって宣言できます。
この場合、text
datumが格納することができるように、すべてのデータはサーバエンコーディングでエンコードされていなければなりません。
出力プラグインには、必要に応じて発生した更新に関する通知が様々なコールバックを通じて送られます。
同時に実行されたトランザクションは、コミットした順番にデコードされます。
指定したトランザクションに含まれる更新だけがbegin
とcommit
の間のコールバックによってデコードされます。
明示的あるいは暗黙的にロールバックされたトランザクションは、決してデコードされません。
成功したセーブポイントは、実行された順番にセーブポイントが実行されたトランザクションの中に折り込まれます。
ディスクに安全に書きだされたトランザクションだけがデコードされます。
そのため、synchronous_commit
がoff
の場合には、直後に呼び出されたpg_logical_slot_get_changes()
がそのCOMMIT
をデコードしないことがあります。
ストリームに投入可能な更新の数に関係なく、レプリケーションスロットが作られるか、ストリームの変更がリクエストされた場合にオプションのstartup_cb
コールバック呼び出されます。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
is_init
パラメータは、レプリケーションスロットが作られる際にはtrue、それ以外ではfalseになります。
options
は、出力プラグインが書き込む以下の構造体を指します。
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
はOUTPUT_PLUGIN_TEXTUAL_OUTPUT
かOUTPUT_PLUGIN_BINARY_OUTPUT
のどちらかです。
48.6.3も参照してください。
receive_rewrites
が真なら、何らかDDL操作時のヒープ書き換えで生じた変更に対して、出力プラグインも呼ばれます。
これはDDLレプリケーションを処理するプラグインを対象としていますが、これらは特別な処理を必要とします。
開始コールバックでは、ctx->output_plugin_options
で指定されるオプションを検証しましょう。
出力プラグインが状態を持つ必要がある場合には、ctx->output_plugin_private
を利用できます。
以前アクティブだったレプリケーションスロットが使われなくなったら、いつでもshutdown_cb
コールバックが呼び出され、出力プラグインのプライベートリソースが解放されます。
スロットは削除される必要はありません。単にストリームが停止します。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必須であるbegin_cb
コールバックは、コミットしたトランザクションの開始がデコードされる際に必ず呼び出されます。
アボートしたトランザクションとその内容は決してデコードされません。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
引数は、コミット時のタイムスタンプやトランザクションIDなどのトランザクションに関するメタ情報を含みます。
必須であるcommit_cb
コールバックは、トランザクションのコミットがデコードされる際に必ず呼び出されます。
行が更新された場合は、それぞれの行に対してchange_cb
コールバックが、commit_cb
の前に呼び出されます。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
トランザクション内のINSERT
、UPDATE
、DELETE
の更新に対して、必須コールバックであるchange_cb
が呼び出されます。
元の更新コマンドが複数の行を一度に更新する場合は、それぞれの行に対してこのコールバックが呼び出されます。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
ctx
とtxn
は、begin_cb
、commit_cb
コールバックでは同じ内容になります。
これに加えてrelation
は行が属するリレーションを指定し、行の変更を記述するchange
パラメータが渡されます。
unloggedテーブル(UNLOGGED
参照)と(TEMPORARY
またはTEMP
参照)以外のユーザ定義テーブルだけが、ロジカルデコーディングを使って更新データを取得できます。
truncate_cb
コールバックは、TRUNCATE
コマンドに対して呼ばれます。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
パラメータはchange_cb
コールバックと似ています。
しかしながら、外部キーで結びついたテーブル群のTRUNCATE
動作は一緒に実行される必要があるため、このコールバックは単一リレーションではなく、リレーションの配列を受け取ります。
詳しくはTRUNCATE文の説明を参照してください。
オプションのfilter_by_origin_cb
コールバックは、origin_id
からリプレイされたデータがアウトプットプラグインの対象となるかどうかを判定するために呼び出されます。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
ctx
パラメータは、他のコールバックと同じ内容を持ちます。
オリジンの情報だけが得られます。
渡されたノードで発生した変更が無関係であることを伝えるには、trueを返します。
これにより、その変更は無視されることになります。
無視されたトランザクション変更に関わる他のコールバックは呼び出されません。
これは、カスケード、あるいは双方向レプリケーションソリューションを実装する際に有用です。 オリジンでフィルターすることにより、そのような構成で、同じ変更のレプリケーションが往復するのを防ぐことができます。 トランザクションや変更もオリジンに関する情報を持っていますが、このコールバックでフィルターするほうがずっと効率的です。
オプションのmessage_cb
コールバックは、ロジカルデコーディングメッセージがデコードされる度に呼び出されます。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
パラメータは、コミット時のタイムスタンプとXIDのような、トランザクションに関するメタ情報を含んでいます。
ただし、そのメッセージがトランザクション扱いではなく、メッセージをログしたトランザクションにXIDが割り当てられてない場合はNULLになることに注意してください。
lsn
は、メッセージに対応するWALの位置です。
transactional
は、メッセージがトランザクションとして送られたものかどうかを表しています。
prefix
はnull終端された任意の接頭辞で、現在のプラグインが興味のあるメッセージを特定するために利用できます。
最後に、message
パラメータは、大きさがmessage_size
の、実際のメッセージを保持します。
出力プラグインが利用を考慮している接頭辞が一意になるように、特に注意を払ってください。 拡張の名前か、出力プラグインの名前を使うのが良い場合が多いです。
begin_cb
、commit_cb
、change_cb
コールバックにおいて、出力プラグインは実際にデータ出力するためにctx->out
のStringInfo
出力バッファに書き込みます。
出力バッファに書き込む前に、OutputPluginPrepareWrite(ctx, last_write)
を呼び出します。
また、書き込みバッファにデータを書き終えたら、OutputPluginWrite(ctx, last_write)
を呼び出してデータの書き込みを実施します。
last_write
引数により、その書き込みがコールバックの最終的な書き込みであるかどうかを指定します。
以下の例では、出力プラグインにおいて消費者に向けてデータを出力する方法を示します。
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);