Este contenido no está disponible en el idioma seleccionado.
Chapter 3. Sink connectors
Debezium provides sink connectors that can consume events from sources such as Apache Kafka topics. A sink connector standardizes the format of the data, and then persists the event data to a configured sink repository. Other systems, applications, or users can then access the events from the data sink.
Because the sink connector applies a consistent structure to the event data that it consumes, downstream applications that read from the data sink can more easily interpret and process that data.
Debezium provides the following sink connectors:
3.1. Debezium sink connector for JDBC Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
3.1.1. How the Debezium JDBC connector works Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.
The Debezium JDBC connector provides the following features:
3.1.1.1. Description of how the Debezium JDBC connector consumes complex change events Copiar enlaceEnlace copiado en el portapapeles!
By default, Debezium source connectors produce complex, hierarchical change events. When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState
single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation. If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.
When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after
section of the original insert
or update
event. When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.
The Debezium JDBC sink connector is not designed to read from schema change topics. If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the topics
or topics.regex
properties so that the connector does not consume from schema change topics.
3.1.1.2. Description of Debezium JDBC connector at-least-once delivery Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector guarantees that events that it consumes from Kafka topics are processed at least once.
3.1.1.3. Description of Debezium JDBC use of multiple tasks Copiar enlaceEnlace copiado en el portapapeles!
You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks. To run the connector across multiple tasks, set the tasks.max
configuration property to the number of tasks that you want the connector to use. The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task. Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.
3.1.1.4. Description of Debezium JDBC connector data and column type mappings Copiar enlaceEnlace copiado en el portapapeles!
To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event. The connector supports a wide range of column type mappings across different database dialects. To correctly convert the destination column type from the type
metadata in an event field, the connector applies the data type mappings that are defined for the source database. You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type
or datatype.propagate.source.type
options in the source connector configuration. When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct
. In addition, the payload of the source message must be a Struct
that has either a flattened structure with no nested struct
types, or a nested struct
layout that conforms to Debezium’s complex, hierarchical structure.
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
3.1.1.5. Description of how the Debezium JDBC connector handles primary keys in source events Copiar enlaceEnlace copiado en el portapapeles!
By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
Mode | Description |
---|---|
| No primary key fields are specified when creating the table. |
| The primary key consists of the following three columns:
The values for these columns are sourced from the coordinates of the Kafka event. |
|
The primary key is composed of the Kafka event’s key. |
|
The primary key is composed of the Kafka event’s value. |
|
The primary key is composed of the Kafka event’s headers. |
Some database dialects might throw an exception if you set the primary.key.mode
to kafka
and set schema.evolution
to basic
. This exception occurs when a dialect maps a STRING
data type mapping to a variable length string data type such as TEXT
or CLOB
, and the dialect does not allow primary key columns to have unbounded lengths. To avoid this problem, apply the following settings in your environment:
-
Do not set
schema.evolution
tobasic
. - Create the database table and primary key mappings in advance.
If a column maps to a data type that isn’t permitted as a primary key for your target database, an explicit list of columns will be necessary in primary.key.fields
excluding such columns. Consult your specific database vendor’s documentation for what data types are and are not permissible.
3.1.1.6. Configuring the Debezium JDBC connector to delete rows when consuming DELETE or tombstone events Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector can delete rows in the destination database when a DELETE
or tombstone event is consumed. By default, the JDBC sink connector does not enable delete mode.
If you want to the connector to remove rows, you must explicitly set delete.enabled=true
in the connector configuration. To use this mode you must also set primary.key.fields
to a value other than none
. The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
3.1.1.7. Enabling the connector to perform idempotent writes Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.
To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode
for the connector to upsert
. An upsert
operation is applied as either an update
or an insert
, depending on whether the specified primary key already exists.
If the primary key value already exists, the operation updates values in the row. If the specified primary key value doesn’t exist, an insert
adds a new row.
Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations. The following table shows the upsert
DML syntax for the database dialects that Debezium supports:
Dialect | Upsert Syntax |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Schema evolution modes for the Debezium JDBC connector Copiar enlaceEnlace copiado en el portapapeles!
You can use the following schema evolution modes with the Debezium JDBC sink connector:
Mode | Description |
---|---|
| The connector does not perform any DDL schema evolution. |
| The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields. |
When schema.evolution
is set to basic
, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table. If schema evolution is enabled, the connector prepares and executes a CREATE TABLE
SQL statement before it applies the DML event to the destination table.
When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table. If schema evolution is enabled, the connector prepares and executes an ALTER TABLE
SQL statement before it applies the DML event to the destination table. Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
The schema of each field determines whether a column is NULL
or NOT NULL
. The schema also defines the default values for each column. If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event. To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
A field’s data type is resolved based on a predefined set of mappings. For more information, see JDBC field types.
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:
- Remove the field manually.
- Drop the column.
- Assign a default value to the field.
- Define the field a nullable.
3.1.1.9. Specifying options to define the letter case of destination table and column names Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
For example, if the destination database dialect is Oracle and the event’s topic is orders
, the destination table will be created as ORDERS
because Oracle defaults to uppercase names when the name is not quoted. Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS
, the destination table will be created as orders
because PostgreSQL defaults to lower-case names when the name is not quoted.
To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers
property to true
. When this options is set, when an incoming event is for a topic called orders
, and the destination database dialect is Oracle, the connector creates a table with the name orders
, because the constructed SQL defines the name of the table as "orders"
. Enabling quoting results in the same behavior when the connector creates column names.
Connection Idle Timeouts
The JDBC sink connector for Debezium leverages a connection pool to enhance performance. Connection pools are engineered to establish an initial set of connections, maintain a specified number of connections, and efficiently allocate connections to the application as required. However, a challenge arises when connections linger idle in the pool, potentially triggering timeouts if they remain inactive beyond the configured idle timeout threshold of the database.
To mitigate the potential for idle connection threads to trigger timeouts, connection pools offer a mechanism that periodically validates the activity of each connection. This validation ensures that connections remain active, and prevents the database from flagging them as idle. In the event of a network disruption, if Debezium attempts to use a terminated connection, the connector prompts the pool to generate a new connection.
By default, the Debezium JDBC sink connector does not conduct idle timeout tests. However, you can configure the connector to request the pool to perform timeout tests at a specified interval by setting the hibernate.c3p0.idle_test_period
property. For example:
Example timeout configuration
{ "hibernate.c3p0.idle_test_period": "300" }
{
"hibernate.c3p0.idle_test_period": "300"
}
The Debezium JDBC sink connector uses the Hibernate C3P0 connection pool. You can customize the CP30 connection pool by setting properties in the hibernate.c3p0.*` configuration namespace. In the preceding example, the setting of the hibernate.c3p0.idle_test_period property configures the connection pool to perform idle timeout tests every 300 seconds. After you apply the configuration, the connection pool begins to assess unused connections every five minutes.
3.1.2. How the Debezium JDBC connector maps data types Copiar enlaceEnlace copiado en el portapapeles!
Before the Debezium JDBC sink connector sends data to a sink database, it converts data types in the original source record to a corresponding type in the target system. Appropriate data type mapping ensures that the original data is accurately represented in the destination database.
The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system. Primitive types include values such as integers, floating points, Booleans, strings, and bytes. Typically, Kafka messages use a specific Kafka Connect Schema
type code to represent primitive data types.
By contrast, to represent more complex data, Debezium uses logical schema names, which provide logical groupings of named fields that can represent a range of data types (strings, arrays, JSON, XML, and so forth). These structured types take into account the semantic meaning of the data and provide a more opinionated interpretation about how to serialize the underlying primitive types. Logical types are useful in representing values that have a specific encoding, such as numbers that represent the time since the epoch.
The following examples show representative structures of primitive and logical data types:
Example 3.1. Primitive field schema
{ "schema": { "type": "INT64" } }
{
"schema": {
"type": "INT64"
}
}
Example 3.2. Logical field schema
Kafka Connect is not the only source for these complex, logical types. In fact, when a Debezium source connector emits a change event, it can assign similar logical types to event fields to represent such data types as timestamps, dates, and even JSON data.
The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.
- Section 3.1.2.1, “Mappings between Kafka Connect primitive data types and sink database column types”
- Section 3.1.2.2, “Mappings between Kafka Connect logical data types and sink database column types”
- Section 3.1.2.3, “Mappings between Debezium logical data types and JDBC column types”
- Section 3.1.2.4, “Mappings between Debezium dialect-specific logical data types and RDBMS column data types”
- Section 3.1.2.5, “Mappings between Debezium vector data types and RDBMS column types”
3.1.2.1. Mappings between Kafka Connect primitive data types and sink database column types Copiar enlaceEnlace copiado en el portapapeles!
Primitive Type | JDBC SQL Type |
---|---|
INT8 | Types.TINYINT |
INT16 | Types.SMALLINT |
INT32 | Types.INTEGER |
INT64 | Types.BIGINT |
FLOAT32 | Types.FLOAT |
FLOAT64 | Types.DOUBLE |
BOOLEAN | Types.BOOLEAN |
STRING | Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES | Types.VARBINARY |
The connector does not support BOOLEAN
data in Oracle 23. If you configure the JDBC connector to use an Oracle 23 database as the sink target, you cannot rely on schema evolution to automatically create a field that uses the Oracle 23 BOOLEAN
data type. The connector maps BOOLEAN
data to BIT
data types, which are universal across Oracle versions. As a workaround, if you need a BOOLEAN data type for an Oracle 23 sink, add the field manually to the target table.
3.1.2.2. Mappings between Kafka Connect logical data types and sink database column types Copiar enlaceEnlace copiado en el portapapeles!
Logical Type | JDBC SQL Type |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Types.TIMESTAMP |
3.1.2.3. Mappings between Debezium logical data types and JDBC column types Copiar enlaceEnlace copiado en el portapapeles!
Logical Type | JDBC SQL Type |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Types.TIMESTAMP |
io.debezium.time.MicroTime | Types.TIMESTAMP |
io.debezium.time.NanoTime | Types.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Types.TIMESTAMP |
io.debezium.time.MicroTimestamp | Types.TIMESTAMP |
io.debezium.time.NanoTimestamp | Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | Types.DOUBLE |
If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.
3.1.2.4. Mappings between Debezium dialect-specific logical data types and RDBMS column data types Copiar enlaceEnlace copiado en el portapapeles!
Logical Type | MySQL SQL Type | PostgreSQL SQL Type | SQL Server SQL Type |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| Types.VARCHAR | n/a |
io.debezium.data.Json |
|
| n/a |
io.debezium.data.EnumSet |
| n/a | n/a |
io.debezium.time.Year |
| n/a | n/a |
io.debezium.time.MicroDuration | n/a |
| n/a |
io.debezium.data.Ltree | n/a |
| n/a |
io.debezium.data.Uuid | n/a |
| n/a |
io.debezium.data.Xml | n/a |
|
|
3.1.2.5. Mappings between Debezium vector data types and RDBMS column types Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector supports direct mapping of logical vector data types from source events to sink destinations, provided that the target database supports a comparable representation.
If a vector field in the source has a logical name, Debezium uses it to determine the appropriate mapping. For certain databases, the connector recognizes the following logical names under the io.debezium.data.vector.*
namespace:
-
FloatVector
-
DoubleVector
-
SparseVector
When the connector processes a message, if these logical types are present, the connector checks for a corresponding mapping in the target database. If a mapping exists, it applies it; otherwise, it defaults to serializing the data as a string, based on the Kafka Connect schema type.
For example, a connector that is configured to send data to a PostgreSQL database sink maps fields with the io.debezium.data.FloatVector
logical name to the halfvector
column type, using the special override mapping. By contrast, if no direct mapping is available in the sink database, as for example with an Oracle sink, the connector defaults to serializing vector data in its raw string format.
Support for the special vector types is not available for all releases of the supported databases. Sink databases must meet the following requirements to support direct mapping of logical vector types:
- MariaDB version 11.7 or later
- MySQL version 9.0 or later
- PostgreSQL requires the pgvector extension
Earlier versions of MariaDB or MySQL, and PostgreSQL without pgvector, do not support special vector types.
Logical Type | Db2 | MySQL | PostgreSQL | Oracle | SQL Server |
---|---|---|---|---|---|
io.debezium.data.DoubleVector | Unsupported | 'vector' |
| Unsupported | Unsupported |
io.debezium.data.FloatVector | Unsupported | 'vector' |
| Unsupported | Unsupported |
io.debezium.data.SparseVector | Unsupported | Unsupported |
| Unsupported | Unsupported |
If there is not a direct mapping between one of the vector logical types and your target relational database’s column types, you can use the VectorToJsonConverter
to convert the vector logical type to JSON so that it can be written to any target relational database.
Column and data type propagation
In addition to the primitive and logical mappings shown in the preceding tables, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. Column and data type propagation helps to determine how the structure and data types of incoming data are translated and applied to the sink destination. As discussed earlier in Data and column type mappings, you can enforce propagation by setting one of the following properties in the source connector configuration:
-
column.propagate.source.type
-
datatype.propagate.source.type
The Debezium JDBC sink connector applies only the values with the higher precedence.
To illustrate how propagation affects how the connector maps data types, let’s look at an example. The following example shows a field schema that might be included within a change event:
Example 3.3. Debezium change event field schema with column or data type propagation enabled
Because the source connector that emitted the event was configured to use column or data type propagation, the event includes parameters that specify column type
and length
.
If propagation were not enabled for the source connector, the type
and length
parameters would be absent, and the Debezium JDBC sink connector would default to mapping the INT8
value in the type
field to a column type of Types.SMALLINT
. Depending on the SQL dialect of the target database, the connector would then resolve the JDBC Types.SMALLINT
type to any of several logical types, For example, for a MySQL sink target, the JDBC connector defaults to converting Types.SMALLINT
to a TINYINT
column type with no specified length.
However, because type propagation is enabled in the source connector, the event that it emitted includes the type
and length
, providing more specific mapping instructions. Thus, rather than using the default mapping, the Debezium JDBC sink connector uses the given parameter values to refine the mapping, and creates a column in the sink database with the type TINYINT(1)
.
Typically, the effect of using column or data type propagation is most significant when the source and sink databases are of the same type. Because the source and sink database share the same underlying schema structure, it’s easier to map data types between them without having to apply complex transformations or interpretations.
3.1.3. Transformations that modify the format of events before processing Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC connector provides several transformations that can be added to the connector configuration to modify the consumed events in-flight before they’re processed by the connector.
3.1.3.1. Transformation that alter topic names Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC connector provides two naming transformations that you can apply to change the naming styles of topic or field names within an event. Optionally, you can also use the naming transformations to apply a prefix or suffix to the transformed name.
You can configure naming transformations to use one of the following naming styles:
camel_case
-
Removes all period (
.
) and underscore (_
) characters, and converts the immediate next character to uppercase. For example, the valueinventory.customers
is changed toinventoryCustomers
. snake_case
-
Removes all period (
.
) characters and replaces them with underscore (_
) characters. Additionally, all numeric sequences are prefixed with an underscore (_
), and all uppercase characters are converted to lowercase and then prefixed with an underscore (_
) character. For example, the valuepublic.inventory
becomespublic_inventory
, whileTopicWith123Numbers
becomestopic_with_123_numbers
. upper_case
-
Converts to uppercase. For example, the value
public.inventory
would becomePUBLIC.INVENTORY
. lower_case
-
Converts to lowercase. For example, the value
PUBLIC.INVENTORY
would becomepublic.inventory
.
The connector provides the following naming transformations:
- CollectionNameTransformation
The
CollectionNameTransformation
provides a way to change the case of the topic names before the event is consumed. This transformation has the following configuration properties:Expand Property Default Description none
Specifies the naming style to apply to the event’s topic.
empty
Specifies the prefix that is applied to the topic name after transformation.
empty
Specifies the suffix that is applied to the topic name after transformation.
The following configuration example illustrates using the
CollectionNameTransformation
SMT to set the topic name to uppercase and prefix the topic name withADT_
. For example, given a topic calledpublic.inventory
, the topic name becomesADT_PUBLIC.INVENTORY
.Example CollectionNameTransformation configuration
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - FieldNameTransformation
The
FieldNameTransformation
provides a way to change the case of field names before the event is consumed. If the event is a Debezium source connector event, only the fields within thebefore
andafter
sections are changed. When the event is not a Debezium source connector event, all top-level field names are changed.This transformation has the following configuration properties:
Expand Property Default Description none
Specifies the naming style to apply to the field name.
empty
Specifies the prefix that is applied to the field name after transformation.
empty
Specifies the suffix that is applied to the field name after transformation.
The following example shows how to configure the
FieldNameTransformation
SMT to convert field names name to lowercase and prefix the names with the stringadt_
. After you apply the SMT to an event that includes a field with the nameID
, the field is renamed toadt_id
.Example FieldNameTransformation configuration
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.1.4. Deployment of Debezium JDBC connectors Copiar enlaceEnlace copiado en el portapapeles!
You can use either of the following methods to deploy a Debezium JDBC connector:
Use Streams for Apache Kafka to automatically create an image that includes the connector plug-in.
This is the preferred method.
-
Build a custom Kafka Connect container image from a Containerfile.
This Containerfile deployment method is deprecated. The instructions for this method are scheduled for removal in future versions of the documentation.
Due to licensing requirements, the Debezium JDBC connector archive does not include the drivers that Debezium requires to connect to the Db2 and Oracle databases. To enable the connector to access these databases, you must add the drivers to your connector environment. For information about how to obtain drivers that are not supplied with the connector, see Obtaining drivers not included in the connector archive.
3.1.4.1. Obtaining JDBC drivers not included in the connector archive Copiar enlaceEnlace copiado en el portapapeles!
Due to licensing requirements, the JDBC driver files that Debezium requires to connect to Db2 Database and Oracle Database are not included in the Debezium JDBC connector archive. These drivers are available for download from Maven Central. Depending on the deployment method that you use, you can use one of the following metods to retrieve the drivers:
- You use Streams for Apache Kafka to add the connector to your Kafka Connect image
-
Add the Maven Central location for the driver to
builds.plugins.artifact.url
in theKafkaConnect
custom resource as shown in Section 3.1.4.3, “Using Streams for Apache Kafka to deploy a Debezium JDBC connector”. - You use a Containerfile to build a container image for the connector
-
In the Containerfile, insert a
curl
command that specifies the URL for downloading the driver file from Maven Central. For more information, see Section 3.1.4.4, “Deploying a Debezium JDBC connector by building a custom Kafka Connect container image from a Containerfile”.
3.1.4.2. JDBC connector deployment using Streams for Apache Kafka Copiar enlaceEnlace copiado en el portapapeles!
The preferred method for deploying a Debezium connector is to use Streams for Apache Kafka to build a Kafka Connect container image that includes the connector plug-in.
During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnect
CR that defines your Kafka Connect instance and includes information about the connector artifacts needs to include in the image. -
A
KafkaConnector
CR that provides details that include information the connector uses to access the source database. After Streams for Apache Kafka starts the Kafka Connect pod, you start the connector by applying theKafkaConnector
CR.
In the build specification for the Kafka Connect image, you can specify the connectors that are available to deploy. For each connector plug-in, you can also specify other components that you want to make available for deployment. For example, you can add Apicurio Registry artifacts, or the Debezium scripting component. When Streams for Apache Kafka builds the Kafka Connect image, it downloads the specified artifacts, and incorporates them into the image.
The spec.build.output
parameter in the KafkaConnect
CR specifies where to store the resulting Kafka Connect container image. Container images can be stored in a container registry, such as quay.io, or in an OpenShift ImageStream. To store images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect. ImageStreams are not created automatically.
If you use a KafkaConnect
resource to create a cluster, afterwards you cannot use the Kafka Connect REST API to create or update connectors. You can still use the REST API to retrieve information.
Additional resources
- Configuring Kafka Connect in Deploying and Managing Streams for Apache Kafka on OpenShift.
- Building a new container image automatically in Deploying and Managing Streams for Apache Kafka on OpenShift.
3.1.4.3. Using Streams for Apache Kafka to deploy a Debezium JDBC connector Copiar enlaceEnlace copiado en el portapapeles!
You can use the build configuration in Streams for Apache Kafka to automatically build a Kafka Connect container image to OpenShift. The build image includes the Debezium connector plug-ins that you specify.
During the build process, the Streams for Apache Kafka Operator transforms input parameters in a KafkaConnect
custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository or from another configured HTTP server.
The newly created container is pushed to the container registry that is specified in .spec.build.output
, and is used to deploy a Kafka Connect cluster. After Streams for Apache Kafka builds the Kafka Connect image, you create KafkaConnector
custom resources to start the connectors that are included in the build.
Prerequisites
- You have access to an OpenShift cluster in which the cluster Operator is installed.
- The Streams for Apache Kafka Operator is running.
- An Apache Kafka cluster is deployed as documented in Deploying and Managing Streams for Apache Kafka on OpenShift.
- Kafka Connect is deployed on Streams for Apache Kafka
- You have a Kafka topic from which the connector can read change event records.
- A destination database is installed and is configured to accept JDBC connections.
- You have a Red Hat build of Debezium license.
-
The OpenShift
oc
CLI client is installed or you have access to the OpenShift Container Platform web console. Depending on how you intend to store the Kafka Connect build image, you need registry permissions or you must create an ImageStream resource:
- To store the build image in an image registry, such as Red Hat Quay.io or Docker Hub
- An account and permissions to create and manage images in the registry.
- To store the build image as a native OpenShift ImageStream
- An ImageStream resource is deployed to the cluster for storing new container images. You must explicitly create an ImageStream for the cluster. ImageStreams are not available by default. For more information about ImageStreams, see Managing image streams on OpenShift Container Platform.
Procedure
- Log in to the OpenShift cluster.
Create a Debezium
KafkaConnect
custom resource (CR) for the connector, or modify an existing one.
For example, create aKafkaConnect
CR with the namedbz-jdbc-connect.yaml
that specifies theannotations
andimage
properties, as shown in the following excerpt. In the example that follows, the custom resource is configured to download the following artifacts:- The Debezium JDBC connector archive.
A JDBC driver that is required to connect to an Oracle or Db2 sink database. You can omit this entry for other sink destinations.
A
dbz-jdbc-connect.yaml
file that defines aKafkaConnect
custom resource that includes a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.6. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resources
annotation to"true"
to enable the Cluster Operator to useKafkaConnector
resources to configure connectors in this Kafka Connect cluster.2
The
spec.build
configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.output
specifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.type
aredocker
to push into a container registry such as Quay, orimagestream
to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.output
in the KafkaConnect configuration, see the Streams for Apache Kafka Build schema reference in {NameConfiguringStreamsOpenShift}.5
The
plugins
configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname
, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.type
specifies the file type of the artifact specified in theartifacts.url
. Valid types arezip
,tgz
, orjar
. Debezium connector archives are provided in.zip
file format. JDBC driver files are in.jar
format. Thetype
value must match the type of the file that is referenced in theurl
field.7
The value of
artifacts.url
specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.8
(For Db2 or Oracle sinks only) Specifies the location of the JDBC JDBC driver in Maven Central. The drivers that are required for Debezium to connect to these databases are not included in the Debezium connector archives.
The example provides the Maven URL for the Oracle Database JDBC driver. The Db2 JDBC driver is available at the following Maven location:
https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.9.0/jcc-11.5.9.0.jar
Apply the
KafkaConnect
build specification to the OpenShift cluster by entering the following command:oc create -f dbz-jdbc-connect.yaml
oc create -f dbz-jdbc-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnector
resource to define an instance of each connector that you want to deploy.
For example, create the followingKafkaConnector
CR, and save it asorders-to-postgresql-jdbc-connector.yaml
orders-to-postgresql-jdbc-connector.yaml
file that defines theKafkaConnector
custom resource for a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.7. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
The number of tasks that can operate concurrently.
4
The connector’s configuration.
5
The JDBC connection URL for the sink database. The URL specifies the port number and any authentication properties that are required to connect to the database. For example,
jdbc:oracle:thin:@myhost.example.com:1521/myservice
6
The name of the account that Debezium uses to connect to the database.
7
The password that Debezium uses to connect to the database user account.
8
Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc create -n debezium -f jdbc-inventory-connector.yaml
oc create -n debezium -f jdbc-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbname
in theKafkaConnector
CR. After the connector pod is ready, Debezium is running.
3.1.4.4. Deploying a Debezium JDBC connector by building a custom Kafka Connect container image from a Containerfile Copiar enlaceEnlace copiado en el portapapeles!
You can deploy a Debezium JDBC connector by building a custom Kafka Connect container image that contains the Debezium connector archive, and then push this container image to a container registry. Afterwards, you create the following custom resources (CRs) to define the connector configuration:
-
A
KafkaConnect
CR that defines your Kafka Connect instance. Theimage
property in the CR specifies the name of the container image that you create to run your Debezium connector. You apply this CR to the OpenShift instance where Red Hat Streams for Apache Kafka is deployed. Streams for Apache Kafka offers operators and images that bring Apache Kafka to OpenShift. -
A
KafkaConnector
CR that defines your Debezium JDBC connector. Apply this CR to the same OpenShift instance where you applied theKafkaConnect
CR.
The deployment method described in this section is deprecated and is scheduled for removal in future versions of the documentation.
Prerequisites
- A destination database is installed and is configured to accept JDBC connections.
- Streams for Apache Kafka is deployed on OpenShift and is running Apache Kafka and Kafka Connect. For more information, see Deploying and Managing Streams for Apache Kafka on OpenShift.
- Podman or Docker is installed.
- You have a Kafka topic from which the connector can read change event records.
- A destination database is installed and is configured to accept JDBC connections.
- If you want the JDBC connector to send data to a Db2 or Oracle database, the Kafka Connect server has access to Maven Central to download the JDBC drivers for those databases. You can also use a local copy of the driver, or one that is available from a local Maven repository or other HTTP server.
-
You have an account and permissions to create and manage containers in the container registry (such as
quay.io
ordocker.io
) to which you plan to add the container that will run your Debezium connector.
Procedure
Create the Debezium JDBC connector container on Kafka Connect:
-
Create a Containerfile that uses
registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.0
as the base image. For example, from a terminal window, enter the following command:
-
Create a Containerfile that uses
+ .Descriptions of Containerfile settings for building a custom Kafka Connect container image
+
.Descriptions of Containerfile settings for building a custom Kafka Connect container image
Item | Description |
---|---|
1 | You can specify any file name that you want. |
2 | Specifies the path to your Kafka Connect plug-ins directory. If your Kafka Connect plug-ins directory is in a different location, replace this path with the actual path of your directory. |
+ The command creates a Containerfile with the name debezium-jdbc-connector-container.yaml
in the current directory.
Build the container image from the
debezium-jdbc-connector-container.yaml
Containerfile that you created in the previous step. From the directory that contains the file, open a terminal window and enter one of the following commands:podman build -t debezium-jdbc-connector-container:latest .
podman build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-jdbc-connector-container:latest .
docker build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The preceding commands build a container image with the name
debezium-jdbc-connector-container
.Push your custom image to a container registry, such as quay.io or an internal container registry. The container registry must be available to the OpenShift instance where you want to deploy the image. Enter one of the following commands:
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-jdbc-connector-container:latest
docker push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Create a new Debezium Oracle KafkaConnect custom resource (CR). For example, create a
KafkaConnect
CR with the namedbz-connect.yaml
that specifiesannotations
andimage
properties. The following example shows an excerpt from adbz-connect.yaml
file that describes aKafkaConnect
custom resource.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Item | Description |
---|---|
1 |
You must set |
2 |
|
Apply the
KafkaConnect
CR to the OpenShift Kafka Connect environment by entering the following command:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The command adds a Kafka Connect instance that specifies the name of the image that you created to run your Debezium connector.
Create a
KafkaConnector
custom resource that configures your Debezium JDBC connector instance.You configure a Debezium JDBC connector in a
.yaml
file that specifies the configuration properties for the connector.The following example shows an excerpt from a
dbz-connect.yaml
file that sets a few of the key properties for aKafkaConnect
custom resource.
The connector establishes a JDBC connection to a PostgreSQL server sink on port5432
.
For information about the full range of available connector properties, see Descriptions of Debezium JDBC connector configuration properties.Example 3.4.
jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Item | Description |
---|---|
1 | The connector name that is registered with the Kafka Connect service. |
2 | The name of the Streams for Apache Kafka cluster. |
3 | The name of the Debezium JDBC connector class. |
4 | The JDBC address of the sink database. |
5 | The name of the account that Debezium uses to connect to the database. |
6 | The password that Debezium uses to authenticate to the database user account. |
7 | Specifies a comma-separated list of Kafka topics that the connector reads. Events from each topic are streamed to tables with the same name in the sink database. |
Create your connector instance with Kafka Connect. For example, if you saved your
KafkaConnector
resource in thejdbc-connector.yaml
file, you would run the following command:oc apply -f jdbc-connector.yaml
oc apply -f jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The preceding command registers
orders-topic-to-postgresql-via-jdbc-sink-connector
. The connector starts and begins to read from theorders
topic, as specified in theKafkaConnector
CR.
3.1.5. Descriptions of Debezium JDBC connector configuration properties Copiar enlaceEnlace copiado en el portapapeles!
The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:
3.1.5.1. JDBC connector Kafka consumer properties Copiar enlaceEnlace copiado en el portapapeles!
Property | Default | Description |
---|---|---|
No default | Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors. | |
No default |
The name of the Java class for the connector. For the Debezium JDBC connector, specify the value | |
1 | Maximum number of tasks to use for this connector. | |
No default |
List of topics to consume, separated by commas. Do not use this property in combination with the | |
No default |
A regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a |
3.1.5.2. JDBC connector connection properties Copiar enlaceEnlace copiado en el portapapeles!
Property | Default | Description |
---|---|---|
| The connection provider implementation to use. | |
No default | The JDBC connection URL used to connect to the database. | |
No default | The name of the database user account that the connector uses to connect to the database. | |
No default | The password that the connector uses to connect to the database. | |
| Specifies the minimum number of connections in the pool. | |
| Specifies the maximum number of concurrent connections that the pool maintains. | |
| Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size. | |
| Specifies the number of seconds that an unused connection is kept before it is discarded. | |
| Specifies whether the connector retries after a transient JDBC connection error.
When enabled ( Note
Setting this option to Use with caution where strong data consistency is required. |
3.1.5.3. JDBC connector runtime properties Copiar enlaceEnlace copiado en el portapapeles!
Property | Default | Description |
---|---|---|
| Specifies the timezone used when inserting JDBC temporal values. | |
|
Specifies whether the connector processes | |
|
Specifies whether the connector processes Note
Although support for
To ensure that the JDBC connector can process
The user account that submits the preceding query requires | |
| Specifies the strategy used to insert events into the database. The following options are available:
| |
| Specifies how the connector resolves the primary key columns from the event.
| |
No default |
Either the name of the primary key column or a comma-separated list of fields to derive the primary key from. | |
| Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the JDBC quoting case-sensitivity section for more details. | |
| Specifies how the connector evolves the destination table schemas. For more information, see Section 3.1.1.8, “Schema evolution modes for the Debezium JDBC connector”. The following options are available:
| |
|
Specifies a string pattern that the connector uses to construct the names of destination tables. | |
|
Specifies the schema name where the PostgreSQL PostGIS extension is installed. The default is | |
|
Specifies whether the connector automatically sets an | |
| Specifies how many records to attempt to batch together into the destination table. Note
Note that if you set | |
| Specifies whether to enable the Debezium JDBC connector’s reduction buffer. Choose one of the following settings:
To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the | |
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form | |
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form | |
5 | Specifies the maximum number of retries that the connector performs after an attempt to flush changes to the target database results in certain database errors. If the number of retries exceeds the retry value, the sink connector enters a FAILED state. | |
1000 | Specifies the number of milliseconds that the connector waits to retry a flush operation that failed. Note
When you set both the |
3.1.5.4. JDBC connector extendable properties Copiar enlaceEnlace copiado en el portapapeles!
Property | Default | Description |
---|---|---|
|
Specifies the fully-qualified class name of a | |
|
Specifies the fully-qualified class name of a
|
3.1.5.5. JDBC connector hibernate.* passthrough properties Copiar enlaceEnlace copiado en el portapapeles!
Kafka Connect supports passthrough configuration, enabling you to modify the behavior of an underlying system by passing certain properties directly from the connector configuration. By default, some Hibernate properties are exposed via the JDBC connector connection properties (for example, connection.url
, connection.username
, and connection.pool.*_size
), and through the connector’s runtime properties (for example, use.time.zone
, quote.identifiers
).
If you want to customize other Hibernate behavior, you can take advantage of the passthrough mechanism by adding properties that use the hibernate.*
namespace to the connector configuration. For example, to assist Hibernate in resolving the type and version of the target database, you can add the hibernate.dialect
property and set it to the fully qualified class name of the database, for example, org.hibernate.dialect.MariaDBDialect
.
3.1.6. JDBC connector frequently asked questions Copiar enlaceEnlace copiado en el portapapeles!
- Is the
ExtractNewRecordState
single message transformation required? - No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.
- If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?
- No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.
- If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?
- The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Section 3.1.1.4, “Description of Debezium JDBC connector data and column type mappings” section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
- How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?
-
In order to add a prefix or a suffix to the destination table name, adjust the collection.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with
jdbc_
, specify thecollection.name.format
configuration property with a value ofjdbc_${topic}
. If the connector is subscribed to a topic calledorders
, the resulting table is created asjdbc_orders
. - Why are some columns automatically quoted, even though identifier quoting is not enabled?
-
In some situations, specific column or table names might be explicitly quoted, even when
quote.identifiers
is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set tokafka
, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.
3.2. Debezium sink connector for MongoDB (Developer Preview) Copiar enlaceEnlace copiado en el portapapeles!
The Debezium MongoDB sink connector captures change event records from Apache Kafka topics and then transforms the records into MongoDB documents that it writes to collections in a specified MongoDB sink database. For applications that require high scalability and fast data retrieval, propagating change data to a cluster-based MongoDB environment, which uses such features as sharding and replica sets to optimize read operations, can significantly improve retrieval performance. The connector can process only change events that originate from a Debezium relational database connector.
The Debezium MongoDB sink connector is Developer Preview software only. Developer Preview software is not supported by Red Hat in any way and is not functionally complete or production-ready. Do not use Developer Preview software for production or business-critical workloads. Developer Preview software provides early access to upcoming product software in advance of its possible inclusion in a Red Hat product offering. Customers can use this software to test functionality and provide feedback during the development process. This software might not have any documentation, is subject to change or removal at any time, and has received limited testing. Red Hat might provide ways to submit feedback on Developer Preview software without an associated SLA. For more information about the support scope of Red Hat Developer Preview software, see Developer Preview Support Scope.
For information about the MongoDB versions that are compatible with this connector, see the Debezium Supported Configurations page.
Information about the Debezium sink connector for MongoDB and and instructions about how to use it is available in the following topics:
- Section 3.2.1, “Architecture of the Debezium MongoDB sink connector”
- Section 3.2.2, “Limitations of the Debezium MongoDB sink connector”
- Section 3.2.3, “Quickstart of a Debezium MongoDB sink connector”
- Section 3.2.4, “Configuration of the Debezium MongoDB sink connector”
- Section 3.2.5, “Example configuration for the Debezium MongoDB sink connector”
- Section 3.2.6, “Monitoring the Debezium MongoDB sink connector”
- Section 3.2.7, “Key field mapping of the Debezium MongoDB sink connector”
- Section 3.2.8, “Using CloudEvents with the Debezium MongoDB sink connector”
3.2.1. Architecture of the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
Use the Debezium MongoDB sink connector to stream change data capture (CDC) event records from Kafka topics to a MongoDB sink database. The connector subscribes to Kafka topics that are populated with event messages produced by Debezium relational database source connectors. Each event message describes a database operation (insert, update, or delete) in a structured format that captures the details of the event. The connector transforms incoming change event records into MongoDB document format, and then writes the resulting documents into the target MongoDB collection.
After it receives an event, the connector parses the event payload and determines which MongoDB collection to send it to. Depending on the event type that is specified in the event payload, the connector then performs one of the following operations in the target collection:
Event type in payload | Resulting operation |
---|---|
INSERT | Create document |
UPDATE | Modify document with the specified identifier. |
DELETE | Remove document with the specified identifier. |
The connector uses the MongoDB Java driver to interact with the MongoDB database.
The mapping between topics and MongoDB collections is derived from the connector configuration. The document key serves as a unique identifier for the document, ensuring that updates, inserts, and deletions are propagated to the correct MongoDB document and collection, and that operations are applied in the correct order.
Through this process of mapping event messages to MongoDB documents, the connector is able to mirror the state of tables in your relational database to collections in a MongoDB database.
3.2.2. Limitations of the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
The Debezium MongoDB sink connector has the following limitations:
- Relational database / RDBMS source connectors only
The MongoDB sink connector can consume only change events that originate from the Debezium connectors for the following relational databases:
- MariaDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
The connector cannot process change event messages from any other Debezium connectors, including the Debezium MongoDB source connector.
- Schema evolution
- Although the connector can process basic schema changes, advanced schema evolution scenarios might require manual intervention or specific configuration. Because MongoDB is schemaless, it has only a limited ability to handle schema evolution.
- Transaction Support
- The connector processes individual change events in chronological order, based on the order in which operations are committed in the source system. Although MongoDB supports transactions, the Debezium MongoDB connector does not provide transactional guarantees across multiple CDC events or across multiple documents within a single sink task.
3.2.3. Quickstart of a Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
Deploy a basic instance of the MongoDB sink connector for testing.
Prerequisites
The following components are available and running in your environment:
- Kafka cluster
- Kafka Connect
- A MongoDB instance.
- Debezium relational database connector
- Debezium MongoDB connector
Procedure
- Configure and start a Debezium source connector, for example, a Debezium PostgreSQL Connector, to stream changes from a relational database to Kafka.
Configure and start the Debezium MongoDB sink connector to consume events that the source connector emits to Kafka, and send them to a MongoDB sink database.
The following example provides a minimal configuration for a Debezium MongoDB sink connector. Replace the placeholders in the example with the actual values for your environment.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.2.4. Configuration of the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
The MongoDB sink connector accepts a variety of configuration options, as described in following tables.
Property | Default | Description |
---|---|---|
| No default value |
Must be set to |
| 1 | Maximum number of tasks. |
| No default value |
List of Kafka topics to consume from. If you set this value to |
Property | Default | Description |
---|---|---|
No default value | MongoDB connection string (URI) that the sink uses to connect to MongoDB. This URI follows the standard MongoDB connection string format.
Example: | |
No default value | Name of the target MongoDB database. |
Property | Default | Description |
---|---|---|
| Specifies the strategy that the connector uses to derive the name of the target MongoDB collection from the name of the Kafka topic. Specify one of the following values:
| |
| Template for deriving the target collection name from the Kafka topic name. | |
| Specifies the strategy that the connector uses to name columns in the target collection. Specify one of the following values:
|
Property | Default | Description |
---|---|---|
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form | |
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form
If you include this property in the configuration, do not set the | |
2048 | Maximum number of records to write in a single batch. |
3.2.5. Example configuration for the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
The following example shows how you might configure the connector to read change events from three specific topics from the dbserver1.inventory
database to modify a collection in the MongoDB sink database named debezium
.
3.2.6. Monitoring the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
This release of the connector does not expose any metrics.
3.2.7. Key field mapping of the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
When the connector processes events, it maps data to specific fields in the target MongoDB document.
-
Keys from Debezium change events, such as Kafka message keys, are mapped to the MongoDB
_id
field by default. - Values are mapped into MongoDB documents.
- Updates and deletes are resolved based on the key field mapping.
The following example shows an event key in a Kafka topic:
{ "userId": 1, "orderId": 1 }
{
"userId": 1,
"orderId": 1
}
Based on the mapping logic, the preceding key is mapped to the _id
field in a MongoDB document, as showin in the following example:
3.2.8. Using CloudEvents with the Debezium MongoDB sink connector Copiar enlaceEnlace copiado en el portapapeles!
The Debezium MongoDB sink connector can consume records serialized as CloudEvents. Debezium can emit change events in CloudEvents format, so that the event payload is encapsulated in a standardized envelope.
When you enable CloudEvents on the source connector, the MongoDB sink connector parses the CloudEvents envelope.
The actual Debezium event payload is extracted from the data section.
The event is then applied to the target MongoDB collection, following the standard insert, update, or delete semantics.
This process makes it possible to integrate Debezium with broader event-driven systems while still persisting the resulting events in MongoDB.
Property | Default | Description |
---|---|---|
| Regular expression pattern to identify CloudEvents messages by matching the schema name with this pattern. |