Este conteúdo não está disponível no idioma selecionado.

Chapter 3. Sink connectors


Debezium provides sink connectors that can consume events from sources such as Apache Kafka topics. A sink connector standardizes the format of the data, and then persists the event data to a configured sink repository. Other systems, applications, or users can then access the events from the data sink.

Because the sink connector applies a consistent structure to the event data that it consumes, downstream applications that read from the data sink can more easily interpret and process that data.

Currently, Debezium provides the following sink connectors:

3.1. Debezium connector for JDBC

The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.

3.1.1. How the Debezium JDBC connector works

The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.

The Debezium JDBC connector provides the following features:

3.1.1.1. Description of how the Debezium JDBC connector consumes complex change events

By default, Debezium source connectors produce complex, hierarchical change events. When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation. If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.

When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after section of the original insert or update event. When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.

Important

The Debezium JDBC sink connector is not designed to read from schema change topics. If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the topics or topics.regex properties so that the connector does not consume from schema change topics.

3.1.1.2. Description of Debezium JDBC connector at-least-once delivery

The Debezium JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.

3.1.1.3. Description of Debezium JDBC use of multiple tasks

You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks. To run the connector across multiple tasks, set the tasks.max configuration property to the number of tasks that you want the connector to use. The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task. Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.

3.1.1.4. Description of Debezium JDBC connector data and column type mappings

To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event. The connector supports a wide range of column type mappings across different database dialects. To correctly convert the destination column type from the type metadata in an event field, the connector applies the data type mappings that are defined for the source database. You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type or datatype.propagate.source.type options in the source connector configuration. When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.

For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct. In addition, the payload of the source message must be a Struct that has either a flattened structure with no nested struct types, or a nested struct layout that conforms to Debezium’s complex, hierarchical structure.

If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.

3.1.1.5. Description of how the Debezium JDBC connector handles primary keys in source events

By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:

ModeDescription

none

No primary key fields are specified when creating the table.

kafka

The primary key consists of the following three columns:

  • __connect_topic
  • __connect_partition
  • __connect_offset

The values for these columns are sourced from the coordinates of the Kafka event.

record_key

The primary key is composed of the Kafka event’s key.

If the primary key is a primitive type, specify the name of the column to be used by setting the primary.key.fields property. If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key. You can use the primary.key.fields property to restrict the primary key to a subset of columns.

record_value

The primary key is composed of the Kafka event’s value.

Because the value of a Kafka event is always a Struct, by default, all of the fields in the value become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

record_header

The primary key is composed of the Kafka event’s headers.

Kafka event’s headers contains could contain multiple header that each one could be Struct or primitives data types, the connectors makes a Struct of these headers. Hence, all fields in this Struct become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

Important

Some database dialects might throw an exception if you set the primary.key.mode to kafka and set schema.evolution to basic. This exception occurs when a dialect maps a STRING data type mapping to a variable length string data type such as TEXT or CLOB, and the dialect does not allow primary key columns to have unbounded lengths. To avoid this problem, apply the following settings in your environment:

  • Do not set schema.evolution to basic.
  • Create the database table and primary key mappings in advance.
Important

If a column maps to a data type that isn’t permitted as a primary key for your target database, an explicit list of columns will be necessary in primary.key.fields excluding such columns. Consult your specific database vendor’s documentation for what data types are and are not permissible.

3.1.1.6. Configuring the Debezium JDBC connector to delete rows when consuming DELETE or tombstone events

The Debezium JDBC sink connector can delete rows in the destination database when a DELETE or tombstone event is consumed. By default, the JDBC sink connector does not enable delete mode.

If you want to the connector to remove rows, you must explicitly set delete.enabled=true in the connector configuration. To use this mode you must also set primary.key.fields to a value other than none. The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.

3.1.1.7. Enabling the connector to perform idempotent writes

The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.

To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode for the connector to upsert. An upsert operation is applied as either an update or an insert, depending on whether the specified primary key already exists.

If the primary key value already exists, the operation updates values in the row. If the specified primary key value doesn’t exist, an insert adds a new row.

Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations. The following table shows the upsert DML syntax for the database dialects that Debezium supports:

DialectUpsert Syntax

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

3.1.1.8. Schema evolution modes for the Debezium JDBC connector

You can use the following schema evolution modes with the Debezium JDBC sink connector:

ModeDescription

none

The connector does not perform any DDL schema evolution.

basic

The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields.

When schema.evolution is set to basic, the connector automatically creates or alters the destination database table according to the structure of the incoming event.

When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table. If schema evolution is enabled, the connector prepares and executes a CREATE TABLE SQL statement before it applies the DML event to the destination table.

When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table. If schema evolution is enabled, the connector prepares and executes an ALTER TABLE SQL statement before it applies the DML event to the destination table. Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.

The schema of each field determines whether a column is NULL or NOT NULL. The schema also defines the default values for each column. If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event. To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.

A field’s data type is resolved based on a predefined set of mappings. For more information, see Section 3.1.2, “How the Debezium JDBC connector maps data types”.

Important

When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:

  • Remove the field manually.
  • Drop the column.
  • Assign a default value to the field.
  • Define the field a nullable.

3.1.1.9. Specifying options to define the letter case of destination table and column names

The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.

For example, if the destination database dialect is Oracle and the event’s topic is orders, the destination table will be created as ORDERS because Oracle defaults to upper-case names when the name is not quoted. Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS, the destination table will be created as orders because PostgreSQL defaults to lower-case names when the name is not quoted.

To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers property to true. When this options is set, when an incoming event is for a topic called orders, and the destination database dialect is Oracle, the connector creates a table with the name orders, because the constructed SQL defines the name of the table as "orders". Enabling quoting results in the same behavior when the connector creates column names.

Connection Idle Timeouts

The JDBC sink connector for Debezium leverages a connection pool to enhance performance. Connection pools are engineered to establish an initial set of connections, maintain a specified number of connections, and efficiently allocate connections to the application as required. However, a challenge arises when connections linger idle in the pool, potentially triggering timeouts if they remain inactive beyond the configured idle timeout threshold of the database.

To mitigate the potential for idle connection threads to trigger timeouts, connection pools offer a mechanism that periodically validates the activity of each connection. This validation ensures that connections remain active, and prevents the database from flagging them as idle. In the event of a network disruption, if Debezium attempts to use a terminated connection, the connector prompts the pool to generate a new connection.

By default, the Debezium JDBC sink connector does not conduct idle timeout tests. However, you can configure the connector to request the pool to perform timeout tests at a specified interval by setting the hibernate.c3p0.idle_test_period property. For example:

Example timeout configuration

{
  "hibernate.c3p0.idle_test_period": "300"
}

The Debezium JDBC sink connector uses the Hibernate C3P0 connection pool. You can customize the CP30 connection pool by setting properties in the hibernate.c3p0.*` configuration namespace. In the preceding example, the setting of the hibernate.c3p0.idle_test_period property configures the connection pool to perform idle timeout tests every 300 seconds. After you apply the configuration, the connection pool begins to assess unused connections every five minutes.

3.1.2. How the Debezium JDBC connector maps data types

The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system. Primitive types include values such as integers, floating points, Booleans, strings, and bytes. Typically, these types are represented with a specific Kafka Connect Schema type code only. Logical data types are more often complex types, including values such as Struct-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.

The following examples show representative structures of primitive and logical data types:

Primitive field schema

{
  "schema": {
    "type": "INT64"
  }
}

Logical field schema

[
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
]

Kafka Connect is not the only source for these complex, logical types. In fact, Debezium source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.

The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.

Table 3.1. Mappings between Kafka Connect Primitives and Column Data Types
Primitive TypeJDBC SQL Type

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

Types.VARBINARY

Table 3.2. Mappings between Kafka Connect Logical Types and Column Data Types
Logical TypeJDBC SQL Type

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

Table 3.3. Mappings between Debezium Logical Types and Column Data Types
Logical TypeJDBC SQL Type

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

Important

If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.

Table 3.4. Mappings between Debezium dialect-specific Logical Types and Column Data Types
Logical TypeMySQL SQL TypePostgreSQL SQL TypeSQL Server SQL Type

io.debezium.data.Bits

bit(n)

bit(n) or bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

n/a

io.debezium.data.Json

json

json

n/a

io.debezium.data.EnumSet

set

n/a

n/a

io.debezium.time.Year

year(n)

n/a

n/a

io.debezium.time.MicroDuration

n/a

interval

n/a

io.debezium.data.Ltree

n/a

ltree

n/a

io.debezium.data.Uuid

n/a

uuid

n/a

io.debezium.data.Xml

n/a

xml

xml

In addition to the primitive and logical mappings above, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. To enforce propagation, one of the following properties must be set in the source connector configuration:

  • column.propagate.source.type
  • datatype.propagate.source.type

The Debezium JDBC sink connector applies the values with the higher precedence.

For example, let’s say the following field schema is included in a change event:

Debezium change event field schema with column or data type propagation enabled

{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

In the preceding example, if no schema parameters are set, the Debezium JDBC sink connector maps this field to a column type of Types.SMALLINT. Types.SMALLINT can have different logical database types, depending on the database dialect. For MySQL, the column type in the example converts to a TINYINT column type with no specified length. If column or data type propagation is enabled for the source connector, the Debezium JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type TINYINT(1).

Note

Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database.

3.1.3. Deployment of Debezium JDBC connectors

To deploy a Debezium JDBC connector, you install the Debezium JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.

Prerequisites

Procedure

  1. Download the Debezium JDBC connector plug-in archive.
  2. Extract the files into your Kafka Connect environment.
  3. Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.

    Note

    Drivers for Oracle and Db2 are not included with the JDBC sink connector. You must download the drivers and install them manually.

  4. Add the driver JAR files to the path where the JDBC sink connector has been installed.
  5. Make sure that the path where you install the JDBC sink connector is part of the Kafka Connect plugin.path.
  6. Restart the Kafka Connect process to pick up the new JAR files.

3.1.3.1. Debezium JDBC connector configuration

Typically, you register a Debezium JDBC connector by submitting a JSON request that specifies the configuration properties for the connector. The following example shows a JSON request for registering an instance of the Debezium JDBC sink connector that consumes events from a topic called orders with the most common configuration settings:

Example: Debezium JDBC connector configuration

{
    "name": "jdbc-connector",  1
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  2
        "tasks.max": "1",  3
        "connection.url": "jdbc:postgresql://localhost/db",  4
        "connection.username": "pguser",  5
        "connection.password": "pgpassword",  6
        "insert.mode": "upsert",  7
        "delete.enabled": "true",  8
        "primary.key.mode": "record_key",  9
        "schema.evolution": "basic",  10
        "database.time_zone": "UTC",  11
        "topics": "orders" 12
    }
}

Descriptions of JDBC connector configuration settings

ItemDescription

1

The name that is assigned to the connector when you register it with Kafka Connect service.

2

The name of the JDBC sink connector class.

3

The maximum number of tasks to create for this connector.

4

The JDBC URL that the connector uses to connect to the sink database that it writes to.

5

The name of the database user that is used for authentication.

6

The password of the database user used for authentication.

7

The insert.mode that the connector uses.

8

Enables the deletion of records in the database. For more information, see the delete.enabled configuration property.

9

Specifies the method used to resolve primary key columns. For more information, see the primary.key.mode configuration property.

10

Enables the connector to evolve the destination database’s schema. For more information, see the schema.evolution configuration property.

11

Specifies the timezone used when writing temporal field types.

12

List of topics to consume, separated by commas.

For a complete list of configuration properties that you can set for the Debezium JDBC connector, see JDBC connector properties.

You can send this configuration with a POST command to a running Kafka Connect service. The service records the configuration and starts a sink connector task(s) that performs the following operations:

  • Connects to the database.
  • Consumes events from subscribed Kafka topics.
  • Writes the events to the configured database.

3.1.4. Descriptions of Debezium JDBC connector configuration properties

The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:

Table 3.5. JDBC connector generic properties
PropertyDefaultDescription

name

No default

Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors.

connector.class

No default

The name of the Java class for the connector. For the Debezium JDBC connector, specify the value io.debezium.connector.jdbc.JdbcSinkConnector.

tasks.max

1

Maximum number of tasks to use for this connector.

topics

No default

List of topics to consume, separated by commas. Do not use this property in combination with the topics.regex property.

topics.regex

No default

A regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a java.util.regex.Pattern. Do not use this property in combination with the topics property.

Table 3.6. JDBC connector connection properties
PropertyDefaultDescription

connection.provider

org.hibernate.c3p0.internal.C3P0ConnectionProvider

The connection provider implementation to use.

connection.url

No default

The JDBC connection URL used to connect to the database.

connection.username

No default

The name of the database user account that the connector uses to connect to the database.

connection.password

No default

The password that the connector uses to connect to the database.

connection.pool.min_size

5

Specifies the minimum number of connections in the pool.

connection.pool.max_size

32

Specifies the maximum number of concurrent connections that the pool maintains.

connection.pool.acquire_increment

32

Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.

connection.pool.timeout

1800

Specifies the number of seconds that an unused connection is kept before it is discarded.

Table 3.7. JDBC connector runtime properties
PropertyDefaultDescription

database.time_zone

UTC

Specifies the timezone used when inserting JDBC temporal values.

delete.enabled

false

Specifies whether the connector processes DELETE or tombstone events and removes the corresponding row from the database. Use of this option requires that you set the primary.key.mode to record.key.

truncate.enabled

false

Specifies whether the connector processes TRUNCATE events and truncates the corresponding tables from the database.

Note

Although support for TRUNCATE statements has been available in Db2 since version 9.7, currently, the JDBC connector is unable to process standard TRUNCATE events that the Db2 connector emits.

To ensure that the JDBC connector can process TRUNCATE events received from Db2, perform the truncation by using an alternative to the standard TRUNCATE TABLE statement. For example:

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

The user account that submits the preceding query requires ALTER privileges on the table to be truncated.

insert.mode

insert

Specifies the strategy used to insert events into the database. The following options are available:

insert
Specifies that all events should construct INSERT-based SQL statements. Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.
update
Specifies that all events should construct UPDATE-based SQL statements. Use this option only when you can be certain that the connector receives only events that apply to existing rows.
upsert
Specifies that the connector adds events to the table using upsert semantics. That is, if the primary key does not exist, the connector performs an INSERT operation, and if the key does exist, the connector performs an UPDATE operation. When idempotent writes are required, the connector should be configured to use this option.

primary.key.mode

none

Specifies how the connector resolves the primary key columns from the event.

none
Specifies that no primary key columns are created.
kafka

Specifies that the connector uses Kafka coordinates as the primary key columns. The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:

  • __connect_topic
  • __connect_partition
  • __connect_offset
record_key
Specifies that the primary key columns are sourced from the event’s record key. If the record key is a primitive type, the primary.key.fields property is required to specify the name of the primary key column. If the record key is a struct type, the primary.key.fields property is optional, and can be used to specify a subset of columns from the event’s key as the table’s primary key.
record_value
Specifies that the primary key columns is sourced from the event’s value. You can set the primary.key.fields property to define the primary key as a subset of fields from the event’s value; otherwise all fields are used by default.

primary.key.fields

No default

Either the name of the primary key column or a comma-separated list of fields to derive the primary key from.

When primary.key.mode is set to record_key and the event’s key is a primitive type, it is expected that this property specifies the column name to be used for the key.

When the primary.key.mode is set to record_key with a non-primitive key, or record_value, it is expected that this property specifies a comma-separated list of field names from either the key or value. If the primary.key.mode is set to record_key with a non-primitive key, or record_value, and this property is not specified, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.

quote.identifiers

false

Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the Section 3.1.1.9, “Specifying options to define the letter case of destination table and column names” section for more details.

schema.evolution

none

Specifies how the connector evolves the destination table schemas. For more information, see Section 3.1.1.8, “Schema evolution modes for the Debezium JDBC connector”. The following options are available:

none
Specifies that the connector does not evolve the destination schema.
basic
Specifies that basic evolution occurs. The connector adds missing columns to the table by comparing the incoming event’s record schema to the database table structure.

table.name.format

${topic}

Specifies a string pattern that the connector uses to construct the names of destination tables.
When the property is set to its default value, ${topic}, after the connector reads an event from Kafka, it writes the event record to a destination table with a name that matches the name of the source topic.

You can also configure this property to extract values from specific fields in incoming event records and then use those values to dynamically generate the names of target tables. This ability to generate table names from values in the message source would otherwise require the use of a custom Kafka Connect single message transformation (SMT).

To configure the property to dynamically generate the names of destination tables, set its value to a pattern such as ${source._field_}. When you specify this type of pattern, the connector extracts values from the source block of the Debezium change event, and then uses those values to construct the table name. For example, you might set the value of the property to the pattern ${source.schema}_${source.table}. Based on this pattern, if the connector reads an event in which the schema field in the source block contains the value, user, and the table field contains the value, tab, the connector writes the event record to a table with the name user_tab.

dialect.postgres.postgis.schema

public

Specifies the schema name where the PostgreSQL PostGIS extension is installed. The default is public; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.

dialect.sqlserver.identity.insert

false

Specifies whether the connector automatically sets an IDENTITY_INSERT before an INSERT or UPSERT operation into the identity column of SQL Server tables, and then unsets it immediately after the operation. When the default setting (false) is in effect, an INSERT or UPSERT operation into the IDENTITY column of a table results in a SQL exception.

batch.size

500

Specifies how many records to attempt to batch together into the destination table.

Note

Note that if you set consumer.max.poll.records in the Connect worker properties to a value lower than batch.size, batch processing will be caped by consumer.max.poll.records and the desired batch.size won’t be reached. You can also configure the connector’s underlying consumer’s max.poll.records using consumer.override.max.poll.records in the connector configuration.

use.reduction.buffer

false

Specifies whether to enable the Debezium JDBC connector’s reduction buffer.

Choose one of the following settings:

false
(default) The connector writes each change event that it consumes from Kafka as a separate logical SQL change.
true
The connector uses the reduction buffer to reduce change events before it writes them to the sink database. That is, if multiple events refer to the same primary key, the connector consolidates the SQL queries and writes only a single logical SQL change, based on the row state that is reported in the most recent offset record.
Choose this option to reduce the SQL load on the target database.

To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the reWriteBatchedInserts parameter to the JDBC connection URL.

field.include.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:_fieldName_.

If you include this property in the configuration, do not set the field.exclude.list property.

field.exclude.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:_fieldName_.

If you include this property in the configuration, do not set the field.include.list property.

Table 3.8. JDBC connector extendable properties
PropertyDefaultDescription

column.naming.strategy

i.d.c.j.n.DefaultColumnNamingStrategy

Specifies the fully-qualified class name of a ColumnNamingStrategy implementation that the connector uses to resolve column names from event field names.

By default, the connector uses the field name as the column name.

table.naming.strategy

i.d.c.j.n.DefaultTableNamingStrategy

Specifies the fully-qualified class name of a TableNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.

The default behavior is to:

  • Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.
  • Sanitize the table name by replacing dots (.) with underscores (_).

JDBC connector hibernate.* passthrough properties

Kafka Connect supports passthrough configuration, enabling you to modify the behavior of an underlying system by passing certain properties directly from the connector configuration. By default, some Hibernate properties are exposed via the JDBC connector connection properties (for example, connection.url, connection.username, and connection.pool.*_size), and through the connector’s runtime properties (for example, database.time_zone, quote.identifiers).

If you want to customize other Hibernate behavior, you can take advantage of the passthrough mechanism by adding properties that use the hibernate.* namespace to the connector configuration. For example, to assist Hibernate in resolving the type and version of the target database, you can add the hibernate.dialect property and set it to the fully qualified class name of the database, for example, org.hibernate.dialect.MariaDBDialect.

3.1.5. JDBC connector frequently asked questions

Is the ExtractNewRecordState single message transformation required?
No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.
If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?
No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.
If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?
The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Section 3.1.1.4, “Description of Debezium JDBC connector data and column type mappings” section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?
In order to add a prefix or a suffix to the destination table name, adjust the table.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with jdbc_, specify the table.name.format configuration property with a value of jdbc_${topic}. If the connector is subscribed to a topic called orders, the resulting table is created as jdbc_orders.
Why are some columns automatically quoted, even though identifier quoting is not enabled?
In some situations, specific column or table names might be explicitly quoted, even when quote.identifiers is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set to kafka, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.
Red Hat logoGithubRedditYoutubeTwitter

Aprender

Experimente, compre e venda

Comunidades

Sobre a documentação da Red Hat

Ajudamos os usuários da Red Hat a inovar e atingir seus objetivos com nossos produtos e serviços com conteúdo em que podem confiar.

Tornando o open source mais inclusivo

A Red Hat está comprometida em substituir a linguagem problemática em nosso código, documentação e propriedades da web. Para mais detalhes veja oBlog da Red Hat.

Sobre a Red Hat

Fornecemos soluções robustas que facilitam o trabalho das empresas em plataformas e ambientes, desde o data center principal até a borda da rede.

© 2024 Red Hat, Inc.