Questo contenuto non è disponibile nella lingua selezionata.

Chapter 3. Sink connectors


Debezium provides sink connectors that can consume events from sources such as Apache Kafka topics. A sink connector standardizes the format of the data, and then persists the event data to a configured sink repository. Other systems, applications, or users can then access the events from the data sink.

Because the sink connector applies a consistent structure to the event data that it consumes, downstream applications that read from the data sink can more easily interpret and process that data.

Debezium provides the following sink connectors:

3.1. Debezium sink connector for JDBC

The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.

3.1.1. How the Debezium JDBC connector works

The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.

The Debezium JDBC connector provides the following features:

By default, Debezium source connectors produce complex, hierarchical change events. When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation. If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.

When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after section of the original insert or update event. When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.

Important

The Debezium JDBC sink connector is not designed to read from schema change topics. If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the topics or topics.regex properties so that the connector does not consume from schema change topics.

3.1.1.2. Description of Debezium JDBC connector at-least-once delivery

The Debezium JDBC sink connector guarantees that events that it consumes from Kafka topics are processed at least once.

3.1.1.3. Description of Debezium JDBC use of multiple tasks

You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks. To run the connector across multiple tasks, set the tasks.max configuration property to the number of tasks that you want the connector to use. The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task. Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.

3.1.1.4. Description of Debezium JDBC connector data and column type mappings

To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event. The connector supports a wide range of column type mappings across different database dialects. To correctly convert the destination column type from the type metadata in an event field, the connector applies the data type mappings that are defined for the source database. You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type or datatype.propagate.source.type options in the source connector configuration. When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.

For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct. In addition, the payload of the source message must be a Struct that has either a flattened structure with no nested struct types, or a nested struct layout that conforms to Debezium’s complex, hierarchical structure.

If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.

By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:

Expand
ModeDescription

none

No primary key fields are specified when creating the table.

kafka

The primary key consists of the following three columns:

  • __connect_topic
  • __connect_partition
  • __connect_offset

The values for these columns are sourced from the coordinates of the Kafka event.

record_key

The primary key is composed of the Kafka event’s key.

If the primary key is a primitive type, specify the name of the column to be used by setting the primary.key.fields property. If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key. You can use the primary.key.fields property to restrict the primary key to a subset of columns.

record_value

The primary key is composed of the Kafka event’s value.

Because the value of a Kafka event is always a Struct, by default, all of the fields in the value become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

record_header

The primary key is composed of the Kafka event’s headers.

Kafka event’s headers contains could contain multiple header that each one could be Struct or primitives data types, the connectors makes a Struct of these headers. Hence, all fields in this Struct become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

Important

Some database dialects might throw an exception if you set the primary.key.mode to kafka and set schema.evolution to basic. This exception occurs when a dialect maps a STRING data type mapping to a variable length string data type such as TEXT or CLOB, and the dialect does not allow primary key columns to have unbounded lengths. To avoid this problem, apply the following settings in your environment:

  • Do not set schema.evolution to basic.
  • Create the database table and primary key mappings in advance.
Important

If a column maps to a data type that isn’t permitted as a primary key for your target database, an explicit list of columns will be necessary in primary.key.fields excluding such columns. Consult your specific database vendor’s documentation for what data types are and are not permissible.

The Debezium JDBC sink connector can delete rows in the destination database when a DELETE or tombstone event is consumed. By default, the JDBC sink connector does not enable delete mode.

If you want to the connector to remove rows, you must explicitly set delete.enabled=true in the connector configuration. To use this mode you must also set primary.key.fields to a value other than none. The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.

3.1.1.7. Enabling the connector to perform idempotent writes

The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.

To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode for the connector to upsert. An upsert operation is applied as either an update or an insert, depending on whether the specified primary key already exists.

If the primary key value already exists, the operation updates values in the row. If the specified primary key value doesn’t exist, an insert adds a new row.

Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations. The following table shows the upsert DML syntax for the database dialects that Debezium supports:

Expand
DialectUpsert Syntax

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

3.1.1.8. Schema evolution modes for the Debezium JDBC connector

You can use the following schema evolution modes with the Debezium JDBC sink connector:

Expand
ModeDescription

none

The connector does not perform any DDL schema evolution.

basic

The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields.

When schema.evolution is set to basic, the connector automatically creates or alters the destination database table according to the structure of the incoming event.

When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table. If schema evolution is enabled, the connector prepares and executes a CREATE TABLE SQL statement before it applies the DML event to the destination table.

When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table. If schema evolution is enabled, the connector prepares and executes an ALTER TABLE SQL statement before it applies the DML event to the destination table. Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.

The schema of each field determines whether a column is NULL or NOT NULL. The schema also defines the default values for each column. If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event. To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.

A field’s data type is resolved based on a predefined set of mappings. For more information, see JDBC field types.

Important

When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:

  • Remove the field manually.
  • Drop the column.
  • Assign a default value to the field.
  • Define the field a nullable.

The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.

For example, if the destination database dialect is Oracle and the event’s topic is orders, the destination table will be created as ORDERS because Oracle defaults to uppercase names when the name is not quoted. Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS, the destination table will be created as orders because PostgreSQL defaults to lower-case names when the name is not quoted.

To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers property to true. When this options is set, when an incoming event is for a topic called orders, and the destination database dialect is Oracle, the connector creates a table with the name orders, because the constructed SQL defines the name of the table as "orders". Enabling quoting results in the same behavior when the connector creates column names.

Connection Idle Timeouts

The JDBC sink connector for Debezium leverages a connection pool to enhance performance. Connection pools are engineered to establish an initial set of connections, maintain a specified number of connections, and efficiently allocate connections to the application as required. However, a challenge arises when connections linger idle in the pool, potentially triggering timeouts if they remain inactive beyond the configured idle timeout threshold of the database.

To mitigate the potential for idle connection threads to trigger timeouts, connection pools offer a mechanism that periodically validates the activity of each connection. This validation ensures that connections remain active, and prevents the database from flagging them as idle. In the event of a network disruption, if Debezium attempts to use a terminated connection, the connector prompts the pool to generate a new connection.

By default, the Debezium JDBC sink connector does not conduct idle timeout tests. However, you can configure the connector to request the pool to perform timeout tests at a specified interval by setting the hibernate.c3p0.idle_test_period property. For example:

Example timeout configuration

{
  "hibernate.c3p0.idle_test_period": "300"
}
Copy to Clipboard Toggle word wrap

The Debezium JDBC sink connector uses the Hibernate C3P0 connection pool. You can customize the CP30 connection pool by setting properties in the hibernate.c3p0.*` configuration namespace. In the preceding example, the setting of the hibernate.c3p0.idle_test_period property configures the connection pool to perform idle timeout tests every 300 seconds. After you apply the configuration, the connection pool begins to assess unused connections every five minutes.

3.1.2. How the Debezium JDBC connector maps data types

Before the Debezium JDBC sink connector sends data to a sink database, it converts data types in the original source record to a corresponding type in the target system. Appropriate data type mapping ensures that the original data is accurately represented in the destination database.

The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system. Primitive types include values such as integers, floating points, Booleans, strings, and bytes. Typically, Kafka messages use a specific Kafka Connect Schema type code to represent primitive data types.

By contrast, to represent more complex data, Debezium uses logical schema names, which provide logical groupings of named fields that can represent a range of data types (strings, arrays, JSON, XML, and so forth). These structured types take into account the semantic meaning of the data and provide a more opinionated interpretation about how to serialize the underlying primitive types. Logical types are useful in representing values that have a specific encoding, such as numbers that represent the time since the epoch.

The following examples show representative structures of primitive and logical data types:

Example 3.1. Primitive field schema

{
  "schema": {
    "type": "INT64"
  }
}
Copy to Clipboard Toggle word wrap

Example 3.2. Logical field schema

{
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
}
Copy to Clipboard Toggle word wrap

Kafka Connect is not the only source for these complex, logical types. In fact, when a Debezium source connector emits a change event, it can assign similar logical types to event fields to represent such data types as timestamps, dates, and even JSON data.

The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.

Expand
Table 3.1. Mappings between Kafka Connect primitive data types and column data types
Primitive TypeJDBC SQL Type

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

Types.VARBINARY

Note

The connector does not support BOOLEAN data in Oracle 23. If you configure the JDBC connector to use an Oracle 23 database as the sink target, you cannot rely on schema evolution to automatically create a field that uses the Oracle 23 BOOLEAN data type. The connector maps BOOLEAN data to BIT data types, which are universal across Oracle versions. As a workaround, if you need a BOOLEAN data type for an Oracle 23 sink, add the field manually to the target table.

Expand
Table 3.2. Mappings between Kafka Connect logical data types and column data types
Logical TypeJDBC SQL Type

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

3.1.2.3. Mappings between Debezium logical data types and JDBC column types

Expand
Table 3.3. Mappings between Debezium logical types and column data types
Logical TypeJDBC SQL Type

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

Important

If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.

Expand
Table 3.4. Mappings between Debezium dialect-specific logical types and column data types
Logical TypeMySQL SQL TypePostgreSQL SQL TypeSQL Server SQL Type

io.debezium.data.Bits

bit(n)

bit(n) or bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

n/a

io.debezium.data.Json

json

json

n/a

io.debezium.data.EnumSet

set

n/a

n/a

io.debezium.time.Year

year(n)

n/a

n/a

io.debezium.time.MicroDuration

n/a

interval

n/a

io.debezium.data.Ltree

n/a

ltree

n/a

io.debezium.data.Uuid

n/a

uuid

n/a

io.debezium.data.Xml

n/a

xml

xml

3.1.2.5. Mappings between Debezium vector data types and RDBMS column types

The Debezium JDBC sink connector supports direct mapping of logical vector data types from source events to sink destinations, provided that the target database supports a comparable representation.

If a vector field in the source has a logical name, Debezium uses it to determine the appropriate mapping. For certain databases, the connector recognizes the following logical names under the io.debezium.data.vector.* namespace:

  • FloatVector
  • DoubleVector
  • SparseVector

When the connector processes a message, if these logical types are present, the connector checks for a corresponding mapping in the target database. If a mapping exists, it applies it; otherwise, it defaults to serializing the data as a string, based on the Kafka Connect schema type.

For example, a connector that is configured to send data to a PostgreSQL database sink maps fields with the io.debezium.data.FloatVector logical name to the halfvector column type, using the special override mapping. By contrast, if no direct mapping is available in the sink database, as for example with an Oracle sink, the connector defaults to serializing vector data in its raw string format.

Support for the special vector types is not available for all releases of the supported databases. Sink databases must meet the following requirements to support direct mapping of logical vector types:

  • MariaDB version 11.7 or later
  • MySQL version 9.0 or later
  • PostgreSQL requires the pgvector extension

Earlier versions of MariaDB or MySQL, and PostgreSQL without pgvector, do not support special vector types.

Expand
Table 3.5. Mappings between Debezium Vector Types and Column Data Types
Logical TypeDb2MySQLPostgreSQLOracleSQL Server

io.debezium.data.DoubleVector

Unsupported

'vector'

vector

Unsupported

Unsupported

io.debezium.data.FloatVector

Unsupported

'vector'

halfvec

Unsupported

Unsupported

io.debezium.data.SparseVector

Unsupported

Unsupported

sparsevec

Unsupported

Unsupported

Note

If there is not a direct mapping between one of the vector logical types and your target relational database’s column types, you can use the VectorToJsonConverter to convert the vector logical type to JSON so that it can be written to any target relational database.

Column and data type propagation

In addition to the primitive and logical mappings shown in the preceding tables, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. Column and data type propagation helps to determine how the structure and data types of incoming data are translated and applied to the sink destination. As discussed earlier in Data and column type mappings, you can enforce propagation by setting one of the following properties in the source connector configuration:

  • column.propagate.source.type
  • datatype.propagate.source.type

The Debezium JDBC sink connector applies only the values with the higher precedence.

To illustrate how propagation affects how the connector maps data types, let’s look at an example. The following example shows a field schema that might be included within a change event:

Example 3.3. Debezium change event field schema with column or data type propagation enabled

{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}
Copy to Clipboard Toggle word wrap

Because the source connector that emitted the event was configured to use column or data type propagation, the event includes parameters that specify column type and length.

If propagation were not enabled for the source connector, the type and length parameters would be absent, and the Debezium JDBC sink connector would default to mapping the INT8 value in the type field to a column type of Types.SMALLINT. Depending on the SQL dialect of the target database, the connector would then resolve the JDBC Types.SMALLINT type to any of several logical types, For example, for a MySQL sink target, the JDBC connector defaults to converting Types.SMALLINT to a TINYINT column type with no specified length.

However, because type propagation is enabled in the source connector, the event that it emitted includes the type and length, providing more specific mapping instructions. Thus, rather than using the default mapping, the Debezium JDBC sink connector uses the given parameter values to refine the mapping, and creates a column in the sink database with the type TINYINT(1).

Note

Typically, the effect of using column or data type propagation is most significant when the source and sink databases are of the same type. Because the source and sink database share the same underlying schema structure, it’s easier to map data types between them without having to apply complex transformations or interpretations.

3.1.3. Transformations that modify the format of events before processing

The Debezium JDBC connector provides several transformations that can be added to the connector configuration to modify the consumed events in-flight before they’re processed by the connector.

3.1.3.1. Transformation that alter topic names

The Debezium JDBC connector provides two naming transformations that you can apply to change the naming styles of topic or field names within an event. Optionally, you can also use the naming transformations to apply a prefix or suffix to the transformed name.

You can configure naming transformations to use one of the following naming styles:

camel_case
Removes all period (.) and underscore (_) characters, and converts the immediate next character to uppercase. For example, the value inventory.customers is changed to inventoryCustomers.
snake_case
Removes all period (.) characters and replaces them with underscore (_) characters. Additionally, all numeric sequences are prefixed with an underscore (_), and all uppercase characters are converted to lowercase and then prefixed with an underscore (_) character. For example, the value public.inventory becomes public_inventory, while TopicWith123Numbers becomes topic_with_123_numbers.
upper_case
Converts to uppercase. For example, the value public.inventory would become PUBLIC.INVENTORY.
lower_case
Converts to lowercase. For example, the value PUBLIC.INVENTORY would become public.inventory.

The connector provides the following naming transformations:

CollectionNameTransformation

The CollectionNameTransformation provides a way to change the case of the topic names before the event is consumed. This transformation has the following configuration properties:

Expand
PropertyDefaultDescription

collection.naming.style

none

Specifies the naming style to apply to the event’s topic.

collection.naming.prefix

empty

Specifies the prefix that is applied to the topic name after transformation.

collection.naming.suffix

empty

Specifies the suffix that is applied to the topic name after transformation.

The following configuration example illustrates using the CollectionNameTransformation SMT to set the topic name to uppercase and prefix the topic name with ADT_. For example, given a topic called public.inventory, the topic name becomes ADT_PUBLIC.INVENTORY.

Example CollectionNameTransformation configuration

{
  "transforms": "topic-uppercase",
  "transforms.topic-uppercase.type": "io.debezium.connector.jdbc.transforms.CollectionNameTransformation",
  "transforms.topic-uppercase.collection.naming.style": "upper_case",
  "transforms.topic-uppercase.collection.naming.prefix": "ADT_"
}
Copy to Clipboard Toggle word wrap

FieldNameTransformation

The FieldNameTransformation provides a way to change the case of field names before the event is consumed. If the event is a Debezium source connector event, only the fields within the before and after sections are changed. When the event is not a Debezium source connector event, all top-level field names are changed.

This transformation has the following configuration properties:

Expand
PropertyDefaultDescription

column.naming.style

none

Specifies the naming style to apply to the field name.

column.naming.prefix

empty

Specifies the prefix that is applied to the field name after transformation.

column.naming.suffix

empty

Specifies the suffix that is applied to the field name after transformation.

The following example shows how to configure the FieldNameTransformation SMT to convert field names name to lowercase and prefix the names with the string adt_. After you apply the SMT to an event that includes a field with the name ID, the field is renamed to adt_id.

Example FieldNameTransformation configuration

{
  "transforms": "topic-lowercase",
  "transforms.topic-lowercase.type": "io.debezium.connector.jdbc.transforms.FieldNameTransformation",
  "transforms.topic-lowercase.collection.naming.style": "lower_case",
  "transforms.topic-lowercase.collection.naming.prefix": "adt_"
}
Copy to Clipboard Toggle word wrap

3.1.4. Deployment of Debezium JDBC connectors

You can use either of the following methods to deploy a Debezium JDBC connector:

Important

Due to licensing requirements, the Debezium JDBC connector archive does not include the drivers that Debezium requires to connect to the Db2 and Oracle databases. To enable the connector to access these databases, you must add the drivers to your connector environment. For information about how to obtain drivers that are not supplied with the connector, see Obtaining drivers not included in the connector archive.

3.1.4.1. Obtaining JDBC drivers not included in the connector archive

Due to licensing requirements, the JDBC driver files that Debezium requires to connect to Db2 Database and Oracle Database are not included in the Debezium JDBC connector archive. These drivers are available for download from Maven Central. Depending on the deployment method that you use, you can use one of the following metods to retrieve the drivers:

You use Streams for Apache Kafka to add the connector to your Kafka Connect image
Add the Maven Central location for the driver to builds.plugins.artifact.url in the KafkaConnect custom resource as shown in Section 3.1.4.3, “Using Streams for Apache Kafka to deploy a Debezium JDBC connector”.
You use a Containerfile to build a container image for the connector
In the Containerfile, insert a curl command that specifies the URL for downloading the driver file from Maven Central. For more information, see Section 3.1.4.4, “Deploying a Debezium JDBC connector by building a custom Kafka Connect container image from a Containerfile”.

3.1.4.2. JDBC connector deployment using Streams for Apache Kafka

The preferred method for deploying a Debezium connector is to use Streams for Apache Kafka to build a Kafka Connect container image that includes the connector plug-in.

During the deployment process, you create and use the following custom resources (CRs):

  • A KafkaConnect CR that defines your Kafka Connect instance and includes information about the connector artifacts needs to include in the image.
  • A KafkaConnector CR that provides details that include information the connector uses to access the source database. After Streams for Apache Kafka starts the Kafka Connect pod, you start the connector by applying the KafkaConnector CR.

In the build specification for the Kafka Connect image, you can specify the connectors that are available to deploy. For each connector plug-in, you can also specify other components that you want to make available for deployment. For example, you can add Apicurio Registry artifacts, or the Debezium scripting component. When Streams for Apache Kafka builds the Kafka Connect image, it downloads the specified artifacts, and incorporates them into the image.

The spec.build.output parameter in the KafkaConnect CR specifies where to store the resulting Kafka Connect container image. Container images can be stored in a container registry, such as quay.io, or in an OpenShift ImageStream. To store images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect. ImageStreams are not created automatically.

Note

If you use a KafkaConnect resource to create a cluster, afterwards you cannot use the Kafka Connect REST API to create or update connectors. You can still use the REST API to retrieve information.

Additional resources

3.1.4.3. Using Streams for Apache Kafka to deploy a Debezium JDBC connector

You can use the build configuration in Streams for Apache Kafka to automatically build a Kafka Connect container image to OpenShift. The build image includes the Debezium connector plug-ins that you specify.

During the build process, the Streams for Apache Kafka Operator transforms input parameters in a KafkaConnect custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository or from another configured HTTP server.

The newly created container is pushed to the container registry that is specified in .spec.build.output, and is used to deploy a Kafka Connect cluster. After Streams for Apache Kafka builds the Kafka Connect image, you create KafkaConnector custom resources to start the connectors that are included in the build.

Prerequisites

  • You have access to an OpenShift cluster in which the cluster Operator is installed.
  • The Streams for Apache Kafka Operator is running.
  • An Apache Kafka cluster is deployed as documented in Deploying and Managing Streams for Apache Kafka on OpenShift.
  • Kafka Connect is deployed on Streams for Apache Kafka
  • You have a Kafka topic from which the connector can read change event records.
  • A destination database is installed and is configured to accept JDBC connections.
  • You have a Red Hat build of Debezium license.
  • The OpenShift oc CLI client is installed or you have access to the OpenShift Container Platform web console.
  • Depending on how you intend to store the Kafka Connect build image, you need registry permissions or you must create an ImageStream resource:

    To store the build image in an image registry, such as Red Hat Quay.io or Docker Hub
    • An account and permissions to create and manage images in the registry.
    To store the build image as a native OpenShift ImageStream

Procedure

  1. Log in to the OpenShift cluster.
  2. Create a Debezium KafkaConnect custom resource (CR) for the connector, or modify an existing one.
    For example, create a KafkaConnect CR with the name dbz-jdbc-connect.yaml that specifies the annotations and image properties, as shown in the following excerpt. In the example that follows, the custom resource is configured to download the following artifacts:

    • The Debezium JDBC connector archive.
    • A JDBC driver that is required to connect to an Oracle or Db2 sink database. You can omit this entry for other sink destinations.

      A dbz-jdbc-connect.yaml file that defines a KafkaConnect custom resource that includes a Debezium connector

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: debezium-kafka-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true"  //  
      1
      
      spec:
        version: 4.0
        replicas: 1
        bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
        build:  
      2
      
          output:  
      3
      
            type: imagestream  
      4
      
            image: debezium-streams-connect:latest
          plugins:   
      5
      
            - name: debezium-jdbc-connector
              artifacts:
                - type: zip  
      6
      
                  url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-jdbc/3.2.4.Final-redhat-00000/debezium-connector-jdbc-3.2.4.Final-redhat-00000-plugin.zip  
      7
      
                - type: jar
                  url: https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc11/21.15.0.0/ojdbc11-21.15.0.0.jar  
      8
      
        ...
      Copy to Clipboard Toggle word wrap

      Expand
      Table 3.6. Descriptions of Kafka Connect configuration settings
      ItemDescription

      1

      Sets the strimzi.io/use-connector-resources annotation to "true" to enable the Cluster Operator to use KafkaConnector resources to configure connectors in this Kafka Connect cluster.

      2

      The spec.build configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.

      3

      The build.output specifies the registry in which the newly built image is stored.

      4

      Specifies the name and image name for the image output. Valid values for output.type are docker to push into a container registry such as Quay, or imagestream to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying the build.output in the KafkaConnect configuration, see the Streams for Apache Kafka Build schema reference in {NameConfiguringStreamsOpenShift}.

      5

      The plugins configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-in name, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.

      6

      The value of artifacts.type specifies the file type of the artifact specified in the artifacts.url. Valid types are zip, tgz, or jar. Debezium connector archives are provided in .zip file format. JDBC driver files are in .jar format. The type value must match the type of the file that is referenced in the url field.

      7

      The value of artifacts.url specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.

      8

      (For Db2 or Oracle sinks only) Specifies the location of the JDBC JDBC driver in Maven Central. The drivers that are required for Debezium to connect to these databases are not included in the Debezium connector archives.

      The example provides the Maven URL for the Oracle Database JDBC driver. The Db2 JDBC driver is available at the following Maven location: https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.9.0/jcc-11.5.9.0.jar

  3. Apply the KafkaConnect build specification to the OpenShift cluster by entering the following command:

    oc create -f dbz-jdbc-connect.yaml
    Copy to Clipboard Toggle word wrap

    Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
    After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.

  4. Create a KafkaConnector resource to define an instance of each connector that you want to deploy.
    For example, create the following KafkaConnector CR, and save it as orders-to-postgresql-jdbc-connector.yaml

    orders-to-postgresql-jdbc-connector.yaml file that defines the KafkaConnector custom resource for a Debezium connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: orders-topic-to-postgresql-via-jdbc-sink-connector 
    1
    
      labels: strimzi.io/cluster: debezium-kafka-connect-cluster
    spec:
      class: io.debezium.connector.jdbc.JdbcSinkConnector  
    2
    
      tasksMax: 1  
    3
    
       config:  
    4
    
        connection.url: jdbc://postgresql://<host>:<port>/<database-name> 
    5
    
        connection.username: <database-user>  
    6
    
        connection.password: <database-pwd>  
    7
    
        insert.mode: upsert
        delete.enabled: true
        topics: orders 
    8
    
        primary.key.mode: record_key
        schema.evolution: basic
        use.time.zone: UTC
    Copy to Clipboard Toggle word wrap

    Expand
    Table 3.7. Descriptions of connector configuration settings
    ItemDescription

    1

    The name of the connector to register with the Kafka Connect cluster.

    2

    The name of the connector class.

    3

    The number of tasks that can operate concurrently.

    4

    The connector’s configuration.

    5

    The JDBC connection URL for the sink database. The URL specifies the port number and any authentication properties that are required to connect to the database. For example, jdbc:oracle:thin:@myhost.example.com:1521/myservice

    6

    The name of the account that Debezium uses to connect to the database.

    7

    The password that Debezium uses to connect to the database user account.

    8

    Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database.

  5. Create the connector resource by running the following command:

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to Clipboard Toggle word wrap

    For example,

    oc create -n debezium -f jdbc-inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by spec.config.database.dbname in the KafkaConnector CR. After the connector pod is ready, Debezium is running.

You can deploy a Debezium JDBC connector by building a custom Kafka Connect container image that contains the Debezium connector archive, and then push this container image to a container registry. Afterwards, you create the following custom resources (CRs) to define the connector configuration:

  • A KafkaConnect CR that defines your Kafka Connect instance. The image property in the CR specifies the name of the container image that you create to run your Debezium connector. You apply this CR to the OpenShift instance where Red Hat Streams for Apache Kafka is deployed. Streams for Apache Kafka offers operators and images that bring Apache Kafka to OpenShift.
  • A KafkaConnector CR that defines your Debezium JDBC connector. Apply this CR to the same OpenShift instance where you applied the KafkaConnect CR.
Note

The deployment method described in this section is deprecated and is scheduled for removal in future versions of the documentation.

Prerequisites

  • A destination database is installed and is configured to accept JDBC connections.
  • Streams for Apache Kafka is deployed on OpenShift and is running Apache Kafka and Kafka Connect. For more information, see Deploying and Managing Streams for Apache Kafka on OpenShift.
  • Podman or Docker is installed.
  • You have a Kafka topic from which the connector can read change event records.
  • A destination database is installed and is configured to accept JDBC connections.
  • If you want the JDBC connector to send data to a Db2 or Oracle database, the Kafka Connect server has access to Maven Central to download the JDBC drivers for those databases. You can also use a local copy of the driver, or one that is available from a local Maven repository or other HTTP server.
  • You have an account and permissions to create and manage containers in the container registry (such as quay.io or docker.io) to which you plan to add the container that will run your Debezium connector.

Procedure

  1. Create the Debezium JDBC connector container on Kafka Connect:

    1. Create a Containerfile that uses registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.0 as the base image. For example, from a terminal window, enter the following command:
cat <<EOF >debezium-jdbc-connector-container.yaml 
1

FROM registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium 
2

RUN cd /opt/kafka/plugins/debezium/ \
&& curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-jdbc/3.2.4.Final-redhat-00000/debezium-connector-jdbc-3.2.4.Final-redhat-00000-plugin.zip \
&& unzip debezium-connector-jdbc-3.2.4.Final-redhat-00000-plugin.zip \
&& rm debezium-connector-jdbc-3.2.4.Final-redhat-00000-plugin.zip
RUN cd /opt/kafka/plugins/debezium/ \
&& curl -O https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc11/21.15.0.0/ojdbc11-21.15.0.0.jar
USER 1001
EOF
Copy to Clipboard Toggle word wrap
 +
.Descriptions of Containerfile settings for building a custom Kafka Connect container image
Copy to Clipboard Toggle word wrap
Expand
ItemDescription

1

You can specify any file name that you want.

2

Specifies the path to your Kafka Connect plug-ins directory. If your Kafka Connect plug-ins directory is in a different location, replace this path with the actual path of your directory.

+ The command creates a Containerfile with the name debezium-jdbc-connector-container.yaml in the current directory.

  1. Build the container image from the debezium-jdbc-connector-container.yaml Containerfile that you created in the previous step. From the directory that contains the file, open a terminal window and enter one of the following commands:

    podman build -t debezium-jdbc-connector-container:latest .
    Copy to Clipboard Toggle word wrap
    docker build -t debezium-jdbc-connector-container:latest .
    Copy to Clipboard Toggle word wrap

    The preceding commands build a container image with the name debezium-jdbc-connector-container.

  2. Push your custom image to a container registry, such as quay.io or an internal container registry. The container registry must be available to the OpenShift instance where you want to deploy the image. Enter one of the following commands:

    podman push <myregistry.io>/debezium-jdbc-connector-container:latest
    Copy to Clipboard Toggle word wrap
    docker push <myregistry.io>/debezium-jdbc-connector-container:latest
    Copy to Clipboard Toggle word wrap
  3. Create a new Debezium Oracle KafkaConnect custom resource (CR). For example, create a KafkaConnect CR with the name dbz-connect.yaml that specifies annotations and image properties. The following example shows an excerpt from a dbz-connect.yaml file that describes a KafkaConnect custom resource.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    1
    
    spec:
      image: debezium-jdbc-connector-container 
    2
    
    
      ...
    Copy to Clipboard Toggle word wrap
Expand
ItemDescription

1

You must set metadata.annotations to true to permit the Cluster Operator to use KafkaConnector resources to configure Debezium connectors in the cluster.

2

spec.image specifies the name of the image that you created to run your Debezium connector. This property overrides the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable in the Cluster Operator.

  1. Apply the KafkaConnect CR to the OpenShift Kafka Connect environment by entering the following command:

    oc create -f dbz-connect.yaml
    Copy to Clipboard Toggle word wrap

    The command adds a Kafka Connect instance that specifies the name of the image that you created to run your Debezium connector.

    1. Create a KafkaConnector custom resource that configures your Debezium JDBC connector instance.

      You configure a Debezium JDBC connector in a .yaml file that specifies the configuration properties for the connector.

      The following example shows an excerpt from a dbz-connect.yaml file that sets a few of the key properties for a KafkaConnect custom resource.
      The connector establishes a JDBC connection to a PostgreSQL server sink on port 5432.

      For information about the full range of available connector properties, see Descriptions of Debezium JDBC connector configuration properties.

      Example 3.4. jdbc-connector.yaml

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        labels:
        strimzi.io/cluster: my-connect-cluster 
      1
      
        name: orders-topic-to-postgresql-via-jdbc-sink-connector 
      2
      
      ...
      spec:
        class: io.debezium.connector.jdbc.JdbcSinkConnector 
      3
      
      ...
        config:
          connection.url: jdbc:<_jdbc_>://<_database-host_>:5432/<_database-name_>  
      4
      
          connection.username: "<_database-user_>" 
      5
      
          connection.password: "<_database-password_>" 
      6
      
          topics: orders 
      7
      Copy to Clipboard Toggle word wrap
Expand
ItemDescription

1

The connector name that is registered with the Kafka Connect service.

2

The name of the Streams for Apache Kafka cluster.

3

The name of the Debezium JDBC connector class.

4

The JDBC address of the sink database.

5

The name of the account that Debezium uses to connect to the database.

6

The password that Debezium uses to authenticate to the database user account.

7

Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database.

  1. Create your connector instance with Kafka Connect. For example, if you saved your KafkaConnector resource in the jdbc-connector.yaml file, you would run the following command:

    oc apply -f jdbc-connector.yaml
    Copy to Clipboard Toggle word wrap

    The preceding command registers orders-topic-to-postgresql-via-jdbc-sink-connector. The connector starts and begins to read from the orders topic, as specified in the KafkaConnector CR.

3.1.5. Descriptions of Debezium JDBC connector configuration properties

The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:

3.1.5.1. JDBC connector Kafka consumer properties

Expand
PropertyDefaultDescription

name

No default

Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors.

class

No default

The name of the Java class for the connector. For the Debezium JDBC connector, specify the value io.debezium.connector.jdbc.JdbcSinkConnector.

tasks.max

1

Maximum number of tasks to use for this connector.

topics

No default

List of topics to consume, separated by commas. Do not use this property in combination with the topics.regex property.

topics.regex

No default

A regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a java.util.regex.Pattern. Do not use this property in combination with the topics property.

3.1.5.2. JDBC connector connection properties

Expand
PropertyDefaultDescription

connection.provider

org.hibernate.c3p0.internal.C3P0ConnectionProvider

The connection provider implementation to use.

connection.url

No default

The JDBC connection URL used to connect to the database.

connection.username

No default

The name of the database user account that the connector uses to connect to the database.

connection.password

No default

The password that the connector uses to connect to the database.

connection.pool.min_size

5

Specifies the minimum number of connections in the pool.

connection.pool.max_size

32

Specifies the maximum number of concurrent connections that the pool maintains.

connection.pool.acquire_increment

32

Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.

connection.pool.timeout

1800

Specifies the number of seconds that an unused connection is kept before it is discarded.

connection.restart.on.errors

false

Specifies whether the connector retries after a transient JDBC connection error.

When enabled (true), the connector treats connection issues (such as socket closures or timeouts) as retriable, allowing it to retry processing instead of failing the task. This reduces downtime and improves resilience against temporary disruptions.

Note

Setting this option to true can reduce downtime. However, in master-replica environments with asynchronous replication, it may lead to data loss if retries occur before all changes are fully replicated.

Use with caution where strong data consistency is required.

3.1.5.3. JDBC connector runtime properties

Expand
PropertyDefaultDescription

use.time.zone

UTC

Specifies the timezone used when inserting JDBC temporal values.

delete.enabled

false

Specifies whether the connector processes DELETE or tombstone events and removes the corresponding row from the database. Use of this option requires that you set the primary.key.mode to record.key.

truncate.enabled

false

Specifies whether the connector processes TRUNCATE events and truncates the corresponding tables from the database.

Note

Although support for TRUNCATE statements has been available in Db2 since version 9.7, currently, the JDBC connector is unable to process standard TRUNCATE events that the Db2 connector emits.

To ensure that the JDBC connector can process TRUNCATE events received from Db2, perform the truncation by using an alternative to the standard TRUNCATE TABLE statement. For example:

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

The user account that submits the preceding query requires ALTER privileges on the table to be truncated.

insert.mode

insert

Specifies the strategy used to insert events into the database. The following options are available:

insert
Specifies that all events should construct INSERT-based SQL statements. Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.
update
Specifies that all events should construct UPDATE-based SQL statements. Use this option only when you can be certain that the connector receives only events that apply to existing rows.
upsert
Specifies that the connector adds events to the table using upsert semantics. That is, if the primary key does not exist, the connector performs an INSERT operation, and if the key does exist, the connector performs an UPDATE operation. When idempotent writes are required, the connector should be configured to use this option.

primary.key.mode

none

Specifies how the connector resolves the primary key columns from the event.

none
Specifies that no primary key columns are created.
kafka

Specifies that the connector uses Kafka coordinates as the primary key columns. The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:

  • __connect_topic
  • __connect_partition
  • __connect_offset
record_key
Specifies that the primary key columns are sourced from the event’s record key. If the record key is a primitive type, the primary.key.fields property is required to specify the name of the primary key column. If the record key is a struct type, the primary.key.fields property is optional, and can be used to specify a subset of columns from the event’s key as the table’s primary key.
record_value
Specifies that the primary key columns is sourced from the event’s value. You can set the primary.key.fields property to define the primary key as a subset of fields from the event’s value; otherwise all fields are used by default.

primary.key.fields

No default

Either the name of the primary key column or a comma-separated list of fields to derive the primary key from.

When primary.key.mode is set to record_key and the event’s key is a primitive type, it is expected that this property specifies the column name to be used for the key.

When the primary.key.mode is set to record_key with a non-primitive key, or record_value, it is expected that this property specifies a comma-separated list of field names from either the key or value. If the primary.key.mode is set to record_key with a non-primitive key, or record_value, and this property is not specified, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.

quote.identifiers

false

Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the JDBC quoting case-sensitivity section for more details.

schema.evolution

none

Specifies how the connector evolves the destination table schemas. For more information, see Section 3.1.1.8, “Schema evolution modes for the Debezium JDBC connector”. The following options are available:

none
Specifies that the connector does not evolve the destination schema.
basic
Specifies that basic evolution occurs. The connector adds missing columns to the table by comparing the incoming event’s record schema to the database table structure.

collection.name.format

${topic}

Specifies a string pattern that the connector uses to construct the names of destination tables.
When the property is set to its default value, ${topic}, after the connector reads an event from Kafka, it writes the event record to a destination table with a name that matches the name of the source topic.

You can also configure this property to extract values from specific fields in incoming event records and then use those values to dynamically generate the names of target tables. This ability to generate table names from values in the message source would otherwise require the use of a custom Kafka Connect single message transformation (SMT).

To configure the property to dynamically generate the names of destination tables, set its value to a pattern such as ${source._field_}. When you specify this type of pattern, the connector extracts values from the source block of the Debezium change event, and then uses those values to construct the table name. For example, you might set the value of the property to the pattern ${source.schema}_${source.table}. Based on this pattern, if the connector reads an event in which the schema field in the source block contains the value, user, and the table field contains the value, tab, the connector writes the event record to a table with the name user_tab.

dialect.postgres.postgis.schema

public

Specifies the schema name where the PostgreSQL PostGIS extension is installed. The default is public; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.

dialect.sqlserver.identity.insert

false

Specifies whether the connector automatically sets an IDENTITY_INSERT before an INSERT or UPSERT operation into the identity column of SQL Server tables, and then unsets it immediately after the operation. When the default setting (false) is in effect, an INSERT or UPSERT operation into the IDENTITY column of a table results in a SQL exception.

batch.size

500

Specifies how many records to attempt to batch together into the destination table.

Note

Note that if you set consumer.max.poll.records in the Connect worker properties to a value lower than batch.size, batch processing will be caped by consumer.max.poll.records and the desired batch.size won’t be reached. You can also configure the connector’s underlying consumer’s max.poll.records using consumer.override.max.poll.records in the connector configuration.

use.reduction.buffer

false

Specifies whether to enable the Debezium JDBC connector’s reduction buffer.

Choose one of the following settings:

false
(default) The connector writes each change event that it consumes from Kafka as a separate logical SQL change.
true
The connector uses the reduction buffer to reduce change events before it writes them to the sink database. That is, if multiple events refer to the same primary key, the connector consolidates the SQL queries and writes only a single logical SQL change, based on the row state that is reported in the most recent offset record.
Choose this option to reduce the SQL load on the target database.

To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the reWriteBatchedInserts parameter to the JDBC connection URL.

field.include.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:_fieldName_.

If you include this property in the configuration, do not set the field.exclude.list property.

field.exclude.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:_fieldName_.

If you include this property in the configuration, do not set the field.include.list property.

flush.max.retries

5

Specifies the maximum number of retries that the connector performs after an attempt to flush changes to the target database results in certain database errors. If the number of retries exceeds the retry value, the sink connector enters a FAILED state.

flush.retry.delay.ms

1000

Specifies the number of milliseconds that the connector waits to retry a flush operation that failed.

Note

When you set both the flush.retry.delay.ms and flush.max.retries properties, it can affect the behavior of the Kafka max.poll.interval.ms property. To prevent the connector from rebalancing, set the total retry time (flush.retry.delay.ms * flush.max.retries) to a value that is less than the value of max.poll.interval.ms (default is 5 minutes).

3.1.5.4. JDBC connector extendable properties

Expand
PropertyDefaultDescription

column.naming.strategy

io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy

Specifies the fully-qualified class name of a ColumnNamingStrategy implementation that the connector uses to resolve column names from incoming event field names.

The default behavior is to use the field name as the column name without any transformation.

collection.naming.strategy

io.debezium.connector.jdbc.nnaming.DefaultCollectionNamingStrategy

Specifies the fully-qualified class name of a CollectionNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.

The default behavior is to:

  • Sanitize the topic name by replacing dots (.) with underscores (_).
  • Replace the ${topic} placeholder in the collection.name.format configuration property with the event’s topic.

3.1.5.5. JDBC connector hibernate.* passthrough properties

Kafka Connect supports passthrough configuration, enabling you to modify the behavior of an underlying system by passing certain properties directly from the connector configuration. By default, some Hibernate properties are exposed via the JDBC connector connection properties (for example, connection.url, connection.username, and connection.pool.*_size), and through the connector’s runtime properties (for example, use.time.zone, quote.identifiers).

If you want to customize other Hibernate behavior, you can take advantage of the passthrough mechanism by adding properties that use the hibernate.* namespace to the connector configuration. For example, to assist Hibernate in resolving the type and version of the target database, you can add the hibernate.dialect property and set it to the fully qualified class name of the database, for example, org.hibernate.dialect.MariaDBDialect.

3.1.6. JDBC connector frequently asked questions

Is the ExtractNewRecordState single message transformation required?
No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.
If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?
No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.
If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?
The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Section 3.1.1.4, “Description of Debezium JDBC connector data and column type mappings” section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?
In order to add a prefix or a suffix to the destination table name, adjust the collection.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with jdbc_, specify the collection.name.format configuration property with a value of jdbc_${topic}. If the connector is subscribed to a topic called orders, the resulting table is created as jdbc_orders.
Why are some columns automatically quoted, even though identifier quoting is not enabled?
In some situations, specific column or table names might be explicitly quoted, even when quote.identifiers is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set to kafka, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.

3.2. Debezium sink connector for MongoDB (Developer Preview)

The Debezium MongoDB sink connector captures change event records from Apache Kafka topics and then transforms the records into MongoDB documents that it writes to collections in a specified MongoDB sink database. For applications that require high scalability and fast data retrieval, propagating change data to a cluster-based MongoDB environment, which uses such features as sharding and replica sets to optimize read operations, can significantly improve retrieval performance. The connector can process only change events that originate from a Debezium relational database connector.

Important

The Debezium MongoDB sink connector is Developer Preview software only. Developer Preview software is not supported by Red Hat in any way and is not functionally complete or production-ready. Do not use Developer Preview software for production or business-critical workloads. Developer Preview software provides early access to upcoming product software in advance of its possible inclusion in a Red Hat product offering. Customers can use this software to test functionality and provide feedback during the development process. This software might not have any documentation, is subject to change or removal at any time, and has received limited testing. Red Hat might provide ways to submit feedback on Developer Preview software without an associated SLA. For more information about the support scope of Red Hat Developer Preview software, see Developer Preview Support Scope.

For information about the MongoDB versions that are compatible with this connector, see the Debezium Supported Configurations page.

Information about the Debezium sink connector for MongoDB and and instructions about how to use it is available in the following topics:

3.2.1. Architecture of the Debezium MongoDB sink connector

Use the Debezium MongoDB sink connector to stream change data capture (CDC) event records from Kafka topics to a MongoDB sink database. The connector subscribes to Kafka topics that are populated with event messages produced by Debezium relational database source connectors. Each event message describes a database operation (insert, update, or delete) in a structured format that captures the details of the event. The connector transforms incoming change event records into MongoDB document format, and then writes the resulting documents into the target MongoDB collection.

After it receives an event, the connector parses the event payload and determines which MongoDB collection to send it to. Depending on the event type that is specified in the event payload, the connector then performs one of the following operations in the target collection:

Expand
Event type in payloadResulting operation

INSERT

Create document

UPDATE

Modify document with the specified identifier.

DELETE

Remove document with the specified identifier.

The connector uses the MongoDB Java driver to interact with the MongoDB database.

The mapping between topics and MongoDB collections is derived from the connector configuration. The document key serves as a unique identifier for the document, ensuring that updates, inserts, and deletions are propagated to the correct MongoDB document and collection, and that operations are applied in the correct order.

Through this process of mapping event messages to MongoDB documents, the connector is able to mirror the state of tables in your relational database to collections in a MongoDB database.

3.2.2. Limitations of the Debezium MongoDB sink connector

The Debezium MongoDB sink connector has the following limitations:

Relational database / RDBMS source connectors only

The MongoDB sink connector can consume only change events that originate from the Debezium connectors for the following relational databases:

  • MariaDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

The connector cannot process change event messages from any other Debezium connectors, including the Debezium MongoDB source connector.

Schema evolution
Although the connector can process basic schema changes, advanced schema evolution scenarios might require manual intervention or specific configuration. Because MongoDB is schemaless, it has only a limited ability to handle schema evolution.
Transaction Support
The connector processes individual change events in chronological order, based on the order in which operations are committed in the source system. Although MongoDB supports transactions, the Debezium MongoDB connector does not provide transactional guarantees across multiple CDC events or across multiple documents within a single sink task.

3.2.3. Quickstart of a Debezium MongoDB sink connector

Deploy a basic instance of the MongoDB sink connector for testing.

Prerequisites

The following components are available and running in your environment:

  • Kafka cluster
  • Kafka Connect
  • A MongoDB instance.
  • Debezium relational database connector
  • Debezium MongoDB connector

Procedure

  1. Configure and start a Debezium source connector, for example, a Debezium PostgreSQL Connector, to stream changes from a relational database to Kafka.
  2. Configure and start the Debezium MongoDB sink connector to consume events that the source connector emits to Kafka, and send them to a MongoDB sink database.

    The following example provides a minimal configuration for a Debezium MongoDB sink connector. Replace the placeholders in the example with the actual values for your environment.

    {
      "name": "mongodb-sink-connector",
      "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics.regex": "server1\.inventory\..*",
        "mongodb.connection.string": "mongodb://localhost:27017",
        "sink.database": "debezium"
      }
    }
    Copy to Clipboard Toggle word wrap

3.2.4. Configuration of the Debezium MongoDB sink connector

The MongoDB sink connector accepts a variety of configuration options, as described in following tables.

Expand
Table 3.8. Required Kafka Connect sink connector configuration properties
PropertyDefaultDescription

connector.class

No default value

Must be set to io.debezium.connector.mongodb.sink.MongoDbSinkConnector.

tasks.max

1

Maximum number of tasks.

topics or topics.regex

No default value

List of Kafka topics to consume from. If you set this value to topics.regex, the connector consumes from all topics that match the regular expression.

Expand
Table 3.9. Required MongoDB connection properties
PropertyDefaultDescription

mongodb.connection.string

No default value

MongoDB connection string (URI) that the sink uses to connect to MongoDB. This URI follows the standard MongoDB connection string format.

Example: mongodb://localhost:27017/?replicaSet=my-replica-set

sink.database

No default value

Name of the target MongoDB database.

Expand
Table 3.10. Sink behavior configuration
PropertyDefaultDescription

collection.naming.strategy

io.debezium.sink.naming.DefaultCollectionNamingStrategy

Specifies the strategy that the connector uses to derive the name of the target MongoDB collection from the name of the Kafka topic.

Specify one of the following values:

io.debezium.sink.naming.DefaultCollectionNamingStrategy
The connector takes the table name directly from the topic name, replacing dot (period) characters in the name of the source topic with underscores.
Custom implementation
You can provide your own CollectionNameStrategy implementation.

collection.name.format

${topic}

Template for deriving the target collection name from the Kafka topic name.

column.naming.strategy

io.debezium.sink.naming.DefaultColumnNamingStrategy

Specifies the strategy that the connector uses to name columns in the target collection.

Specify one of the following values:

io.debezium.sink.naming.DefaultColumnNamingStrategy
Use the original field name as the column name.
Custom implementation
Specify a custom CollectionNameStrategy implementation.
Expand
Table 3.11. Common sink options
PropertyDefaultDescription

field.include.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.
If you include this property in the configuration, do not set the field.exclude.list property.

field.exclude.list

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.

If you include this property in the configuration, do not set the field.include.list property.

batch.size

2048

Maximum number of records to write in a single batch.

3.2.5. Example configuration for the Debezium MongoDB sink connector

The following example shows how you might configure the connector to read change events from three specific topics from the dbserver1.inventory database to modify a collection in the MongoDB sink database named debezium.

{
    "name": "mongodb-sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics": "dbserver1.inventory.customers,dbserver1.inventory.orders,dbserver1.inventory.products",
        "mongodb.connection.string": "mongodb://localhost:27017",
        "sink.database": "debezium"
    }
}
Copy to Clipboard Toggle word wrap

3.2.6. Monitoring the Debezium MongoDB sink connector

This release of the connector does not expose any metrics.

3.2.7. Key field mapping of the Debezium MongoDB sink connector

When the connector processes events, it maps data to specific fields in the target MongoDB document.

  • Keys from Debezium change events, such as Kafka message keys, are mapped to the MongoDB _id field by default.
  • Values are mapped into MongoDB documents.
  • Updates and deletes are resolved based on the key field mapping.

The following example shows an event key in a Kafka topic:

{
    "userId": 1,
    "orderId": 1
}
Copy to Clipboard Toggle word wrap

Based on the mapping logic, the preceding key is mapped to the _id field in a MongoDB document, as showin in the following example:

{
    "_id": {
        "userId": 1,
        "orderId": 1
    }
}
Copy to Clipboard Toggle word wrap

3.2.8. Using CloudEvents with the Debezium MongoDB sink connector

The Debezium MongoDB sink connector can consume records serialized as CloudEvents. Debezium can emit change events in CloudEvents format, so that the event payload is encapsulated in a standardized envelope.

When you enable CloudEvents on the source connector, the MongoDB sink connector parses the CloudEvents envelope.

The actual Debezium event payload is extracted from the data section.

The event is then applied to the target MongoDB collection, following the standard insert, update, or delete semantics.

This process makes it possible to integrate Debezium with broader event-driven systems while still persisting the resulting events in MongoDB.

Expand
Table 3.12. CloudEvents sink options
PropertyDefaultDescription

cloud.events.schema.name.pattern

.*CloudEvents\.Envelope$

Regular expression pattern to identify CloudEvents messages by matching the schema name with this pattern.

Torna in cima
Red Hat logoGithubredditYoutubeTwitter

Formazione

Prova, acquista e vendi

Community

Informazioni sulla documentazione di Red Hat

Aiutiamo gli utenti Red Hat a innovarsi e raggiungere i propri obiettivi con i nostri prodotti e servizi grazie a contenuti di cui possono fidarsi. Esplora i nostri ultimi aggiornamenti.

Rendiamo l’open source più inclusivo

Red Hat si impegna a sostituire il linguaggio problematico nel codice, nella documentazione e nelle proprietà web. Per maggiori dettagli, visita il Blog di Red Hat.

Informazioni su Red Hat

Forniamo soluzioni consolidate che rendono più semplice per le aziende lavorare su piattaforme e ambienti diversi, dal datacenter centrale all'edge della rete.

Theme

© 2025 Red Hat