Chapter 7. Debezium connector for PostgreSQL


The Debezium PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. For information about the PostgreSQL versions that are compatible with the connector, see the Debezium Supported Configurations page.

The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

Information and procedures for using a Debezium PostgreSQL connector is organized as follows:

7.1. Overview of Debezium PostgreSQL connector

PostgreSQL’s logical decoding feature was introduced in version 9.4. It is a mechanism that allows the extraction of the changes that were committed to the transaction log and the processing of these changes in a user-friendly manner with the help of an output plug-in. The output plug-in enables clients to consume the changes.

The PostgreSQL connector contains two main parts that work together to read and process database changes:

  • pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. This is the only supported logical decoding output plug-in in this Debezium release. This plug-in is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed. The Debezium connector interprets the raw replication event stream directly into change events.
  • Java code (the actual Kafka Connect connector) that reads the changes produced by the logical decoding output plug-in by using PostgreSQL’s streaming replication protocol and the PostgreSQL JDBC driver.

The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.

PostgreSQL normally purges write-ahead log (WAL) segments after some period of time. This means that the connector does not have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, the connector starts with a consistent view of all of the data, and does not omit any changes that were made while the snapshot was being taken.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the WAL position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues reading the WAL where it last left off. This includes snapshots. If the connector stops during a snapshot, the connector begins a new snapshot when it restarts.

Important

The connector relies on and reflects the PostgreSQL logical decoding feature, which has the following limitations:

  • Logical decoding does not support DDL changes. This means that the connector is unable to report DDL change events back to consumers.
  • Logical decoding replication slots are supported on only primary servers. When there is a cluster of PostgreSQL servers, the connector can run on only the active primary server. It cannot run on hot or warm standby replicas. If the primary server fails or is demoted, the connector stops. After the primary server has recovered, you can restart the connector. If a different PostgreSQL server has been promoted to primary, adjust the connector configuration before restarting the connector.

Behavior when things go wrong describes how the connector responds if there is a problem.

Important

Debezium currently supports databases with UTF-8 character encoding only. With a single byte character encoding, it is not possible to correctly process strings that contain extended ASCII code characters.

7.2. How Debezium PostgreSQL connectors work

To optimally configure and run a Debezium PostgreSQL connector, it is helpful to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata.

Details are in the following topics:

7.2.1. Security for PostgreSQL connector

To use the Debezium connector to stream changes from a PostgreSQL database, the connector must operate with specific privileges in the database. Although one way to grant the necessary privileges is to provide the user with superuser privileges, doing so potentially exposes your PostgreSQL data to unauthorized access. Rather than granting excessive privileges to the Debezium user, it is best to create a dedicated Debezium replication user to which you grant specific privileges.

For more information about configuring privileges for the Debezium PostgreSQL user, see Setting up permissions. For more information about PostgreSQL logical replication security, see the PostgreSQL documentation.

7.2.2. How Debezium PostgreSQL connectors perform database snapshots

Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments. This means that the PostgreSQL connector would be unable to see the entire history of the database by reading only the WAL. Consequently, the first time that the connector starts, it performs an initial consistent snapshot of the database. The default behavior for performing a snapshot consists of the following steps. You can change this behavior by setting the snapshot.mode connector configuration property to a value other than initial.

  1. Start a transaction with a SERIALIZABLE, READ ONLY, DEFERRABLE isolation level to ensure that subsequent reads in this transaction are against a single consistent version of the data. Any changes to the data due to subsequent INSERT, UPDATE, and DELETE operations by other clients are not visible to this transaction.
  2. Read the current position in the server’s transaction log.
  3. Scan the database tables and schemas, generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.
  4. Commit the transaction.
  5. Record the successful completion of the snapshot in the connector offsets.

If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 5 completes, upon restart the connector begins a new snapshot. After the connector completes its initial snapshot, the PostgreSQL connector continues streaming from the position that it read in Step 2. This ensures that the connector does not miss any updates. If the connector stops again for any reason, upon restart, the connector continues streaming changes from where it previously left off.

Table 7.1. Options for the snapshot.mode connector configuration property
OptionDescription

always

The connector always performs a snapshot when it starts. After the snapshot completes, the connector continues streaming changes from step 3 in the above sequence. This mode is useful in these situations:

  • It is known that some WAL segments have been deleted and are no longer available.
  • After a cluster failure, a new primary has been promoted. The always snapshot mode ensures that the connector does not miss any changes that were made after the new primary had been promoted but before the connector was restarted on the new primary.

never

The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.

initial_only

The connector performs a database snapshot and stops before streaming any change event records. If the connector had started but did not complete a snapshot before stopping, the connector restarts the snapshot process and stops when the snapshot completes.

exported

Deprecated, all modes are lockless.

7.2.2.1. Ad hoc snapshots

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only.

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. The following changes in a database might be cause for performing an ad hoc snapshot:

  • The connector configuration is modified to capture a different set of tables.
  • Kafka topics are deleted and must be rebuilt.
  • Data corruption occurs due to a configuration error or some other problem.

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad-hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table.

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled.

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database.

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental, and provide the names of the tables to include in the snapshot, as described in the following table:

Table 7.2. Example of an ad hoc execute-snapshot signal record
FieldDefaultValue

type

incremental

Specifies the type of snapshot that you want to run.
Setting the type is optional. Currently, you can request only incremental snapshots.

data-collections

N/A

An array that contains regular expressions matching the fully-qualified names of the table to be snapshotted.
The format of the names is the same as for the signal.data.collection configuration option.

additional-condition

N/A

An optional string, which specifies a condition based on the column(s) of the table(s), to capture a subset of the contents of the table(s).

Triggering an ad hoc snapshot

You initiate an ad hoc snapshot by adding an entry with the execute-snapshot signal type to the signaling table. After the connector processes the message, it begins the snapshot operation. The snapshot process reads the first and last primary key values and uses those values as the start and end point for each table. Based on the number of entries in the table, and the configured chunk size, Debezium divides the table into chunks, and proceeds to snapshot each chunk, in succession, one at a time.

Currently, the execute-snapshot action type triggers incremental snapshots only. For more information, see Incremental snapshots.

7.2.2.2. Incremental snapshots

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector.

In an incremental snapshot, instead of capturing the full state of a database all at once, as in an initial snapshot, Debezium captures each table in phases, in a series of configurable chunks. You can specify the tables that you want the snapshot to capture and the size of each chunk. The chunk size determines the number of rows that the snapshot collects during each fetch operation on the database. The default chunk size for incremental snapshots is 1 KB.

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process:

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other.
  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning.
  • You can run an incremental snapshot on demand at any time, and repeat the process as needed to adapt to database updates. For example, you might re-run a snapshot after you modify the connector configuration to add a table to its table.include.list property.

Incremental snapshot process

When you run an incremental snapshot, Debezium sorts each table by primary key and then splits the table into chunks based on the configured chunk size. Working chunk by chunk, it then captures each table row in a chunk. For each row that it captures, the snapshot emits a READ event. That event represents the value of the row when the snapshot for the chunk began.

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka.

How Debezium resolves collisions among records with the same primary key

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka.

Snapshot window

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot windows demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key..

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change.

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic.

The connector repeats the process for each snapshot chunk.

Warning

The Debezium connector for PostgreSQL does not support schema changes while an incremental snapshot is running. If a schema change is performed before the incremental snapshot start but after sending the signal then passthrough config option database.autosave is set to conservative to correctly process the schema change.

7.2.2.3. Triggering an incremental snapshot

Currently, the only way to initiate an incremental snapshot is to send an ad hoc snapshot signal to the signaling table on the source database.

You submit a signal to the signaling table as SQL INSERT queries.

After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation.

The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the kind of snapshot operation. Currently, the only valid option for snapshots operations is the default value, incremental.

To specify the tables to include in the snapshot, provide a data-collections array that lists the tables or an array of regular expressions used to match tables, for example,

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium detects that no action is required and does not perform a snapshot.

Note

If the name of a table that you want to include in a snapshot contains a dot (.) in the name of the database, schema, or table, to add the table to the data-collections array, you must escape each part of the name in double quotes.

For example, to include a table that exists in the public schema and that has the name My.Table, use the following format: "public"."My.Table".

Prerequisites

Procedure

  1. Send a SQL query to add the ad hoc incremental snapshot request to the signaling table:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    For example,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    The values of the id,type, and data parameters in the command correspond to the fields of the signaling table.

    The following table describes the parameters in the example:

    Table 7.3. Descriptions of fields in a SQL command for sending an incremental snapshot signal to the signaling table
    ItemValueDescription

    1

    myschema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database.

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request.
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its own id string as a watermarking signal.

    3

    execute-snapshot

    The type parameter specifies the operation that the signal is intended to trigger.

    4

    data-collections

    A required component of the data field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot.
    The array lists regular expressions which match tables by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling table in the signal.data.collection configuration property.

    5

    incremental

    An optional type component of the data field of a signal that specifies the kind of snapshot operation to run.
    Currently, the only valid option is the default value, incremental.
    If you do not specify a value, the connector runs an incremental snapshot.

    6

    additional-condition

    An optional string, which specifies a condition based on the column(s) of the table(s), to capture a subset of the contents of the tables. For more information about the additional-condition parameter, see Ad hoc incremental snapshots with additional-condition.

Ad hoc incremental snapshots with additional-condition

If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-condition parameter to the snapshot signal.

The SQL query for a typical snapshot takes the following form:

SELECT * FROM <tableName> ....

By adding an additional-condition parameter, you append a WHERE condition to the SQL query, as in the following example:

SELECT * FROM <tableName> WHERE <additional-condition> ....

The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

For example, suppose you have a products table that contains the following columns:

  • id (primary key)
  • color
  • quantity

If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

The additional-condition parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

The following example, shows the JSON for an incremental snapshot event that is captured by a connector.

Example: Incremental snapshot event message

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

ItemField nameDescription

1

snapshot

Specifies the type of snapshot operation to run.
Currently, the only valid option is the default value, incremental.
Specifying a type value in the SQL query that you submit to the signaling table is optional.
If you do not specify a value, the connector runs an incremental snapshot.

2

op

Specifies the event type.
The value for snapshot events is r, signifying a READ operation.

7.2.2.4. Stopping an incremental snapshot

You can also stop an incremental snapshot by sending a signal to the table on the source database. You submit a stop snapshot signal to the table by sending a SQL INSERT query.

After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress.

The query that you submit specifies the snapshot operation of incremental, and, optionally, the tables of the current running snapshot to be removed.

Prerequisites

Procedure

  1. Send a SQL query to stop the ad hoc incremental snapshot to the signaling table:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    For example,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    The values of the id, type, and data parameters in the signal command correspond to the fields of the signaling table.

    The following table describes the parameters in the example:

    Table 7.4. Descriptions of fields in a SQL command for sending a stop incremental snapshot signal to the signaling table
    ItemValueDescription

    1

    myschema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database.

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request.
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string.

    3

    stop-snapshot

    Specifies type parameter specifies the operation that the signal is intended to trigger.

    4

    data-collections

    An optional component of the data field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot.
    The array lists regular expressions which match tables by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling table in the signal.data.collection configuration property. If this component of the data field is omitted, the signal stops the entire incremental snapshot that is in progress.

    5

    incremental

    A required component of the data field of a signal that specifies the kind of snapshot operation that is to be stopped.
    Currently, the only valid option is incremental.
    If you do not specify a type value, the signal fails to stop the incremental snapshot.

7.2.3. How Debezium PostgreSQL connectors stream change event records

The PostgreSQL connector typically spends the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on PostgreSQL’s replication protocol. This protocol enables clients to receive changes from the server as they are committed in the server’s transaction log at certain positions, which are referred to as Log Sequence Numbers (LSNs).

Whenever the server commits a transaction, a separate server process invokes a callback function from the logical decoding plug-in. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of Debezium plug-in) and writes them on an output stream, which can then be consumed by clients.

The Debezium PostgreSQL connector acts as a PostgreSQL client. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the LSN of the event. The PostgreSQL connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Periodically, Kafka Connect records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the PostgreSQL connector, the LSN recorded in each change event is the offset.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to the PostgreSQL server to send the events starting just after that position.

Note

The PostgreSQL connector retrieves schema information as part of the events sent by the logical decoding plug-in. However, the connector does not retrieve information about which columns compose the primary key. The connector obtains this information from the JDBC metadata (side channel). If the primary key definition of a table changes (by adding, removing or renaming primary key columns), there is a tiny period of time when the primary key information from JDBC is not synchronized with the change event that the logical decoding plug-in generates. During this tiny period, a message could be created with an inconsistent key structure. To prevent this inconsistency, update primary key structures as follows:

  1. Put the database or an application into a read-only mode.
  2. Let Debezium process all remaining events.
  3. Stop Debezium.
  4. Update the primary key definition in the relevant table.
  5. Put the database or the application into read/write mode.
  6. Restart Debezium.

PostgreSQL 10+ logical decoding support (pgoutput)

As of PostgreSQL 10+, there is a logical replication stream mode, called pgoutput that is natively supported by PostgreSQL. This means that a Debezium PostgreSQL connector can consume that replication stream without the need for additional plug-ins. This is particularly valuable for environments where installation of plug-ins is not supported or not allowed.

For more information, see Setting up PostgreSQL.

7.2.4. Default names of Kafka topics that receive Debezium PostgreSQL change event records

By default, the PostgreSQL connector writes change events for all INSERT, UPDATE, and DELETE operations that occur in a table to a single Apache Kafka topic that is specific to that table. The connector uses the following convention to name change event topics:

topicPrefix.schemaName.tableName

The following list provides definitions for the components of the default name:

topicPrefix
The topic prefix as specified by the topic.prefix configuration property.
schemaName
The name of the database schema in which the change event occurred.
tableName
The name of the database table in which the change event occurred.

For example, suppose that fulfillment is the logical server name in the configuration for a connector that is capturing changes in a PostgreSQL installation that has a postgres database and an inventory schema that contains four tables: products, products_on_hand, customers, and orders. The connector would stream records to these four Kafka topics:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

Now suppose that the tables are not part of a specific schema but were created in the default public PostgreSQL schema. The names of the Kafka topics would be:

  • fulfillment.public.products
  • fulfillment.public.products_on_hand
  • fulfillment.public.customers
  • fulfillment.public.orders

The connector applies similar naming conventions to label its transaction metadata topics.

If the default topic name do not meet your requirements, you can configure custom topic names. To configure custom topic names, you specify regular expressions in the logical topic routing SMT. For more information about using the logical topic routing SMT to customize topic naming, see Topic routing.

7.2.5. Debezium PostgreSQL connector-generated events that represent transaction boundaries

Debezium can generate events that represent transaction boundaries and that enrich data change event messages.

Limits on when Debezium receives transaction metadata

Debezium registers and receives metadata only for transactions that occur after you deploy the connector. Metadata for transactions that occur before you deploy the connector is not available.

For every transaction BEGIN and END, Debezium generates an event that contains the following fields:

status
BEGIN or END.
id
String representation of the unique transaction identifier composed of Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN.
ts_ms
The time of a transaction boundary event (BEGIN or END event) at the data source. If the data source does not provide Debezium with the event time, then the field instead represents the time at which Debezium processes the event.
event_count (for END events)
Total number of events emmitted by the transaction.
data_collections (for END events)
An array of pairs of data_collection and event_count elements that indicates the number of events that the connector emits for changes that originate from a data collection.

Example

{
  "status": "BEGIN",
  "id": "571:53195829",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "571:53195832",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

Unless overridden via the topic.transaction option, transaction events are written to the topic named <topic.prefix>.transaction.

Change data event enrichment

When transaction metadata is enabled the data message Envelope is enriched with a new transaction field. This field provides information about every event in the form of a composite of fields:

id
String representation of unique transaction identifier.
total_order
The absolute position of the event among all events generated by the transaction.
data_collection_order
The per-data collection position of the event among all events that were emitted by the transaction.

Following is an example of a message:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
   ...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "571:53195832",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

7.3. Descriptions of Debezium PostgreSQL connector data change events

The Debezium PostgreSQL connector generates a data change event for each row-level INSERT, UPDATE, and DELETE operation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.

Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.

The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A schema field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
Table 7.5. Overview of change event basic content
ItemField nameDescription

1

schema

The first schema field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key’s payload portion. In other words, the first schema field describes the structure of the primary key, or the unique key if the table does not have a primary key, for the table that was changed.

It is possible to override the table’s primary key by setting the message.key.columns connector configuration property. In this case, the first schema field describes the structure of the key identified by that property.

2

payload

The first payload field is part of the event key. It has the structure described by the previous schema field and it contains the key for the row that was changed.

3

schema

The second schema field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value’s payload portion. In other words, the second schema describes the structure of the row that was changed. Typically, this schema contains nested schemas.

4

payload

The second payload field is part of the event value. It has the structure described by the previous schema field and it contains the actual data for the row that was changed.

By default behavior is that the connector streams change event records to topics with names that are the same as the event’s originating table.

Note

Starting with Kafka 0.10, Kafka can optionally record the event key and value with the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka.

Warning

The PostgreSQL connector ensures that all Kafka Connect schema names adhere to the Avro schema name format. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or _. Each remaining character in the logical server name and each character in the schema and table names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.

This can lead to unexpected conflicts if the logical server name, a schema name, or a table name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.

Details are in the following topics:

7.3.1. About keys in Debezium PostgreSQL change events

For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created. Alternatively, if the table has REPLICA IDENTITY set to FULL or USING INDEX there is a field for each unique key constraint.

Consider a customers table defined in the public database schema and the example of a change event key for that table.

Example table

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

Example change event key

If the topic.prefix connector configuration property has the value PostgreSQL_server, every change event for the customers table while it has this definition has the same key structure, which in JSON looks like this:

{
  "schema": { 1
    "type": "struct",
    "name": "PostgreSQL_server.public.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": { 5
      "id": "1"
  },
}
Table 7.6. Description of change event key
ItemField nameDescription

1

schema

The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’s payload portion.

2

PostgreSQL_server.inventory.customers.Key

Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed. Key schema names have the format connector-name.database-name.table-name.Key. In this example:

  • PostgreSQL_server is the name of the connector that generated this event.
  • inventory is the database that contains the table that was changed.
  • customers is the table that was updated.

3

optional

Indicates whether the event key must contain a value in its payload field. In this example, a value in the key’s payload is required. A value in the key’s payload field is optional when a table does not have a primary key.

4

fields

Specifies each field that is expected in the payload, including each field’s name, index, and schema.

5

payload

Contains the key for the row for which this change event was generated. In this example, the key, contains a single id field whose value is 1.

Note

Although the column.exclude.list and column.include.list connector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event’s key.

Warning

If the table does not have a primary or unique key, then the change event’s key is null. The rows in a table without a primary or unique key constraint cannot be uniquely identified.

7.3.2. About values in Debezium PostgreSQL change events

The value in a change event is a bit more complicated than the key. Like the key, the value has a schema section and a payload section. The schema section contains the schema that describes the Envelope structure of the payload section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

Consider the same sample table that was used to show an example of a change event key:

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

The value portion of a change event for a change to this table varies according to the REPLICA IDENTITY setting and the operation that the event is for.

Details follow in these sections:

Replica identity

REPLICA IDENTITY is a PostgreSQL-specific table-level setting that determines the amount of information that is available to the logical decoding plug-in for UPDATE and DELETE events. More specifically, the setting of REPLICA IDENTITY controls what (if any) information is available for the previous values of the table columns involved, whenever an UPDATE or DELETE event occurs.

There are 4 possible values for REPLICA IDENTITY:

  • DEFAULT - The default behavior is that UPDATE and DELETE events contain the previous values for the primary key columns of a table if that table has a primary key. For an UPDATE event, only the primary key columns with changed values are present.

    If a table does not have a primary key, the connector does not emit UPDATE or DELETE events for that table. For a table without a primary key, the connector emits only create events. Typically, a table without a primary key is used for appending messages to the end of the table, which means that UPDATE and DELETE events are not useful.

  • NOTHING - Emitted events for UPDATE and DELETE operations do not contain any information about the previous value of any table column.
  • FULL - Emitted events for UPDATE and DELETE operations contain the previous values of all columns in the table.
  • INDEX index-name - Emitted events for UPDATE and DELETE operations contain the previous values of the columns contained in the specified index. UPDATE events also contain the indexed columns with the updated values.

create events

The following example shows the value portion of a change event that the connector generates for an operation that creates data in the customers table:

{
    "schema": { 1
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value", 2
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source", 3
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "PostgreSQL_server.inventory.customers.Envelope" 4
    },
    "payload": { 5
        "before": null, 6
        "after": { 7
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 8
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]"
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 9
        "ts_ms": 1559033904863 10
    }
}
Table 7.7. Descriptions of create event value fields
ItemField nameDescription

1

schema

The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table.

2

name

In the schema section, each name field specifies the schema for a field in the value’s payload.

PostgreSQL_server.inventory.customers.Value is the schema for the payload’s before and after fields. This schema is specific to the customers table.

Names of schemas for before and after fields are of the form logicalName.tableName.Value, which ensures that the schema name is unique in the database. This means that when using the Avro converter, the resulting Avro schema for each table in each logical source has its own evolution and history.

3

name

io.debezium.connector.postgresql.Source is the schema for the payload’s source field. This schema is specific to the PostgreSQL connector. The connector uses it for all events that it generates.

4

name

PostgreSQL_server.inventory.customers.Envelope is the schema for the overall structure of the payload, where PostgreSQL_server is the connector name, inventory is the database, and customers is the table.

5

payload

The value’s actual data. This is the information that the change event is providing.

It may appear that the JSON representations of the events are much larger than the rows they describe. This is because the JSON representation must include the schema and the payload portions of the message. However, by using the Avro converter, you can significantly decrease the size of the messages that the connector streams to Kafka topics.

6

before

An optional field that specifies the state of the row before the event occurred. When the op field is c for create, as it is in this example, the before field is null since this change event is for new content.

Note

Whether or not this field is available is dependent on the REPLICA IDENTITY setting for each table.

7

after

An optional field that specifies the state of the row after the event occurred. In this example, the after field contains the values of the new row’s id, first_name, last_name, and email columns.

8

source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:

  • Debezium version
  • Connector type and name
  • Database and table that contains the new row
  • Stringified JSON array of additional offset information. The first value is always the last committed LSN, the second value is always the current LSN. Either value may be null.
  • Schema name
  • If the event was part of a snapshot
  • ID of the transaction in which the operation was performed
  • Offset of the operation in the database log
  • Timestamp for when the change was made in the database

9

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, c indicates that the operation created a row. Valid values are:

  • c = create
  • u = update
  • d = delete
  • r = read (applies to only snapshots)
  • t = truncate
  • m = message

10

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

update events

The value of a change event for an update in the sample customers table has the same schema as a create event for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in an update event. Here is an example of a change event value in an event that the connector generates for an update in the customers table:

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": { 2
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 3
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "u", 4
        "ts_ms": 1465584025523  5
    }
}
Table 7.8. Descriptions of update event value fields
ItemField nameDescription

1

before

An optional field that contains values that were in the row before the database commit. In this example, only the primary key column, id, is present because the table’s REPLICA IDENTITY setting is, by default, DEFAULT. + For an update event to contain the previous values of all columns in the row, you would have to change the customers table by running ALTER TABLE customers REPLICA IDENTITY FULL.

2

after

An optional field that specifies the state of the row after the event occurred. In this example, the first_name value is now Anne Marie.

3

source

Mandatory field that describes the source metadata for the event. The source field structure has the same fields as in a create event, but some values are different. The source metadata includes:

  • Debezium version
  • Connector type and name
  • Database and table that contains the new row
  • Schema name
  • If the event was part of a snapshot (always false for update events)
  • ID of the transaction in which the operation was performed
  • Offset of the operation in the database log
  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. In an update event value, the op field value is u, signifying that this row changed because of an update.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

Note

Updating the columns for a row’s primary/unique key changes the value of the row’s key. When a key changes, Debezium outputs three events: a DELETE event and a tombstone event with the old key for the row, followed by an event with the new key for the row. Details are in the next section.

Primary key updates

An UPDATE operation that changes a row’s primary key field(s) is known as a primary key change. For a primary key change, in place of sending an UPDATE event record, the connector sends a DELETE event record for the old key and a CREATE event record for the new (updated) key. These events have the usual structure and content, and in addition, each one has a message header related to the primary key change:

  • The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
  • The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

delete events

The value in a delete change event has the same schema portion as create and update events for the same table. The payload portion in a delete event for the sample customers table looks like this:

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": null, 2
        "source": { 3
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "d", 4
        "ts_ms": 1465581902461 5
    }
}
Table 7.9. Descriptions of delete event value fields
ItemField nameDescription

1

before

Optional field that specifies the state of the row before the event occurred. In a delete event value, the before field contains the values that were in the row before it was deleted with the database commit.

In this example, the before field contains only the primary key column because the table’s REPLICA IDENTITY setting is DEFAULT.

2

after

Optional field that specifies the state of the row after the event occurred. In a delete event value, the after field is null, signifying that the row no longer exists.

3

source

Mandatory field that describes the source metadata for the event. In a delete event value, the source field structure is the same as for create and update events for the same table. Many source field values are also the same. In a delete event value, the ts_ms and lsn field values, as well as other values, might have changed. But the source field in a delete event value provides the same metadata:

  • Debezium version
  • Connector type and name
  • Database and table that contained the deleted row
  • Schema name
  • If the event was part of a snapshot (always false for delete events)
  • ID of the transaction in which the operation was performed
  • Offset of the operation in the database log
  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. The op field value is d, signifying that this row was deleted.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

A delete change event record provides a consumer with the information it needs to process the removal of this row.

Warning

For a consumer to be able to process a delete event generated for a table that does not have a primary key, set the table’s REPLICA IDENTITY to FULL. When a table does not have a primary key and the table’s REPLICA IDENTITY is set to DEFAULT or NOTHING, a delete event has no before field.

PostgreSQL connector events are designed to work with Kafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.

Tombstone events

When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be null. To make this possible, the PostgreSQL connector follows a delete event with a special tombstone event that has the same key but a null value.

truncate events

A truncate change event signals that a table has been truncated. The message key is null in this case, the message value looks like this:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "t", 2
        "ts_ms": 1559033904961 3
    }
}
Table 7.10. Descriptions of truncate event value fields
ItemField nameDescription

1

source

Mandatory field that describes the source metadata for the event. In a truncate event value, the source field structure is the same as for create, update, and delete events for the same table, provides this metadata:

  • Debezium version
  • Connector type and name
  • Database and table that contains the new row
  • Schema name
  • If the event was part of a snapshot (always false for delete events)
  • ID of the transaction in which the operation was performed
  • Offset of the operation in the database log
  • Timestamp for when the change was made in the database

2

op

Mandatory string that describes the type of operation. The op field value is t, signifying that this table was truncated.

3

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

In case a single TRUNCATE statement applies to multiple tables, one truncate change event record for each truncated table will be emitted.

Note that since truncate events represent a change made to an entire table and don’t have a message key, unless you’re working with topics with a single partition, there are no ordering guarantees for the change events pertaining to a table (create, update, etc.) and truncate events for that table. For instance a consumer may receive an update event only after a truncate event for that table, when those events are read from different partitions.

message events

This event type is only supported through the pgoutput plugin on Postgres 14+ (Postgres Documentation)

A message event signals that a generic logical decoding message has been inserted directly into the WAL typically with the pg_logical_emit_message function. The message key is a Struct with a single field named prefix in this case, carrying the prefix specified when inserting the message. The message value looks like this for transactional messages:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", 2
        "ts_ms": 1559033904961, 3
        "message": { 4
            "prefix": "foo",
            "content": "Ymfy"
        }
    }
}

Unlike other event types, non-transactional messages will not have any associated BEGIN or END transaction events. The message value looks like this for non-transactional messages:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", 2
        "ts_ms": 1559033904961 3
        "message": { 4
            "prefix": "foo",
            "content": "Ymfy"
    }
}
Table 7.11. Descriptions of message event value fields
ItemField nameDescription

1

source

Mandatory field that describes the source metadata for the event. In a message event value, the source field structure will not have table or schema information for any message events and will only have txId if the message event is transactional.

  • Debezium version
  • Connector type and name
  • Database name
  • Schema name (always "" for message events)
  • Table name (always "" for message events)
  • If the event was part of a snapshot (always false for message events)
  • ID of the transaction in which the operation was performed (null for non-transactional message events)
  • Offset of the operation in the database log
  • Transactional messages: Timestamp for when the message was inserted into the WAL
  • Non-Transactional messages; Timestamp for when the connector encounters the message

2

op

Mandatory string that describes the type of operation. The op field value is m, signifying that this is a message event.

3

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

For transactional message events, the ts_ms attribute of the source object indicates the time that the change was made in the database for transactional message events. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

For non-transactional message events, the source object’s ts_ms indicates time at which the connector encounters the message event, while the payload.ts_ms indicates the time at which the connector processed the event. This difference is due to the fact that the commit timestamp is not present in Postgres’s generic logical message format and non-transactional logical messages are not preceded by a BEGIN event (which has timestamp information).

4

message

Field that contains the message metadata

7.4. How Debezium PostgreSQL connectors map data types

The PostgreSQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the PostgreSQL data type of the column. The following sections describe how the connector maps PostgreSQL data types to a literal type and a semantic type in event fields.

  • literal type describes how the value is literally represented using Kafka Connect schema types: INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, and STRUCT.
  • semantic type describes how the Kafka Connect schema captures the meaning of the field using the name of the Kafka Connect schema for the field.

If the default data type conversions do not meet your needs, you can create a custom converter for the connector.

Details are in the following sections:

Basic types

The following table describes how the connector maps basic types.

Table 7.12. Mappings for PostgreSQL basic data types
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

BOOLEAN

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT( > 1)

BYTES

io.debezium.data.Bits

The length schema parameter contains an integer that represents the number of bits. The resulting byte[] contains the bits in little-endian form and is sized to contain the specified number of bits. For example, numBytes = n/8 + (n % 8 == 0 ? 0 : 1) where n is the number of bits.

BIT VARYING[(M)]

BYTES

io.debezium.data.Bits

The length schema parameter contains an integer that represents the number of bits (2^31 - 1 in case no length is given for the column). The resulting byte[] contains the bits in little-endian form and is sized based on the content. The specified size (M) is stored in the length parameter of the io.debezium.data.Bits type.

SMALLINT, SMALLSERIAL

INT16

n/a

INTEGER, SERIAL

INT32

n/a

BIGINT, BIGSERIAL, OID

INT64

n/a

REAL

FLOAT32

n/a

DOUBLE PRECISION

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

CHARACTER[(M)]

STRING

n/a

CHARACTER VARYING[(M)]

STRING

n/a

TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information, where the timezone is GMT.

TIMETZ, TIME WITH TIME ZONE

STRING

io.debezium.time.ZonedTime

A string representation of a time value with timezone information, where the timezone is GMT.

INTERVAL [P]

INT64

io.debezium.time.MicroDuration
(default)

The approximate number of microseconds for a time interval using the 365.25 / 12.0 formula for days per month average.

INTERVAL [P]

STRING

io.debezium.time.Interval
(when interval.handling.mode is set to string)

The string representation of the interval value that follows the pattern P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S, for example, P1Y2M3DT4H5M6.78S.

BYTEA

BYTES or STRING

n/a

Either the raw bytes (the default), a base64-encoded string, or a base64-url-safe-encoded String, or a hex-encoded string, based on the connector’s binary handling mode setting.

Debezium only supports Postgres bytea_output configuration of value hex. For more information about PostgreSQL binary data types, see the PostgreSQL documentation.

JSON, JSONB

STRING

io.debezium.data.Json

Contains the string representation of a JSON document, array, or scalar.

XML

STRING

io.debezium.data.Xml

Contains the string representation of an XML document.

UUID

STRING

io.debezium.data.Uuid

Contains the string representation of a PostgreSQL UUID value.

POINT

STRUCT

io.debezium.data.geometry.Point

Contains a structure with two FLOAT64 fields, (x,y). Each field represents the coordinates of a geometric point.

LTREE

STRING

io.debezium.data.Ltree

Contains the string representation of a PostgreSQL LTREE value.

CITEXT

STRING

n/a

INET

STRING

n/a

INT4RANGE

STRING

n/a

Range of integer.

INT8RANGE

STRING

n/a

Range of bigint.

NUMRANGE

STRING

n/a

Range of numeric.

TSRANGE

STRING

n/a

Contains the string representation of a timestamp range without a time zone.

TSTZRANGE

STRING

n/a

Contains the string representation of a timestamp range with the local system time zone.

DATERANGE

STRING

n/a

Contains the string representation of a date range. It always has an exclusive upper-bound.

ENUM

STRING

io.debezium.data.Enum

Contains the string representation of the PostgreSQL ENUM value. The set of allowed values is maintained in the allowed schema parameter.

Temporal types

Other than PostgreSQL’s TIMESTAMPTZ and TIMETZ data types, which contain time zone information, how temporal types are mapped depends on the value of the time.precision.mode connector configuration property. The following sections describe these mappings:

time.precision.mode=adaptive

When the time.precision.mode property is set to adaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database.

Table 7.13. Mappings when time.precision.mode is adaptive
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

DATE

INT32

io.debezium.time.Date

Represents the number of days since the epoch.

TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

Represents the number of milliseconds past midnight, and does not include timezone information.

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

Represents the number of microseconds past midnight, and does not include timezone information.

TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds since the epoch, and does not include timezone information.

TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

Represents the number of microseconds since the epoch, and does not include timezone information.

time.precision.mode=adaptive_time_microseconds

When the time.precision.mode configuration property is set to adaptive_time_microseconds, the connector determines the literal type and semantic type for temporal types based on the column’s data type definition. This ensures that events exactly represent the values in the database, except all TIME fields are captured as microseconds.

Table 7.14. Mappings when time.precision.mode is adaptive_time_microseconds
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

DATE

INT32

io.debezium.time.Date

Represents the number of days since the epoch.

TIME([P])

INT64

io.debezium.time.MicroTime

Represents the time value in microseconds and does not include timezone information. PostgreSQL allows precision P to be in the range 0-6 to store up to microsecond precision.

TIMESTAMP(1) , TIMESTAMP(2), TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds past the epoch, and does not include timezone information.

TIMESTAMP(4) , TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

Represents the number of microseconds past the epoch, and does not include timezone information.

time.precision.mode=connect

When the time.precision.mode configuration property is set to connect, the connector uses Kafka Connect logical types. This may be useful when consumers can handle only the built-in Kafka Connect logical types and are unable to handle variable-precision time values. However, since PostgreSQL supports microsecond precision, the events generated by a connector with the connect time precision mode results in a loss of precision when the database column has a fractional second precision value that is greater than 3.

Table 7.15. Mappings when time.precision.mode is connect
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

DATE

INT32

org.apache.kafka.connect.data.Date

Represents the number of days since the epoch.

TIME([P])

INT64

org.apache.kafka.connect.data.Time

Represents the number of milliseconds since midnight, and does not include timezone information. PostgreSQL allows P to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when P is greater than 3.

TIMESTAMP([P])

INT64

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since the epoch, and does not include timezone information. PostgreSQL allows P to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when P is greater than 3.

TIMESTAMP type

The TIMESTAMP type represents a timestamp without time zone information. Such columns are converted into an equivalent Kafka Connect value based on UTC. For example, the TIMESTAMP value "2018-06-20 15:13:16.945104" is represented by an io.debezium.time.MicroTimestamp with the value "1529507596945104" when time.precision.mode is not set to connect.

The timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.

PostgreSQL supports using +/-infinite values in TIMESTAMP columns. These special values are converted to timestamps with value 9223372036825200000 in case of positive infinity or -9223372036832400000 in case of negative infinity. This behavior mimics the standard behavior of the PostgreSQL JDBC driver. For reference, see the org.postgresql.PGStatement interface.

Decimal types

The setting of the PostgreSQL connector configuration property decimal.handling.mode determines how the connector maps decimal types.

When the decimal.handling.mode property is set to precise, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL, NUMERIC and MONEY columns. This is the default mode.

Table 7.16. Mappings when decimal.handling.mode is precise
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer representing how many digits the decimal point was shifted.

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer representing how many digits the decimal point was shifted.

MONEY[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer representing how many digits the decimal point was shifted. The scale schema parameter is determined by the money.fraction.digits connector configuration property.

There is an exception to this rule. When the NUMERIC or DECIMAL types are used without scale constraints, the values coming from the database have a different (variable) scale for each value. In this case, the connector uses io.debezium.data.VariableScaleDecimal, which contains both the value and the scale of the transferred value.

Table 7.17. Mappings of DECIMAL and NUMERIC types when there are no scale constraints
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

NUMERIC

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale of type INT32 that contains the scale of the transferred value and value of type BYTES containing the original value in an unscaled form.

DECIMAL

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale of type INT32 that contains the scale of the transferred value and value of type BYTES containing the original value in an unscaled form.

When the decimal.handling.mode property is set to double, the connector represents all DECIMAL, NUMERIC and MONEY values as Java double values and encodes them as shown in the following table.

Table 7.18. Mappings when decimal.handling.mode is double
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name)

NUMERIC[(M[,D])]

FLOAT64

 

DECIMAL[(M[,D])]

FLOAT64

 

MONEY[(M[,D])]

FLOAT64

 

The last possible setting for the decimal.handling.mode configuration property is string. In this case, the connector represents DECIMAL, NUMERIC and MONEY values as their formatted string representation, and encodes them as shown in the following table.

Table 7.19. Mappings when decimal.handling.mode is string
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name)

NUMERIC[(M[,D])]

STRING

 

DECIMAL[(M[,D])]

STRING

 

MONEY[(M[,D])]

STRING

 

PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN.

HSTORE type

The setting of the PostgreSQL connector configuration property hstore.handling.mode determines how the connector maps HSTORE values.

When the dhstore.handling.mode property is set to json (the default), the connector represents HSTORE values as string representations of JSON values and encodes them as shown in the following table. When the hstore.handling.mode property is set to map, the connector uses the MAP schema type for HSTORE values.

Table 7.20. Mappings for HSTORE data type
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

HSTORE

STRING

io.debezium.data.Json

Example: output representation using the JSON converter is {"key" : "val"}

HSTORE

MAP

n/a

Example: output representation using the JSON converter is {"key" : "val"}

Domain types

PostgreSQL supports user-defined types that are based on other underlying types. When such column types are used, Debezium exposes the column’s representation based on the full type hierarchy.

Important

Capturing changes in columns that use PostgreSQL domain types requires special consideration. When a column is defined to contain a domain type that extends one of the default database types and the domain type defines a custom length or scale, the generated schema inherits that defined length or scale.

When a column is defined to contain a domain type that extends another domain type that defines a custom length or scale, the generated schema does not inherit the defined length or scale because that information is not available in the PostgreSQL driver’s column metadata.

Network address types

PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions.

Table 7.21. Mappings for network address types
PostgreSQL data typeLiteral type (schema type)Semantic type (schema name) and Notes

INET

STRING

n/a

IPv4 and IPv6 networks

CIDR

STRING

n/a

IPv4 and IPv6 hosts and networks

MACADDR

STRING

n/a

MAC addresses

MACADDR8

STRING

n/a

MAC addresses in EUI-64 format

PostGIS types

The PostgreSQL connector supports all PostGIS data types.

Table 7.22. Mappings of PostGIS data types
PostGIS data typeLiteral type (schema type)Semantic type (schema name) and Notes

GEOMETRY
(planar)

STRUCT

io.debezium.data.geometry.Geometry

Contains a structure with two fields:

  • srid (INT32) - Spatial Reference System Identifier that defines what type of geometry object is stored in the structure.
  • wkb (BYTES) - A binary representation of the geometry object encoded in the Well-Known-Binary format.

For format details, see Open Geospatial Consortium Simple Features Access specification.

GEOGRAPHY
(spherical)

STRUCT

io.debezium.data.geometry.Geography

Contains a structure with two fields:

  • srid (INT32) - Spatial Reference System Identifier that defines what type of geography object is stored in the structure.
  • wkb (BYTES) - A binary representation of the geometry object encoded in the Well-Known-Binary format.

For format details, see Open Geospatial Consortium Simple Features Access specification.

Toasted values

PostgreSQL has a hard limit on the page size. This means that values that are larger than around 8 KBs need to be stored by using TOAST storage. This impacts replication messages that are coming from the database. Values that were stored by using the TOAST mechanism and that have not been changed are not included in the message, unless they are part of the table’s replica identity. There is no safe way for Debezium to read the missing value out-of-bands directly from the database, as this would potentially lead to race conditions. Consequently, Debezium follows these rules to handle toasted values:

  • Tables with REPLICA IDENTITY FULL - TOAST column values are part of the before and after fields in change events just like any other column.
  • Tables with REPLICA IDENTITY DEFAULT - When receiving an UPDATE event from the database, any unchanged TOAST column value that is not part of the replica identity is not contained in the event. Similarly, when receiving a DELETE event, no TOAST columns, if any, are in the before field. As Debezium cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property, unavailable.value.placeholder.

Default values

If a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the Kafka schema whenever possible. Most common data types are supported, including:

  • BOOLEAN
  • Numeric types (INT, FLOAT, NUMERIC, etc.)
  • Text types (CHAR, VARCHAR, TEXT, etc.)
  • Temporal types (DATE, TIME, INTERVAL, TIMESTAMP, TIMESTAMPTZ)
  • JSON, JSONB, XML
  • UUID

Note that for temporal types, parsing of the default value is provided by PostgreSQL libraries; therefore, any string representation which is normally supported by PostgreSQL should also be supported by the connector.

In the case that the default value is generated by a function rather than being directly specified in-line, the connector will instead export the equivalent of 0 for the given data type. These values include:

  • FALSE for BOOLEAN
  • 0 with appropriate precision, for numeric types
  • Empty string for text/XML types
  • {} for JSON types
  • 1970-01-01 for DATE, TIMESTAMP, TIMESTAMPTZ types
  • 00:00 for TIME
  • EPOCH for INTERVAL
  • 00000000-0000-0000-0000-000000000000 for UUID

This support currently extends only to explicit usage of functions. For example, CURRENT_TIMESTAMP(6) is supported with parentheses, but CURRENT_TIMESTAMP is not.

Important

Support for the propagation of default values exists primarily to allow for safe schema evolution when using the PostgreSQL connector with a schema registry which enforces compatibility between schema versions. Due to this primary concern, as well as the refresh behaviours of the different plug-ins, the default value present in the Kafka schema is not guaranteed to always be in-sync with the default value in the database schema.

  • Default values may appear 'late' in the Kafka schema, depending on when/how a given plugin triggers refresh of the in-memory schema. Values may never appear/be skipped in the Kafka schema if the default changes multiple times in-between refreshes
  • Default values may appear 'early' in the Kafka schema, if a schema refresh is triggered while the connector has records waiting to be processed. This is due to the column metadata being read from the database at refresh time, rather than being present in the replication message. This may occur if the connector is behind and a refresh occurs, or on connector start if the connector was stopped for a time while updates continued to be written to the source database.

This behaviour may be unexpected, but it is still safe. Only the schema definition is affected, while the real values present in the message will remain consistent with what was written to the source database.

7.5. Setting up PostgreSQL to run a Debezium connector

This release of Debezium supports only the native pgoutput logical replication stream. To set up PostgreSQL so that it uses the pgoutput plug-in, you must enable a replication slot, and configure a user with sufficient privileges to perform the replication.

Details are in the following topics:

7.5.1. Configuring a replication slot for the Debezium pgoutput plug-in

PostgreSQL’s logical decoding uses replication slots. To configure a replication slot, specify the following in the postgresql.conf file:

wal_level=logical
max_wal_senders=1
max_replication_slots=1

These settings instruct the PostgreSQL server as follows:

  • wal_level - Use logical decoding with the write-ahead log.
  • max_wal_senders - Use a maximum of one separate process for processing WAL changes.
  • max_replication_slots - Allow a maximum of one replication slot to be created for streaming WAL changes.

Replication slots are guaranteed to retain all WAL entries that are required for Debezium even during Debezium outages. Consequently, it is important to closely monitor replication slots to avoid:

  • Too much disk consumption
  • Any conditions, such as catalog bloat, that can happen if a replication slot stays unused for too long

For more information, see the PostgreSQL documentation for replication slots.

Note

Familiarity with the mechanics and configuration of the PostgreSQL write-ahead log is helpful for using the Debezium PostgreSQL connector.

7.5.2. Setting up PostgreSQL permissions for the Debezium connector

Setting up a PostgreSQL server to run a Debezium connector requires a database user that can perform replications. Replication can be performed only by a database user that has appropriate permissions and only for a configured number of hosts.

Although, by default, superusers have the necessary REPLICATION and LOGIN roles, as mentioned in Security, it is best not to provide the Debezium replication user with elevated privileges. Instead, create a Debezium user that has the minimum required privileges.

Prerequisites

  • PostgreSQL administrative permissions.

Procedure

  1. To provide a user with replication permissions, define a PostgreSQL role that has at least the REPLICATION and LOGIN permissions, and then grant that role to the user. For example:

    CREATE ROLE <name> REPLICATION LOGIN;

7.5.3. Setting privileges to enable Debezium to create PostgreSQL publications

Debezium streams change events for PostgreSQL source tables from publications that are created for the tables. Publications contain a filtered set of change events that are generated from one or more tables. The data in each publication is filtered based on the publication specification. The specification can be created by the PostgreSQL database administrator or by the Debezium connector. To permit the Debezium PostgreSQL connector to create publications and specify the data to replicate to them, the connector must operate with specific privileges in the database.

There are several options for determining how publications are created. In general, it is best to manually create publications for the tables that you want to capture, before you set up the connector. However, you can configure your environment in a way that permits Debezium to create publications automatically, and to specify the data that is added to them.

Debezium uses include list and exclude list properties to specify how data is inserted in the publication. For more information about the options for enabling Debezium to create publications, see publication.autocreate.mode.

For Debezium to create a PostgreSQL publication, it must run as a user that has the following privileges:

  • Replication privileges in the database to add the table to a publication.
  • CREATE privileges on the database to add publications.
  • SELECT privileges on the tables to copy the initial table data. Table owners automatically have SELECT permission for the table.

To add tables to a publication, the user must be an owner of the table. But because the source table already exists, you need a mechanism to share ownership with the original owner. To enable shared ownership, you create a PostgreSQL replication group, and then add the existing table owner and the replication user to the group.

Procedure

  1. Create a replication group.

    CREATE ROLE <replication_group>;
  2. Add the original owner of the table to the group.

    GRANT REPLICATION_GROUP TO <original_owner>;
  3. Add the Debezium replication user to the group.

    GRANT REPLICATION_GROUP TO <replication_user>;
  4. Transfer ownership of the table to <replication_group>.

    ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;

For Debezium to specify the capture configuration, the value of publication.autocreate.mode must be set to filtered.

7.5.4. Configuring PostgreSQL to allow replication with the Debezium connector host

To enable Debezium to replicate PostgreSQL data, you must configure the database to permit replication with the host that runs the PostgreSQL connector. To specify the clients that are permitted to replicate with the database, add entries to the PostgreSQL host-based authentication file, pg_hba.conf. For more information about the pg_hba.conf file, see the PostgreSQL documentation.

Procedure

  • Add entries to the pg_hba.conf file to specify the Debezium connector hosts that can replicate with the database host. For example,

    pg_hba.conf file example:

    local   replication     <youruser>                          trust   1
    host    replication     <youruser>  127.0.0.1/32            trust   2
    host    replication     <youruser>  ::1/128                 trust   3

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
    Instructs the server to allow replication for <youruser> locally, that is, on the server machine.
    2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
    Instructs the server to allow <youruser> on localhost to receive replication changes using IPV4.
    3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3
    Instructs the server to allow <youruser> on localhost to receive replication changes using IPV6.
Note

For more information about network masks, see the PostgreSQL documentation.

7.5.5. Configuring PostgreSQL to manage Debezium WAL disk space consumption

In certain cases, it is possible for PostgreSQL disk space consumed by WAL files to spike or increase out of usual proportions. There are several possible reasons for this situation:

  • The LSN up to which the connector has received data is available in the confirmed_flush_lsn column of the server’s pg_replication_slots view. Data that is older than this LSN is no longer available, and the database is responsible for reclaiming the disk space.

    Also in the pg_replication_slots view, the restart_lsn column contains the LSN of the oldest WAL that the connector might require. If the value for confirmed_flush_lsn is regularly increasing and the value of restart_lsn lags then the database needs to reclaim the space.

    The database typically reclaims disk space in batch blocks. This is expected behavior and no action by a user is necessary.

  • There are many updates in a database that is being tracked but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. This situation can be easily solved with periodic heartbeat events. Set the heartbeat.interval.ms connector configuration property.
  • The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:

    • Enable periodic heartbeat record generation with the heartbeat.interval.ms connector configuration property.
    • Regularly emit change events from the database for which Debezium is capturing changes.

    A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of the heartbeat.action.query connector configuration property.

7.6. Deployment of Debezium PostgreSQL connectors

You can use either of the following methods to deploy a Debezium PostgreSQL connector:

7.6.1. PostgreSQL connector deployment using AMQ Streams

Beginning with Debezium 1.7, the preferred method for deploying a Debezium connector is to use AMQ Streams to build a Kafka Connect container image that includes the connector plug-in.

During the deployment process, you create and use the following custom resources (CRs):

  • A KafkaConnect CR that defines your Kafka Connect instance and includes information about the connector artifacts needs to include in the image.
  • A KafkaConnector CR that provides details that include information the connector uses to access the source database. After AMQ Streams starts the Kafka Connect pod, you start the connector by applying the KafkaConnector CR.

In the build specification for the Kafka Connect image, you can specify the connectors that are available to deploy. For each connector plug-in, you can also specify other components that you want to make available for deployment. For example, you can add Service Registry artifacts, or the Debezium scripting component. When AMQ Streams builds the Kafka Connect image, it downloads the specified artifacts, and incorporates them into the image.

The spec.build.output parameter in the KafkaConnect CR specifies where to store the resulting Kafka Connect container image. Container images can be stored in a Docker registry, or in an OpenShift ImageStream. To store images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect. ImageStreams are not created automatically.

Note

If you use a KafkaConnect resource to create a cluster, afterwards you cannot use the Kafka Connect REST API to create or update connectors. You can still use the REST API to retrieve information.

7.6.2. Using AMQ Streams to deploy a Debezium PostgreSQL connector

With earlier versions of AMQ Streams, to deploy Debezium connectors on OpenShift, you were required to first build a Kafka Connect image for the connector. The current preferred method for deploying connectors on OpenShift is to use a build configuration in AMQ Streams to automatically build a Kafka Connect container image that includes the Debezium connector plug-ins that you want to use.

During the build process, the AMQ Streams Operator transforms input parameters in a KafkaConnect custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository or another configured HTTP server.

The newly created container is pushed to the container registry that is specified in .spec.build.output, and is used to deploy a Kafka Connect cluster. After AMQ Streams builds the Kafka Connect image, you create KafkaConnector custom resources to start the connectors that are included in the build.

Prerequisites

  • You have access to an OpenShift cluster on which the cluster Operator is installed.
  • The AMQ Streams Operator is running.
  • An Apache Kafka cluster is deployed as documented in Deploying and Upgrading AMQ Streams on OpenShift.
  • Kafka Connect is deployed on AMQ Streams
  • You have a Red Hat Integration license.
  • The OpenShift oc CLI client is installed or you have access to the OpenShift Container Platform web console.
  • Depending on how you intend to store the Kafka Connect build image, you need registry permissions or you must create an ImageStream resource:

    To store the build image in an image registry, such as Red Hat Quay.io or Docker Hub
    • An account and permissions to create and manage images in the registry.
    To store the build image as a native OpenShift ImageStream

Procedure

  1. Log in to the OpenShift cluster.
  2. Create a Debezium KafkaConnect custom resource (CR) for the connector, or modify an existing one. For example, create a KafkaConnect CR with the name dbz-connect.yaml that specifies the metadata.annotations and spec.build properties. The following example shows an excerpt from a dbz-connect.yaml file that describes a KafkaConnect custom resource.

    Example 7.1. A dbz-connect.yaml file that defines a KafkaConnect custom resource that includes a Debezium connector

    In the example that follows, the custom resource is configured to download the following artifacts:

    • The Debezium PostgreSQL connector archive.
    • The Service Registry archive. The Service Registry is an optional component. Add the Service Registry component only if you intend to use Avro serialization with the connector.
    • The Debezium scripting SMT archive and the associated scripting engine that you want to use with the Debezium connector. The SMT archive and scripting language dependencies are optional components. Add these components only if you intend to use the Debezium content-based routing SMT or filter SMT.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.3.1
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-postgres
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.1.4.Final-redhat-00001/debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.3.0.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.3.0.Final-redhat-<build-number>.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.1.4.Final-redhat-00001/debezium-scripting-2.1.4.Final-redhat-00001.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    Table 7.23. Descriptions of Kafka Connect configuration settings
    ItemDescription

    1

    Sets the strimzi.io/use-connector-resources annotation to "true" to enable the Cluster Operator to use KafkaConnector resources to configure connectors in this Kafka Connect cluster.

    2

    The spec.build configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.

    3

    The build.output specifies the registry in which the newly built image is stored.

    4

    Specifies the name and image name for the image output. Valid values for output.type are docker to push into a container registry such as Docker Hub or Quay, or imagestream to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying the build.output in the KafkaConnect configuration, see the AMQ Streams Build schema reference in Configuring AMQ Streams on OpenShift.

    5

    The plugins configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-in name, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.

    6

    The value of artifacts.type specifies the file type of the artifact specified in the artifacts.url. Valid types are zip, tgz, or jar. Debezium connector archives are provided in .zip file format. The type value must match the type of the file that is referenced in the url field.

    7

    The value of artifacts.url specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. Debezium connector artifacts are available in the Red Hat Maven repository. The OpenShift cluster must have access to the specified server.

    8

    (Optional) Specifies the artifact type and url for downloading the Service Registry component. Include the Service Registry artifact, only if you want the connector to use Apache Avro to serialize event keys and values with the Service Registry, instead of using the default JSON converter.

    9

    (Optional) Specifies the artifact type and url for the Debezium scripting SMT archive to use with the Debezium connector. Include the scripting SMT only if you intend to use the Debezium content-based routing SMT or filter SMT To use the scripting SMT, you must also deploy a JSR 223-compliant scripting implementation, such as groovy.

    10

    (Optional) Specifies the artifact type and url for the JAR files of a JSR 223-compliant scripting implementation, which is required by the Debezium scripting SMT.

    Important

    If you use AMQ Streams to incorporate the connector plug-in into your Kafka Connect image, for each of the required scripting language components artifacts.url must specify the location of a JAR file, and the value of artifacts.type must also be set to jar. Invalid values cause the connector fails at runtime.

    To enable use of the Apache Groovy language with the scripting SMT, the custom resource in the example retrieves JAR files for the following libraries:

    • groovy
    • groovy-jsr223 (scripting agent)
    • groovy-json (module for parsing JSON strings)

    As an alternative, the Debezium scripting SMT also supports the use of the JSR 223 implementation of GraalVM JavaScript.

  3. Apply the KafkaConnect build specification to the OpenShift cluster by entering the following command:

    oc create -f dbz-connect.yaml

    Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
    After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.

  4. Create a KafkaConnector resource to define an instance of each connector that you want to deploy.
    For example, create the following KafkaConnector CR, and save it as postgresql-inventory-connector.yaml

    Example 7.2. postgresql-inventory-connector.yaml file that defines the KafkaConnector custom resource for a Debezium connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-postgresql 1
    spec:
      class: io.debezium.connector.postgresql.PostgresConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5
        database.port: 5432   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-postgresql 10
        table.include.list: public.inventory  11
    
        ...
    Table 7.24. Descriptions of connector configuration settings
    ItemDescription

    1

    The name of the connector to register with the Kafka Connect cluster.

    2

    The name of the connector class.

    3

    The number of tasks that can operate concurrently.

    4

    The connector’s configuration.

    5

    The address of the host database instance.

    6

    The port number of the database instance.

    7

    The name of the account that Debezium uses to connect to the database.

    8

    The password that Debezium uses to connect to the database user account.

    9

    The name of the database to capture changes from.

    10

    The topic prefix for the database instance or cluster.
    The specified name must be formed only from alphanumeric characters or underscores.
    Because the topic prefix is used as the prefix for any Kafka topics that receive change events from this connector, the name must be unique among the connectors in the cluster.
    This namespace is also used in the names of related Kafka Connect schemas, and the namespaces of a corresponding Avro schema if you integrate the connector with the Avro connector.

    11

    The list of tables from which the connector captures change events.

  5. Create the connector resource by running the following command:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    For example,

    oc create -n debezium -f {context}-inventory-connector.yaml

    The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by spec.config.database.dbname in the KafkaConnector CR. After the connector pod is ready, Debezium is running.

You are now ready to verify the Debezium PostgreSQL deployment.

7.6.3. Deploying a Debezium PostgreSQL connector by building a custom Kafka Connect container image from a Dockerfile

To deploy a Debezium PostgreSQL connector, you need to build a custom Kafka Connect container image that contains the Debezium connector archive and push this container image to a container registry. You then need to create two custom resources (CRs):

  • A KafkaConnect CR that defines your Kafka Connect instance. The image property in the CR specifies the name of the container image that you create to run your Debezium connector. You apply this CR to the OpenShift instance where Red Hat AMQ Streams is deployed. AMQ Streams offers operators and images that bring Apache Kafka to OpenShift.
  • A KafkaConnector CR that defines your Debezium Db2 connector. Apply this CR to the same OpenShift instance where you applied the KafkaConnect CR.

Prerequisites

  • PostgreSQL is running and you performed the steps to set up PostgreSQL to run a Debezium connector.
  • AMQ Streams is deployed on OpenShift and is running Apache Kafka and Kafka Connect. For more information, see Deploying and Upgrading AMQ Streams on OpenShift.
  • Podman or Docker is installed.
  • You have an account and permissions to create and manage containers in the container registry (such as quay.io or docker.io) to which you plan to add the container that will run your Debezium connector.

Procedure

  1. Create the Debezium PostgreSQL container for Kafka Connect:

    1. Create a Dockerfile that uses registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12 as the base image. For example, from a terminal window, enter the following command:

      cat <<EOF >debezium-container-for-postgresql.yaml 1
      FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.1.4.Final-redhat-00001/debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip \
      && unzip debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip \
      && rm debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      ItemDescription

      1

      You can specify any file name that you want.

      2

      Specifies the path to your Kafka Connect plug-ins directory. If your Kafka Connect plug-ins directory is in a different location, replace this path with the actual path of your directory.

      The command creates a Dockerfile with the name debezium-container-for-postgresql.yaml in the current directory.

    2. Build the container image from the debezium-container-for-postgresql.yaml Docker file that you created in the previous step. From the directory that contains the file, open a terminal window and enter one of the following commands:

      podman build -t debezium-container-for-postgresql:latest .
      docker build -t debezium-container-for-postgresql:latest .

      The build command builds a container image with the name debezium-container-for-postgresql.

    3. Push your custom image to a container registry such as quay.io or an internal container registry. The container registry must be available to the OpenShift instance where you want to deploy the image. Enter one of the following commands:

      podman push <myregistry.io>/debezium-container-for-postgresql:latest
      docker push <myregistry.io>/debezium-container-for-postgresql:latest
    4. Create a new Debezium PostgreSQL KafkaConnect custom resource (CR). For example, create a KafkaConnect CR with the name dbz-connect.yaml that specifies annotations and image properties. The following example shows an excerpt from a dbz-connect.yaml file that describes a KafkaConnect custom resource.

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-postgresql 2
      
        ...
      ItemDescription

      1

      metadata.annotations indicates to the Cluster Operator that KafkaConnector resources are used to configure connectors in this Kafka Connect cluster.

      2

      spec.image specifies the name of the image that you created to run your Debezium connector. This property overrides the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable in the Cluster Operator.

    5. Apply your KafkaConnect CR to the OpenShift Kafka instance by running the following command:

      oc create -f dbz-connect.yaml

      This updates your Kafka Connect environment in OpenShift to add a Kafka Connector instance that specifies the name of the image that you created to run your Debezium connector.

  2. Create a KafkaConnector custom resource that configures your Debezium PostgreSQL connector instance.

    You configure a Debezium PostgreSQL connector in a .yaml file that specifies the configuration properties for the connector. The connector configuration might instruct Debezium to produce events for a subset of the schemas and tables, or it might set properties so that Debezium ignores, masks, or truncates values in specified columns that are sensitive, too large, or not needed. For the complete list of the configuration properties that you can set for the Debezium PostgreSQL connector, see PostgreSQL connector properties.

    The following example shows an excerpt from a custom resource that configures a Debezium connector that connects to a PostgreSQL server host, 192.168.99.100, on port 5432. This host has a database named sampledb, a schema named public, and inventory-connector-postgresql is the server’s logical name.

    inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-postgresql  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1  2
        config:  3
          database.hostname: 192.168.99.100   4
          database.port: 5432
          database.user: debezium
          database.password: dbz
          database.dbname: sampledb
          topic.prefix: inventory-connector-postgresql   5
          schema.include.list: public   6
          plugin.name: pgoutput    7
    
          ...

    1 1 1 1 1
    The name of the connector.
    2 2 2 2 2
    Only one task should operate at any one time. Because the PostgreSQL connector reads the PostgreSQL server’s binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.
    3 3 3
    The connector’s configuration.
    4 4 4
    The name of the database host that is running the PostgreSQL server. In this example, the database host name is 192.168.99.100.
    5 5 5
    A unique topic prefix. The server name is the logical identifier for the PostgreSQL server or cluster of servers. This name is used as the prefix for all Kafka topics that receive change event records.
    6 6 6
    The connector captures changes in only the public schema. It is possible to configure the connector to capture changes in only the tables that you choose. For more information, see table.include.list.
    7 7 7
    The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. While the only supported value for PostgreSQL 10 and later is pgoutput, you must explicitly set plugin.name to pgoutput.
  3. Create your connector instance with Kafka Connect. For example, if you saved your KafkaConnector resource in the inventory-connector.yaml file, you would run the following command:

    oc apply -f inventory-connector.yaml

    This registers inventory-connector and the connector starts to run against the sampledb database as defined in the KafkaConnector CR.

Results

After the connector starts, it performs a consistent snapshot of the PostgreSQL server databases that the connector is configured for. The connector then starts generating data change events for row-level operations and streaming change event records to Kafka topics.

7.6.4. Verifying that the Debezium PostgreSQL connector is running

If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.

To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):

  • Verify the connector status.
  • Verify that the connector generates topics.
  • Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.

Prerequisites

  • A Debezium connector is deployed to AMQ Streams on OpenShift.
  • The OpenShift oc CLI client is installed.
  • You have access to the OpenShift Container Platform web console.

Procedure

  1. Check the status of the KafkaConnector resource by using one of the following methods:

    • From the OpenShift Container Platform web console:

      1. Navigate to Home Search.
      2. On the Search page, click Resources to open the Select Resource box, and then type KafkaConnector.
      3. From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector-postgresql.
      4. In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
    • From a terminal window:

      1. Enter the following command:

        oc describe KafkaConnector <connector-name> -n <project>

        For example,

        oc describe KafkaConnector inventory-connector-postgresql -n debezium

        The command returns status information that is similar to the following output:

        Example 7.3. KafkaConnector resource status

        Name:         inventory-connector-postgresql
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-postgresql
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-postgresql.inventory
            inventory-connector-postgresql.inventory.addresses
            inventory-connector-postgresql.inventory.customers
            inventory-connector-postgresql.inventory.geom
            inventory-connector-postgresql.inventory.orders
            inventory-connector-postgresql.inventory.products
            inventory-connector-postgresql.inventory.products_on_hand
        Events:  <none>
  2. Verify that the connector created Kafka topics:

    • From the OpenShift Container Platform web console.

      1. Navigate to Home Search.
      2. On the Search page, click Resources to open the Select Resource box, and then type KafkaTopic.
      3. From the KafkaTopics list, click the name of the topic that you want to check, for example, inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
      4. In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
    • From a terminal window:

      1. Enter the following command:

        oc get kafkatopics

        The command returns status information that is similar to the following output:

        Example 7.4. KafkaTopic resource status

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. Check topic content.

    • From a terminal window, enter the following command:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    For example,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-postgresql.inventory.products_on_hand

    The format for specifying the topic name is the same as the oc describe command returns in Step 1, for example, inventory-connector-postgresql.inventory.addresses.

    For each event in the topic, the command returns information that is similar to the following output:

    Example 7.5. Content of a Debezium change event

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    In the preceding example, the payload value shows that the connector snapshot generated a read ("op" ="r") event from the table inventory.products_on_hand. The "before" state of the product_id record is null, indicating that no previous value exists for the record. The "after" state shows a quantity of 3 for the item with product_id 101.

7.6.5. Descriptions of Debezium PostgreSQL connector configuration properties

The Debezium PostgreSQL connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:

The following configuration properties are required unless a default value is available.

Table 7.25. Required connector configuration properties
PropertyDefaultDescription

name

No default

Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.

connector.class

No default

The name of the Java class for the connector. Always use a value of io.debezium.connector.postgresql.PostgresConnector for the PostgreSQL connector.

tasks.max

1

The maximum number of tasks that should be created for this connector. The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable.

plugin.name

decoderbufs

The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.

The only supported value is pgoutput. You must explicitly set plugin.name to pgoutput.

slot.name

debezium

The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring.

Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."

slot.drop.on.stop

false

Whether or not to delete the logical replication slot when the connector stops in a graceful, expected way. The default behavior is that the replication slot remains configured for the connector when the connector stops. When the connector restarts, having the same replication slot enables the connector to start processing where it left off.

Set to true in only testing or development environments. Dropping the slot allows the database to discard WAL segments. When the connector restarts it performs a new snapshot or it can continue from a persistent offset in the Kafka Connect offsets topic.

publication.name

dbz_publication

The name of the PostgreSQL publication created for streaming changes when using pgoutput.

This publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time.

If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.

database.hostname

No default

IP address or hostname of the PostgreSQL database server.

database.port

5432

Integer port number of the PostgreSQL database server.

database.user

No default

Name of the PostgreSQL database user for connecting to the PostgreSQL database server.

database.password

No default

Password to use when connecting to the PostgreSQL database server.

database.dbname

No default

The name of the PostgreSQL database from which to stream the changes.

topic.prefix

No default

Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name.

Warning

Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.

schema.include.list

No default

An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured.

To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name.
If you include this property in the configuration, do not also set the schema.exclude.list property.

schema.exclude.list

No default

An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas.

To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name.
If you include this property in the configuration, do not set the schema.include.list property.

table.include.list

No default

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. When this property is set, the connector captures changes only from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes in every non-system table in each schema whose changes are being captured.

To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not also set the table.exclude.list property.

table.exclude.list

No default

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture. Each identifier is of the form schemaName.tableName. When this property is set, the connector captures changes from every table that you do not specify.

To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not set the table.include.list property.

column.include.list

No default

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.

To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name.
If you include this property in the configuration, do not also set the column.exclude.list property.

column.exclude.list

No default

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.

To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name.
If you include this property in the configuration, do not set the column.include.list property.

time.precision.mode

adaptive

Time, date, and timestamps can be represented with different kinds of precision:

adaptive captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.

adaptive_time_microseconds captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception is TIME type fields, which are always captured as microseconds.

connect always represents time and timestamp values by using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns' precision. For more information, see temporal values.

decimal.handling.mode

precise

Specifies how the connector should handle values for DECIMAL and NUMERIC columns:

precise represents values by using java.math.BigDecimal to represent values in binary form in change events.

double represents values by using double values, which might result in a loss of precision but which is easier to use.

string encodes values as formatted strings, which are easy to consume but semantic information about the real type is lost. For more information, see Decimal types.

hstore.handling.mode

map

Specifies how the connector should handle values for hstore columns:

map represents values by using MAP.

json represents values by using json string. This setting encodes values as formatted strings such as {"key" : "val"}. For more information, see PostgreSQL HSTORE type.

interval.handling.mode

numeric

Specifies how the connector should handle values for interval columns:

numeric represents intervals using approximate number of microseconds.

string represents intervals exactly by using the string pattern representation P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. For example: P1Y2M3DT4H5M6.78S. For more information, see PostgreSQL basic types.

database.sslmode

disable

Whether to use an encrypted connection to the PostgreSQL server. Options include:

disable uses an unencrypted connection.

require uses a secure (encrypted) connection, and fails if one cannot be established.

verify-ca behaves like require but also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.

verify-full behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect. For more information, see the PostgreSQL documentation.

database.sslcert

No default

The path to the file that contains the SSL certificate for the client. For more information, see the PostgreSQL documentation.

database.sslkey

No default

The path to the file that contains the SSL private key of the client. For more information, see the PostgreSQL documentation.

database.sslpassword

No default

The password to access the client private key from the file specified by database.sslkey. For more information, see the PostgreSQL documentation.

database.sslrootcert

No default

The path to the file that contains the root certificate(s) against which the server is validated. For more information, see the PostgreSQL documentation.

database.tcpKeepAlive

true

Enable TCP keep-alive probe to verify that the database connection is still alive. For more information, see the PostgreSQL documentation.

tombstones.on.delete

true

Controls whether a delete event is followed by a tombstone event.

true - a delete operation is represented by a delete event and a subsequent tombstone event.

false - only a delete event is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.

column.truncate.to.length.chars

n/a

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the length in the property name. Set length to a positive integer value, for example, column.truncate.to.20.chars.

The fully-qualified name of a column observes the following format: <schemaName>.<tableName>.<columnName>. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.

You can specify multiple properties with different lengths in a single configuration.

column.mask.with.length.chars

n/a

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want the connector to mask the values for a set of columns, for example, if they contain sensitive data. Set length to a positive integer to replace data in the specified columns with the number of asterisk (*) characters specified by the length in the property name. Set length to 0 (zero) to replace data in the specified columns with an empty string.

The fully-qualified name of a column observes the following format: schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.

You can specify multiple properties with different lengths in a single configuration.

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

n/a

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>.
To match the name of a column Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. In the resulting change event record, the values for the specified columns are replaced with pseudonyms.

A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation.

In the following example, CzQMA0cB5K is a randomly selected salt.

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts.

Depending on the hashAlgorithm used, the salt selected, and the actual data set, the resulting data set might not be completely masked.

Hashing strategy version 2 should be used to ensure fidelity if the value is being hashed in different places or systems.

column.propagate.source.type

n/a

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns for which you want the connector to emit extra parameters that represent column metadata. When this property is set, the connector adds the following fields to the schema of event records:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

These parameters propagate a column’s original type name and length (for variable-width types), respectively.
Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases.

The fully-qualified name of a column observes one of the following formats: databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.
To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.

datatype.propagate.source.type

n/a

An optional, comma-separated list of regular expressions that specify the fully-qualified names of data types that are defined for columns in a database. When this property is set, for columns with matching data types, the connector emits event records that include the following extra fields in their schema:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

These parameters propagate a column’s original type name and length (for variable-width types), respectively.
Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases.

The fully-qualified name of a column observes one of the following formats: databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName.
To match the name of a data type, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the data type; the expression does not match substrings that might be present in a type name.

For the list of PostgreSQL-specific data type names, see the PostgreSQL data type mappings.

message.key.columns

empty string

A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.

By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

To base a table key on multiple column names, insert commas between the column names.

Each fully-qualified table name is a regular expression in the following format:

<schemaName>.<tableName>

The property can include entries for multiple tables. Use a semicolon to separate table entries in the list.

The following example sets the message key for the tables inventory.customers and purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the purchaseorders tables in any schema, the columns pk3 and pk4 server as the message key.

There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.

publication.autocreate.mode

all_tables

Applies only when streaming changes by using the pgoutput plug-in. The setting determines how creation of a publication should work. Specify one of the following values:

all_tables - If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication for all tables in the database for which the connector is capturing changes. For the connector to create a publication it must access the database through a database user account that has permission to create publications and perform replications. You grant the required permission by using the following SQL command CREATE PUBLICATION <publication_name> FOR ALL TABLES;.

disabled - The connector does not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.

filtered - If a publication exists, the connector uses it. If no publication exists, the connector creates a new publication for tables that match the current filter configuration as specified by the schema.include.list, schema.exclude.list, and table.include.list, and table.exclude.list connector configuration properties. For example: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>. If the publication exists, the connector updates the publication for tables that match the current filter configuration. For example: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>.

binary.handling.mode

bytes

Specifies how binary (bytea) columns should be represented in change events:

bytes represents binary data as byte array.

base64 represents binary data as base64-encoded strings.

base64-url-safe represents binary data as base64-url-safe-encoded strings.

hex represents binary data as hex-encoded (base16) strings.

schema.name.adjustment.mode

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:

  • none does not apply any adjustment.
  • avro replaces the characters that cannot be used in the Avro type name with underscore.

money.fraction.digits

2

Specifies how many decimal digits should be used when converting Postgres money type to java.math.BigDecimal, which represents the values in change events. Applicable only when decimal.handling.mode is set to precise.

message.prefix.include.list

No default

An optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you want the connector to capture. By default, the connector captures all logical decoding messages. When this property is set, the connector captures only logical decoding message with the prefixes specified by the property. All other logical decoding messages are excluded.

To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix.

If you include this property in the configuration, do not also set the message.prefix.exclude.list property.

For information about the structure of message events and about their ordering semantics, see message events.

message.prefix.exclude.list

No default

An optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you do not want the connector to capture. When this property is set, the connector does not capture logical decoding messages that use the specified prefixes. All other messages are captured.
To exclude all logical decoding messages, set the value of this property to .*.

To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix.

If you include this property in the configuration, do not also set message.prefix.include.list property.

For information about the structure of message events and about their ordering semantics, see message events.

The following advanced configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.

Table 7.26. Advanced connector configuration properties
PropertyDefaultDescription

converters

No default

Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For example,

isbn

You must set the converters property to enable the connector to use a custom converter.

For each converter that you configure for a connector, you must also add a .type property, which specifies the fully-qualifed name of the class that implements the converter interface. The .type property uses the following format:

<converterSymbolicName>.type

For example,

isbn.type: io.debezium.test.IsbnConverter

If you want to further control the behavior of a configured converter, you can add one or more configuration parameters to pass values to the converter. To associate any additional configuration parameter with a converter, prefix the parameter names with the symbolic name of the converter.
For example,

isbn.schema.name: io.debezium.postgresql.type.Isbn

snapshot.mode

initial

Specifies the criteria for performing a snapshot when the connector starts:

initial - The connector performs a snapshot only when no offsets have been recorded for the logical server name.

always - The connector performs a snapshot each time the connector starts.

never - The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.

initial_only - The connector performs an initial snapshot and then stops, without processing any subsequent changes.

exported - deprecated


For more information, see the table of snapshot.mode options.

snapshot.include.collection.list

All tables specified in table.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names (<schemaName>.<tableName>) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.
This property does not affect the behavior of incremental snapshots.

To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table; it does not match substrings that might be present in a table name.

snapshot.lock.timeout.ms

10000

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If the connector cannot acquire table locks in this time interval, the snapshot fails. How the connector performs snapshots provides details.

snapshot.select.statement.overrides

No default

Specifies the table rows to include in a snapshot. Use the property if you want a snapshot to include only a subset of the rows in a table. This property affects snapshots only. It does not apply to events that the connector reads from the log.

The property contains a comma-separated list of fully-qualified table names in the form <schemaName>.<tableName>. For example,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

For each table in the list, add a further configuration property that specifies the SELECT statement for the connector to run on the table when it takes a snapshot. The specified SELECT statement determines the subset of table rows to include in the snapshot. Use the following format to specify the name of this SELECT statement property:

snapshot.select.statement.overrides.<schemaName>.<tableName>. For example, snapshot.select.statement.overrides.customers.orders.

Example:

From a customers.orders table that includes the soft-delete column, delete_flag, add the following properties if you want a snapshot to include only those records that are not soft-deleted:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

In the resulting snapshot, the connector includes only the records for which delete_flag = 0.

event.processing.failure.handling.mode

fail

Specifies how the connector should react to exceptions during processing of events:

fail propagates the exception, indicates the offset of the problematic event, and causes the connector to stop.

warn logs the offset of the problematic event, skips that event, and continues processing.

skip skips the problematic event and continues processing.

max.batch.size

2048

Positive integer value that specifies the maximum size of each batch of events that the connector processes.

max.queue.size

8192

Positive integer value that specifies the maximum number of records that the blocking queue can hold. When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka. The blocking queue can provide backpressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable. Events that are held in the queue are disregarded when the connector periodically records offsets. Always set the value of max.queue.size to be larger than the value of max.batch.size.

max.queue.size.in.bytes

0

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value.
If max.queue.size is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you set max.queue.size=1000, and max.queue.size.in.bytes=5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.

poll.interval.ms

500

Positive integer value that specifies the number of milliseconds the connector should wait for new change events to appear before it starts processing a batch of events. Defaults to 500 milliseconds.

include.unknown.datatypes

false

Specifies connector behavior when the connector encounters a field whose data type is unknown. The default behavior is that the connector omits the field from the change event and logs a warning.

Set this property to true if you want the change event to contain an opaque binary representation of the field. This lets consumers decode the field. You can control the exact representation by setting the binary handling mode property.

Note

Consumers risk backward compatibility issues when include.unknown.datatypes is set to true. Not only may the database-specific binary representation change between releases, but if the data type is eventually supported by Debezium, the data type will be sent downstream in a logical type, which would require adjustments by consumers. In general, when encountering unsupported data types, create a feature request so that support can be added.

database.initial.statements

No default

A semicolon separated list of SQL statements that the connector executes when it establishes a JDBC connection to the database. To use a semicolon as a character and not as a delimiter, specify two consecutive semicolons, ;;.

The connector may establish JDBC connections at its own discretion. Consequently, this property is useful for configuration of session parameters only, and not for executing DML statements.

The connector does not execute these statements when it creates a connection for reading the transaction log.

status.update.interval.ms

10000

Frequency for sending replication connection status updates to the server, given in milliseconds.
The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.

heartbeat.interval.ms

0

Controls how frequently the connector sends heartbeat messages to a Kafka topic. The default behavior is that the connector does not send heartbeat messages.

Heartbeat messages are useful for monitoring whether the connector is receiving change events from the database. Heartbeat messages might help decrease the number of change events that need to be re-sent when a connector restarts. To send heartbeat messages, set this property to a positive integer, which indicates the number of milliseconds between heartbeat messages.

Heartbeat messages are needed when there are many updates in a database that is being tracked but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. In this situation, the connector reads from the database transaction log as usual but rarely emits change records to Kafka. This means that no offset updates are committed to Kafka and the connector does not have an opportunity to send the latest retrieved LSN to the database. The database retains WAL files that contain events that have already been processed by the connector. Sending heartbeat messages enables the connector to send the latest retrieved LSN to the database, which allows the database to reclaim disk space being used by no longer needed WAL files.

heartbeat.action.query

No default

Specifies a query that the connector executes on the source database when the connector sends a heartbeat message.

This is useful for resolving the situation described in WAL disk space consumption, where capturing changes from a low-traffic database on the same host as a high-traffic database prevents Debezium from processing WAL records and thus acknowledging WAL positions with the database. To address this situation, create a heartbeat table in the low-traffic database, and set this property to a statement that inserts records into that table, for example:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

This allows the connector to receive changes from the low-traffic database and acknowledge their LSNs, which prevents unbounded WAL growth on the database host.

schema.refresh.mode

columns_diff

Specify the conditions that trigger a refresh of the in-memory schema for a table.

columns_diff is the safest mode. It ensures that the in-memory schema stays in sync with the database table’s schema at all times.

columns_diff_exclude_unchanged_toast instructs the connector to refresh the in-memory schema cache if there is a discrepancy with the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.

This setting can significantly improve connector performance if there are frequently-updated tables that have TOASTed data that are rarely part of updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.

snapshot.delay.ms

No default

An interval in milliseconds that the connector should wait before performing a snapshot when the connector starts. If you are starting multiple connectors in a cluster, this property is useful for avoiding snapshot interruptions, which might cause re-balancing of connectors.

snapshot.fetch.size

10240

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

slot.stream.params

No default

Semicolon separated list of parameters to pass to the configured logical decoding plug-in. For example, add-tables=public.table,public.table2;include-lsn=true.

sanitize.field.names

true if connector configuration sets the key.converter or value.converter property to the Avro converter.

false if not.

Indicates whether field names are sanitized to adhere to Avro naming requirements.

slot.max.retries

6

If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.

slot.retry.delay.ms

10000 (10 seconds)

The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.

unavailable.value.placeholder

__debezium_unavailable_value

Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting of unavailable.value.placeholder starts with the hex: prefix it is expected that the rest of the string represents hexadecimally encoded octets. For more information, see toasted values.

provide.transaction.metadata

false

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this. For more information, see Transaction metadata.

flush.lsn.source

true

Determines whether the connector should commit the LSN of the processed records in the source postgres database so that the WAL logs can be deleted. Specify false if you don’t want the connector to do this. Please note that if set to false LSN will not be acknowledged by Debezium and as a result WAL logs will not be cleared which might result in disk space issues. User is expected to handle the acknowledgement of LSN outside Debezium.

retriable.restart.connector.wait.ms

10000 (10 seconds)

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

skipped.operations

t

A comma-separated list of operation types that will be skipped during streaming. The operations include: c for inserts/create, u for updates, d for deletes, t for truncates, and none to not skip any operations. By default, truncate operations are skipped.

signal.data.collection

No default value

Fully-qualified name of the data collection that is used to send signals to the connector.
Use the following format to specify the collection name:
<schemaName>.<tableName>

incremental.snapshot.chunk.size

1024

The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment.

xmin.fetch.interval.ms

0

How often, in milliseconds, the XMIN will be read from the replication slot. The XMIN value provides the lower bounds of where a new replication slot could start from. The default value of 0 disables tracking XMIN tracking.

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to SchemaTopicNamingStrategy.

topic.delimiter

.

Specify the delimiter for topic name, defaults to ..

topic.cache.size

10000

The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.

topic.heartbeat.prefix

__debezium-heartbeat

Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern:

topic.heartbeat.prefix.topic.prefix

For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment.

topic.transaction

transaction

Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern:

topic.prefix.topic.transaction

For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction.

Pass-through connector configuration properties

The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.

Be sure to consult the Kafka documentation for all of the configuration properties for Kafka producers and consumers. The PostgreSQL connector does use the new consumer configuration properties.

7.7. Monitoring Debezium PostgreSQL connector performance

The Debezium PostgreSQL connector provides two types of metrics that are in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.

  • Snapshot metrics provide information about connector operation while performing a snapshot.
  • Streaming metrics provide information about connector operation when the connector is capturing changes and streaming change event records.

Debezium monitoring documentation provides details for how to expose these metrics by using JMX.

7.7.1. Monitoring Debezium during snapshots of PostgreSQL databases

The MBean is debezium.postgres:type=connector-metrics,context=snapshot,server=<postgresql.server.name>.

Snapshot metrics are not exposed unless a snapshot operation is active, or if a snapshot has occurred since the last connector start.

The following table lists the shapshot metrics that are available.

AttributesTypeDescription

LastEvent

string

The last snapshot event that the connector has read.

MilliSecondsSinceLastEvent

long

The number of milliseconds since the connector has read and processed the most recent event.

TotalNumberOfEventsSeen

long

The total number of events that this connector has seen since last started or reset.

NumberOfEventsFiltered

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

CapturedTables

string[]

The list of tables that are captured by the connector.

QueueTotalCapacity

int

The length the queue used to pass events between the snapshotter and the main Kafka Connect loop.

QueueRemainingCapacity

int

The free capacity of the queue used to pass events between the snapshotter and the main Kafka Connect loop.

TotalTableCount

int

The total number of tables that are being included in the snapshot.

RemainingTableCount

int

The number of tables that the snapshot has yet to copy.

SnapshotRunning

boolean

Whether the snapshot was started.

SnapshotPaused

boolean

Whether the snapshot was paused.

SnapshotAborted

boolean

Whether the snapshot was aborted.

SnapshotCompleted

boolean

Whether the snapshot completed.

SnapshotDurationInSeconds

long

The total number of seconds that the snapshot has taken so far, even if not complete. Includes also time when snapshot was paused.

SnapshotPausedDurationInSeconds

long

The total number of seconds that the snapshot was paused. If the snapshot was paused several times, the paused time adds up.

RowsScanned

Map<String, Long>

Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table.

MaxQueueSizeInBytes

long

The maximum buffer of the queue in bytes. This metric is available if max.queue.size.in.bytes is set to a positive long value.

CurrentQueueSizeInBytes

long

The current volume, in bytes, of records in the queue.

The connector also provides the following additional snapshot metrics when an incremental snapshot is executed:

AttributesTypeDescription

ChunkId

string

The identifier of the current snapshot chunk.

ChunkFrom

string

The lower bound of the primary key set defining the current chunk.

ChunkTo

string

The upper bound of the primary key set defining the current chunk.

TableFrom

string

The lower bound of the primary key set of the currently snapshotted table.

TableTo

string

The upper bound of the primary key set of the currently snapshotted table.

7.7.2. Monitoring Debezium PostgreSQL connector record streaming

The MBean is debezium.postgres:type=connector-metrics,context=streaming,server=<postgresql.server.name>.

The following table lists the streaming metrics that are available.

AttributesTypeDescription

LastEvent

string

The last streaming event that the connector has read.

MilliSecondsSinceLastEvent

long

The number of milliseconds since the connector has read and processed the most recent event.

TotalNumberOfEventsSeen

long

The total number of events that this connector has seen since the last start or metrics reset.

TotalNumberOfCreateEventsSeen

long

The total number of create events that this connector has seen since the last start or metrics reset.

TotalNumberOfUpdateEventsSeen

long

The total number of update events that this connector has seen since the last start or metrics reset.

TotalNumberOfDeleteEventsSeen

long

The total number of delete events that this connector has seen since the last start or metrics reset.

NumberOfEventsFiltered

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

CapturedTables

string[]

The list of tables that are captured by the connector.

QueueTotalCapacity

int

The length the queue used to pass events between the streamer and the main Kafka Connect loop.

QueueRemainingCapacity

int

The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.

Connected

boolean

Flag that denotes whether the connector is currently connected to the database server.

MilliSecondsBehindSource

long

The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running.

NumberOfCommittedTransactions

long

The number of processed transactions that were committed.

SourceEventPosition

Map<String, String>

The coordinates of the last received event.

LastTransactionId

string

Transaction identifier of the last processed transaction.

MaxQueueSizeInBytes

long

The maximum buffer of the queue in bytes. This metric is available if max.queue.size.in.bytes is set to a positive long value.

CurrentQueueSizeInBytes

long

The current volume, in bytes, of records in the queue.

7.8. How Debezium PostgreSQL connectors handle faults and problems

Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium provides exactly once delivery of every change event record.

If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, Debezium, like Kafka, provides at least once delivery of change events.

Details are in the following sections:

Configuration and startup errors

In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:

  • The connector’s configuration is invalid.
  • The connector cannot successfully connect to PostgreSQL by using the specified connection parameters.
  • The connector is restarting from a previously-recorded position in the PostgreSQL WAL (by using the LSN) and PostgreSQL no longer has that history available.

In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the PostgreSQL problem, restart the connector.

PostgreSQL becomes unavailable

When the connector is running, the PostgreSQL server that it is connected to could become unavailable for any number of reasons. If this happens, the connector fails with an error and stops. When the server is available again, restart the connector.

The PostgreSQL connector externally stores the last processed offset in the form of a PostgreSQL LSN. After a connector restarts and connects to a server instance, the connector communicates with the server to continue streaming from that particular offset. This offset is available as long as the Debezium replication slot remains intact. Never drop a replication slot on the primary server or you will lose data. For information about failure cases in which a slot has been removed, see the next section.

Cluster failures

As of release 12, PostgreSQL allows logical replication slots only on primary servers. This means that you can point a Debezium PostgreSQL connector to only the active primary server of a database cluster. Also, replication slots themselves are not propagated to replicas. If the primary server goes down, a new primary must be promoted.

Note

Some managed PostgresSQL services (AWS RDS and GCP CloudSQL for example) implement replication to a standby via disk replication. This means that the replication slot does get replicated and will remain available after a failover.

The new primary must have a replication slot that is configured for use by the pgoutput plug-in and the database in which you want to capture changes. Only then can you point the connector to the new server and restart the connector.

There are important caveats when failovers occur and you should pause Debezium until you can verify that you have an intact replication slot that has not lost data. After a failover:

  • There must be a process that re-creates the Debezium replication slot before allowing the application to write to the new primary. This is crucial. Without this process, your application can miss change events.
  • You might need to verify that Debezium was able to read all changes in the slot before the old primary failed.

One reliable method of recovering and verifying whether any changes were lost is to recover a backup of the failed primary to the point immediately before it failed. While this can be administratively difficult, it allows you to inspect the replication slot for any unconsumed changes.

Kafka Connect process stops gracefully

Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, PostgreSQL connectors resume from the last offset that was recorded by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events. Debezium changes are idempotent, so a sequence of events always results in the same state.

In each change event record, Debezium connectors insert source-specific information about the origin of the event, including the PostgreSQL server’s time of the event, the ID of the server transaction, and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information, especially the LSN, to determine whether an event is a duplicate.

Kafka becomes unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the database can continue to be used. Any changes are recorded in the PostgreSQL WAL. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.

A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.