Chapter 12. Applying transformations to modify messages exchanged with Apache Kafka


Debezium provides several single message transformations (SMTs) that you can use to modify change event records. You can configure a connector to apply a transformation that modifies records before its sends them to Apache Kafka. You can also apply the Debezium SMTs to a sink connector to modify records before the connector reads from a Kafka topic.

If you want to apply transformations selectively to specific messages only, you can configure a Kafka Connect predicate to define the conditions for applying the SMT.

Debezium provides the following SMTs:

Topic router SMT
Reroutes change event records to specific topics based on a regular expression that is applied to the original topic name.
Content-based router SMT
Reroutes specified change event records based on the event content.
Message filtering SMT
Enables you to propagate a subset of event records to the destination Kafka topic. The transformation applies a regular expression to the change event records that a connector emits, based on the content of the event record. Only records that match the expression are written to the target topic. Other records are ignored.
New record state extraction SMT
Flattens the complex structure of a Debezium change event record into a simplified format. The simplified structure enables processing by sink connectors that cannot consume the original structure.
Outbox event router SMT
Provides support for the outbox pattern to enable safe and reliable data exchange among multiple services.
MongoDB outbox event router SMT
Provides support for using the outbox pattern with the MongoDB connector to enable safe and reliable data exchange among multiple services.

12.1. Applying transformations selectively with SMT predicates

When you configure a single message transformation (SMT) for a connector, you can define a predicate for the transformation. The predicate specifies how to apply the transformation conditionally to a subset of the messages that the connector processes. You can assign predicates to transformations that you configure for source connectors, such as Debezium, or to sink connectors.

12.1.1. About SMT predicates

Debezium provides several single message transformations (SMTs) that you can use to modify event records before Kafka Connect saves the records to Kafka topics. By default, when you configure one of these SMTs for a Debezium connector, Kafka Connect applies that transformation to every record that the connector emits. However, there might be instances in which you want to apply a transformation selectively, so that it modifies only that subset of change event messages that share a common characteristic.

For example, for a Debezium connector, you might want to run the transformation only on event messages from a specific table or that include a specific header key. In environments that run Apache Kafka 2.6 or greater, you can append a predicate statement to a transformation to instruct Kafka Connect to apply the SMT only to certain records. In the predicate, you specify a condition that Kafka Connect uses to evaluate each message that it processes. When a Debezium connector emits a change event message, Kafka Connect checks the message against the configured predicate condition. If the condition is true for the event message, Kafka Connect applies the transformation, and then writes the message to a Kafka topic. Messages that do not match the condition are sent to Kafka unmodified.

The situation is similar for predicates that you define for a sink connector SMT. The connector reads messages from a Kafka topic and Kafka Connect evaluates the messages against the predicate condition. If a message matches the condition, Kafka Connect applies the transformation and then passes the messages to the sink connector.

After you define a predicate, you can reuse it and apply it to multiple transforms. Predicates also include a negate option that you can use to invert a predicate so that the predicate condition is applied only to records that do not match the condition that is defined in the predicate statement. You can use the negate option to pair the predicate with other transforms that are based on negating the condition.

Predicate elements

Predicates include the following elements:

  • predicates prefix
  • Alias (for example, isOutboxTable)
  • Type (for example, org.apache.kafka.connect.transforms.predicates.TopicNameMatches). Kafka Connect provides a set of default predicate types, which you can supplement by defining your own custom predicates.
  • Condition statement and any additional configuration properties, depending on the type of predicate (for example, a regex naming pattern)

Default predicate types

The following predicate types are available by default:

HasHeaderKey
Specifies a key name in the header in the event message that you want Kafka Connect to evaluate. The predicate evaluates to true for any records that include a header key that has the specified name.
RecordIsTombstone

Matches Kafka tombstone records. The predicate evaluates to true for any record that has a null value. Use this predicate in combination with a filter SMT to remove tombstone records. This predicate has no configuration parameters.

A tombstone in Kafka is a record that has a key with a 0-byte, null payload. When a Debezium connector processes a delete operation in the source database, the connector emits two change events for the delete operation:

  • A delete operation ("op" : "d") event that provides the previous value of the database record.
  • A tombstone event that has the same key, but a null value.

    The tombstone represents a delete marker for the row. When log compaction is enabled for Kafka, during compaction Kafka removes all events that share the same key as the tombstone. Log compaction occurs periodically, with the compaction interval controlled by the delete.retention.ms setting for the topic.

    Although it is possible to configure Debezium so that it does not emit tombstone events, it’s best to permit Debezium to emit tombstones to maintain the expected behavior during log compaction. Suppressing tombstones prevents Kafka from removing records for a deleted key during log compaction. If your environment includes sink connectors that cannot process tombstones, you can configure the sink connector to use an SMT with the RecordIsTombstone predicate to filter out the tombstone records.

TopicNameMatches
A regular expression that specifies the name of a topic that you want Kafka Connect to match. The predicate is true for connector records in which the topic name matches the specified regular expression. Use this predicate to apply an SMT to records based on the name of the source table.

12.1.2. Defining SMT predicates

By default, Kafka Connect applies each single message transformation in the Debezium connector configuration to every change event record that it receives from Debezium. Beginning with Apache Kafka 2.6, you can define an SMT predicate for a transformation in the connector configuration that controls how Kafka Connect applies the transformation. The predicate statement defines the conditions under which Kafka Connect applies the transformation to event records emitted by Debezium. Kafka Connect evaluates the predicate statement and then applies the SMT selectively to the subset of records that match the condition that is defined in the predicate. Configuring Kafka Connect predicates is similar to configuring transforms. You specify a predicate alias, associate the alias with a transform, and then define the type and configuration for the predicate.

Prerequisites

  • The Debezium environment runs Apache Kafka 2.6 or greater.
  • An SMT is configured for the Debezium connector.

Procedure

  1. In the Debezium connector configuration, specify a predicate alias for the predicates parameter, for example, IsOutboxTable.
  2. Associate the predicate alias with the transform that you want to apply conditionally, by appending the predicate alias to the transform alias in the connector configuration:

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    For example:

    transforms.outbox.predicate=IsOutboxTable
  3. Configure the predicate by specifying its type and providing values for configuration parameters.

    1. For the type, specify one of the following default types that are available in Kafka Connect:

      • HasHeaderKey
      • RecordIsTombstone
      • TopicNameMatches

        For example:

        predicates.IsOutboxTable.type=org.apache.kafka.connect.predicates.TopicNameMatch
    2. For the TopicNameMatch or HasHeaderKey predicates, specify a regular expression for the topic or header name that you want to match.

      For example:

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. If you want to negate a condition, append the negate keyword to the transform alias and set it to true.

    For example:

    transforms.outbox.negate=true

    The preceding property inverts the set of records that the predicate matches, so that Kafka Connect applies the transform to any record that does not match the condition specified in the predicate.

Example: TopicNameMatch predicate for the outbox event router transformation

The following example shows a Debezium connector configuration that applies the outbox event router transformation only to messages that Debezium emits to the Kafka outbox.event.order topic.

Because the TopicNameMatch predicate evaluates to true only for messages from the outbox table (outbox.event.*), the transformation is not applied to messages that originate from other tables in the database.

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.predicates.TopicNameMatch
predicates.IsOutboxTable.pattern=outbox.event.*

12.1.3. Ignoring tombstone events

You can control whether Debezium emits tombstone events, and how long Kafka retains them. Depending on your data pipeline, you might want to set the tombstones.on.delete property for a connector so that Debezium does not emit tombstone events.

Whether you enable Debezium to emit tombstones depends on how topics are consumed in your environment and by the characteristics of the sink consumer. Some sink connectors rely on tombstone events to remove records from downstream data stores. In cases where sink connectors rely on tombstone records to indicate when to delete records in downstream data stores, configure Debezium to emit them.

When you configure Debezium to generate tombstones, further configuration is required to ensure that sink connectors receive the tombstone events. The retention policy for a topic must be set so that the connector has time to read event messages before Kafka removes them during log compaction. The length of time that a topic retains tombstones before compaction is controlled by the delete.retention.ms property for the topic.

By default, the tombstones.on.delete property for a connector is set to true so that the connector generates a tombstone after each delete event. If you set the property to false to prevent Debezium from saving tombstone records to Kafka topics, the absence of tombstone records might lead to unintended consequences. Kafka relies on tombstone during log compaction to remove records that are related to a deleted key.

If you need to support sink connectors or downstream Kafka consumers that cannot process records with null values, rather than preventing Debezium from emitting tombstones, consider configuring an SMT for the connector with a predicate that uses the RecordIsTombstone predicate type to remove tombstone messages before consumers read them.

Procedure

  • To prevent Debezium from emitting tombstone events for deleted database records, set the connector option tombstones.on.delete to false.

    For example:

    “tombstones.on.delete”: “false”

12.2. Routing Debezium event records to topics that you specify

Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the topic routing single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:

  • An expression for identifying the records to re-route
  • An expression that resolves to the destination topic
  • How to ensure a unique key among the records being re-routed to the destination topic

It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.

The topic routing transformation is a Kafka Connect SMT.

The following topics provide details:

12.2.1. Use case for routing Debezium records to topics that you specify

The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.

Logical tables

A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table and db_shard2.my_table. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.

Partitioned PostgreSQL tables

When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.

12.2.2. Example of routing Debezium records for multiple tables to one topic

To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:

  • The tables for which to route records. These tables must all have the same schema.
  • The destination topic name.

For example, configuration in a .properties file looks like this:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

In the example, the regular expression, (.*)customers_shard(.*) matches records for changes to tables whose names include the customers_shard string. This would re-route records for tables with the following names:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the myserver.mydb.customers_all_shards topic.

12.2.3. Ensuring unique keys across Debezium records routed to the same topic

A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1 table might have the same key value as a row in the myserver.mydb.customers_shard2 table.

To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier. The value of the inserted field is the default destination topic name.

If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names. For example:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

This example adds the shard_id field to the key structure in routed records.

If you want to adjust the value of the key’s new field, configure both of these options:

key.field.regex
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.

For example:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

With this configuration, suppose that the default destination topic names are:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1, 2, or 3.

If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness option to false:

...
transforms.Reroute.key.enforce.uniqueness=false
...

12.2.4. Options for applying the topic routing transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

You can use one of the following methods to configure the connector to apply the SMT selectively:

12.2.5. Options for configuring Debezium topic routing transformation

The following table describes topic routing SMT configuration options.

Table 12.1. Topic routing SMT configuration options
OptionDefaultDescription

topic.regex

 

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

topic.replacement

 

Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for topic.regex. To refer to a group, specify $1, $2, and so on.

key.enforce​.uniqueness

true

Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables.

Specify false if you do not want the transformation to add a key field. For example, if you are routing records from a partitioned PostgreSQL table to one topic, you can configure key.enforce.uniqueness=false because unique keys are guaranteed in partitioned PostgreSQL tables.

key.field.name

__dbz__physicalTableIdentifier

Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.

key.field.regex

 

Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

key.field​.replacement

 

Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for key.field.regex. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

12.3. Routing change event records to topics according to event content

By default, Debezium streams all of the change events that it reads from a table to a single static topic. However, there might be situations in which you might want to reroute selected events to other topics, based on the event content. The process of routing messages based on their content is described in the Content-based routing messaging pattern. To apply this pattern in Debezium, you use the content-based routing single message transform (SMT) to write expressions that are evaluated for each event. Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.

Important

The Debezium content-based routing SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

While it is possible to use Java to create a custom SMT to encode routing logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The content-based routing SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR 223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.

12.3.1. Setting up the Debezium content-based-routing SMT

For security reasons, the content-based routing SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.7.2.Final.tar.gz. To use the content-based routing SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.

Important

After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.

Procedure

  1. From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (debezium-scripting-1.7.2.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

The Groovy language needs the following libraries on the classpath:

  • groovy
  • groovy-json (optional)
  • groovy-jsr223

The JavaScript language needs the following libraries on the classpath:

  • graalvm.js
  • graalvm.js.scriptengine

12.3.2. Example: Debezium basic content-based routing configuration

To configure a Debezium connector to route change event records based on the event content, you configure the ContentBasedRouter SMT in the Kafka Connect configuration for the connector.

Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria. In the configuration, you create a regular expression that defines routing criteria. The expression defines a pattern for evaluating event records. It also specifies the name of a destination topic where events that match the pattern are routed. The pattern that you specify might designate an event type, such as a table insert, update, or delete operation. You might also define a pattern that matches a value in a specific column or row.

For example, to reroute all update (u) records to an updates topic, you might add the following configuration to your connector configuration:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

The preceding example specifies the use of the Groovy expression language.

Records that do not match the pattern are routed to the default topic.

12.3.3. Variables for use in Debezium content-based routing expressions

Debezium binds certain variables into the evaluation context for the SMT. When you create expressions to specify conditions to control the routing destination, the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the content-based routing SMT:

Table 12.2. Content-based routing expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect​.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the routing condition in an expression evaluates to true, the message is retained. When the routing condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

12.3.4. Options for applying the content-based routing transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

12.3.5. Configuration of content-based routing conditions for other scripting languages

The way that you express content-based routing conditions depends on the scripting language that you use. For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression reroutes all update (u) records to the updates topic, while routing other records to the default topic:

value.op == 'u' ? 'updates' : null

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures. To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState SMT.

You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

When you use JavaScript as the expression language, you can call the Struct#get() method to specify the content-based routing condition, as in the following example:

value.get('op') == 'u' ? 'updates' : null

Javascript with Graal.js

When you create content-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy. For example:

value.op == 'u' ? 'updates' : null

12.3.6. Options for configuring the content-based routing transformation

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the condition logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

topic.expression

 

The expression to be evaluated for every message. Must evaluate to a String value where a result of non-null reroutes the message to a new topic, and a null value routes the message to the default topic.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the condition logic to the messages.

12.4. Filtering Debezium change event records

By default, Debezium delivers every data change event that it receives to the Kafka broker. However, in many cases, you might be interested in only a subset of the events emitted by the producer. To enable you to process only the records that are relevant to you, Debezium provides the filter single message transform (SMT).

Important

The Debezium filter SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

While it is possible to use Java to create a custom SMT to encode filtering logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The filter SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.

12.4.1. Setting up the Debezium filter SMT

For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.7.2.Final.tar.gz. To use the filter SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.

Important

After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.

Procedure

  1. From a browser, open the Red Hat Integration download site, and download the Debezium scripting SMT archive (debezium-scripting-1.7.2.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

The Groovy language needs the following libraries on the classpath:

  • groovy
  • groovy-json (optional)
  • groovy-jsr223

The JavaScript language needs the following libraries on the classpath:

  • graalvm.js
  • graalvm.js.scriptengine

12.4.2. Example: Debezium basic filter SMT configuration

You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.

To configure a Debezium connector to filter change event records, configure the Filter SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.

For example, you might add the following configuration in your connector configuration.

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

The preceding example specifies the use of the Groovy expression language. The regular expression value.op == 'u' && value.before.id == 2 removes all messages, except those that represent update (u) records with id values that are equal to 2.

12.4.3. Variables for use in filter expressions

Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:

Table 12.3. Filter expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true, the message is retained. When the filter condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

12.4.4. Options for applying the filter transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

12.4.5. Filter condition configuration for other scripting languages

The way that you express filtering conditions depends on the scripting language that you use.

For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression removes all messages, except for update records that have id values set to 2:

value.op == 'u' && value.before.id == 2

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures. To use the filter SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState SMT.

You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

If you use JavaScript as the expression language, you can call the Struct#get() method to specify the filtering condition, as in the following example:

value.get('op') == 'u' && value.get('before').get('id') == 2

Javascript with Graal.js

If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:

value.op == 'u' && value.before.id == 2

12.4.6. Options for configuring filter transformation

The following table lists the configuration options that you can use with the filter SMT.

Table 12.4. filter SMT configuration options

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the filter logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

condition

 

The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of true keeps the message, and a result of false removes it.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the filter condition to the messages.

12.5. Extracting source record after state from Debezium change events

A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.

The event flattening transformation is a Kafka Connect SMT.

This transformation is available to only SQL database connectors.

The following topics provide details:

12.5.1. Description of Debezium change event structure

Debezium generates data change events that have a complex structure. Each event consists of three parts:

  • Metadata, which includes but is not limited to:

    • The operation that made the change
    • Source information such as the names of the database and table where the change was made
    • Time stamp for when the change was made
    • Optional transaction information
  • Row data before the change
  • Row data after the change

For example, part of the structure of an UPDATE change event looks like this:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

To provide the needed Kafka record format for consumers, configure the event flattening SMT.

12.5.2. Behavior of Debezium event flattening transformation

The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record.

You can configure the event flattening SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.

You can configure the transformation to do any of the following:

  • Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
  • Keep Kafka records that contain change events for DELETE operations in the stream. The default behavior is that the SMT drops Kafka records for DELETE operation change events because most consumers cannot yet handle them.

A database DELETE operation causes Debezium to generate two Kafka records:

  • A record that contains "op": "d", the before row data, and some other fields.
  • A tombstone record that has the same key as the deleted row and a value of null. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.

Instead of dropping the record that contains the before row data, you can configure the event flattening SMT to do one of the following:

  • Keep the record in the stream and edit it to have only the "value": "null" field.
  • Keep the record in the stream and edit it to have a value field that contains the key/value pairs that were in the before field with an added "__deleted": "true" entry.

Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.

12.5.3. Configuration of Debezium event flattening transformation

Configure the Debezium event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties file, you would specify something like the following:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

As for any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.

The following .properties example sets several event flattening SMT options:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
Keeps tombstone records for DELETE operations in the event stream.
delete.handling.mode=rewrite

For DELETE operations, edits the Kafka record by flattening the value field that was in the change event. The value field directly contains the key/value pairs that were in the before field. The SMT adds __deleted and sets it to true, for example:

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn
Adds change event metadata for the table and lsn fields to the simplified Kafka record.

12.5.4. Example of adding Debezium metadata to the Kafka record

The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:

  • The type of operation that made the change
  • The name of the database or table that was changed
  • Connector-specific fields such as the Postgres LSN field

To add metadata to the simplified Kafka record’s header, specify the add.header option. To add metadata to the simplified Kafka record’s value, specify the add.fields option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite

With that configuration, a simplified Kafka record would contain something like the following:

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

Also, simplified Kafka records would have a __db header.

In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.

To add metadata to a simplified Kafka record that is for a DELETE operation, you must also configure delete.handling.mode=rewrite.

12.5.5. Options for applying the event flattening transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.

12.5.6. Options for configuring Debezium event flattening transformation

The following table describes the options that you can specify to configure the event flattening SMT.

Table 12.5. Descriptions of event flattening SMT configuration options
OptionDefaultDescription

drop.tombstones

true

Debezium generates a tombstone record for each DELETE operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify drop.tombstones=false.

delete.handling​.mode

drop

Debezium generates a change event record for each DELETE operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for DELETE operations in the stream, set delete.handling.mode to none or rewrite.

Specify none to keep the change event record in the stream. The record contains only "value": "null".

Specify rewrite to keep the change event record in the stream and edit the record to have a value field that contains the key/value pairs that were in the before field and also add __deleted: true to the value. This is another way to indicate that the record has been deleted.

When you specify rewrite, the updated simplified records for DELETE operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.

route.by.field

 

To use row data to determine the topic to route the record to, set this option to an after field attribute. The SMT routes the record to the topic whose name matches the value of the specified after field attribute. For a DELETE operation, set this option to a before field attribute.

For example, configuration of route.by.field=destination routes records to the topic whose name is the value of after.destination. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made.

If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure route.by.field to re-route the event.

add.fields.prefix

__ (double-underscore)

Set this optional string to prefix a field.

add.fields

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT still adds the field to the record’s value.

add.headers.prefix

__ (double-underscore)

Set this optional string to prefix a header.

add.headers

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT does not add the field to the header.

12.6. Configuring Debezium connectors to use the outbox pattern

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox table
  • Apply the Debezium outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the outbox SMT should capture changes that occur in an outbox table only. For more information, see Options for applying the transformation selectively.

A connector can capture changes in more than one outbox table only if each outbox table has the same structure.

See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.

Note

The outbox event router SMT is not compatible with the MongoDB connector.

MongoDB users can run the MongoDB outbox event router SMT.

The following topics provide details:

12.6.1. Example of a Debezium outbox message

To understand how the Debezium outbox event router SMT is configured, review the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "1.7.2.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

12.6.2. Outbox table structure expected by Debezium outbox event router SMT

To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
Table 12.6. Descriptions of expected outbox table columns
ColumnEffect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox table column, set the table.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two records to the outbox table. In the first record, the value in the aggregatetype column is customers. In the second record, the value in the aggregatetype column is orders. The connector emits the first record to the outbox.event.customers topic. The connector emits the second record to the outbox.event.orders topic.

To obtain this value from a different outbox table column, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox table column, set the table.field.event.key SMT option in the connector configuration.

payload

A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox table column, set the table.field.event.payload SMT option in the connector configuration.

Additional custom columns

Any additional columns from the outbox table can be added to outbox events either within the payload section or as a message header.

One example could be a column eventType which conveys a user-defined value that helps to categorize or organize events.

12.6.3. Basic Debezium outbox event router SMT configuration

To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter SMT. For example, the basic configuration in a .properties file looks like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

12.6.4. Options for applying the Outbox event router transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

12.6.5. Using Avro as the payload format in Debezium outbox messages

The outbox event router SMT supports arbitrary payload formats. The payload column value in an outbox table is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter

By default, the payload column value (the Avro data) is the only message value. Configuration of ByteBufferConverter as the value converter propagates the payload column value as-is into the Kafka message value.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteBufferConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

12.6.6. Emitting additional fields in Debezium outbox messages

Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order in the aggregatetype column and another column, eventType, whose possible values are order-created and order-shipped. To emit the eventType column value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:header:eventType

To emit the eventType column value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:envelope:eventType

12.6.7. Expanding escaped JSON String as JSON

You may have noticed that the Debezium outbox message contains the payload represented as a String. So when this string, is actually JSON, it appears as escaped in the result Kafka message like shown below:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

The outbox event router allows you to expand this message content to "real" JSON with the companion schema being deduced from the JSON document itself. That way the result in Kafka message looks like:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

To enable this transformation, you have to set the table.expand.json.payload to true and use the StringConverter like below:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

12.6.8. Options for configuring outbox event router transformation

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 12.7. Descriptions of outbox event router SMT configuration options
OptionDefaultGroupDescription

table.op.invalid.behavior

warn

Table

Determines the behavior of the SMT when there is an UPDATE operation on the outbox table. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox table record.
  • error - The SMT logs an error and continues to the next outbox table record.
  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox table are expected to be INSERT operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed. The SMT automatically filters out DELETE operations on an outbox table.

table.field.event.id

id

Table

Specifies the outbox table column that contains the unique event ID. This ID will be stored in the emitted event’s headers under the id key.

table.field.event.key

aggregateid

Table

Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

table.field.event.timestamp

 

Table

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages.

table.field.event.payload

payload

Table

Specifies the outbox table column that contains the event payload.

table.field.event.payload.id

aggregateid

Table

Specifies the outbox table column that contains the payload ID. This ID will be used as the emitted event’s key.

This option is deprecated, use table.field.event.key instead.

table.expand.json.payload

false

Table

Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".

Fore more details, please see the expanding escaped json section.

table.fields.additional.placement

 

Table, Envelope

Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the column, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

Configuration examples are in emitting additional fields in Debezium outbox messages.

table.field.event.schema.version

 

Table, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

route.by.field

aggregatetype

Router

Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table.

route.topic.regex

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the route.topic.replacement SMT option.

The default behavior is that the SMT replaces the default ${routedByValue} variable in the setting of the route.topic.replacement SMT option with the setting of the route.by.field outbox SMT option.

route.topic.replacement

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record. For example, if the aggregatetype value is customers, the topic name is outbox.event.customers.

To change the topic name, you can:

route.tombstone.on.empty.payload

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

12.7. Configuring Debezium MongoDB connectors to use the outbox pattern

Note

This SMT is for use with the Debezium MongoDB connector only. For information about using the outbox event router SMT for relational databases, see Outbox event router.

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox collection
  • Apply the Debezium MongoDB outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the MongoDB outbox SMT should capture changes that occur in an outbox collection only. For more information, see Options for applying the transformation selectively.

A connector can capture changes in more than one outbox collection only if each outbox collection has the same structure.

Note

To use this SMT, operations on the actual business collection(s) and the insert into the outbox collection must be done as part of a multi-document transaction, which have been being supported since MongoDB 4.0, to prevent potential data inconsistencies between business collection(s) and outbox collection. For future update, to enable updating existing data and inserting outbox event in an ACID transaction without multi-document transactions, we have planned to support additional configurations for storing outbox events in a form of a sub-document of the existing collection, rather than an independent outbox collection.

For more information about the outbox pattern, see Reliable Microservices Data Exchange With the Outbox Pattern.

The following topics provide details:

12.7.1. Example of a Debezium MongoDB outbox message

To understand how to configure the Debezium MongoDB outbox event router SMT, consider the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the MongoDB outbox event router SMT generates the preceding message by transforming a raw Debezium change event message as in the following example:

# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "patch": null,
  "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
  "source": {
    "version": "1.7.2.Final",
    "connector": "mongodb",
    "name": "fulfillment",
    "ts_ms": 1558965508000,
    "snapshot": false,
    "db": "inventory",
    "rs": "rs0",
    "collection": "customers",
    "ord": 31,
    "h": 1546547425148721999
  },
  "op": "c",
  "ts_ms": 1556890294484
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox collection structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

12.7.2. Outbox collection structure expected by Debezium mongodb outbox event router SMT

To apply the default MongoDB outbox event router SMT configuration, your outbox collection is assumed to have the following fields:

{
  "_id": "objectId",
  "aggregatetype": "string",
  "aggregateid": "objectId",
  "type": "string",
  "payload": "object"
}
Table 12.8. Descriptions of expected outbox collection fields
FieldEffect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox collection field, set the collection.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two documents to the outbox collection. In the first document, the value in the aggregatetype field is customers. In the second document, the value in the aggregatetype field is orders. The connector emits the first document to the outbox.event.customers topic. The connector emits the second document to the outbox.event.orders topic.

To obtain this value from a different outbox collection field, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox collection field, set the collection.field.event.key SMT option in the connector configuration.

payload

A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox collection field, set the collection.field.event.payload SMT option in the connector configuration.

Additional custom fields

Any additional fields from the outbox collection can be added to outbox events either within the payload section or as a message header.

One example could be a field eventType which conveys a user-defined value that helps to categorize or organize events.

12.7.3. Basic Debezium MongoDB outbox event router SMT configuration

To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter SMT. The following example shows the basic configuration for the SMT in a .properties file:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter

12.7.4. Options for applying the MongoDB outbox event router transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

12.7.5. Using Avro as the payload format in Debezium MongoDB outbox messages

The MongoDB outbox event router SMT supports arbitrary payload formats. The payload field value in an outbox collection is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteBufferConverter

By default, the payload field value (the Avro data) is the only message value. Configuration of ByteBufferConverter as the value converter propagates the payload field value as-is into the Kafka message value.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteBufferConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteBufferConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

12.7.6. Emitting additional fields in Debezium MongoDB outbox messages

Your outbox collection might contain fields whose values you want to add to the emitted outbox messages. For example, consider an outbox collection that has a value of purchase-order in the aggregatetype field and another field, eventType, whose possible values are order-created and order-shipped. To emit the eventType field value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.fields.additional.placement=type:header:eventType

To emit the eventType field value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.fields.additional.placement=type:envelope:eventType

12.7.7. Expanding escaped JSON String as JSON

By default, the payload of the Debezium outbox message is represented as a string. When the original source of the string is in JSON format, the resulting Kafka message uses escape sequences to represent the string, as shown in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

You can configure the outbox event router to expand the message content, converting the escaped JSON back to its original, unescaped JSON format. In the converted string, the companion schema is deduced from the original JSON document. The following examples shows the expanded JSON in the resulting Kafka message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

To enable string conversion in the transformation, set the value of collection.expand.json.payload to true and use the StringConverter as shown in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

12.7.8. Options for configuring outbox event router transformation

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 12.9. Descriptions of outbox event router SMT configuration options
OptionDefaultGroupDescription

collection.op.invalid.behavior

warn

Collection

Determines the behavior of the SMT when there is an update operation on the outbox collection. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox collection document.
  • error - The SMT logs an error and continues to the next outbox collection document.
  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox collection are expected to be an insert or delete operation. That is, an outbox collection functions as a queue; updates to documents in an outbox collection are not allowed. The SMT automatically filters out delete operations (for removing proceeded outbox events) on an outbox collection.

collection.field.event.id

_id

Collection

Specifies the outbox collection field that contains the unique event ID. This ID will be stored in the emitted event’s headers under the id key.

collection.field.event.key

aggregateid

Collection

Specifies the outbox collection field that contains the event key. When this field contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

collection.field.event.timestamp

 

Collection

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox collection field that contains the timestamp that you want to be in emitted outbox messages.

collection.field.event.payload

payload

Collection

Specifies the outbox collection field that contains the event payload.

collection.expand.json.payload

false

Collection

Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".

Fore more details, please see the expanding escaped json section.

collection.fields.additional.placement

 

Collection, Envelope

Specifies one or more outbox collection fields that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a field and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the field, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

Configuration examples are in emitting additional fields in Debezium outbox messages.

collection.field.event.schema.version

 

Collection, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

route.by.field

aggregatetype

Router

Specifies the name of a field in the outbox collection. By default, the value specified in this field becomes a part of the name of the topic to which the connector emits the outbox messages. For an example, see the description of the expected outbox collection.

route.topic.regex

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox collection documents. This regular expression is part of the setting of the route.topic.replacement SMT option.

+ The default behavior is that the SMT replaces the default ${routedByValue} variable in the setting of the route.topic.replacement SMT option with the setting of the route.by.field outbox SMT option.

route.topic.replacement

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype field value in the outbox collection document. For example, if the aggregatetype value is customers, the topic name is outbox.event.customers.

+ To change the topic name, you can:

route.tombstone.on.empty.payload

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

Revised on 2022-04-13 09:46:09 UTC

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.