此内容没有您所选择的语言版本。
Chapter 9. Configuring Debezium connectors for your application
When default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.
- Topic router SMT
- Single message transform that re-routes data change event records to topics that you specify.
- Content-based router SMT
- Single message transform that evaluates data change event record content and re-routes event records to particular topics according to content.
- Kafka Connect automatic topic creation
- Enables Connect to create topics at runtime, and apply configuration settings to those topics based on their names.
- Filter SMT
- Single message transform that uses an expression that you specify to evaluate data change event records. The connector streams only those events that evaluate to true.
- Event flattening SMT
- Single message transform that flattens the complex structure of a data change event record into the simplified format that might be required by some Kafka consumers.
- Avro serialization
- Support for configuring Debezium PostgreSQL, MongoDB, or SQL Server connectors to use Avro to serialize message keys and value, making it easier for change event record consumers to adapt to a changing record schema.
- Outbox event router SMT
- Single message transform that provides support for the outbox pattern.
- CloudEvents converter
- Enables a Debezium connector to emit change event records that conform to the CloudEvents specification.
9.1. Routing Debezium event records to topics that you specify
Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the topic routing single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:
- An expression for identifying the records to re-route
- An expression that resolves to the destination topic
- How to ensure a unique key among the records being re-routed to the destination topic
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.
The topic routing transformation is a Kafka Connect SMT.
The following topics provide details:
- Section 9.1.1, “Use case for routing Debezium records to topics that you specify”
- Section 9.1.2, “Example of routing Debezium records for multiple tables to one topic”
- Section 9.1.3, “Ensuring unique keys across Debezium records routed to the same topic”
- Section 9.1.4, “Options for configuring Debezium topic routing transformation”
9.1.1. Use case for routing Debezium records to topics that you specify
The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.
Logical tables
A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table
and db_shard2.my_table
. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.
Partitioned PostgreSQL tables
When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false
so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.
9.1.2. Example of routing Debezium records for multiple tables to one topic
To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:
- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
For example, configuration in a .properties
file looks like this:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.
In the example, the regular expression,
(.*)customers_shard(.*)
matches records for changes to tables whose names include thecustomers_shard
string. This would re-route records for tables with the following names:myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the
myserver.mydb.customers_all_shards
topic.
9.1.3. Ensuring unique keys across Debezium records routed to the same topic
A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1
table might have the same key value as a row in the myserver.mydb.customers_shard2
table.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier
. The value of the inserted field is the default destination topic name.
If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name
option and set it to a field name that does not clash with existing primary key field names. For example:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
This example adds the shard_id
field to the key structure in routed records.
If you want to adjust the value of the key’s new field, configure both of these options:
key.field.regex
- Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
- Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.
For example:
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
With this configuration, suppose that the default destination topic names are:
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1
, 2
, or 3
.
If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness
option to false
:
... transforms.Reroute.key.enforce.uniqueness=false ...
9.1.4. Options for configuring Debezium topic routing transformation
The following table describes topic routing SMT configuration options.
Option | Default | Description |
---|---|---|
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic. | ||
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for | ||
|
Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. | |
|
Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, | |
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, | ||
Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for |
9.2. Routing change event records to topics according to event content
By default, Debezium streams all of the change events that it reads from a table to a single static topic. However, there might be situations in which you might want to reroute selected events to other topics, based on the event content. The process of routing messages based on their content is described in the Content-based routing messaging pattern. To apply this pattern in Debezium, you use the content-based routing single message transform (SMT) to write expressions that are evaluated for each event. Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.
The Debezium content-based routing SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
While it is possible to use Java to create a custom SMT to encode routing logic, using a custom-coded SMT has its drawbacks. For example:
- It is necessary to compile the transformation up front and deploy it to Kafka Connect.
- Every change needs code recompilation and redeployment, leading to inflexible operations.
The content-based routing SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).
Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR 223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.
9.2.1. Setting up the Debezium content-based-routing SMT
For security reasons, the content-based routing SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.4.2.Final.tar.gz
. To use the content-based routing SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.
After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.
Procedure
-
From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (
debezium-scripting-1.4.2.Final.tar.gz
). - Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
- Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
- Restart the Kafka Connect process to pick up the new JAR files.
9.2.2. Example: Debezium basic content-based routing configuration
To configure a Debezium connector to route change event records based on the event content, you configure the ContentBasedRouter
SMT in the Kafka Connect configuration for the connector.
Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria. In the configuration, you create a regular expression that defines routing criteria. The expression defines a pattern for evaluating event records. It also specifies the name of a destination topic where events that match the pattern are routed. The pattern that you specify might designate an event type, such as a table insert, update, or delete operation. You might also define a pattern that matches a value in a specific column or row.
For example, to reroute all update (u
) records to an updates
topic, you might add the following configuration to your connector configuration:
... transforms=route transforms.route.type=io.debezium.transforms.ContentBasedRouter transforms.route.language=jsr223.groovy transforms.route.topic.expression=value.op == 'u' ? 'updates' : null ...
The preceding example specifies the use of the Groovy
expression language.
Records that do not match the pattern are routed to the default topic.
9.2.3. Variables for use in Debezium content-based routing expressions
Debezium binds certain variables into the evaluation context for the SMT. When you create expressions to specify conditions to control the routing destination, the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.
The following table lists the variables that Debezium binds into the evaluation context for the content-based routing SMT:
Name | Description | Type |
---|---|---|
| A key of the message. |
|
| A value of the message. |
|
| Schema of the message key. |
|
| Schema of the message value. |
|
| Name of the target topic. | String |
|
A Java map of message headers. The key field is the header name. The
|
|
An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the routing condition in an expression evaluates to true
, the message is retained. When the routing condition evaluates to false
, the message is removed.
Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.
9.2.4. Configuration of content-based routing conditions for other scripting languages
The way that you express content-based routing conditions depends on the scripting language that you use. For example, as shown in the basic configuration example, when you use Groovy
as the expression language, the following expression reroutes all update (u
) records to the updates
topic, while routing other records to the default topic:
value.op == 'u' ? 'updates' : null
Other languages use different methods to express the same condition.
The Debezium MongoDB connector emits the after
and patch
fields as serialized JSON documents rather than as structures. To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState
SMT.
You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json
artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'
.
Javascript
When you use JavaScript as the expression language, you can call the Struct#get()
method to specify the content-based routing condition, as in the following example:
value.get('op') == 'u' ? 'updates' : null
Javascript with Graal.js
When you create content-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy. For example:
value.op == 'u' ? 'updates' : null
9.2.5. Options for configuring the content-based routing transformation
Property | Default | Description |
An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic. If the name of the destination topic matches the value in | ||
The language in which the expression is written. Must begin with | ||
The expression to be evaluated for every message. Must evaluate to a | ||
|
Specifies how the transformation handles
|
9.3. Customization of Kafka Connect automatic topic creation
Kafka provides two mechanisms for creating topics automatically. You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics. The Kafka broker uses the auto.create.topics.enable
property to control automatic topic creation. In Kafka Connect, the topic.creation.enable
property specifies whether Kafka Connect is permitted to create topics. In both cases, the default settings for the properties enables automatic topic creation.
When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.
Differences between automatic topic creation at the broker and in Kafka Connect
Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.
The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.
See the following topics for more information:
- Section 9.3.1, “Disabling automatic topic creation for the Kafka broker”
- Section 9.3.2, “Configuring automatic topic creation in Kafka Connect”
- Section 9.3.3, “Configuration of automatically created topics”
- Section 9.3.3.1, “Topic creation groups”
- Section 9.3.3.2, “Topic creation group configuration properties”
- Section 9.3.3.3, “Specifying the configuration for the Debezium default topic creation group”
- Section 9.3.3.4, “Specifying the configuration for Debezium custom topic creation groups”
- Section 9.3.3.5, “Registering Debezium custom topic creation groups”
9.3.1. Disabling automatic topic creation for the Kafka broker
By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.
Procedure
-
In the broker configuration, set the value of
auto.create.topics.enable
tofalse
.
9.3.2. Configuring automatic topic creation in Kafka Connect
Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable
property. The default value for the property is true
, enabling automatic topic creation, as shown in the following example:
topic.creation.enable = true
The setting for the topic.creation.enable
property applies to all workers in the Connect cluster.
Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.
For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.
By default, topics that Kafka Connect creates are named based on the pattern server.schema.table
, for example, dbserver.myschema.inventory
.
Procedure
-
To prevent Kafka Connect from creating topics automatically, set the value of
topic.creation.enable
tofalse
in the Kafka Connect custom resource, as in the following example:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false"
Kafka Connect automatic topic creation requires the replication.factor
and partitions
properties to be set for at least the default
topic creation group. It is valid for groups to obtain the values for the required properties from the default values for the Kafka broker.
9.3.3. Configuration of automatically created topics
For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.
9.3.3.1. Topic creation groups
A set of topic properties is associated with a topic creation group. Minimally, you must define a default
topic creation group and specify its configuration properties. Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.
When you create custom topic creation groups, you define the member topics for each group based on topic name patterns. You can specify naming patterns that describe the topics to include or exclude from each group. The include
and exclude
properties contain comma-separated lists of regular expressions that define topic name patterns. For example, if you want a group to include all topics that start with the string dbserver1.inventory
, set the value of its topic.creation.inventory.include
property to dbserver1\\.inventory\\.*
.
If you specify both include
and exclude
properties for a custom topic group, the exclusion rules take precedence, and override the inclusion rules.
9.3.3.2. Topic creation group configuration properties
The default
topic creation group and each custom group is associated with a unique set of configuration properties. You can configure a group to include any of the Kafka topic-level configuration properties. For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group. You must define at least a minimum set of properties to describe the configuration of the topics to be created.
If no custom groups are registered, or if the include
patterns for any registered groups don’t match the names of any topics to be created, then Kafka Connect uses the configuration of the default
group to create topics.
For general information about configuring topics, see Kafka topic creation recommendations in Installing Debezium on OpenShift.
9.3.3.3. Specifying the configuration for the Debezium default topic creation group
Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it. The configuration for the default topic creation group is applied to any topics with names that do not match the include
list pattern of a custom topic creation group.
Prerequisites
In the Kafka Connect custom resource, the
use-connector-resources
value inmetadata.annotations
specifies that the cluster Operator uses KafkaConnector custom resources to configure connectors in the cluster. For example:... metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" ...
Procedure
To define properties for the
topic.creation.default
group, add them tospec.config
in the connector custom resource, as shown in the following example:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: ... config: ... topic.creation.default.replication.factor: 3 1 topic.creation.default.partitions: 10 2 topic.creation.default.cleanup.policy: compact 3 topic.creation.default.compression.type: lz4 4 ...
You can include any Kafka topic-level configuration property in the configuration for the
default
group.
Item | Description |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
Custom groups fall back to the default
group settings only for the required replication.factor
and partitions
properties. If the configuration for a custom topic group leaves other properties undefined, the values specified in the default
group are not applied.
9.3.3.4. Specifying the configuration for Debezium custom topic creation groups
You can define multiple custom topic groups, each with its own configuration.
Procedure
To define a custom topic group, add a
topic.creation.<group_name>.include
property tospec.config
in the connector custom resource, followed by the configuration properties that you want to apply to topics in the custom group.The following example shows an excerpt of a custom resource that defines the custom topic creation groups
inventory
andapplicationlogs
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... 1 topic.creation.inventory.include: dbserver1\\.inventory\\.* 2 topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 3 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4 topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* 5 topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ... ...
Item | Description |
---|---|
1 |
Defines the configuration for the |
2 |
|
3 |
Defines the configuration for the |
4 |
|
5 |
|
9.3.3.5. Registering Debezium custom topic creation groups
After you specify the configuration for any custom topic creation groups, register the groups.
Procedure
Register custom groups by adding the
topic.creation.groups
property to the connector custom resource, and specifying a comma-separated list of custom topic creation groups.The following excerpt from a connector custom resource registers the custom topic creation groups
inventory
andapplicationlogs
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: topic.creation.groups: inventory,applicationlogs ...
Completed configuration
The following example shows a completed configuration that includes the configuration for a default
topic group, along with the configurations for an inventory
and an applicationlogs
custom topic creation group:
Example: Configuration for a default topic creation group and two custom groups
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... topic.creation.default.replication.factor: 3, topic.creation.default.partitions: 10, topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 topic.creation.groups: inventory,applicationlogs topic.creation.inventory.include: dbserver1\\.inventory\\.* topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ...
9.4. Filtering Debezium change event records
By default, Debezium delivers every data change event that it receives to the Kafka broker. However, in many cases, you might be interested in only a subset of the events emitted by the producer. To enable you to process only the records that are relevant to you, Debezium provides the filter single message transform (SMT).
The Debezium filter SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
While it is possible to use Java to create a custom SMT to encode filtering logic, using a custom-coded SMT has its drawbacks. For example:
- It is necessary to compile the transformation up front and deploy it to Kafka Connect.
- Every change needs code recompilation and redeployment, leading to inflexible operations.
The filter SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).
Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.
9.4.1. Setting up the Debezium filter SMT
For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.4.2.Final.tar.gz
. To use the filter SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.
After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.
Procedure
-
From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (
debezium-scripting-1.4.2.Final.tar.gz
). - Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
- Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
- Restart the Kafka Connect process to pick up the new JAR files.
9.4.2. Example: Debezium basic filter SMT configuration
You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.
To configure a Debezium connector to filter change event records, configure the Filter
SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.
For example, you might add the following configuration in your connector configuration.
... transforms=filter transforms.filter.type=io.debezium.transforms.Filter transforms.filter.language=jsr223.groovy transforms.filter.condition=value.op == 'u' && value.before.id == 2 ...
The preceding example specifies the use of the Groovy
expression language. The regular expression value.op == 'u' && value.before.id == 2
removes all messages, except those that represent update (u
) records with id
values that are equal to 2
.
9.4.3. Variables for use in filter expressions
Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.
The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:
Name | Description | Type |
---|---|---|
| A key of the message. |
|
| A value of the message. |
|
| Schema of the message key. |
|
| Schema of the message value. |
|
| Name of the target topic. | String |
|
A Java map of message headers. The key field is the header name. The
|
|
An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true
, the message is retained. When the filter condition evaluates to false
, the message is removed.
Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.
9.4.4. Filter condition configuration for other scripting languages
The way that you express filtering conditions depends on the scripting language that you use.
For example, as shown in the basic configuration example, when you use Groovy
as the expression language, the following expression removes all messages, except for update records that have id
values set to 2
:
value.op == 'u' && value.before.id == 2
Other languages use different methods to express the same condition.
The Debezium MongoDB connector emits the after
and patch
fields as serialized JSON documents rather than as structures. To use the filter SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState
SMT.
You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json
artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'
.
Javascript
If you use JavaScript as the expression language, you can call the Struct#get()
method to specify the filtering condition, as in the following example:
value.get('op') == 'u' && value.get('before').get('id') == 2
Javascript with Graal.js
If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:
value.op == 'u' && value.before.id == 2
9.4.5. Options for configuring filter transformation
The following table lists the configuration options that you can use with the filter SMT.
Property | Default | Description |
An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in | ||
The language in which the expression is written. Must begin with | ||
The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of | ||
|
Specifies how the transformation handles
|
9.5. Extracting source record after
state from Debezium change events
A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
The event flattening transformation is a Kafka Connect SMT.
This transformation is available to only SQL database connectors.
The following topics provide details:
- Section 9.5.1, “Description of Debezium change event structure”
- Section 9.5.2, “Behavior of Debezium event flattening transformation”
- Section 9.5.3, “Configuration of Debezium event flattening transformation”
- Section 9.5.4, “Example of adding Debezium metadata to the Kafka record”
- Section 9.5.5, “Options for configuring Debezium event flattening transformation”
9.5.1. Description of Debezium change event structure
Debezium generates data change events that have a complex structure. Each event consists of three parts:
Metadata, which includes but is not limited to:
- The operation that made the change
- Source information such as the names of the database and table where the change was made
- Time stamp for when the change was made
- Optional transaction information
- Row data before the change
- Row data after the change
For example, part of the structure of an UPDATE
change event looks like this:
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:
{ "field1" : "newvalue1", "field2" : "newvalue2" }
To provide the needed Kafka record format for consumers, configure the event flattening SMT.
9.5.2. Behavior of Debezium event flattening transformation
The event flattening SMT extracts the after
field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after
field to create a simple Kafka record.
You can configure the event flattening SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
- Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
-
Keep Kafka records that contain change events for
DELETE
operations in the stream. The default behavior is that the SMT drops Kafka records forDELETE
operation change events because most consumers cannot yet handle them.
A database DELETE
operation causes Debezium to generate two Kafka records:
-
A record that contains
"op": "d",
thebefore
row data, and some other fields. -
A tombstone record that has the same key as the deleted row and a value of
null
. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.
Instead of dropping the record that contains the before
row data, you can configure the event flattening SMT to do one of the following:
-
Keep the record in the stream and edit it to have only the
"value": "null"
field. -
Keep the record in the stream and edit it to have a
value
field that contains the key/value pairs that were in thebefore
field with an added"__deleted": "true"
entry.
Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.
9.5.3. Configuration of Debezium event flattening transformation
Configure the Debezium event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties
file, you would specify something like the following:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
As for any Kafka Connect connector configuration, you can set transforms=
to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following .properties
example sets several event flattening SMT options:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
Keeps tombstone records for
DELETE
operations in the event stream. delete.handling.mode=rewrite
For
DELETE
operations, edits the Kafka record by flattening thevalue
field that was in the change event. Thevalue
field directly contains the key/value pairs that were in thebefore
field. The SMT adds__deleted
and sets it totrue
, for example:"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
Adds change event metadata for the
table
andlsn
fields to the simplified Kafka record.
9.5.4. Example of adding Debezium metadata to the Kafka record
The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:
- The type of operation that made the change
- The name of the database or table that was changed
- Connector-specific fields such as the Postgres LSN field
To add metadata to the simplified Kafka record’s header, specify the add.header
option. To add metadata to the simplified Kafka record’s value, specify the add.fields
option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
With that configuration, a simplified Kafka record would contain something like the following:
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
Also, simplified Kafka records would have a __db
header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata to a simplified Kafka record that is for a DELETE
operation, you must also configure delete.handling.mode=rewrite
.
9.5.5. Options for configuring Debezium event flattening transformation
The following table describes the options that you can specify to configure the event flattening SMT.
Option | Default | Description |
---|---|---|
|
Debezium generates a tombstone record for each | |
|
Debezium generates a change event record for each | |
To use row data to determine the topic to route the record to, set this option to an | ||
__ (double-underscore) | Set this optional string to prefix a field. | |
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example | ||
__ (double-underscore) | Set this optional string to prefix a header. | |
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example |
9.6. Configuring Debezium connectors to use Avro serialization
A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector completes the following actions:
- Applies configured transformations.
- Serializes the record key and value into a binary form by using the configured Kafka Connect converters.
- Writes the record to the correct Kafka topic.
You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
Setting these properties to false
excludes the verbose schema information from each record.
Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for Red Hat Integration - Service Registry.
9.6.1. About the Service Registry
Red Hat Integration - Service Registry provides the following components that work with Avro:
- An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
An API and schema registry that tracks:
- Avro schemas that are used in Kafka topics.
- Where the Avro converter sends the generated Avro schemas.
Because the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
- Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Service Registry with Debezium, add Service Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.
The Service Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
9.6.2. Overview of deploying a Debezium connector that uses Avro serialization
To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:
- Deploy a Red Hat Integration - Service Registry instance by following the instructions in Getting Started with Service Registry.
- Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/api key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/api value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
9.6.3. Deploying connectors that use Avro in Debezium containers
In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serialization. Complete the following procedure to build a custom Kafka Connect container image for Debezium, and configure the Debezium connector to use the Avro converter.
Prerequisites
- You have Docker installed and sufficient rights to create and manage containers.
- You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.
Procedure
Deploy an instance of Service Registry. See Getting Started with Service Registry, Installing Service Registry from the OpenShift OperatorHub, which provides instructions for:
- Installing AMQ Streams
- Setting up AMQ Streams storage
- Installing Service Registry
Extract the Debezium connector archives to create a directory structure for the connector plug-ins. If you downloaded and extracted the archives for multiple Debezium connectors, the resulting directory structure looks like the one in the following example:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:
- Go to the Red Hat Integration download site and download the Service Registry Kafka Connect zip file.
- Extract the archive into the desired Debezium connector directory.
To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.
Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:
Create a new
Dockerfile
by usingregistry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0
as the base image. In the following example, replace my-plugins with the name of your plug-ins directory:FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the
/opt/kafka/plugins
directory.Build the docker container image. For example, if you saved the docker file that you created in the previous step as
debezium-container-with-avro
, then you would run the following command:docker build -t debezium-container-with-avro:latest
Push your custom image to your container registry, for example:
docker push <myregistry.io>/debezium-container-with-avro:latest
Point to the new container image. Do one of the following:
Edit the
KafkaConnect.spec.image
property of theKafkaConnect
custom resource. If set, this property overrides theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable in the Cluster Operator. For example:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
In the
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
file, edit theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:
Create a Debezium connector instance. The following
inventory-connector.yaml
file example creates aKafkaConnector
custom resource that defines a MySQL connector instance that is configured to use the Avro converter:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: dbserver1 database.include.list: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Apply the connector instance, for example:
oc apply -f inventory-connector.yaml
This registers
inventory-connector
and the connector starts to run against theinventory
database.
Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example,
inventory-connector
starts.Display the Kafka Connect log output:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
Taking the snapshot involves a number of steps:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
After completing the snapshot, Debezium begins tracking changes in, for example, the
inventory
database’sbinlog
for change events:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
9.6.4. About Avro name requirements
As stated in the Avro documentation, names must adhere to the following rules:
-
Start with
[A-Za-z_]
-
Subsequently contains only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, sanitize.field.names
that you can set to true
if you have columns that do not adhere to Avro rules for names. Setting sanitize.field.names
to true
allows serialization of non-conformant fields without having to actually modify your schema.
9.7. Configuring Debezium connectors to use the outbox pattern
The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.
To implement the outbox pattern in a Debezium application, configure a Debezium connector to:
- Capture changes in an outbox table
- Apply the Debezium outbox event router single message transformation (SMT)
A Debezium connector that is configured to apply the outbox SMT should capture changes in only an outbox table. A connector can capture changes in more than one outbox table only if each outbox table has the same structure.
The Debezium outbox event router SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.
The outbox event router SMT does not support the MongoDB connector.
The following topics provide details:
- Section 9.7.1, “Example of a Debezium outbox message”
- Section 9.7.2, “Outbox table structure expected by Debezium outbox event router SMT”
- Section 9.7.3, “Basic Debezium outbox event router SMT configuration”
- Section 9.7.4, “Using Avro as the payload format in Debezium outbox messages”
- Section 9.7.5, “Emitting additional fields in Debezium outbox messages”
- Section 9.7.6, “Options for configuring outbox event router transformation”
9.7.1. Example of a Debezium outbox message
To learn about how to configure the Debezium outbox event router SMT, consider the following example of a Debezium outbox message:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "0.9.3.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484 }
This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.
9.7.2. Outbox table structure expected by Debezium outbox event router SMT
To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
Column | Effect |
---|---|
|
Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages. |
|
Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default |
|
Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. |
| A user-defined value that helps categorize or organize events. |
| The representation of the event itself. The default structure is JSON. The content in this field becomes one of these:
To obtain the event payload from a different outbox table column, set the |
9.7.3. Basic Debezium outbox event router SMT configuration
To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter
SMT. For example, the basic configuration in a .properties
file looks like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
9.7.4. Using Avro as the payload format in Debezium outbox messages
The outbox event router SMT supports arbitrary payload formats. The payload
column value in an outbox table is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer
class to serialize GenericRecord
instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.ByteBufferConverter
By default, the payload
column value (the Avro data) is the only message value. Configuration of ByteBufferConverter
as the value converter propagates the payload
column value as-is into the Kafka message value.
The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteBufferConverter
so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter
with no schemas:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.ByteBufferConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
The delegate Converter
implementation is specified by the delegate.converter.type
option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false
.
9.7.5. Emitting additional fields in Debezium outbox messages
Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order
in the aggregatetype
column and another column, eventType
, whose possible values are order-created
and order-shipped
. To emit the eventType
column value in the outbox message header, configure the SMT like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=type:header:eventType
To emit the eventType
column value in the outbox message envelope, configure the SMT like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=type:envelope:eventType
9.7.6. Options for configuring outbox event router transformation
The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.
Option | Default | Group | Description |
---|---|---|---|
| Table | Specifies the outbox table column that contains the unique event ID. | |
| Table | Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. | |
Table | By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages. | ||
| Table | Specifies the outbox table column that contains the event payload. | |
| Table | Specifies the outbox table column that contains the payload ID. | |
Table, Envelope | Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:
To specify an alias for the column, specify a trio with the alias as the third value, for example:
The second value is the placement and it must always be Configuration examples are in emitting additional fields in Debezium outbox messages. | ||
Table, Schema | When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc. | ||
| Router | Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table. | |
| Router |
Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the | |
| Router |
Specifies the name of the topic to which the connector emits outbox messages. The default topic name is
| |
| Router |
Indicates whether an empty or | |
| Debezium |
Determines the behavior of the SMT when there is an
All changes in an outbox table are expected to be | |
| Tracing | The name of the field containing tracing span context. | |
| Tracing | The operation name representing the Debezium processing span. | |
| Tracing |
When |
Distributed tracing
The extension has support for the distributed tracing. See tracing documentation for more details.
9.8. Emitting Debezium change event records in CloudEvents format
CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a MongoDB, MySQL, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.
Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
The CloudEvents specification defines:
- A set of standardized event attributes
- Rules for defining custom attributes
- Encoding rules for mapping event formats to serialized representations such as JSON or Avro
- Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP
To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter
, which is a Kafka Connect message converter.
Currently, only structured mapping mode is supported. The CloudEvents change event envelope can be JSON or Avro and each envelope type supports JSON or Avro as the data
format. It is expected that a future Debezium release will support binary mapping mode.
Information about emitting change events in CloudEvents format is organized as follows:
For information about using Avro, see:
9.8.1. Example Debezium change event records in CloudEvents format
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data
format.
{ "id" : "name:test_server;lsn:29274832;txId:565", 1 "source" : "/debezium/postgresql/test_server", 2 "specversion" : "1.0", 3 "type" : "io.debezium.postgresql.datachangeevent", 4 "time" : "2020-01-13T13:55:39.738Z", 5 "datacontenttype" : "application/json", 6 "iodebeziumop" : "r", 7 "iodebeziumversion" : "1.4.2.Final", 8 "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "565", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", 9 "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { 10 "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
- 1 1 1
- Unique ID that the connector generates for the change event based on the change event’s content.
- 2 2 2
- The source of the event, which is the logical name of the database as specified by the
database.server.name
property in the connector’s configuration. - 3 3 3
- The CloudEvents specification version.
- 4 4 4
- Connector type that generated the change event. The format of this field is
io.debezium.CONNECTOR_TYPE.datachangeevent
. The value ofCONNECTOR_TYPE
ismongodb
,mysql
,postgresql
, orsqlserver
. - 5 5
- Time of the change in the source database.
- 6
- Describes the content type of the
data
attribute, which is JSON in this example. The only alternative is Avro. - 7
- An operation identifier. Possible values are
r
for read,c
for create,u
for update, ord
for delete. - 8
- All
source
attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using theiodebezium
prefix for the attribute name. - 9
- When enabled in the connector, each
transaction
attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using theiodebeziumtx
prefix for the attribute name. - 10
- The actual data change itself. Depending on the operation and the connector, the data might contain
before
,after
and/orpatch
fields.
The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data
format.
{ "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro", 1 "dataschema" : "http://my-registry/schemas/ids/1", 2 "iodebeziumop" : "r", "iodebeziumversion" : "1.4.2.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" 3 }
It is also possible to use Avro for the envelope as well as the data
attribute.
9.8.2. Example of configuring Debezium CloudEvents converter
Configure io.debezium.converters.CloudEventsConverter
in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:
- Use JSON as the envelope.
-
Use the schema registry at
http://my-registry/schemas/ids/1
to serialize thedata
attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", 1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
- 1
- Specifying the
serializer.type
is optional, becausejson
is the default.
The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter
if you want to operate on record keys. For example, you might specify StringConverter
, LongConverter
, JsonConverter
, or AvroConverter
.
9.8.3. Debezium CloudEvents converter configuration options
When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.
Option | Default | Description |
|
The encoding type to use for the CloudEvents envelope structure. The value can be | |
|
The encoding type to use for the | |
N/A |
Any configuration options to be passed through to the underlying converter when using JSON. The | |
N/A |
Any configuration options to be passed through to the underlying converter when using Avro. The |
Revised on 2021-05-13 02:59:10 UTC