Chapter 3. Sink connectors
Debezium provides sink connectors that can consume events from sources such as Apache Kafka topics. A sink connector standardizes the format of the data, and then persists the event data to a configured sink repository. Other systems, applications, or users can then access the events from the data sink.
Because the sink connector applies a consistent structure to the event data that it consumes, downstream applications that read from the data sink can more easily interpret and process that data.
Currently, Debezium provides the following sink connectors:
3.1. Debezium connector for JDBC Copy linkLink copied to clipboard!
The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
3.1.1. How the Debezium JDBC connector works Copy linkLink copied to clipboard!
The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.
The Debezium JDBC connector provides the following features:
3.1.1.1. Description of how the Debezium JDBC connector consumes complex change events Copy linkLink copied to clipboard!
By default, Debezium source connectors produce complex, hierarchical change events. When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState
single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation. If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.
When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after
section of the original insert
or update
event. When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.
The Debezium JDBC sink connector is not designed to read from schema change topics. If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the topics
or topics.regex
properties so that the connector does not consume from schema change topics.
3.1.1.2. Description of Debezium JDBC connector at-least-once delivery Copy linkLink copied to clipboard!
The Debezium JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.
3.1.1.3. Description of Debezium JDBC use of multiple tasks Copy linkLink copied to clipboard!
You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks. To run the connector across multiple tasks, set the tasks.max
configuration property to the number of tasks that you want the connector to use. The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task. Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.
3.1.1.4. Description of Debezium JDBC connector data and column type mappings Copy linkLink copied to clipboard!
To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event. The connector supports a wide range of column type mappings across different database dialects. To correctly convert the destination column type from the type
metadata in an event field, the connector applies the data type mappings that are defined for the source database. You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type
or datatype.propagate.source.type
options in the source connector configuration. When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct
. In addition, the payload of the source message must be a Struct
that has either a flattened structure with no nested struct
types, or a nested struct
layout that conforms to Debezium’s complex, hierarchical structure.
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
3.1.1.5. Description of how the Debezium JDBC connector handles primary keys in source events Copy linkLink copied to clipboard!
By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
Mode | Description |
---|---|
| No primary key fields are specified when creating the table. |
| The primary key consists of the following three columns:
The values for these columns are sourced from the coordinates of the Kafka event. |
|
The primary key is composed of the Kafka event’s key. |
|
The primary key is composed of the Kafka event’s value. |
|
The primary key is composed of the Kafka event’s headers. |
Some database dialects might throw an exception if you set the primary.key.mode
to kafka
and set schema.evolution
to basic
. This exception occurs when a dialect maps a STRING
data type mapping to a variable length string data type such as TEXT
or CLOB
, and the dialect does not allow primary key columns to have unbounded lengths. To avoid this problem, apply the following settings in your environment:
-
Do not set
schema.evolution
tobasic
. - Create the database table and primary key mappings in advance.
If a column maps to a data type that isn’t permitted as a primary key for your target database, an explicit list of columns will be necessary in primary.key.fields
excluding such columns. Consult your specific database vendor’s documentation for what data types are and are not permissible.
3.1.1.6. Configuring the Debezium JDBC connector to delete rows when consuming DELETE or tombstone events Copy linkLink copied to clipboard!
The Debezium JDBC sink connector can delete rows in the destination database when a DELETE
or tombstone event is consumed. By default, the JDBC sink connector does not enable delete mode.
If you want to the connector to remove rows, you must explicitly set delete.enabled=true
in the connector configuration. To use this mode you must also set primary.key.fields
to a value other than none
. The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
3.1.1.7. Enabling the connector to perform idempotent writes Copy linkLink copied to clipboard!
The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.
To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode
for the connector to upsert
. An upsert
operation is applied as either an update
or an insert
, depending on whether the specified primary key already exists.
If the primary key value already exists, the operation updates values in the row. If the specified primary key value doesn’t exist, an insert
adds a new row.
Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations. The following table shows the upsert
DML syntax for the database dialects that Debezium supports:
Dialect | Upsert Syntax |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Schema evolution modes for the Debezium JDBC connector Copy linkLink copied to clipboard!
You can use the following schema evolution modes with the Debezium JDBC sink connector:
Mode | Description |
---|---|
| The connector does not perform any DDL schema evolution. |
| The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields. |
When schema.evolution
is set to basic
, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table. If schema evolution is enabled, the connector prepares and executes a CREATE TABLE
SQL statement before it applies the DML event to the destination table.
When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table. If schema evolution is enabled, the connector prepares and executes an ALTER TABLE
SQL statement before it applies the DML event to the destination table. Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
The schema of each field determines whether a column is NULL
or NOT NULL
. The schema also defines the default values for each column. If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event. To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
A field’s data type is resolved based on a predefined set of mappings. For more information, see JDBC field types.
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:
- Remove the field manually.
- Drop the column.
- Assign a default value to the field.
- Define the field a nullable.
3.1.1.9. Specifying options to define the letter case of destination table and column names Copy linkLink copied to clipboard!
The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
For example, if the destination database dialect is Oracle and the event’s topic is orders
, the destination table will be created as ORDERS
because Oracle defaults to upper-case names when the name is not quoted. Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS
, the destination table will be created as orders
because PostgreSQL defaults to lower-case names when the name is not quoted.
To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers
property to true
. When this options is set, when an incoming event is for a topic called orders
, and the destination database dialect is Oracle, the connector creates a table with the name orders
, because the constructed SQL defines the name of the table as "orders"
. Enabling quoting results in the same behavior when the connector creates column names.
Connection Idle Timeouts
The JDBC sink connector for Debezium leverages a connection pool to enhance performance. Connection pools are engineered to establish an initial set of connections, maintain a specified number of connections, and efficiently allocate connections to the application as required. However, a challenge arises when connections linger idle in the pool, potentially triggering timeouts if they remain inactive beyond the configured idle timeout threshold of the database.
To mitigate the potential for idle connection threads to trigger timeouts, connection pools offer a mechanism that periodically validates the activity of each connection. This validation ensures that connections remain active, and prevents the database from flagging them as idle. In the event of a network disruption, if Debezium attempts to use a terminated connection, the connector prompts the pool to generate a new connection.
By default, the Debezium JDBC sink connector does not conduct idle timeout tests. However, you can configure the connector to request the pool to perform timeout tests at a specified interval by setting the hibernate.c3p0.idle_test_period
property. For example:
Example timeout configuration
{ "hibernate.c3p0.idle_test_period": "300" }
{
"hibernate.c3p0.idle_test_period": "300"
}
The Debezium JDBC sink connector uses the Hibernate C3P0 connection pool. You can customize the CP30 connection pool by setting properties in the hibernate.c3p0.*` configuration namespace. In the preceding example, the setting of the hibernate.c3p0.idle_test_period property configures the connection pool to perform idle timeout tests every 300 seconds. After you apply the configuration, the connection pool begins to assess unused connections every five minutes.
3.1.2. How the Debezium JDBC connector maps data types Copy linkLink copied to clipboard!
The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system. Primitive types include values such as integers, floating points, Booleans, strings, and bytes. Typically, these types are represented with a specific Kafka Connect Schema
type code only. Logical data types are more often complex types, including values such as Struct
-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.
The following examples show representative structures of primitive and logical data types:
Primitive field schema
{ "schema": { "type": "INT64" } }
{
"schema": {
"type": "INT64"
}
}
Logical field schema
Kafka Connect is not the only source for these complex, logical types. In fact, Debezium source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.
The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.
- Table 3.1, “Mappings between Kafka Connect Primitives and Column Data Types”
- Table 3.2, “Mappings between Kafka Connect Logical Types and Column Data Types”
- Table 3.3, “Mappings between Debezium Logical Types and Column Data Types”
- Table 3.4, “Mappings between Debezium dialect-specific Logical Types and Column Data Types”
Primitive Type | JDBC SQL Type |
---|---|
INT8 | Types.TINYINT |
INT16 | Types.SMALLINT |
INT32 | Types.INTEGER |
INT64 | Types.BIGINT |
FLOAT32 | Types.FLOAT |
FLOAT64 | Types.DOUBLE |
BOOLEAN | Types.BOOLEAN |
STRING | Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES | Types.VARBINARY |
Logical Type | JDBC SQL Type |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Types.TIMESTAMP |
Logical Type | JDBC SQL Type |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Types.TIMESTAMP |
io.debezium.time.MicroTime | Types.TIMESTAMP |
io.debezium.time.NanoTime | Types.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Types.TIMESTAMP |
io.debezium.time.MicroTimestamp | Types.TIMESTAMP |
io.debezium.time.NanoTimestamp | Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | Types.DOUBLE |
If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.
Logical Type | MySQL SQL Type | PostgreSQL SQL Type | SQL Server SQL Type |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| Types.VARCHAR | n/a |
io.debezium.data.Json |
|
| n/a |
io.debezium.data.EnumSet |
| n/a | n/a |
io.debezium.time.Year |
| n/a | n/a |
io.debezium.time.MicroDuration | n/a |
| n/a |
io.debezium.data.Ltree | n/a |
| n/a |
io.debezium.data.Uuid | n/a |
| n/a |
io.debezium.data.Xml | n/a |
|
|
In addition to the primitive and logical mappings above, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. To enforce propagation, one of the following properties must be set in the source connector configuration:
-
column.propagate.source.type
-
datatype.propagate.source.type
The Debezium JDBC sink connector applies the values with the higher precedence.
For example, let’s say the following field schema is included in a change event:
Debezium change event field schema with column or data type propagation enabled
In the preceding example, if no schema parameters are set, the Debezium JDBC sink connector maps this field to a column type of Types.SMALLINT
. Types.SMALLINT
can have different logical database types, depending on the database dialect. For MySQL, the column type in the example converts to a TINYINT
column type with no specified length. If column or data type propagation is enabled for the source connector, the Debezium JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type TINYINT(1)
.
Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database.
3.1.3. Deployment of Debezium JDBC connectors Copy linkLink copied to clipboard!
You can use either of the following methods to deploy a Debezium JDBC connector:
Use Streams for Apache Kafka to automatically create an image that includes the connector plug-in.
This is the preferred method.
-
Build a custom Kafka Connect container image from a Containerfile.
This Containerfile deployment method is deprecated. The instructions for this method are scheduled for removal in future versions of the documentation.
Due to licensing requirements, the Debezium JDBC connector archive does not include the drivers that Debezium requires to connect to the Db2 and Oracle databases. To enable the connector to access these databases, you must add the drivers to your connector environment. For information about how to obtain drivers that are not supplied with the connector, see Obtaining drivers not included in the connector archive.
3.1.3.1. Obtaining JDBC drivers not included in the connector archive Copy linkLink copied to clipboard!
Due to licensing requirements, the JDBC driver files that Debezium requires to connect to Db2 Database and Oracle Database are not included in the Debezium JDBC connector archive. These drivers are available for download from Maven Central. Depending on the deployment method that you use, you can use one of the following metods to retrieve the drivers:
- You use Streams for Apache Kafka to add the connector to your Kafka Connect image
-
Add the Maven Central location for the driver to
builds.plugins.artifact.url
in theKafkaConnect
custom resource as shown in Section 3.1.3.3, “Using Streams for Apache Kafka to deploy a Debezium JDBC connector”. - You use a Containerfile to build a container image for the connector
-
In the Containerfile, insert a
curl
command that specifies the URL for downloading the driver file from Maven Central. For more information, see Section 3.1.3.4, “Deploying a Debezium JDBC connector by building a custom Kafka Connect container image from a Containerfile”.
3.1.3.2. JDBC connector deployment using Streams for Apache Kafka Copy linkLink copied to clipboard!
The preferred method for deploying a Debezium connector is to use Streams for Apache Kafka to build a Kafka Connect container image that includes the connector plug-in.
During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnect
CR that defines your Kafka Connect instance and includes information about the connector artifacts needs to include in the image. -
A
KafkaConnector
CR that provides details that include information the connector uses to access the source database. After Streams for Apache Kafka starts the Kafka Connect pod, you start the connector by applying theKafkaConnector
CR.
In the build specification for the Kafka Connect image, you can specify the connectors that are available to deploy. For each connector plug-in, you can also specify other components that you want to make available for deployment. For example, you can add Apicurio Registry artifacts, or the Debezium scripting component. When Streams for Apache Kafka builds the Kafka Connect image, it downloads the specified artifacts, and incorporates them into the image.
The spec.build.output
parameter in the KafkaConnect
CR specifies where to store the resulting Kafka Connect container image. Container images can be stored in a container registry, such as quay.io, or in an OpenShift ImageStream. To store images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect. ImageStreams are not created automatically.
If you use a KafkaConnect
resource to create a cluster, afterwards you cannot use the Kafka Connect REST API to create or update connectors. You can still use the REST API to retrieve information.
Additional resources
- Configuring Kafka Connect in Deploying and Managing Streams for Apache Kafka on OpenShift.
- Building a new container image automatically in Deploying and Managing Streams for Apache Kafka on OpenShift.
3.1.3.3. Using Streams for Apache Kafka to deploy a Debezium JDBC connector Copy linkLink copied to clipboard!
You can use the build configuration in Streams for Apache Kafka to automatically build a Kafka Connect container image to OpenShift. The build image includes the Debezium connector plug-ins that you specify.
During the build process, the Streams for Apache Kafka Operator transforms input parameters in a KafkaConnect
custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository or from another configured HTTP server.
The newly created container is pushed to the container registry that is specified in .spec.build.output
, and is used to deploy a Kafka Connect cluster. After Streams for Apache Kafka builds the Kafka Connect image, you create KafkaConnector
custom resources to start the connectors that are included in the build.
Prerequisites
- You have access to an OpenShift cluster in which the cluster Operator is installed.
- The Streams for Apache Kafka Operator is running.
- An Apache Kafka cluster is deployed as documented in Deploying and Managing Streams for Apache Kafka on OpenShift.
- Kafka Connect is deployed on Streams for Apache Kafka
- You have a Kafka topic from which the connector can read change event records.
- A destination database is installed and is configured to accept JDBC connections.
- You have a Red Hat build of Debezium license.
-
The OpenShift
oc
CLI client is installed or you have access to the OpenShift Container Platform web console. Depending on how you intend to store the Kafka Connect build image, you need registry permissions or you must create an ImageStream resource:
- To store the build image in an image registry, such as Red Hat Quay.io or Docker Hub
- An account and permissions to create and manage images in the registry.
- To store the build image as a native OpenShift ImageStream
- An ImageStream resource is deployed to the cluster for storing new container images. You must explicitly create an ImageStream for the cluster. ImageStreams are not available by default. For more information about ImageStreams, see Managing image streams on OpenShift Container Platform.
Procedure
- Log in to the OpenShift cluster.
Create a Debezium
KafkaConnect
custom resource (CR) for the connector, or modify an existing one.
For example, create aKafkaConnect
CR with the namedbz-jdbc-connect.yaml
that specifies theannotations
andimage
properties, as shown in the following excerpt. In the example that follows, the custom resource is configured to download the following artifacts:- The Debezium JDBC connector archive.
A JDBC driver that is required to connect to an Oracle or Db2 sink database. You can omit this entry for other sink destinations.
A
dbz-jdbc-connect.yaml
file that defines aKafkaConnect
custom resource that includes a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.5. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resources
annotation to"true"
to enable the Cluster Operator to useKafkaConnector
resources to configure connectors in this Kafka Connect cluster.2
The
spec.build
configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.output
specifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.type
aredocker
to push into a container registry such as Quay, orimagestream
to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.output
in the KafkaConnect configuration, see the Streams for Apache Kafka Build schema reference in {NameConfiguringStreamsOpenShift}.5
The
plugins
configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname
, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.type
specifies the file type of the artifact specified in theartifacts.url
. Valid types arezip
,tgz
, orjar
. Debezium connector archives are provided in.zip
file format. JDBC driver files are in.jar
format. Thetype
value must match the type of the file that is referenced in theurl
field.7
The value of
artifacts.url
specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.8
(For Db2 or Oracle sinks only) Specifies the location of the JDBC JDBC driver in Maven Central. The drivers that are required for Debezium to connect to these databases are not included in the Debezium connector archives.
The example provides the Maven URL for the Oracle Database JDBC driver. The Db2 JDBC driver is available at the following Maven location:
https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.9.0/jcc-11.5.9.0.jar
Apply the
KafkaConnect
build specification to the OpenShift cluster by entering the following command:oc create -f dbz-jdbc-connect.yaml
oc create -f dbz-jdbc-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnector
resource to define an instance of each connector that you want to deploy.
For example, create the followingKafkaConnector
CR, and save it asorders-to-postgresql-jdbc-connector.yaml
orders-to-postgresql-jdbc-connector.yaml
file that defines theKafkaConnector
custom resource for a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.6. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
The number of tasks that can operate concurrently.
4
The connector’s configuration.
5
The JDBC connection URL for the sink database. The URL specifies the port number and any authentication properties that are required to connect to the database. For example,
jdbc:oracle:thin:@myhost.example.com:1521/myservice
6
The name of the account that Debezium uses to connect to the database.
7
The password that Debezium uses to connect to the database user account.
8
Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc create -n debezium -f jdbc-inventory-connector.yaml
oc create -n debezium -f jdbc-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbname
in theKafkaConnector
CR. After the connector pod is ready, Debezium is running.
3.1.3.4. Deploying a Debezium JDBC connector by building a custom Kafka Connect container image from a Containerfile Copy linkLink copied to clipboard!
You can deploy a Debezium JDBC connector by building a custom Kafka Connect container image that contains the Debezium connector archive, and then push this container image to a container registry. Afterwards, you create the following custom resources (CRs) to define the connector configuration:
-
A
KafkaConnect
CR that defines your Kafka Connect instance. Theimage
property in the CR specifies the name of the container image that you create to run your Debezium connector. You apply this CR to the OpenShift instance where Red Hat Streams for Apache Kafka is deployed. Streams for Apache Kafka offers operators and images that bring Apache Kafka to OpenShift. -
A
KafkaConnector
CR that defines your Debezium JDBC connector. Apply this CR to the same OpenShift instance where you applied theKafkaConnect
CR.
The deployment method described in this section is deprecated and is scheduled for removal in future versions of the documentation.
Prerequisites
- A destination database is installed and is configured to accept JDBC connections.
- Streams for Apache Kafka is deployed on OpenShift and is running Apache Kafka and Kafka Connect. For more information, see Deploying and Managing Streams for Apache Kafka on OpenShift.
- Podman or Docker is installed.
- You have a Kafka topic from which the connector can read change event records.
- A destination database is installed and is configured to accept JDBC connections.
- If you want the JDBC connector to send data to a Db2 or Oracle database, the Kafka Connect server has access to Maven Central to download the JDBC drivers for those databases. You can also use a local copy of the driver, or one that is available from a local Maven repository or other HTTP server.
-
You have an account and permissions to create and manage containers in the container registry (such as
quay.io
ordocker.io
) to which you plan to add the container that will run your Debezium connector.
Procedure
Create the Debezium JDBC connector container on Kafka Connect:
-
Create a Containerfile that uses
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0
as the base image. For example, from a terminal window, enter the following command:
-
Create a Containerfile that uses
+ .Descriptions of Containerfile settings for building a custom Kafka Connect container image
+
.Descriptions of Containerfile settings for building a custom Kafka Connect container image
Item | Description |
---|---|
1 | You can specify any file name that you want. |
2 | Specifies the path to your Kafka Connect plug-ins directory. If your Kafka Connect plug-ins directory is in a different location, replace this path with the actual path of your directory. |
+ The command creates a Containerfile with the name debezium-jdbc-connector-container.yaml
in the current directory.
Build the container image from the
debezium-jdbc-connector-container.yaml
Containerfile that you created in the previous step. From the directory that contains the file, open a terminal window and enter one of the following commands:podman build -t debezium-jdbc-connector-container:latest .
podman build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-jdbc-connector-container:latest .
docker build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The preceding commands build a container image with the name
debezium-jdbc-connector-container
.Push your custom image to a container registry, such as quay.io or an internal container registry. The container registry must be available to the OpenShift instance where you want to deploy the image. Enter one of the following commands:
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-jdbc-connector-container:latest
docker push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Create a new Debezium Oracle KafkaConnect custom resource (CR). For example, create a
KafkaConnect
CR with the namedbz-connect.yaml
that specifiesannotations
andimage
properties. The following example shows an excerpt from adbz-connect.yaml
file that describes aKafkaConnect
custom resource.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Item | Description |
---|---|
1 |
You must set |
2 |
|
Apply the
KafkaConnect
CR to the OpenShift Kafka Connect environment by entering the following command:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The command adds a Kafka Connect instance that specifies the name of the image that you created to run your Debezium connector.
Create a
KafkaConnector
custom resource that configures your Debezium JDBC connector instance.You configure a Debezium JDBC connector in a
.yaml
file that specifies the configuration properties for the connector.The following example shows an excerpt from a
dbz-connect.yaml
file that sets a few of the key properties for aKafkaConnect
custom resource.
The connector establishes a JDBC connection to a PostgreSQL server sink on port5432
.
For information about the full range of available connector properties, see Descriptions of Debezium JDBC connector configuration properties.Example 3.1.
jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Item | Description |
---|---|
1 | The connector name that is registered with the Kafka Connect service. |
2 | The name of the Streams for Apache Kafka cluster. |
3 | The name of the Debezium JDBC connector class. |
4 | The JDBC address of the sink database. |
5 | The name of the account that Debezium uses to connect to the database. |
6 | The password that Debezium uses to authenticate to the database user account. |
7 | Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database. |
Create your connector instance with Kafka Connect. For example, if you saved your
KafkaConnector
resource in thejdbc-connector.yaml
file, you would run the following command:oc apply -f jdbc-connector.yaml
oc apply -f jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The preceding command registers
orders-topic-to-postgresql-via-jdbc-sink-connector
. The connector starts and begins to read from theorders
topic, as specified in theKafkaConnector
CR.
3.1.4. Descriptions of Debezium JDBC connector configuration properties Copy linkLink copied to clipboard!
The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:
3.1.4.1. JDBC connector Kafka consumer properties Copy linkLink copied to clipboard!
Property | Default | Description |
---|---|---|
No default | Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors. | |
No default |
The name of the Java class for the connector. For the Debezium JDBC connector, specify the value | |
1 | Maximum number of tasks to use for this connector. | |
No default |
List of topics to consume, separated by commas. Do not use this property in combination with the | |
No default |
A regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a |
3.1.4.2. JDBC connector connection properties Copy linkLink copied to clipboard!
Property | Default | Description |
---|---|---|
| The connection provider implementation to use. | |
No default | The JDBC connection URL used to connect to the database. | |
No default | The name of the database user account that the connector uses to connect to the database. | |
No default | The password that the connector uses to connect to the database. | |
| Specifies the minimum number of connections in the pool. | |
| Specifies the maximum number of concurrent connections that the pool maintains. | |
| Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size. | |
| Specifies the number of seconds that an unused connection is kept before it is discarded. |
3.1.4.3. JDBC connector runtime properties Copy linkLink copied to clipboard!
Property | Default | Description |
---|---|---|
| Specifies the timezone used when inserting JDBC temporal values. | |
|
Specifies whether the connector processes | |
|
Specifies whether the connector processes Note
Although support for
To ensure that the JDBC connector can process
The user account that submits the preceding query requires | |
| Specifies the strategy used to insert events into the database. The following options are available:
| |
| Specifies how the connector resolves the primary key columns from the event.
| |
No default |
Either the name of the primary key column or a comma-separated list of fields to derive the primary key from. | |
| Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the JDBC quoting case-sensitivity section for more details. | |
| Specifies how the connector evolves the destination table schemas. For more information, see Section 3.1.1.8, “Schema evolution modes for the Debezium JDBC connector”. The following options are available:
| |
|
Specifies a string pattern that the connector uses to construct the names of destination tables. | |
|
Specifies the schema name where the PostgreSQL PostGIS extension is installed. The default is | |
|
Specifies whether the connector automatically sets an | |
| Specifies how many records to attempt to batch together into the destination table. Note
Note that if you set | |
| Specifies whether to enable the Debezium JDBC connector’s reduction buffer. Choose one of the following settings:
To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the | |
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form | |
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form | |
5 | Specifies the maximum number of retries that the connector performs after an attempt to flush changes to the target database results in certain database errors. If the number of retries exceeds the retry value, the sink connector enters a FAILED state. | |
1000 | Specifies the number of milliseconds that the connector waits to retry a flush operation that failed. Note
When you set both the |
3.1.4.4. JDBC connector extendable properties Copy linkLink copied to clipboard!
Property | Default | Description |
---|---|---|
|
Specifies the fully-qualified class name of a | |
|
Specifies the fully-qualified class name of a
|
3.1.4.5. JDBC connector hibernate.* passthrough properties Copy linkLink copied to clipboard!
Kafka Connect supports passthrough configuration, enabling you to modify the behavior of an underlying system by passing certain properties directly from the connector configuration. By default, some Hibernate properties are exposed via the JDBC connector connection properties (for example, connection.url
, connection.username
, and connection.pool.*_size
), and through the connector’s runtime properties (for example, use.time.zone
, quote.identifiers
).
If you want to customize other Hibernate behavior, you can take advantage of the passthrough mechanism by adding properties that use the hibernate.*
namespace to the connector configuration. For example, to assist Hibernate in resolving the type and version of the target database, you can add the hibernate.dialect
property and set it to the fully qualified class name of the database, for example, org.hibernate.dialect.MariaDBDialect
.
3.1.5. JDBC connector frequently asked questions Copy linkLink copied to clipboard!
- Is the
ExtractNewRecordState
single message transformation required? - No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.
- If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?
- No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.
- If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?
- The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Section 3.1.1.4, “Description of Debezium JDBC connector data and column type mappings” section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
- How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?
-
In order to add a prefix or a suffix to the destination table name, adjust the collection.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with
jdbc_
, specify thecollection.name.format
configuration property with a value ofjdbc_${topic}
. If the connector is subscribed to a topic calledorders
, the resulting table is created asjdbc_orders
. - Why are some columns automatically quoted, even though identifier quoting is not enabled?
-
In some situations, specific column or table names might be explicitly quoted, even when
quote.identifiers
is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set tokafka
, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.