此内容没有您所选择的语言版本。

Chapter 9. Configuring Debezium connectors for your application


When default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.

Topic router SMT
Single message transform that re-routes data change event records to topics that you specify.
Content-based router SMT
Single message transform that evaluates data change event record content and re-routes event records to particular topics according to content.
Kafka Connect automatic topic creation
Enables Connect to create topics at runtime, and apply configuration settings to those topics based on their names.
Filter SMT
Single message transform that uses an expression that you specify to evaluate data change event records. The connector streams only those events that evaluate to true.
Event flattening SMT
Single message transform that flattens the complex structure of a data change event record into the simplified format that might be required by some Kafka consumers.
Avro serialization
Support for configuring Debezium PostgreSQL, MongoDB, or SQL Server connectors to use Avro to serialize message keys and value, making it easier for change event record consumers to adapt to a changing record schema.
Outbox event router SMT
Single message transform that provides support for the outbox pattern.
CloudEvents converter
Enables a Debezium connector to emit change event records that conform to the CloudEvents specification.

9.1. Routing Debezium event records to topics that you specify

Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the topic routing single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:

  • An expression for identifying the records to re-route
  • An expression that resolves to the destination topic
  • How to ensure a unique key among the records being re-routed to the destination topic

It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.

The topic routing transformation is a Kafka Connect SMT.

The following topics provide details:

9.1.1. Use case for routing Debezium records to topics that you specify

The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.

Logical tables

A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table and db_shard2.my_table. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.

Partitioned PostgreSQL tables

When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.

9.1.2. Example of routing Debezium records for multiple tables to one topic

To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:

  • The tables for which to route records. These tables must all have the same schema.
  • The destination topic name.

For example, configuration in a .properties file looks like this:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

In the example, the regular expression, (.*)customers_shard(.*) matches records for changes to tables whose names include the customers_shard string. This would re-route records for tables with the following names:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the myserver.mydb.customers_all_shards topic.

9.1.3. Ensuring unique keys across Debezium records routed to the same topic

A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1 table might have the same key value as a row in the myserver.mydb.customers_shard2 table.

To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier. The value of the inserted field is the default destination topic name.

If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names. For example:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

This example adds the shard_id field to the key structure in routed records.

If you want to adjust the value of the key’s new field, configure both of these options:

key.field.regex
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.

For example:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

With this configuration, suppose that the default destination topic names are:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1, 2, or 3.

If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness option to false:

...
transforms.Reroute.key.enforce.uniqueness=false
...

9.1.4. Options for configuring Debezium topic routing transformation

The following table describes topic routing SMT configuration options.

Table 9.1. Topic routing SMT configuration options
OptionDefaultDescription

topic.regex

 

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

topic.replacement

 

Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for topic.regex. To refer to a group, specify $1, $2, and so on.

key.enforce.uniqueness

true

Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables.

Specify false if you do not want the transformation to add a key field. For example, if you are routing records from a partitioned PostgreSQL table to one topic, you can configure key.enforce.uniqueness=false because unique keys are guaranteed in partitioned PostgreSQL tables.

key.field.name

__dbz__physicalTableIdentifier

Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.

key.field.regex

 

Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

key.field.replacement

 

Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for key.field.regex. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

9.2. Routing change event records to topics according to event content

By default, Debezium streams all of the change events that it reads from a table to a single static topic. However, there might be situations in which you might want to reroute selected events to other topics, based on the event content. The process of routing messages based on their content is described in the Content-based routing messaging pattern. To apply this pattern in Debezium, you use the content-based routing single message transform (SMT) to write expressions that are evaluated for each event. Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.

Important

The Debezium content-based routing SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

While it is possible to use Java to create a custom SMT to encode routing logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The content-based routing SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR 223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.

9.2.1. Setting up the Debezium content-based-routing SMT

For security reasons, the content-based routing SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.4.2.Final.tar.gz. To use the content-based routing SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.

Important

After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.

Procedure

  1. From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (debezium-scripting-1.4.2.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

9.2.2. Example: Debezium basic content-based routing configuration

To configure a Debezium connector to route change event records based on the event content, you configure the ContentBasedRouter SMT in the Kafka Connect configuration for the connector.

Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria. In the configuration, you create a regular expression that defines routing criteria. The expression defines a pattern for evaluating event records. It also specifies the name of a destination topic where events that match the pattern are routed. The pattern that you specify might designate an event type, such as a table insert, update, or delete operation. You might also define a pattern that matches a value in a specific column or row.

For example, to reroute all update (u) records to an updates topic, you might add the following configuration to your connector configuration:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

The preceding example specifies the use of the Groovy expression language.

Records that do not match the pattern are routed to the default topic.

9.2.3. Variables for use in Debezium content-based routing expressions

Debezium binds certain variables into the evaluation context for the SMT. When you create expressions to specify conditions to control the routing destination, the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the content-based routing SMT:

Table 9.2. Content-based routing expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect​.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the routing condition in an expression evaluates to true, the message is retained. When the routing condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

9.2.4. Configuration of content-based routing conditions for other scripting languages

The way that you express content-based routing conditions depends on the scripting language that you use. For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression reroutes all update (u) records to the updates topic, while routing other records to the default topic:

value.op == 'u' ? 'updates' : null

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures. To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState SMT.

You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

When you use JavaScript as the expression language, you can call the Struct#get() method to specify the content-based routing condition, as in the following example:

value.get('op') == 'u' ? 'updates' : null

Javascript with Graal.js

When you create content-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy. For example:

value.op == 'u' ? 'updates' : null

9.2.5. Options for configuring the content-based routing transformation

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the condition logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

topic.expression

 

The expression to be evaluated for every message. Must evaluate to a String value where a result of non-null reroutes the message to a new topic, and a null value routes the message to the default topic.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the condition logic to the messages.

9.3. Customization of Kafka Connect automatic topic creation

Kafka provides two mechanisms for creating topics automatically. You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics. The Kafka broker uses the auto.create.topics.enable property to control automatic topic creation. In Kafka Connect, the topic.creation.enable property specifies whether Kafka Connect is permitted to create topics. In both cases, the default settings for the properties enables automatic topic creation.

When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.

Differences between automatic topic creation at the broker and in Kafka Connect

Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.

The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.

See the following topics for more information:

9.3.1. Disabling automatic topic creation for the Kafka broker

By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.

Procedure

  • In the broker configuration, set the value of auto.create.topics.enable to false.

9.3.2. Configuring automatic topic creation in Kafka Connect

Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable property. The default value for the property is true, enabling automatic topic creation, as shown in the following example:

topic.creation.enable = true

The setting for the topic.creation.enable property applies to all workers in the Connect cluster.

Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.

For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.

By default, topics that Kafka Connect creates are named based on the pattern server.schema.table, for example, dbserver.myschema.inventory.

Procedure

  • To prevent Kafka Connect from creating topics automatically, set the value of topic.creation.enable to false in the Kafka Connect custom resource, as in the following example:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster

...

spec:
  config:
    topic.creation.enable: "false"
Note

Kafka Connect automatic topic creation requires the replication.factor and partitions properties to be set for at least the default topic creation group. It is valid for groups to obtain the values for the required properties from the default values for the Kafka broker.

9.3.3. Configuration of automatically created topics

For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.

9.3.3.1. Topic creation groups

A set of topic properties is associated with a topic creation group. Minimally, you must define a default topic creation group and specify its configuration properties. Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.

When you create custom topic creation groups, you define the member topics for each group based on topic name patterns. You can specify naming patterns that describe the topics to include or exclude from each group. The include and exclude properties contain comma-separated lists of regular expressions that define topic name patterns. For example, if you want a group to include all topics that start with the string dbserver1.inventory, set the value of its topic.creation.inventory.include property to dbserver1\\.inventory\\.*.

Note

If you specify both include and exclude properties for a custom topic group, the exclusion rules take precedence, and override the inclusion rules.

9.3.3.2. Topic creation group configuration properties

The default topic creation group and each custom group is associated with a unique set of configuration properties. You can configure a group to include any of the Kafka topic-level configuration properties. For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group. You must define at least a minimum set of properties to describe the configuration of the topics to be created.

If no custom groups are registered, or if the include patterns for any registered groups don’t match the names of any topics to be created, then Kafka Connect uses the configuration of the default group to create topics.

For general information about configuring topics, see Kafka topic creation recommendations in Installing Debezium on OpenShift.

9.3.3.3. Specifying the configuration for the Debezium default topic creation group

Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it. The configuration for the default topic creation group is applied to any topics with names that do not match the include list pattern of a custom topic creation group.

Prerequisites

  • In the Kafka Connect custom resource, the use-connector-resources value in metadata.annotations specifies that the cluster Operator uses KafkaConnector custom resources to configure connectors in the cluster. For example:

     ...
        metadata:
          name: my-connect-cluster
          annotations: strimzi.io/use-connector-resources: "true"
     ...

Procedure

  • To define properties for the topic.creation.default group, add them to spec.config in the connector custom resource, as shown in the following example:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
    ...
    
       config:
    ...
         topic.creation.default.replication.factor: 3  1
         topic.creation.default.partitions: 10  2
         topic.creation.default.cleanup.policy: compact  3
         topic.creation.default.compression.type: lz4  4
    ...

    You can include any Kafka topic-level configuration property in the configuration for the default group.

Table 9.3. Connector configuration for the default topic creation group
ItemDescription

1

topic.creation.default.replication.factor defines the replication factor for topics created by the default group.
replication.factor is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

2

topic.creation.default.partitions defines the number of partitions for topics created by the default group.
partitions is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

3

topic.creation.default.cleanup.policy is mapped to the cleanup.policy property of the topic level configuration parameters and defines the log retention policy.

4

topic.creation.default.compression.type is mapped to the compression.type property of the topic level configuration parameters and defines how messages are compressed on hard disk.

Note

Custom groups fall back to the default group settings only for the required replication.factor and partitions properties. If the configuration for a custom topic group leaves other properties undefined, the values specified in the default group are not applied.

9.3.3.4. Specifying the configuration for Debezium custom topic creation groups

You can define multiple custom topic groups, each with its own configuration.

Procedure

  • To define a custom topic group, add a topic.creation.<group_name>.include property to spec.config in the connector custom resource, followed by the configuration properties that you want to apply to topics in the custom group.

    The following example shows an excerpt of a custom resource that defines the custom topic creation groups inventory and applicationlogs:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
    ... 1
        topic.creation.inventory.include: dbserver1\\.inventory\\.*  2
        topic.creation.inventory.partitions: 20
        topic.creation.inventory.cleanup.policy: compact
        topic.creation.inventory.delete.retention.ms: 7776000000
    
        3
        topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4
        topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*  5
        topic.creation.applicationlogs.replication.factor: 1
        topic.creation.applicationlogs.partitions: 20
        topic.creation.applicationlogs.cleanup.policy: delete
        topic.creation.applicationlogs.retention.ms: 7776000000
        topic.creation.applicationlogs.compression.type: lz4
    ...
    ...
Table 9.4. Connector configuration for custom inventory and applicationlogs topic creation groups
ItemDescription

1

Defines the configuration for the inventory group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

2

topic.creation.inventory.include defines a regular expression to match all topics that start with dbserver1.inventory.. The configuration that is defined for the inventory group is applied only to topics with names that match the specified regular expression.

3

Defines the configuration for the applicationlogs group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

4

topic.creation.applicationlogs.include defines a regular expression to match all topics that start with dbserver1.logs.applog-. The configuration that is defined for the applicationlogs group is applied only to topics with names that match the specified regular expression. Because an exclude property is also defined for this group, the topics that match the include regular expression might be further restricted by the that exclude property.

5

topic.creation.applicationlogs.exclude defines a regular expression to match all topics that start with dbserver1.logs.applog-old-. The configuration that is defined for the applicationlogs group is applied only to topics with name that do not match the given regular expression. Because an include property is also defined for this group, the configuration of the applicationlogs group is applied only to topics with names that match the specified include regular expressions and that do not match the specified exclude regular expressions.

9.3.3.5. Registering Debezium custom topic creation groups

After you specify the configuration for any custom topic creation groups, register the groups.

Procedure

  • Register custom groups by adding the topic.creation.groups property to the connector custom resource, and specifying a comma-separated list of custom topic creation groups.

    The following excerpt from a connector custom resource registers the custom topic creation groups inventory and applicationlogs:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
         topic.creation.groups: inventory,applicationlogs
    
    ...

Completed configuration

The following example shows a completed configuration that includes the configuration for a default topic group, along with the configurations for an inventory and an applicationlogs custom topic creation group:

Example: Configuration for a default topic creation group and two custom groups

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: inventory-connector
...
spec:
...

   config:
...
    topic.creation.default.replication.factor: 3,
    topic.creation.default.partitions: 10,
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: lz4
    topic.creation.groups: inventory,applicationlogs
    topic.creation.inventory.include: dbserver1\\.inventory\\.*
    topic.creation.inventory.partitions: 20
    topic.creation.inventory.cleanup.policy: compact
    topic.creation.inventory.delete.retention.ms: 7776000000
    topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.*
    topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*
    topic.creation.applicationlogs.replication.factor: 1
    topic.creation.applicationlogs.partitions: 20
    topic.creation.applicationlogs.cleanup.policy: delete
    topic.creation.applicationlogs.retention.ms: 7776000000
    topic.creation.applicationlogs.compression.type: lz4
...

9.4. Filtering Debezium change event records

By default, Debezium delivers every data change event that it receives to the Kafka broker. However, in many cases, you might be interested in only a subset of the events emitted by the producer. To enable you to process only the records that are relevant to you, Debezium provides the filter single message transform (SMT).

Important

The Debezium filter SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

While it is possible to use Java to create a custom SMT to encode filtering logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The filter SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.

9.4.1. Setting up the Debezium filter SMT

For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.4.2.Final.tar.gz. To use the filter SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.

Important

After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.

Procedure

  1. From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (debezium-scripting-1.4.2.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

9.4.2. Example: Debezium basic filter SMT configuration

You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.

To configure a Debezium connector to filter change event records, configure the Filter SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.

For example, you might add the following configuration in your connector configuration.

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

The preceding example specifies the use of the Groovy expression language. The regular expression value.op == 'u' && value.before.id == 2 removes all messages, except those that represent update (u) records with id values that are equal to 2.

9.4.3. Variables for use in filter expressions

Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:

Table 9.5. Filter expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true, the message is retained. When the filter condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

9.4.4. Filter condition configuration for other scripting languages

The way that you express filtering conditions depends on the scripting language that you use.

For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression removes all messages, except for update records that have id values set to 2:

value.op == 'u' && value.before.id == 2

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures. To use the filter SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState SMT.

You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

If you use JavaScript as the expression language, you can call the Struct#get() method to specify the filtering condition, as in the following example:

value.get('op') == 'u' && value.get('before').get('id') == 2

Javascript with Graal.js

If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:

value.op == 'u' && value.before.id == 2

9.4.5. Options for configuring filter transformation

The following table lists the configuration options that you can use with the filter SMT.

Table 9.6. filter SMT configuration options

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the filter logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

condition

 

The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of true keeps the message, and a result of false removes it.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the filter condition to the messages.

9.5. Extracting source record after state from Debezium change events

A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.

The event flattening transformation is a Kafka Connect SMT.

This transformation is available to only SQL database connectors.

The following topics provide details:

9.5.1. Description of Debezium change event structure

Debezium generates data change events that have a complex structure. Each event consists of three parts:

  • Metadata, which includes but is not limited to:

    • The operation that made the change
    • Source information such as the names of the database and table where the change was made
    • Time stamp for when the change was made
    • Optional transaction information
  • Row data before the change
  • Row data after the change

For example, part of the structure of an UPDATE change event looks like this:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

To provide the needed Kafka record format for consumers, configure the event flattening SMT.

9.5.2. Behavior of Debezium event flattening transformation

The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record.

You can configure the event flattening SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.

You can configure the transformation to do any of the following:

  • Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
  • Keep Kafka records that contain change events for DELETE operations in the stream. The default behavior is that the SMT drops Kafka records for DELETE operation change events because most consumers cannot yet handle them.

A database DELETE operation causes Debezium to generate two Kafka records:

  • A record that contains "op": "d", the before row data, and some other fields.
  • A tombstone record that has the same key as the deleted row and a value of null. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.

Instead of dropping the record that contains the before row data, you can configure the event flattening SMT to do one of the following:

  • Keep the record in the stream and edit it to have only the "value": "null" field.
  • Keep the record in the stream and edit it to have a value field that contains the key/value pairs that were in the before field with an added "__deleted": "true" entry.

Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.

9.5.3. Configuration of Debezium event flattening transformation

Configure the Debezium event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties file, you would specify something like the following:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

As for any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.

The following .properties example sets several event flattening SMT options:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
Keeps tombstone records for DELETE operations in the event stream.
delete.handling.mode=rewrite

For DELETE operations, edits the Kafka record by flattening the value field that was in the change event. The value field directly contains the key/value pairs that were in the before field. The SMT adds __deleted and sets it to true, for example:

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn
Adds change event metadata for the table and lsn fields to the simplified Kafka record.

9.5.4. Example of adding Debezium metadata to the Kafka record

The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:

  • The type of operation that made the change
  • The name of the database or table that was changed
  • Connector-specific fields such as the Postgres LSN field

To add metadata to the simplified Kafka record’s header, specify the add.header option. To add metadata to the simplified Kafka record’s value, specify the add.fields option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite

With that configuration, a simplified Kafka record would contain something like the following:

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

Also, simplified Kafka records would have a __db header.

In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.

To add metadata to a simplified Kafka record that is for a DELETE operation, you must also configure delete.handling.mode=rewrite.

9.5.5. Options for configuring Debezium event flattening transformation

The following table describes the options that you can specify to configure the event flattening SMT.

Table 9.7. Descriptions of event flattening SMT configuration options
OptionDefaultDescription

drop.tombstones

true

Debezium generates a tombstone record for each DELETE operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify drop.tombstones=false.

delete.handling​.mode

drop

Debezium generates a change event record for each DELETE operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for DELETE operations in the stream, set delete.handling.mode to none or rewrite.

Specify none to keep the change event record in the stream. The record contains only "value": "null".

Specify rewrite to keep the change event record in the stream and edit the record to have a value field that contains the key/value pairs that were in the before field and also add __deleted: true to the value. This is another way to indicate that the record has been deleted.

When you specify rewrite, the updated simplified records for DELETE operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.

route.by.field

 

To use row data to determine the topic to route the record to, set this option to an after field attribute. The SMT routes the record to the topic whose name matches the value of the specified after field attribute. For a DELETE operation, set this option to a before field attribute.

For example, configuration of route.by.field=destination routes records to the topic whose name is the value of after.destination. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made.

If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure route.by.field to re-route the event.

add.fields.prefix

__ (double-underscore)

Set this optional string to prefix a field.

add.fields

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT still adds the field to the record’s value.

add.headers.prefix

__ (double-underscore)

Set this optional string to prefix a header.

add.headers

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT does not add the field to the header.

9.6. Configuring Debezium connectors to use Avro serialization

A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector completes the following actions:

  1. Applies configured transformations.
  2. Serializes the record key and value into a binary form by using the configured Kafka Connect converters.
  3. Writes the record to the correct Kafka topic.

You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false:

  • key.converter.schemas.enable
  • value.converter.schemas.enable

Setting these properties to false excludes the verbose schema information from each record.

Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.

To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for Red Hat Integration - Service Registry.

9.6.1. About the Service Registry

Red Hat Integration - Service Registry provides the following components that work with Avro:

  • An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
  • An API and schema registry that tracks:

    • Avro schemas that are used in Kafka topics.
    • Where the Avro converter sends the generated Avro schemas.

    Because the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.

  • Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.

To use the Service Registry with Debezium, add Service Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.

Note

The Service Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.

9.6.2. Overview of deploying a Debezium connector that uses Avro serialization

To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:

  1. Deploy a Red Hat Integration - Service Registry instance by following the instructions in Getting Started with Service Registry.
  2. Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
  3. Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/api
    key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/api
    value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy

Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.

9.6.3. Deploying connectors that use Avro in Debezium containers

In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serialization. Complete the following procedure to build a custom Kafka Connect container image for Debezium, and configure the Debezium connector to use the Avro converter.

Prerequisites

  • You have Docker installed and sufficient rights to create and manage containers.
  • You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.

Procedure

  1. Deploy an instance of Service Registry. See Getting Started with Service Registry, Installing Service Registry from the OpenShift OperatorHub, which provides instructions for:

    • Installing AMQ Streams
    • Setting up AMQ Streams storage
    • Installing Service Registry
  2. Extract the Debezium connector archives to create a directory structure for the connector plug-ins. If you downloaded and extracted the archives for multiple Debezium connectors, the resulting directory structure looks like the one in the following example:

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:

    1. Go to the Red Hat Integration download site and download the Service Registry Kafka Connect zip file.
    2. Extract the archive into the desired Debezium connector directory.

    To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.

  4. Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:

    1. Create a new Dockerfile by using registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 as the base image. In the following example, replace my-plugins with the name of your plug-ins directory:

      FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the /opt/kafka/plugins directory.

    2. Build the docker container image. For example, if you saved the docker file that you created in the previous step as debezium-container-with-avro, then you would run the following command:

      docker build -t debezium-container-with-avro:latest

    3. Push your custom image to your container registry, for example:

      docker push <myregistry.io>/debezium-container-with-avro:latest

    4. Point to the new container image. Do one of the following:

      • Edit the KafkaConnect.spec.image property of the KafkaConnect custom resource. If set, this property overrides the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable in the Cluster Operator. For example:

        apiVersion: kafka.strimzi.io/v1beta1
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • In the install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml file, edit the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
  5. Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:

    1. Create a Debezium connector instance. The following inventory-connector.yaml file example creates a KafkaConnector custom resource that defines a MySQL connector instance that is configured to use the Avro converter:

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          database.server.name: dbserver1
          database.include.list: inventory
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          database.history.kafka.topic: schema-changes.inventory
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    2. Apply the connector instance, for example:

      oc apply -f inventory-connector.yaml

      This registers inventory-connector and the connector starts to run against the inventory database.

  6. Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example, inventory-connector starts.

    1. Display the Kafka Connect log output:

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      Taking the snapshot involves a number of steps:

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      After completing the snapshot, Debezium begins tracking changes in, for example, the inventory database’s binlog for change events:

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...

9.6.4. About Avro name requirements

As stated in the Avro documentation, names must adhere to the following rules:

  • Start with [A-Za-z_]
  • Subsequently contains only [A-Za-z0-9_] characters

Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, sanitize.field.names that you can set to true if you have columns that do not adhere to Avro rules for names. Setting sanitize.field.names to true allows serialization of non-conformant fields without having to actually modify your schema.

9.7. Configuring Debezium connectors to use the outbox pattern

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox table
  • Apply the Debezium outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the outbox SMT should capture changes in only an outbox table. A connector can capture changes in more than one outbox table only if each outbox table has the same structure.

Important

The Debezium outbox event router SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.

Note

The outbox event router SMT does not support the MongoDB connector.

The following topics provide details:

9.7.1. Example of a Debezium outbox message

To learn about how to configure the Debezium outbox event router SMT, consider the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "0.9.3.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

9.7.2. Outbox table structure expected by Debezium outbox event router SMT

To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
Table 9.8. Descriptions of expected outbox table columns
ColumnEffect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox table column, set the table.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two records to the outbox table. In the first record, the value in the aggregatetype column is customers. In the second record, the value in the aggregatetype column is orders. The connector emits the first record to the outbox.event.customers topic. The connector emits the second record to the outbox.event.orders topic.

To obtain this value from a different outbox table column, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox table column, set the table.field.event.key SMT option in the connector configuration.

type

A user-defined value that helps categorize or organize events.

payload

The representation of the event itself. The default structure is JSON. The content in this field becomes one of these:

  • Part of the outbox message payload.
  • If other metadata, including eventType is delivered as headers, the payload becomes the message itself without encapsulation in an envelope.

To obtain the event payload from a different outbox table column, set the table.field.event.payload SMT option in the connector configuration.

9.7.3. Basic Debezium outbox event router SMT configuration

To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter SMT. For example, the basic configuration in a .properties file looks like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

9.7.4. Using Avro as the payload format in Debezium outbox messages

The outbox event router SMT supports arbitrary payload formats. The payload column value in an outbox table is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter

By default, the payload column value (the Avro data) is the only message value. Configuration of ByteBufferConverter as the value converter propagates the payload column value as-is into the Kafka message value.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteBufferConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

9.7.5. Emitting additional fields in Debezium outbox messages

Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order in the aggregatetype column and another column, eventType, whose possible values are order-created and order-shipped. To emit the eventType column value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:header:eventType

To emit the eventType column value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:envelope:eventType

9.7.6. Options for configuring outbox event router transformation

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 9.9. Descriptions of outbox event router SMT configuration options
OptionDefaultGroupDescription

table.field.event.id

id

Table

Specifies the outbox table column that contains the unique event ID.

table.field.event.key

aggregateid

Table

Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

table.field.event.timestamp

 

Table

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages.

table.field.event.payload

payload

Table

Specifies the outbox table column that contains the event payload.

table.field.event.payload.id

aggregateid

Table

Specifies the outbox table column that contains the payload ID.

table.fields.additional.placement

 

Table, Envelope

Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the column, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

Configuration examples are in emitting additional fields in Debezium outbox messages.

table.field.event.schema.version

 

Table, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

route.by.field

aggregatetype

Router

Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table.

route.topic.regex

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the route.topic.replacement SMT option.

The default behavior is that the SMT replaces the default ${routedByValue} variable in the setting of the route.topic.replacement SMT option with the setting of the route.by.field outbox SMT option.

route.topic.replacement

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record. For example, if the aggregatetype value is customers, the topic name is outbox.event.customers.

To change the topic name, you can:

  • Set the route.by.field option to a different column.
  • Set the route.topic.regex option to a different regular expression.

route.tombstone.on.empty.payload

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

debezium.op.invalid.behavior

warn

Debezium

Determines the behavior of the SMT when there is an UPDATE operation on the outbox table. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox table record.
  • error - The SMT logs an error and continues to the next outbox table record.
  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox table are expected to be INSERT operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed. The SMT automatically filters out DELETE operations on an outbox table.

tracing.span.context.field

tracingspancontext

Tracing

The name of the field containing tracing span context.

tracing.operation.name

debezium-read

Tracing

The operation name representing the Debezium processing span.

tracing.with.context.field.only

false

Tracing

When true only events that have serialized context field should be traced.

Distributed tracing

The extension has support for the distributed tracing. See tracing documentation for more details.

9.8. Emitting Debezium change event records in CloudEvents format

CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a MongoDB, MySQL, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.

Important

Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

The CloudEvents specification defines:

  • A set of standardized event attributes
  • Rules for defining custom attributes
  • Encoding rules for mapping event formats to serialized representations such as JSON or Avro
  • Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP

To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter, which is a Kafka Connect message converter.

Currently, only structured mapping mode is supported. The CloudEvents change event envelope can be JSON or Avro and each envelope type supports JSON or Avro as the data format. It is expected that a future Debezium release will support binary mapping mode.

Information about emitting change events in CloudEvents format is organized as follows:

For information about using Avro, see:

9.8.1. Example Debezium change event records in CloudEvents format

The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.

{
  "id" : "name:test_server;lsn:29274832;txId:565",   1
  "source" : "/debezium/postgresql/test_server",     2
  "specversion" : "1.0",                             3
  "type" : "io.debezium.postgresql.datachangeevent", 4
  "time" : "2020-01-13T13:55:39.738Z",               5
  "datacontenttype" : "application/json",            6
  "iodebeziumop" : "r",                              7
  "iodebeziumversion" : "1.4.2.Final",        8
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "565",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                           9
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                         10
    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
1 1 1
Unique ID that the connector generates for the change event based on the change event’s content.
2 2 2
The source of the event, which is the logical name of the database as specified by the database.server.name property in the connector’s configuration.
3 3 3
The CloudEvents specification version.
4 4 4
Connector type that generated the change event. The format of this field is io.debezium.CONNECTOR_TYPE.datachangeevent. The value of CONNECTOR_TYPE is mongodb, mysql, postgresql, or sqlserver.
5 5
Time of the change in the source database.
6
Describes the content type of the data attribute, which is JSON in this example. The only alternative is Avro.
7
An operation identifier. Possible values are r for read, c for create, u for update, or d for delete.
8
All source attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using the iodebezium prefix for the attribute name.
9
When enabled in the connector, each transaction attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using the iodebeziumtx prefix for the attribute name.
10
The actual data change itself. Depending on the operation and the connector, the data might contain before, after and/or patch fields.

The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.postgresql.datachangeevent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",            1
  "dataschema" : "http://my-registry/schemas/ids/1", 2
  "iodebeziumop" : "r",
  "iodebeziumversion" : "1.4.2.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                        3
}
1
Indicates that the data attribute contains Avro binary data.
2
URI of the schema to which the Avro data adheres.
3
The data attribute contains base64-encoded Avro binary data.

It is also possible to use Avro for the envelope as well as the data attribute.

9.8.2. Example of configuring Debezium CloudEvents converter

Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:

  • Use JSON as the envelope.
  • Use the schema registry at http://my-registry/schemas/ids/1 to serialize the data attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",          1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
1
Specifying the serializer.type is optional, because json is the default.

The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.

9.8.3. Debezium CloudEvents converter configuration options

When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.

Table 9.10. Descriptions of CloudEvents converter configuration options

Option

Default

Description

serializer.type

json

The encoding type to use for the CloudEvents envelope structure. The value can be json or avro.

data​.serializer.type

json

The encoding type to use for the data attribute. The value can be json or avro.

json. ...

N/A

Any configuration options to be passed through to the underlying converter when using JSON. The json. prefix is removed.

avro. ...

N/A

Any configuration options to be passed through to the underlying converter when using Avro. The avro. prefix is removed. For example, for Avro data, you would specify the avro.schema.registry.url option.

Revised on 2021-05-13 02:59:10 UTC

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.