Chapter 7. Applying transformations to modify messages exchanged with Apache Kafka


Debezium provides several single message transformations (SMTs) that you can use to modify change event records. You can configure a connector to apply a transformation that modifies records before its sends them to Apache Kafka. You can also apply the Debezium SMTs to a sink connector to modify records before the connector reads from a Kafka topic.

If you want to apply transformations selectively to specific messages only, you can configure a Kafka Connect predicate to define the conditions for applying the SMT.

Debezium provides the following SMTs:

Topic router SMT
Reroutes change event records to specific topics based on a regular expression that is applied to the original topic name.
Content-based router SMT
Reroutes specified change event records based on the event content.
Event Record Changes SMT
Enhances an event message to identify the fields whose values change or remain unchanged after a database operation
Message filtering SMT
Enables you to propagate a subset of event records to the destination Kafka topic. The transformation applies a regular expression to the change event records that a connector emits, based on the content of the event record. Only records that match the expression are written to the target topic. Other records are ignored.
HeaderToValue SMT
Extracts specified header fields from event records, and then copies or moves the header fields to values in the event record.
New record state extraction SMT
Flattens the complex structure of a Debezium change event record into a simplified format. The simplified structure enables processing by sink connectors that cannot consume the original structure.
MongoDB new record state extraction
Simplifies the complex structure of Debezium MongoDB connector change event records. The simplified structure enables processing by sink connectors that cannot consume the original event structure.
Outbox event router SMT
Provides support for the outbox pattern to enable safe and reliable data exchange among multiple services.
MongoDB outbox event router SMT
Provides support for using the outbox pattern with the MongoDB connector to enable safe and reliable data exchange among multiple services.
Partition routing SMT
Routes events to specific destination partitions based on the values of one or more specified payload fields.
Time zone converter SMT
Converts Debezium and Kafka Connect timestamp fields in event records to a specified timezone.

7.1. Applying transformations selectively with SMT predicates

When you configure a single message transformation (SMT) for a connector, you can define a predicate for the transformation. The predicate specifies how to apply the transformation conditionally to a subset of the messages that the connector processes. You can assign predicates to transformations that you configure for source connectors, such as Debezium, or to sink connectors.

7.1.1. About SMT predicates

Debezium provides several single message transformations (SMTs) that you can use to modify event records before Kafka Connect saves the records to Kafka topics. By default, when you configure one of these SMTs for a Debezium connector, Kafka Connect applies that transformation to every record that the connector emits. However, there might be instances in which you want to apply a transformation selectively, so that it modifies only that subset of change event messages that share a common characteristic.

For example, for a Debezium connector, you might want to run the transformation only on event messages from a specific table or that include a specific header key. In environments that run Apache Kafka 2.6 or greater, you can append a predicate statement to a transformation to instruct Kafka Connect to apply the SMT only to certain records. In the predicate, you specify a condition that Kafka Connect uses to evaluate each message that it processes. When a Debezium connector emits a change event message, Kafka Connect checks the message against the configured predicate condition. If the condition is true for the event message, Kafka Connect applies the transformation, and then writes the message to a Kafka topic. Messages that do not match the condition are sent to Kafka unmodified.

The situation is similar for predicates that you define for a sink connector SMT. The connector reads messages from a Kafka topic and Kafka Connect evaluates the messages against the predicate condition. If a message matches the condition, Kafka Connect applies the transformation and then passes the messages to the sink connector.

After you define a predicate, you can reuse it and apply it to multiple transforms. Predicates also include a negate option that you can use to invert a predicate so that the predicate condition is applied only to records that do not match the condition that is defined in the predicate statement. You can use the negate option to pair the predicate with other transforms that are based on negating the condition.

Predicate elements

Predicates include the following elements:

  • predicates prefix
  • Alias (for example, isOutboxTable)
  • Type (for example, org.apache.kafka.connect.transforms.predicates.TopicNameMatches). Kafka Connect provides a set of default predicate types, which you can supplement by defining your own custom predicates.
  • Condition statement and any additional configuration properties, depending on the type of predicate (for example, a regex naming pattern)

Default predicate types

The following predicate types are available by default:

HasHeaderKey
Specifies a key name in the header in the event message that you want Kafka Connect to evaluate. The predicate evaluates to true for any records that include a header key that has the specified name.
RecordIsTombstone

Matches Kafka tombstone records. The predicate evaluates to true for any record that has a null value. Use this predicate in combination with a filter SMT to remove tombstone records. This predicate has no configuration parameters.

A tombstone in Kafka is a record that has a key with a 0-byte, null payload. When a Debezium connector processes a delete operation in the source database, the connector emits two change events for the delete operation:

  • A delete operation ("op" : "d") event that provides the previous value of the database record.
  • A tombstone event that has the same key, but a null value.

    The tombstone represents a delete marker for the row. When log compaction is enabled for Kafka, during compaction Kafka removes all events that share the same key as the tombstone. Log compaction occurs periodically, with the compaction interval controlled by the delete.retention.ms setting for the topic.

    Although it is possible to configure Debezium so that it does not emit tombstone events, it’s best to permit Debezium to emit tombstones to maintain the expected behavior during log compaction. Suppressing tombstones prevents Kafka from removing records for a deleted key during log compaction. If your environment includes sink connectors that cannot process tombstones, you can configure the sink connector to use an SMT with the RecordIsTombstone predicate to filter out the tombstone records.

TopicNameMatches
A regular expression that specifies the name of a topic that you want Kafka Connect to match. The predicate is true for connector records in which the topic name matches the specified regular expression. Use this predicate to apply an SMT to records based on the name of the source table.

7.1.2. Defining SMT predicates

By default, Kafka Connect applies each single message transformation in the Debezium connector configuration to every change event record that it receives from Debezium. Beginning with Apache Kafka 2.6, you can define an SMT predicate for a transformation in the connector configuration that controls how Kafka Connect applies the transformation. The predicate statement defines the conditions under which Kafka Connect applies the transformation to event records emitted by Debezium. Kafka Connect evaluates the predicate statement and then applies the SMT selectively to the subset of records that match the condition that is defined in the predicate. Configuring Kafka Connect predicates is similar to configuring transforms. You specify a predicate alias, associate the alias with a transform, and then define the type and configuration for the predicate.

Prerequisites

  • The Debezium environment runs Apache Kafka 2.6 or greater.
  • An SMT is configured for the Debezium connector.

Procedure

  1. In the Debezium connector configuration, specify a predicate alias for the predicates parameter, for example, IsOutboxTable.
  2. Associate the predicate alias with the transform that you want to apply conditionally, by appending the predicate alias to the transform alias in the connector configuration:

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    For example:

    transforms.outbox.predicate=IsOutboxTable
  3. Configure the predicate by specifying its type and providing values for configuration parameters.

    1. For the type, specify one of the following default types that are available in Kafka Connect:

      • HasHeaderKey
      • RecordIsTombstone
      • TopicNameMatches

        For example:

        predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. For the TopicNameMatch or HasHeaderKey predicates, specify a regular expression for the topic or header name that you want to match.

      For example:

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. If you want to negate a condition, append the negate keyword to the transform alias and set it to true.

    For example:

    transforms.outbox.negate=true

    The preceding property inverts the set of records that the predicate matches, so that Kafka Connect applies the transform to any record that does not match the condition specified in the predicate.

Example: TopicNameMatch predicate for the outbox event router transformation

The following example shows a Debezium connector configuration that applies the outbox event router transformation only to messages that Debezium emits to the Kafka outbox.event.order topic.

Because the TopicNameMatch predicate evaluates to true only for messages from the outbox table (outbox.event.*), the transformation is not applied to messages that originate from other tables in the database.

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*

7.1.3. Ignoring tombstone events

You can control whether Debezium emits tombstone events, and how long Kafka retains them. Depending on your data pipeline, you might want to set the tombstones.on.delete property for a connector so that Debezium does not emit tombstone events.

Whether you enable Debezium to emit tombstones depends on how topics are consumed in your environment and by the characteristics of the sink consumer. Some sink connectors rely on tombstone events to remove records from downstream data stores. In cases where sink connectors rely on tombstone records to indicate when to delete records in downstream data stores, configure Debezium to emit them.

When you configure Debezium to generate tombstones, further configuration is required to ensure that sink connectors receive the tombstone events. The retention policy for a topic must be set so that the connector has time to read event messages before Kafka removes them during log compaction. The length of time that a topic retains tombstones before compaction is controlled by the delete.retention.ms property for the topic.

By default, the tombstones.on.delete property for a connector is set to true so that the connector generates a tombstone after each delete event. If you set the property to false to prevent Debezium from saving tombstone records to Kafka topics, the absence of tombstone records might lead to unintended consequences. Kafka relies on tombstone during log compaction to remove records that are related to a deleted key.

If you need to support sink connectors or downstream Kafka consumers that cannot process records with null values, rather than preventing Debezium from emitting tombstones, consider configuring an SMT for the connector with a predicate that uses the RecordIsTombstone predicate type to remove tombstone messages before consumers read them.

Procedure

  • To prevent Debezium from emitting tombstone events for deleted database records, set the connector option tombstones.on.delete to false.

    For example:

    “tombstones.on.delete”: “false”

7.2. Routing Debezium event records to topics that you specify

Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the topic routing single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:

  • An expression for identifying the records to re-route
  • An expression that resolves to the destination topic
  • How to ensure a unique key among the records being re-routed to the destination topic

It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.

The topic routing transformation is a Kafka Connect SMT.

The following topics provide details:

7.2.1. Use case for routing Debezium records to topics that you specify

The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.

Logical tables

A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table and db_shard2.my_table. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.

Partitioned PostgreSQL tables

When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.

7.2.2. Example of routing Debezium records for multiple tables to one topic

To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:

  • The tables for which to route records. These tables must all have the same schema.
  • The destination topic name.

The connector configuration in the following example sets several options for the topic routing SMT:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

In the example, the regular expression, (.*)customers_shard(.*) matches records for changes to tables whose names include the customers_shard string. This would re-route records for tables with the following names:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the myserver.mydb.customers_all_shards topic.
schema.name.adjustment.mode
Specifies how the message key schema names derived from the resulting topic name should be adjusted for compatibility with the message converter used by the connector. The value can be none (default) or avro.

Customizing the configuration

To customize the configuration you can define an SMT predicate statement that specifies the tables that you want the transformation to process, or not to process. A predicate might be useful if you configure the SMT to route tables that match a regular expression, and you do not want the SMT to reroute one particular table that matches the expression.

7.2.3. Ensuring unique keys across Debezium records routed to the same topic

A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1 table might have the same key value as a row in the myserver.mydb.customers_shard2 table.

To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier. The value of the inserted field is the default destination topic name.

If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names. For example:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

This example adds the shard_id field to the key structure in routed records.

If you want to adjust the value of the key’s new field, configure both of these options:

key.field.regex
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.

For example:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

With this configuration, suppose that the default destination topic names are:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1, 2, or 3.

If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness option to false:

...
transforms.Reroute.key.enforce.uniqueness=false
...

7.2.4. Options for applying the topic routing transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

You can use one of the following methods to configure the connector to apply the SMT selectively:

7.2.5. Options for configuring Debezium topic routing transformation

The following table describes topic routing SMT configuration options.

Table 7.1. Topic routing SMT configuration options
OptionDefaultDescription

topic.regex

 

Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.

topic.replacement

 

Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for topic.regex. To refer to a group, specify $1, $2, and so on.

key.enforce​.uniqueness

true

Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables.

Specify false if you do not want the transformation to add a key field. For example, if you are routing records from a partitioned PostgreSQL table to one topic, you can configure key.enforce.uniqueness=false because unique keys are guaranteed in partitioned PostgreSQL tables.

key.field.name

__dbz__physicalTableIdentifier

Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.

key.field.regex

 

Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

key.field​.replacement

 

Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for key.field.regex. For the SMT to apply this expression, key.enforce.uniqueness must be true, which is the default.

schema.name.adjustment.mode

none

Specify how the message key schema names derived from the resulting topic name should be adjusted for compatibility with the message converter used by the connector, including: none does not apply any adjustment (default), avro replaces the characters that cannot be used in the Avro type name with underscore.

logical.table.cache.size

16

The size used for holding the max entries in LRUCache. The cache will keep the old/new schema for logical table key and value, also cache the derived key and topic regex result for improving the source record transformation.

7.3. Routing change event records to topics according to event content

By default, Debezium streams all of the change events that it reads from a table to a single static topic. However, there might be situations in which you might want to reroute selected events to other topics, based on the event content. The process of routing messages based on their content is described in the Content-based routing messaging pattern. To apply this pattern in Debezium, you use the content-based routing single message transform (SMT) to write expressions that are evaluated for each event. Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.

While it is possible to use Java to create a custom SMT to encode routing logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The content-based routing SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language. Depending on the method that you use to deploy Debezium, you can automatically download the required artifacts from Maven Central, or you can manually download the artifacts, and then add them to your Debezium connector plug-in directories, along any other JAR files used by the language implementation.

7.3.1. Setting up the Debezium content-based-routing SMT

For security reasons, the content-based routing SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-2.7.3.Final.tar.gz.

If you deploy the Debezium connector by building a custom Kafka Connect container image from a Dockerfile, to use the filter SMT, you must explicitly add the SMT artifact to your Kafka Connect environment. When you use Streams for Apache Kafka to deploy the connector, it can download the required artifacts automatically based on configuration parameters that you specify in the Kafka Connect custom resource. IMPORTANT: After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.

The following procedure applies if you build your Kafka Connect container image from a Dockerfile. If you use Streams for Apache Kafka to create the Kafka Connect image, follow the instructions in the deployment topic for your connector.

Procedure

  1. From a browser, open the Software Downloads, and download the Debezium scripting SMT archive (debezium-scripting-2.7.3.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

The Groovy language needs the following libraries on the classpath:

  • groovy
  • groovy-json (optional)
  • groovy-jsr223

The JavaScript language needs the following libraries on the classpath:

  • graalvm.js
  • graalvm.js.scriptengine

7.3.2. Example: Debezium basic content-based routing configuration

To configure a Debezium connector to route change event records based on the event content, you configure the ContentBasedRouter SMT in the Kafka Connect configuration for the connector.

Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria. In the configuration, you create a regular expression that defines routing criteria. The expression defines a pattern for evaluating event records. It also specifies the name of a destination topic where events that match the pattern are routed. The pattern that you specify might designate an event type, such as a table insert, update, or delete operation. You might also define a pattern that matches a value in a specific column or row.

For example, to reroute all update (u) records to an updates topic, you might add the following configuration to your connector configuration:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

The preceding example specifies the use of the Groovy expression language.

Records that do not match the pattern are routed to the default topic.

Customizing the configuration

The preceding example shows a simple SMT configuration that is designed to process only DML events, which contain an op field. Other types of messages that a connector might emit (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes) do not contain this field. To avoid processing failures, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

7.3.3. Variables for use in Debezium content-based routing expressions

Debezium binds certain variables into the evaluation context for the SMT. When you create expressions to specify conditions to control the routing destination, the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the content-based routing SMT:

Table 7.2. Content-based routing expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect​.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

headers

A Java map of message headers. The key field is the header name. The headers variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the routing condition in an expression evaluates to true, the message is retained. When the routing condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

7.3.4. Options for applying the content-based routing transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

7.3.5. Configuration of content-based routing conditions for other scripting languages

The way that you express content-based routing conditions depends on the scripting language that you use. For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression reroutes all update (u) records to the updates topic, while routing other records to the default topic:

value.op == 'u' ? 'updates' : null

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures.
To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the array fields in the JSON into separate documents.
You can use a JSON parser within an expression to generate separate output documents for each array item. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

When you use JavaScript as the expression language, you can call the Struct#get() method to specify the content-based routing condition, as in the following example:

value.get('op') == 'u' ? 'updates' : null

Javascript with Graal.js

When you create content-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy. For example:

value.op == 'u' ? 'updates' : null

7.3.6. Options for configuring the content-based routing transformation

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the condition logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

topic.expression

 

The expression to be evaluated for every message. Must evaluate to a String value where a result of non-null reroutes the message to a new topic, and a null value routes the message to the default topic.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the condition logic to the messages.

7.4. Extracting field-level changes from Debezium event records

A Debezium data change event has a complex structure that provides a wealth of information. However, in some cases, before a downstream consumer can process Debezium change event messages, it requires additional information about field-level changes that result from the original database change. To enhance event messages with details about how a database operation modifies fields in the source database, Debezium provides the ExtractChangedRecordState single message transformation (SMT).

The event changes transformation is a Kafka Connect SMT.

7.4.1. Description of Debezium change event structure

Debezium generates data change events that have a complex structure. Each event consists of the following parts:

  • Metadata, which includes but is not limited to the following types:

    • The type of operation that changed the data.
    • Source information, such as the names of the database and the table in which the change occurred.
    • Timestamp that identifies when the change was made.
    • Optional transaction information.
  • Row data before a change.
  • Row data after a change.

The following example shows part of the structure of a typical Debezium UPDATE change event:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

The complex format of the message in the preceding example provides detailed information about changes that occur in the source database. However, the format might not be suitable for some downstream consumers. Sink connectors, or other parts of the Kafka ecosystem might expect the message to explicitly identify the fields that a database operation changes or leaves unchanged. The ExtractChangedRecordState SMT adds headers to the change event message to identify the fields that are modified by a database operation, and the fields that remain unchanged.

7.4.2. Behavior of the Debezium event changes SMT

The event changes SMT extracts the before and after fields from a Debezium UPDATE change event in a Kafka record. The transformation examines the before and after event state structures to identify the fields that are altered by an operation, and those that remain unchanged. Depending on the connector configuration, the transformation then produces a modified event message that adds message headers to list the changed fields, the unchanged fields, or both. If the event represents an INSERT or DELETE, this single message transformation has no effect.

You can configure the event changes SMT for a Debezium connector, or for a sink connector that consumes messages emitted by a Debezium connector. Configure the event changes SMT for a sink connector if you want Apache Kafka to retain the entire original Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.

Depending on your use case, you can configure the transformation to modify the original message by performing one or both of the following tasks:

  • Identify the fields that are changed by an UPDATE event by listing them in the user-configured header.changed.name header.
  • Identify the fields that are not changed by an UPDATE event by listing them in the user-configured header.unchanged.name header.

7.4.3. Configuration of the Debezium event changes SMT

You configure the Debezium event changes SMT for a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, which doesn’t add any headers, add the transformation to the connector configuration, as in the following example:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState

As with any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.

The connector configuration in the following example sets several options for the event changes SMT:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
header.changed.name
The Kafka message header name to use for storing a comma-separated list of the fields that are changed by a database operation.
header.unchanged.name
The Kafka message header name to use for storing a comma-separated list of the fields that remain unchanged after a database operation.

Customizing the configuration

The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

7.4.4. Options for applying the event changes transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.

7.4.5. Descriptions of the configuration options for the Debezium event changes SMT

The following table describes the options that you can specify to configure the event changes SMT.

Table 7.3. Descriptions of event changes SMT configuration options
OptionDefaultDescription

header.changed.name

 

The Kafka message header name to use for storing a comma-separated list of the fields that are changed by a database operation.

header.unchanged.name

 

The Kafka message header name to use for storing a comma-separated list of the fields that remain unchanged after a database operation.

7.5. Filtering Debezium change event records

By default, Debezium delivers every data change event that it receives to the Kafka broker. However, in many cases, you might be interested in only a subset of the events emitted by the producer. To enable you to process only the records that are relevant to you, Debezium provides the filter single message transform (SMT).

While it is possible to use Java to create a custom SMT to encode filtering logic, using a custom-coded SMT has its drawbacks. For example:

  • It is necessary to compile the transformation up front and deploy it to Kafka Connect.
  • Every change needs code recompilation and redeployment, leading to inflexible operations.

The filter SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).

Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language. Depending on the method that you use to deploy Debezium, you can automatically download the required artifacts from Maven Central, or you can manually download the artifacts, and then add them to your Debezium connector plug-in directories, along any other JAR files used by the language implementation.

7.5.1. Setting up the Debezium filter SMT

For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-2.7.3.Final.tar.gz.

If you deploy the Debezium connector by building a custom Kafka Connect container image from a Dockerfile, to use the filter SMT, you must explicitly download the SMT archive and deploy the files alongside the connector plug-in. When you use Streams for Apache Kafka to deploy the connector, it can download the required artifacts automatically based on configuration parameters that you specify in the Kafka Connect custom resource. IMPORTANT: After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.

The following procedure applies if you build your Kafka Connect container image from a Dockerfile. If you use Streams for Apache Kafka to create the Kafka Connect image, follow the instructions in the deployment topic for your connector.

Procedure

  1. From a browser, open the Software Downloads, and download the Debezium scripting SMT archive (debezium-scripting-2.7.3.Final.tar.gz).
  2. Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
  3. Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
  4. Restart the Kafka Connect process to pick up the new JAR files.

The Groovy language needs the following libraries on the classpath:

  • groovy
  • groovy-json (optional)
  • groovy-jsr223

The JavaScript language needs the following libraries on the classpath:

  • graalvm.js
  • graalvm.js.scriptengine

7.5.2. Example: Debezium basic filter SMT configuration

You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.

To configure a Debezium connector to filter change event records, configure the Filter SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.

For example, you might add the following configuration in your connector configuration.

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

The preceding example specifies the use of the Groovy expression language. The regular expression value.op == 'u' && value.before.id == 2 removes all messages, except those that represent update (u) records with id values that are equal to 2.

Customizing the configuration

The preceding example shows a simple SMT configuration that is designed to process only DML events, which contain an op field. Other types of messages that a connector might emit (heartbeat messages, tombstone messages, or metadata messages about schema changes and transactions) do not contain this field. To avoid processing failures, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

7.5.3. Variables for use in filter expressions

Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.

The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:

Table 7.4. Filter expression variables
NameDescriptionType

key

A key of the message.

org.apache.kafka.connect​.data​.Struct

value

A value of the message.

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

Schema of the message value.

org.apache.kafka.connect​.data​.Schema

topic

Name of the target topic.

String

header

A Java map of message headers. The key field is the header name. The header variable exposes the following properties:

  • value (of type Object)
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true, the message is retained. When the filter condition evaluates to false, the message is removed.

Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.

7.5.4. Options for applying the filter transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

7.5.5. Filter condition configuration for other scripting languages

The way that you express filtering conditions depends on the scripting language that you use.

For example, as shown in the basic configuration example, when you use Groovy as the expression language, the following expression removes all messages, except for update records that have id values set to 2:

value.op == 'u' && value.before.id == 2

Other languages use different methods to express the same condition.

Tip

The Debezium MongoDB connector emits the after and patch fields as serialized JSON documents rather than as structures.
To use the filter SMT with the MongoDB connector, you must first unwind the array fields in the JSON into separate documents.
You can use a JSON parser within an expression to generate separate output documents for each array item. For example, if you use Groovy as the expression language, add the groovy-json artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Javascript

If you use JavaScript as the expression language, you can call the Struct#get() method to specify the filtering condition, as in the following example:

value.get('op') == 'u' && value.get('before').get('id') == 2

Javascript with Graal.js

If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:

value.op == 'u' && value.before.id == 2

7.5.6. Options for configuring filter transformation

The following table lists the configuration options that you can use with the filter SMT.

Table 7.5. filter SMT configuration options

Property

Default

Description

topic.regex

 

An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in topic.regex, the transformation applies the filter logic before it passes the event to the topic. If the name of the topic does not match the value in topic.regex, the SMT passes the event to the topic unmodified.

language

 

The language in which the expression is written. Must begin with jsr223., for example, jsr223.groovy, or jsr223.graal.js. Debezium supports bootstrapping through the JSR 223 API ("Scripting for the Java ™ Platform") only.

condition

 

The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of true keeps the message, and a result of false removes it.

null.handling.mode

keep

Specifies how the transformation handles null (tombstone) messages. You can specify one of the following options:

keep
(Default) Pass the messages through.
drop
Remove the messages completely.
evaluate
Apply the filter condition to the messages.

7.6. Converting message headers into event record values

The HeaderToValue SMT extracts specified header fields from event records, and then copies or moves the header fields to values in the event record. The move options removes the fields from the header entirely before adding them as values in the payload. You can configure the SMT to manipulate multiple headers in the original message. You can use dot notation to specify a node within the payload in which you want to nest the header field. For more information about configuring the SMT, see the following example.

7.6.1. Example: Basic configuration of the Debezium HeaderToValue SMT

To extract message headers in an event record into the record value, configure the HeaderToValue SMT in the Kafka Connect configuration for a connector. You can configure the transformation to either remove the original headers or to copy them. To remove header fields from the record, configure the SMT to use the move operations. To retain the header fields in the original record, configure the SMT to use the copy operation. For example, to remove the headers event_timestamp and key from an event message, add the following lines to your connector configuration:

transforms=moveHeadersToValue
transforms.moveHeadersToValue.type=io.debezium.transforms.HeaderToValue
transforms.moveHeadersToValue.headers=event_timestamp,key
transforms.moveHeadersToValue.fields=timestamp,source.id
transforms.moveHeadersToValue.operation=move

The following example shows the headers and values of an event record before and after the transformation is applied.

Example 7.1. Effect of applying the HeaderToValue SMT

Event record before it is processed by the HeaderToValue transformation
Header before the SMT processes the event record
{
    "header_x": 0,
    "event_timestamp": 1626102708861,
    "key": 100,
}
Value before the SMT processes the event record
{
        "before": null,
        "after": {
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": {
            "version": "2.1.3.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863091,
            "ts_ns": 1559033904863091000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]"
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1559033904863,
        "ts_us": 1559033904863741,
        "ts_ns": 1559033904863741697
    }
Event record after it is processed by the HeaderToValue transformation
Header after the SMT removes the specified field
{
    "header_x": 0
}
Value after the SMT moves header fields into the value
{
        "before": null,
        "after": {
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": {
            "version": "2.1.3.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863697,
            "ts_ns": 1559033904863697000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]"
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null,
            "id": 100
        },
        "op": "c",
        "ts_ms": 1559033904863,
        "ts_us": 1559033904863631,
        "ts_ns": 1559033904863631584,
        "event_timestamp": 1626102708861
    }

7.6.2. Options for configuring the HeaderToValue transformation

The following table lists the configuration options that you can use with the HeaderToValue SMT.

Table 7.6. HeaderToValue SMT configuration options

Property

Description

Type

Default

Valid Values

Importance

headers

A comma-separated list of header names in the record whose values are to be copied or moved to the record value.

list

No default value

non-empty list

high

fields

A comma-separated list of field names, in the same order as the header names listed in the headers configuration property. Use dot notation to instruct the SMT to nest fields within specific nodes of the message payload. For information about how to configure the SMT to use dot notation, see the example that appears earlier in this topic.

list

No default value

non-empty list

high

operation

Specifies one of the following options: move:: The SMT moves header fields to values in the event record, and removes the fields from the header. copy:: The SMT copies header field to values in the event record, and retains the original header fields.

string

No default value

move or copy

high

7.7. Extracting source record after state from Debezium change events

Debezium connectors emits data change messages to represent each operation that they capture from a source database. The messages that a connector sends to Apache Kafka have a complex structure that faithfully represent the details of the original database event.

Although this complex message format accurately details information about changes that happen in the system, the format might not be suitable for some downstream consumers. Sink connectors, or other parts of the Kafka ecosystem might require messages that are formatted so that field names and values are presented in a simplified, flattened structure.

To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening single message transformation (SMT). Configure the transformation to support consumers that require Kafka records to be in a format that is simpler than the default format that that the connector produces. Depending on your particular use case, you can apply the SMT to a Debezium connector, or to a sink connector that consumes messages that the Debezium connector produces. To enable Apache Kafka to retain the Debezium change event messages in their original format, configure the SMT for a sink connector.

The event flattening transformation is a Kafka Connect SMT.

Note

The information in this chapter describes the event flattening single message transformation (SMT) for Debezium SQL-based database connectors. For information about an equivalent SMT for the Debezium MongoDB connector, see MongoDB New Document State Extraction.

The following topics provide details:

7.7.1. Description of Debezium change event structure

Debezium generates data change events that have a complex structure. Each event consists of three parts:

  • Metadata, which includes but is not limited to:

    • The type of operation that changed the data.
    • Source information, such as the names of the database and the table in which the change occurred.
    • Timestamp that identifies when the change was made.
    • Optional transaction information.
  • Row data before the change
  • Row data after the change

The following example shows part of the message structure for an UPDATE change event:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

For more information about the change event structure for a connector, see the documentation for the connector.

After the event flattening SMT processes the message in the previous example, it simplifies the message format, resulting in the message in the following example:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

7.7.2. Behavior of Debezium event flattening transformation

The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record.

You can configure the event flattening SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.

You can configure the transformation to do any of the following:

  • Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
  • Keep Kafka records that contain change events for DELETE operations in the stream. The default behavior is that the SMT drops Kafka records for DELETE operation change events because most consumers cannot yet handle them.

A database DELETE operation causes Debezium to generate two Kafka records:

  • A record that contains "op": "d", the before row data, and some other fields.
  • A tombstone record that has the same key as the deleted row and a value of null. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.

Instead of dropping the record that contains the before row data, you can configure the event flattening SMT to do one of the following:

  • Keep the record in the stream and edit it to have only the "value": "null" field.
  • Keep the record in the stream and edit it to have a value field that contains the key/value pairs that were in the before field with an added "__deleted": "true" entry.

Similarly, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.

7.7.3. Configuration of Debezium event flattening transformation

Configure the Debezium event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. For example, to obtain the default behavior of the transformation, add it to the connector configuration without specifying any options, as in the following example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

As with any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.

The following .properties example sets several event flattening SMT options:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
Keeps tombstone records for DELETE operations in the event stream.
delete.handling.mode=rewrite

For DELETE operations, edits the Kafka record by flattening the value field that was in the change event. The value field directly contains the key/value pairs that were in the before field. The SMT adds __deleted and sets it to true, for example:

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn
Adds change event metadata for the table and lsn fields to the simplified Kafka record.

Customizing the configuration

The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

7.7.4. Example of adding Debezium metadata to the Kafka record

You can configure the event flattening SMT to add original change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:

  • The type of operation that made the change
  • The name of the database or table that was changed
  • Connector-specific fields such as the Postgres LSN field

To add metadata to the simplified Kafka record’s header, specify the add.headers option. To add metadata to the simplified Kafka record’s value, specify the add.fields option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite

With that configuration, a simplified Kafka record would contain something like the following:

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

Also, simplified Kafka records would have a __db header.

In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.

To add metadata to a simplified Kafka record that is for a DELETE operation, you must also configure delete.handling.mode=rewrite.

7.7.5. Options for applying the event flattening transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.

7.7.6. Options for configuring Debezium event flattening transformation

The following table describes the options that you can specify to configure the event flattening SMT.

Table 7.7. Descriptions of event flattening SMT configuration options
OptionDefaultDescription

drop.tombstones

true

Debezium generates a tombstone record for each DELETE operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify drop.tombstones=false.

Note

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

delete.handling​.mode

drop

Debezium generates a change event record for each DELETE operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for DELETE operations in the stream, set delete.handling.mode to none or rewrite.

Specify none to keep the change event record in the stream. The record contains only "value": "null".

Specify rewrite to keep the change event record in the stream and edit the record to have a value field that contains the key/value pairs that were in the before field and also add __deleted: true to the value. This is another way to indicate that the record has been deleted.

When you specify rewrite, the updated simplified records for DELETE operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.

Note

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

delete.tombstone.handling.mode

No default

Debezium generates a change event record for each DELETE operation. This setting determines how the event flattening SMT handles DELETE events from the stream.

Note

The setting for this option takes precedence over any conflicting settings that you might configure for the deprecated drop.tombstones or delete.handling.mode options.

Set one of the following options:

drop
The SMT removes both the DELETE event and TOMBSTONE from the stream.
tombstone (default)
The SMT retains TOMBSTONE records in the stream. The TOMBSTONE record contains only the following value: "value": "null".
rewrite

The SMT retains the change event record in the stream and makes the following changes:

  • Adds a value field to the record that contains the key/value pairs from the before field of the original record.
  • Adds __deleted: true to the value of the record.
  • Removes TOMBSTONE records.

    This setting provides another way to indicate that the record has been deleted.

rewrite-with-tombstone
The SMT behaves as it does when you select the rewrite option, except that it also retains TOMBSTONE records.

route.by.field

 

To use row data to determine the topic to route the record to, set this option to an after field attribute. The SMT routes the record to the topic whose name matches the value of the specified after field attribute. For a DELETE operation, set this option to a before field attribute.

For example, configuration of route.by.field=destination routes records to the topic whose name is the value of after.destination. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made.

If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure route.by.field to re-route the event.

add.fields.prefix

__ (double-underscore)

Set this optional string to prefix a field.

add.fields

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT still adds the field to the record’s value.

add.headers.prefix

__ (double-underscore)

Set this optional string to prefix a header.

add.headers

 

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example source.ts_ms.

Optionally, you can override the field name via <field name>:<new field name>, e.g. like so: new field name like version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP. Please note that the new field name is case-sensitive.

When the SMT adds metadata fields to the simplified record’s header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event record, the SMT does not add the field to the header.

drop.fields.header.name

 

The Kafka message header name to use for listing field names in the source message that you want to drop from the output message.

drop.fields.from.key

false

Specifies whether you want the SMT to remove fields that are listed in drop.fields.header.name from the event’s key.

drop.fields.keep.schema.compatible

true

Specifies whether you want the SMT to remove non-optional fields that are included in the drop.fields.header.name configuration property.

By default, the SMT only removes fields that are marked optional.

7.8. Extracting the source document after state from Debezium MongoDB change events

The Debezium MongoDB connector emits data change messages to represent each operation that occurs in a MongoDB collection. The complex structure of these event messages faithfully represent the details of the original database event. However, some downstream consumers might not be able to process the messages in their original format. For example, to represent nested documents in a data collection, the connector emits an event message in a format that includes nested fields. To support sink connectors, or other consumers that cannot process the hierarchical format of the original messages, you can use the Debezium MongoDB event flattening (ExtractNewDocumentState) single message transformation (SMT). The SMT simplifies the structure of the original messages, and can modify messages in other ways to make data easier to process.

The event flattening transformation is a Kafka Connect SMT.

Note

The information in this chapter describes the event flattening single message transformation (SMT) for Debezium MongoDB connectors only. For information about an equivalent SMT for use with relational databases, see the documentation for the New Record State Extraction SMT.

The following topics provide details:

7.8.1. Description of Debezium MongoDB change event structure

The Debezium MongoDB connector generates change events that have a complex structure. Each event message includes the following parts:

Source metadata

Includes, but is not limited to the following fields:

  • Type of the operation that changed data in the collection (create/insert, update, or delete).
  • Name of the database and collection in which the change occurred.
  • Timestamp that identifies when the change was made.
  • Optional transaction information.
Document data
before data

This field is present in environments that run MongoDB 6.0 and later when the capture.mode for the Debezium connector is set to one of the following values:

  • change_streams_with_pre_image.
  • change_streams_update_full_with_pre_image.

    For more information, see MongoDB pre-image support

after data

JSON strings that represent the values that are present in a document after the current operation. The presence of an after field in an event message depends on the type of event and the connector configuration. A create event for a MongoDB insert operation always contain an after field, regardless of the capture.mode setting. For update events, the after field is present only when capture.mode is set to one of the following values:

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

    Note

    The after value in a change event message does not necessarily represent the state of a document immediately following the event. The value is not calculated dynamically; instead, after the connector captures a change event, it queries the collection to retrieve the current value of the document.

    For example, imagine a situation in which multiple operations, a, b, and c modify a document in quick succession. When the connector processes, change a, it queries the collection for the full document. In the meantime, changes b and c occur. When the connector receives a response to its query for the full document for change a, it might receive a version of the document that is based on the subsequent changes for b or c. For more information, see the documentation for the capture.mode property.

The following fragment shows the basic structure of a create change event that the connector emits after a MongoDB insert operation:

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

The complex format of the after field in the preceding example provides detailed information about changes that occur in the source database. However, some consumers cannot process messages that contain nested values. To convert the complex nested fields of the original message into a simpler, more universally compatible structure, use the event flattening SMT for MongoDB. The SMT flattens the structure of nested fields in a message, as shown in the following example:

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

For more information about the default structure of messages produced by the Debezium MongoDB connector, see the connector documentation.

7.8.2. Behavior of the Debezium MongoDB event flattening transformation

The event flattening SMT for MongoDB extracts the after field from create or update change event messages emitted by the Debezium MongoDB connector. After the SMT processes the original change event message, it generates a simplified version that contains only the contents of the after field.

Depending on your use case, you can apply the ExtractNewDocumentState SMT to the Debezium MongoDB connector, or to a sink connector that consumes messages that the Debezium connector produces. If you apply the SMT to the Debezium MongoDB connector, the SMT modifies messages that the connector emits before they are sent to Apache Kafka. To ensure that Kafka retains the complete Debezium change event message in its original format, apply the SMT to a sink connector.

When you use the event flattening SMT to process a message emitted from a MongoDB connector, the SMT converts the structure of the records in the original message into properly typed Kafka Connect records that can be consumed by a typical sink connector. For example, the SMT converts the JSON strings that represent the after information in the original message into schema structures that any consumer can process.

Optionally, you can configure the event flattening SMT for MongoDB to modify messages in other ways during processing. For more information, see the configuration topic.

7.8.3. Configuration of the Debezium MongoDB event flattening transformation

Configure the event flattening (ExtractNewDocumentState) SMT for MongoDB for sink connectors that consume the messages emitted by the Debezium MongoDB connector.

The following topics provide details:

7.8.3.1. Example: Basic configuration of the Debezium MongoDB event flattening-transformation

To obtain the default behavior of the SMT, add the SMT to the configuration of a sink connector without specifying any options, as in the following example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

As with any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases. Kafka Connect applies the transformations that you specify in the order in which they are listed.

You can set multiple options for a connector that uses the MongoDB event flattening SMT. The following example shows a configuration that sets the drop.tombstones, delete.handling.mode, and add.headers options for a connector:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.add.headers=op

For more information about the configuration options in the preceding example, see the configuration topic.

Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.

7.8.4. Options for encoding arrays in MongoDB event messages

By default, the event flattening SMT converts MongoDB arrays into arrays that are compatible with Apache Kafka Connect, or Apache Avro schemas. While MongoDB arrays can contain multiple types of elements, all elements in a Kafka array must be of the same type.

To ensure that the SMT encodes arrays in a way that meets the needs of your environment, you can specify the array.encoding configuration option. The following example shows the configuration for setting the array encoding:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

Depending on the configuration, the SMT processes each instance of an array in the source message by using one of the following encoding methods:

array encoding
If array.encoding is set to array (the default), the SMT encodes uses the array datatype to encode arrays in the original message. To ensure correct processing, all elements in an array instance must be of the same type. This option is a restricting one, but it enables downstream clients to easily process arrays.
document encoding
If array.encoding is set to document, the SMT converts each array in the source into a struct of structs, in a manner that is similar to BSON serialization. The main struct contains fields named _0, _1, _2, and so on, where each field name represents the index of an element in the original array. The SMT populates each of these index fields with the values that it retrieves for the equivalent element in the source array. Index names are prefixed with underscores, because Avro encoding prohibits field names that begin with a numeric character.

The following example shows how the Debezium MongoDB connector represents a database document that contains an array that includes heterogeneous data types:

Example 7.2. Example: Document encoding of an array that contains multiple data types

{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

If the array.encoding is set to document, the SMT converts the preceding document into the following format:

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

The document encoding option enables the SMT to process arbitrary arrays that are comprised of heterogeneous elements. However, before you use this option, always verify that the sink connector and other downstream consumers are capable of processing arrays that contain multiple data types.

7.8.5. Flattening nested structures in a MongoDB event message

When a database operation involves an embedded document, the Debezium MongoDB connector emits a Kafka event record that has a structure that reflects the hierarchical structure of the original document. That is, the event message represents nested documents as a set of nested field structure. In environments where downstream connectors cannot process messages that contain nested structures, you can configure the event flattening SMT to flatten hierarchical structures in the message. A flat message structure is better suited to table-like storage.

To configure the SMT to flatten nested structures, set the flatten.struct configuration option to true. In the converted message, field names are constructed to be consistent with the document source. The SMT renames each flattened field by concatenating the name of the parent document field with the name of the nested document field. A delimiter that is defined by the flatten.struct.delimiter option separates the components of the name. The default value of struct.delimiter is an underscore character (_).

The following example shows the configuration for specifying whether the SMT flattens nested structures:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

The following example shows an event message that is emitted by the MongoDB connector. The message includes a field for a document a that contains fields for two nested documents, b and c:

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

The message in the following example shows the output after the SMT for MongoDB flattens the nested structures in the preceding message:

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

In the resulting message, the b and c fields that were nested in the original message are flattened and renamed. The renamed fields are formed by concatenating the name of the parent document a with the names of the nested documents: a_b and a_c. The components of the new field names are separated by an underscore character, as defined by the setting of the struct.delimiter configuration property,

7.8.6. How the Debezium MongoDB connector reports the names of fields removed by $unset operations

In MongoDB, the $unset operator and the $rename operator both remove fields from a document. Because MongoDB collections are schemaless, after an update removes fields from a document, it’s not possible to infer the name of the missing field from the updated document. To support sink connectors or other consumers that might require information about removed fields, Debezium emits update messages that include a removedFields element that lists the names of the deleted fields.

The following example shows part of an update message for an operation that results in the removal of the field a:

"payload": {
  "op": "u",
  "ts_ms": "...",
  "ts_us" : "...",
  "ts_ns" : "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

In the preceding example, the before and after represent the state of the source document before and after the document was updated. These fields are present in the event message that a connector emits only if the capture.mode for the connector is set as described in the following list:

before field

Provides the state of the document before the change. This field is present only when capture.mode is set to one of the following values:

  • change_streams_with_pre_image
  • change_streams_update_full_with_pre_image.
after field

Provides the full state of the document after a change. This field is present only when capture.mode is set to one of the following values:

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

Assuming a connector that is configured to capture full documents, when the ExtractNewDocumentState SMT receives an update message for an $unset event, the SMT re-encodes the message by representing the removed field has a null value, as shown in the following example:

{
    "id": 1,
    "a": null
}

For connectors that are not configured to capture full documents, when the SMT receives an update event for an $unset operation, it produces the following output message:

{
   "a": null
}

7.8.7. Determining the type of the original database operation

After the SMT flattens an event message, the resulting message no longer indicates whether the operation that generated the event was of type create, update or initial snapshot read. Typically, you can identify delete operations by configuring the connectors to expose information about the tombstone or rewrite events that accompany a deletion. For more information about configuring the connector to expose information about tombstones and rewrites in event messages, see the drop.tombstones and delete.handling.mode properties.

To report the type of a database operation in an event message, the SMT can add an op field to one of the following elements:

  • The event message body.
  • A message header.

For example, to add a header property that shows the type of the original operation, add the transform, and then add the add.headers property to the connector configuration, as in the following example:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

Based on the preceding configuration, the SMT reports the event type by adding an op header to the message and assigning it a string value to identify the type of the operation. The assigned string value is based on the op field value in the original MongoDB change event message.

7.8.8. Using the MongoDB event flattening SMT to add Debezium metadata to Kafka records

The event flattening SMT for MongoDB can add metadata fields from the original change event message to the simplified message. The added metadata fields are prefixed with a double underscore ("__"). Adding metadata to the event record makes it possible to include content such as the name of the collection in which a change event occurred, or to include connector-specific fields, such as a replica set name. Currently, the SMT can add fields from the following change event sub-structures only: source, transaction and updateDescription.

For more information about the MongoDB change event structure, see the MongoDB connector documentation.

For example, you might specify the following configuration to add the replica set name (rs) and the collection name for a change event to the final flattened event record:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

The preceding configuration results in the following content being added to the flattened record:

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

If you want the SMT to add metadata fields to delete events, set the value of the delete.handling.mode option to rewrite.

7.8.9. Options for applying the MongoDB extract new document state transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.

7.8.10. Configuration options for the Debezium event flattening transformation for MongoDB

The following table describes the configuration options for the MongoDB event flattening SMT.

PropertyDefaultDescription

array.encoding

array

Specifies the format that the SMT uses when it encodes arrays that it reads from the original event message. Set one of the following options:

array
The SMT uses the array datatype to encode MongoDB arrays into a format that is compatible with Apache Kafka Connect or Apache Avro schemas. If you set this option, verify that the elements in each array instance are of the same type. Although MongoDB allows arrays to contain multiple data types, some downstream clients cannot process arrays.
document
The SMT converts each MongoDB array into a struct of structs, in a manner that is similar to BSON serialization. The main struct contains fields with the names _0, _1, _2, and so forth. To comply with Avro naming standards, the SMT prefixes the numeric name of each index field with an underscore. Each of the numeric field names represents the index of an element in the original array. The SMT populates each of these index fields with the value that it retrieves from the source document for the designated array element.

For more information about the array.coding option, see the options for encoding arrays in MongoDB event messages.

flatten.struct

false

The SMT flattens structures (structs) in the original event message by concatenating the names of nested properties in the message, separated by a configurable delimiter, to form a simple field name.

flatten.struct.delimiter

_

When flatten.struct is set to true, specifies the delimiter that the transformation inserts between field names that it concatenates from the input record to generate field names in the output record.

drop.tombstones

true

Debezium generates a tombstone record for each delete operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To retain tombstone records in the stream, specify drop.tombstones=false.

Note

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

delete.handling.mode

drop

Specifies how the SMT handles the change event records that Debezium generates for delete operations. Set one of the following options:

drop
The SMT removes records for delete operations from the event stream.
none
The SMT retains the original change event record from the event stream. The record contains only "value": "null".
rewrite
The SMT retains a modified version of the change event record from the stream. To provide another way to indicate that the record was deleted, the modified record includes a value field that contains the key/value pairs that were from the original record, and adds __deleted: true to the value.

If you set the rewrite option, you might find that the updated, simplified records for DELETE operations are sufficient for tracking deleted records. In such a case, you might want the SMT to drop tombstone records.
Note

This option is scheduled for removal in a future release. In its place, use the delete.tombstone.handling.mode option.

delete.tombstone.handling.mode

No default

Debezium generates a change event record for each DELETE operation. This setting determines how the MongoDB event flattening SMT handles DELETE events in the stream.

Note

The setting for this option takes precedence over any conflicting settings that you might configure for the deprecated drop.tombstones or delete.handling.mode options.

Set one of the following options:

drop
The SMT removes both DELETE events and `TOMBSTONE`records from the stream.
tombstone (default)
The SMT retains TOMBSTONE records in the stream. The TOMBSTONE record contains only the following value: "value": "null".
rewrite

The SMT retains the change event record in the stream and makes the following changes:

  • Adds a value field to the record that contains the key/value pairs from the before field of the original record.
  • Adds __deleted: true to the value of the record.
  • Removes TOMBSTONE records.

    This setting provides another way to indicate that the record has been deleted.

rewrite-with-tombstone
The SMT behaves as it does when you select the rewrite option, except that it also retains TOMBSTONE records.

delete.tombstone.handling.mode.rewrite-with-id

false

When set to true and delete.tombstone.handling.mode is rewrite, copies id field from the key and includes it as _id in the payload for delete events.

add.headers.prefix

__ (double-underscore)

Set this optional string to prefix a header.

add.headers

No default

Specifies a comma-separated list, with no spaces, of metadata fields that you want the SMT to add to the header of simplified messages. When the original message contains duplicate field names, you can identify the specific field to modify by providing the name of the struct together with the name of the field, for example, source.ts_ms.

Optionally, you can override the original name of a field and assign it a new name by adding an entry in the following format to the list:

<field_name>:<new_field_name>.

For example:

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

The new name values that you specify are case-sensitive.

When the SMT adds metadata fields to the header of the simplified message, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not in the change event original message, the SMT does not add the field to the header.

add.fields.prefix

__ (double-underscore)

Specifies an optional string to prefix to a field name.

add.fields

No default

Set this option to a comma-separated list, with no spaces, of metadata fields to add to the value element of the simplified Kafka message. When the original message contains duplicate field names, you can identify the specific field to modify by providing the name of the struct together with the name of the field, for example, source.ts_ms.
Optionally, you can override the original name of a field and assign it a new name by adding an entry in the following format to the list:

<field_name>:<new_field_name>.

For example:

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

The new name values that you specify are case-sensitive.

When the SMT adds metadata fields to the value element of the simplified message, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.

If you specify a field that is not present in the original change event message, the SMT still adds the specified field to the value element of the modified message.

Known limitations

  • Because MongoDB is a schemaless database, to ensure consistent column definitions when you use Debezium to stream changes to a schema-based data relational database, fields within a collection that have the same name must store the same type of data.
  • Configure the SMT to produce messages in the format that is compatible with the sink connector. If a sink connector requires a "flat" message structure, but it receives a message that encodes an array in the source MongoDB document as a struct of structs, the sink connector cannot process the message.

7.9. Configuring Debezium connectors to use the outbox pattern

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox table
  • Apply the Debezium outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the outbox SMT should capture changes that occur in an outbox table only. For more information, see Options for applying the transformation selectively.

A connector can capture changes in more than one outbox table only if each outbox table has the same structure.

See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.

Note

The outbox event router SMT is not compatible with the MongoDB connector.

MongoDB users can run the MongoDB outbox event router SMT.

The following topics provide details:

7.9.1. Example of a Debezium outbox message

To understand how the Debezium outbox event router SMT is configured, review the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "2.7.3.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484651,
  "ts_ns": 1556890294484651402
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

7.9.2. Outbox table structure expected by Debezium outbox event router SMT

To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
Table 7.8. Descriptions of expected outbox table columns
ColumnEffect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox table column, set the table.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two records to the outbox table. In the first record, the value in the aggregatetype column is customers. In the second record, the value in the aggregatetype column is orders. The connector emits the first record to the outbox.event.customers topic. The connector emits the second record to the outbox.event.orders topic.

To obtain this value from a different outbox table column, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox table column, set the table.field.event.key SMT option in the connector configuration.

payload

A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox table column, set the table.field.event.payload SMT option in the connector configuration.

Additional custom columns

Any additional columns from the outbox table can be added to outbox events either within the payload section or as a message header.

One example could be a column eventType which conveys a user-defined value that helps to categorize or organize events.

7.9.3. Basic Debezium outbox event router SMT configuration

To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter SMT. To obtain the default behavior of the SMT, add it to the connector configuration without specifying any options, as in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation only to events that originate in the outbox table, define an SMT predicate statement that selectively applies the transformation to those events only.

7.9.4. Options for applying the Outbox event router transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

7.9.5. Payload serialization format

The outbox event router SMT supports arbitrary payload formats. The SMT passes on payload column values that it reads from the outbox table without modification. The way that the SMT converts these column values into Kafka message fields depends on how you configure the SMT. Common payload formats for serializing data are JSON and Avro.

7.9.5.1. Using JSON as the serialization format

The default serialization format for the outbox event router SMT is JSON. To use this format, the data type of the source column must be JSON (for example, jsonb in PostgreSQL).

7.9.5.1.1. Producing Expanding escaped JSON String as JSON

When a Debezium outbox message represents the payload as a JSON String, the resulting Kafka message escapes the string as in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

The outbox event router enables you to expand the message content to "real" JSON, deducing the companion schema from the JSON document. The resulting Kafka message is formatted as in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

To enable use of the outbox event router transformation, set the table.expand.json.payload to true, and use the JsonConverter as shown in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

7.9.5.2. Using Apache Avro as the payload format in Debezium outbox messages

Apache Avro is a common framework for serializing data. Using Avro can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

By default, the payload column value (the Avro data) is the only message value. When data is stored in Avro format, the column format must be set to a binary data type, such as bytea in PostgreSQL. The value converter for the SMT must also be set to BinaryDataConverter, so that it propagates the binary value of the payload column as-is into the Kafka message value.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the BinaryDataConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

The following example illustrates how to configure the SMT to use a delegate converter with a Apicurio Registry to convert data into Avro format:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true

Finally, the following example illustrates how to configure the SMT to use a delegate converter with a Confluent Schema Registry to convert data into Avro format:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}
Note

In the preceding configuration examples, because the AvroConverter is configured as a delegate converter, third-party libraries are required. Information about how to add third-party libraries to the classpath is beyond the scope of this document.

7.9.6. Emitting additional fields in Debezium outbox messages

Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order in the aggregatetype column and another column, eventType, whose possible values are order-created and order-shipped. Additional fields can be added with the syntax column:placement:alias.

The allowed values for placement are: - header - envelope - partition

To emit the eventType column value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

The result will be a header on the Kafka message with type as its key, and the value of the eventType column as its value.

To emit the eventType column value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

To control which partition the outbox message is produced on, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

Note that for the partition placement, adding an alias will have no effect.

7.9.7. Options for configuring outbox event router transformation

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 7.9. Descriptions of outbox event router SMT configuration options
OptionDefaultGroupDescription

table.op.invalid.behavior

warn

Table

Determines the behavior of the SMT when there is an UPDATE operation on the outbox table. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox table record.
  • error - The SMT logs an error and continues to the next outbox table record.
  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox table are expected to be INSERT operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed. The SMT automatically filters out DELETE operations on an outbox table.

table.field.event.id

id

Table

Specifies the outbox table column that contains the unique event ID. This ID will be stored in the emitted event’s headers under the id key.

table.field.event.key

aggregateid

Table

Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

table.field.event.timestamp

 

Table

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages.

table.field.event.payload

payload

Table

Specifies the outbox table column that contains the event payload.

table.expand.json.payload

false

Table

Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".

Fore more details, please see the expanding escaped json section.

table.json.payload.null.behavior

ignore

Table

When enable JSON expansion property table.expand.json.payload, determines the behavior of json payload that including an null value on the outbox table. Possible settings are:

  • ignore - Ignore the null value.
  • optional_bytes - Keep the null value, and treat null as optional bytes of connect.

table.fields.additional.placement

 

Table, Envelope

Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the column, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

Configuration examples are in emitting additional fields in Debezium outbox messages.

table.fields.additional.error.on.missing

true

Table, Envelope

Specifies whether this transformation throws an error if a field specified by the table.fields.additional.placement property is not found in the Outbox payload.

table.field.event.schema.version

 

Table, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

route.by.field

aggregatetype

Router

Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table.

route.topic.regex

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the route.topic.replacement SMT option.

The default behavior is that the SMT replaces the default ${routedByValue} variable in the setting of the route.topic.replacement SMT option with the setting of the route.by.field outbox SMT option.

route.topic.replacement

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record. For example, if the aggregatetype value is customers, the topic name is outbox.event.customers.

To change the topic name, you can:

route.tombstone.on.empty.payload

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

7.10. Configuring Debezium MongoDB connectors to use the outbox pattern

Note

This SMT is for use with the Debezium MongoDB connector only. For information about using the outbox event router SMT for relational databases, see Outbox event router.

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox collection
  • Apply the Debezium MongoDB outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the MongoDB outbox SMT should capture changes that occur in an outbox collection only. For more information, see Options for applying the transformation selectively.

A connector can capture changes in more than one outbox collection only if each outbox collection has the same structure.

Note

To use this SMT, operations on the actual business collection(s) and the insert into the outbox collection must be done as part of a multi-document transaction, which have been being supported since MongoDB 4.0, to prevent potential data inconsistencies between business collection(s) and outbox collection. For future update, to enable updating existing data and inserting outbox event in an ACID transaction without multi-document transactions, we have planned to support additional configurations for storing outbox events in a form of a sub-document of the existing collection, rather than an independent outbox collection.

For more information about the outbox pattern, see Reliable Microservices Data Exchange With the Outbox Pattern.

The following topics provide details:

7.10.1. Example of a Debezium MongoDB outbox message

To understand how to configure the Debezium MongoDB outbox event router SMT, consider the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the MongoDB outbox event router SMT generates the preceding message by transforming a raw Debezium change event message as in the following example:

# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "patch": null,
  "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
  "source": {
    "version": "2.7.3.Final",
    "connector": "mongodb",
    "name": "fulfillment",
    "ts_ms": 1558965508000,
    "ts_us": 1558965508000000,
    "ts_ns": 1558965508000000000,
    "snapshot": false,
    "db": "inventory",
    "rs": "rs0",
    "collection": "customers",
    "ord": 31,
    "h": 1546547425148721999
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484452,
  "ts_ns": 1556890294484452697,
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox collection structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

7.10.2. Outbox collection structure expected by Debezium mongodb outbox event router SMT

To apply the default MongoDB outbox event router SMT configuration, your outbox collection is assumed to have the following fields:

{
  "_id": "objectId",
  "aggregatetype": "string",
  "aggregateid": "objectId",
  "type": "string",
  "payload": "object"
}
Table 7.10. Descriptions of expected outbox collection fields
FieldEffect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox collection field, set the collection.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two documents to the outbox collection. In the first document, the value in the aggregatetype field is customers. In the second document, the value in the aggregatetype field is orders. The connector emits the first document to the outbox.event.customers topic. The connector emits the second document to the outbox.event.orders topic.

To obtain this value from a different outbox collection field, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox collection field, set the collection.field.event.key SMT option in the connector configuration.

payload

A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox collection field, set the collection.field.event.payload SMT option in the connector configuration.

Additional custom fields

Any additional fields from the outbox collection can be added to outbox events either within the payload section or as a message header.

One example could be a field eventType which conveys a user-defined value that helps to categorize or organize events.

7.10.3. Basic Debezium MongoDB outbox event router SMT configuration

To configure a Debezium MongoDB connector to support the outbox pattern, configure the outbox.MongoEventRouter SMT. To obtain the default behavior of the SMT, add it to the connector configuration without specifying any options, as in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter

Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation only to events that originate in the outbox collection, define an SMT predicate statement that selectively applies the transformation to those events only.

7.10.4. Options for applying the MongoDB outbox event router transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

7.10.5. Using Avro as the payload format in Debezium MongoDB outbox messages

The MongoDB outbox event router SMT supports arbitrary payload formats. The payload field value in an outbox collection is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter

By default, the payload field value (the Avro data) is the only message value. Configuration of ByteArrayConverter as the value converter propagates the payload field value as-is into the Kafka message value.

Note that this differs from the BinaryDataConverter suggested for other SMTs. This is due to the different approach MongoDB takes to storing byte arrays internally.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the ByteArrayConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

7.10.6. Emitting additional fields in Debezium MongoDB outbox messages

Your outbox collection might contain fields whose values you want to add to the emitted outbox messages. For example, consider an outbox collection that has a value of purchase-order in the aggregatetype field and another field, eventType, whose possible values are order-created and order-shipped. Additional fields can be added with the syntax field:placement:alias.

The allowed values for placement are: - header - envelope - partition

To emit the eventType field value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:header:type

The result will be a header on the Kafka message with type as its key, and the value of the eventType field as its value.

To emit the eventType field value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:envelope:type

To control which partition the outbox message is produced on, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=partitionField:partition

Note that for the partition placement, adding an alias will have no effect.

7.10.7. Expanding escaped JSON String as JSON

By default, the payload of the Debezium outbox message is represented as a string. When the original source of the string is in JSON format, the resulting Kafka message uses escape sequences to represent the string, as shown in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

You can configure the outbox event router to expand the message content, converting the escaped JSON back to its original, unescaped JSON format. In the converted string, the companion schema is deduced from the original JSON document. The following examples shows the expanded JSON in the resulting Kafka message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

To enable string conversion in the transformation, set the value of collection.expand.json.payload to true and use the StringConverter as shown in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

7.10.8. Options for configuring outbox event router transformation

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 7.11. Descriptions of outbox event router SMT configuration options
OptionDefaultGroupDescription

collection.op.invalid.behavior

warn

Collection

Determines the behavior of the SMT when there is an update operation on the outbox collection. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox collection document.
  • error - The SMT logs an error and continues to the next outbox collection document.
  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox collection are expected to be an insert or delete operation. That is, an outbox collection functions as a queue; updates to documents in an outbox collection are not allowed. The SMT automatically filters out delete operations (for removing proceeded outbox events) on an outbox collection.

collection.field.event.id

_id

Collection

Specifies the outbox collection field that contains the unique event ID. This ID will be stored in the emitted event’s headers under the id key.

collection.field.event.key

aggregateid

Collection

Specifies the outbox collection field that contains the event key. When this field contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

collection.field.event.timestamp

 

Collection

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox collection field that contains the timestamp that you want to be in emitted outbox messages.

collection.field.event.payload

payload

Collection

Specifies the outbox collection field that contains the event payload.

collection.expand.json.payload

false

Collection

Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".

Fore more details, please see the expanding escaped json section.

collection.fields.additional.placement

 

Collection, Envelope

Specifies one or more outbox collection fields that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a field and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the field, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

Configuration examples are in emitting additional fields in Debezium outbox messages.

collection.field.event.schema.version

 

Collection, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

route.by.field

aggregatetype

Router

Specifies the name of a field in the outbox collection. By default, the value specified in this field becomes a part of the name of the topic to which the connector emits the outbox messages. For an example, see the description of the expected outbox collection.

route.topic.regex

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox collection documents. This regular expression is part of the setting of the route.topic.replacement SMT option.

When this property is set to the default Router value, the SMT replaces the default ${routedByValue} variable that is set in the route.topic.replacement property with the value that is specified for the route.by.field property.

route.topic.replacement

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is prefixed by the string outbox.event., followed by the value of the aggregatetype field that is assigned to the outbox collection document.

For example, if the aggregatetype value is customers, then connector emits outbox messages to the topic outbox.event.customers.

To change the default name of the destination topic, perform one of the following actions:

route.tombstone.on.empty.payload

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

7.11. Routing records to partitions based on payload fields

By default, when Debezium detects a change in a data collection, the change event that it emits is sent to a topic that uses a single Apache Kafka partition. As described in Customization of Kafka Connect automatic topic creation, you can customize the default configuration to route events to multiple partitions, based on a hash of the primary key.

However, in some cases, you might also want Debezium to route events to a specific topic partition. The partition routing SMT enables you to route events to specific destination partitions based on the values of one or more specified payload fields. To calculate the destination partition, Debezium uses a hash of the specified field values.

7.11.1. Example: Basic configuration of the Debezium partition routing SMT

You configure the partition routing transformation in the Debezium connector’s Kafka Connect configuration. The configuration specifies the following parameters:

partition.payload.fields
Specifies the fields in the event payload that the SMT uses to calculate the destination partition. You can use dot notation to specify nested payload fields.
partition.topic.num
Specifies the number of partitions in the destination topic.
partition.hash.function
Specifies hash function to be used hash of the fields which would determine number of the destination partition.

By default, Debezium routes all change event records for a configured data collection to a single Apache Kafka topic. Connectors do not direct event records to specific partitions in the topic.

To configure a Debezium connector to route events to a specific partition, configure the PartitionRouting SMT in the Kafka Connect configuration for the Debezium connector.

For example, you might add the following configuration in your connector configuration.

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

Based on the preceding configuration, whenever the SMT receives a message that is bound for a topic with a name that begin with the prefix, fulfillment, it redirects the message to a specific topic partition.

The SMT computes the target partition from a hash of the value of the name field in the message payload. By specifying the`allTopic` predicate, the configuration selectively applies the SMT. The change prefix is a special keyword that enables the SMT to automatically refer to elements in the payload that describe the before or after states of the data. If a specified field is not present in the event message, the SMT ignores it. If none of the fields exist in the message, then the transformation ignores the event message entirely, and delivers the original version of the message to the default destination topic. The number of partitions specified by the topic.num setting in the SMT configuration must match the number of partitions specified by the Kafka Connect configuration. For example, in the preceding configuration example, the value specified by the Kafka Connect property topic.creation.default.partitions matches the topic.num value in the SMT configuration.

Given this Products table

Table 7.12. Products table

id

name

description

weight

101

scooter

Small 2-wheel scooter

3.14

102

car battery

12V car battery

8.1

103

12-pack drill bits

12-pack of drill bits with sizes ranging from #40 to #3

0.8

104

hammer

12oz carpenter’s hammer

0.75

105

hammer

14oz carpenter’s hammer

0.875

106

hammer

16oz carpenter’s hammer

1.0

107

rocks

box of assorted rocks

5.3

108

jacket

water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

Based on the configuration, the SMT routes change events for the records that have the field name hammer to the same partition. That is, the items with id values 104, 105, and 106 are routed to the same partition.

7.11.2. Example: Advanced configuration of the Debezium partition routing SMT

Suppose that you want to route events from two data collections (t1, t2) to the same topic (for example, my_topic), and you want to partition events from data collection t1 by using field f1, and partition events from data collection t2 by using field f2.

You could apply the following configuration:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

The preceding configuration does not specify how to re-route events so that they are sent to a specific destination topic. For information about how to send events to topics other than their default destination topics, see the Topic Routing SMT., see the Topic Routing SMT.

7.11.3. Migrating from the Debezium ComputePartition SMT

The Debezium ComputePartition SMT has been discontinued. The information in the following section describes how migrate from the ComputePartition SMT to the new PartitionRouting SMT.

Assuming that the configuration sets the same number of partitions for all topics, replace the following ComputePartition`configuration with the `PartitionRouting SMT. The following examples provide a comparison of the two configuration.

Example: Legacy ComputePartition configuration

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

Replace the preceding ComputePartition with the following PartitionRouting configuration. Example: PartitionRouting configuration that replaces the earlier ComputePartition configuration

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

If the SMT emits events to topics that do not share the same number of partitions, you must specify unique partition.num.mappings values for each topic. For example, in the following example, the topic for the legacy products collection is configured with 3 partitions, and the topic for the orders data collection is configured with 2 partitions:

Example: Legacy ComputePartition configuration that sets unique partition values for different topics

...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

Replace the preceding ComputePartition configuration with the following PartitionRouting configuration: .PartitionRouting configuration that sets unique partition.topic.num values for different topics

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

7.11.4. Options for configuring the partition routing transformation

The following table lists the configuration options that you can set for the partition routing SMT.

Table 7.13. Partition routing SMT (PartitionRouting) configuration options

Property

Default

Description

partition.payload.fields

 

Specifies the fields in the event payload that the SMT uses to calculate the target partition. Use dot notation if you want the SMT to add fields from the original payload to specific levels in the output data structure. To access fields related to data collections, you can use: after, before, or change. The 'change' field is a special field that results in the SMT automatically populating content in the 'after' or 'before' elements, depending on type of operation. If a specified field is not present in a record, the SMT skips it. For example, after.name,source.table,change.name

partition.topic.num

 

The number of partitions for the topic on which this SMT acts. Use the TopicNameMatches predicate to filter records by topic.

partition.hash.function

java

Hash function to be used when computing hash of the fields which would determine number of the destination partition. Possible values are:

java - standard Java Object::hashCode function

murmur - latest version of MurmurHash function, MurmurHash3

This configuration is optional. If not specified or invalid value is used, the default value will be used.

7.12. Converting timezone values in Debezium event records

When Debezium emits event records, the timezone values for timestamp fields in the record can vary, depending on the type and configuration of the data source. To maintain data consistency and precision within data processing pipelines and applications, you can use the Timezone Converter SMT to ensure that event records use a consistent timezone to represent timestamp data.

The SMT converts the value of the specified field to the target timezone by using the converted.timezone configuration option. You can specify the target timezone as a geographic timezone, for example, America/New_York, or as a UTC offset, such as +02:00. It is assumed that the fields of the record are in UTC. Along with the specified timezone, the SMT also provides configuration options to include or exclude specific fields from timezone conversion using the include.list and exclude.list configuration options.

The SMT supports all Debezium and Kafka Connect temporal and non-temporal types.

The following topics provide details:

Note

To comply with daylight savings time, you must specify a geographic timezone in the converted.timezone configuration option. If you specify a UTC offset, the transform applies a fixed offset from UTC that isn’t accurate for regions that observe daylight savings time. Providing a fixed UTC offset is useful when converting timestamp fields to a specific timezone that does not observe daylight saving time.

Note

The include.list and exclude.list configuration options are mutually exclusive. You must specify only one of the options.

The SMT also allows conversion of event metadata fields in the source information block, such as ts_ms to the target timezone. In order to convert the metadata fields, you must include the source prefix in the fieldname of the include.list or exclude.list configuration option.

Note

If the schema for timestamp fields in the source information block, like ts_ms, is currently set to INT64, which is not a timestamp type, future releases aim to support the conversion of such fields by introducing compatibility for a timestamp schema.

7.12.1. Example: Basic Debezium timezone converter SMT configuration

Configure the TimezoneConverter SMT in the Kafka Connect configuration for a connector to convert the time-based fields in an event record to a target timezone.

For example, to convert all timestamp fields in an event record from UTC to the Pacific/Easter timezone, add the following lines to your connector configuration:

transforms=convertTimezone
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.convertTimezone.converted.timezone=Pacific/Easter

7.12.1.1. Effect of applying the TimezoneConverter SMT to a Debezium event record

The following examples show how the TimezoneConverter transformation modifies the timestamp fields in an event record. The first example shows a Debezium event record that is not processed by the transformation; the record retains its original timestamp values. The next example shows the same event record after the transformation is applied. Per the configuration specified in the basic configuration example, the SMT converts the original UTC values of timestamp fields in the source message to Pacific/Easter timezone values.

Example 7.3. Event record value before processing by the TimezoneConverter transformation

The value of the created_at field shows the UTC time.

{
        "before": null,
        "after": {
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org",
            "created_at": "2011-01-11T16:40:30.123456789+00:00"
        },
        "source": {
            "version": "2.7.3.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863000,
            "ts_ns": 1559033904863000000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1559033904863,
        "ts_us": 1559033904863875,
        "ts_ns": 1559033904863875124
    }

Example 7.4. Event record value after processing by the TimezoneConverter transformation

The SMT converts the original UTC value of the created_at field to the time in the target Pacific/Easter timezone that is specified in the Basic configuration example. The SMT also adds an event_timestamp field.

{
        "before": null,
        "after": {
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org",
            "created_at": "2011-01-11T11:40:30.123456789-05:00"
        },
        "source": {
            "version": "2.7.3.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863752,
            "ts_ns": 1559033904863752000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null,
            "id": 100
        },
        "op": "c",
        "ts_ms": 1559033904863,
        "ts_us": 1559033904863971,
        "ts_ns": 1559033904863971541,
        "event_timestamp": 1626102708861
    }

7.12.2. Example: Advanced Debezium timezone converter SMT configuration

Instead of converting all timestamp fields in an event record, you can configure the SMT to convert specific fields only. The following example shows how you might use the include.list option in the SMT configuration to convert only the created_at, and updated_at timestamp fields in an event record. The following configuration uses a fixed offset, rather than a geographic timezone designator, to convert the time from UTC to +05:30.

transforms=convertTimezone
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.convertTimezone.converted.timezone=+05:30
transforms.convertTimezone.include.list=source:customers:created_at,customers:updated_at

In some cases, you might want to exclude specific timestamp fields from timezone conversion. For example, to exclude the updated_at timestamp field in an event record from timezone conversion, use the exclude.list configuration option as in the following example:

transforms=convertTimezone
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.convertTimezone.converted.timezone=+05:30
transforms.convertTimezone.exclude.list=source:customers:updated_at

7.12.3. Options for configuring the Debezium timezone converter transformation

The following table lists the configuration options for the TimezoneConverter SMT.

Table 7.14. TimezoneConverter SMT configuration options

Property

Description

Type

Importance

converted.timezone

A string that specifies the target timezone to which the timestamp fields should be converted. The target timezone can be specified as a geographic timezone, such as, America/New_York, or as a UTC offset, for example, +02:00.

string

high

include.list

A comma-separated list of rules that specify the fields that the SMT includes for timezone conversion. Specify rules by using one of the following formats:

source:<tablename>
Matches Debezium change events with source information blocks that have the specified table name. The SMT converts all time-based fields in the matched table.
source:<tablename>:<fieldname>
Matches Debezium change events with source information blocks that have the specified table name. The SMT converts only fields in the specified table that have the specified field name. fieldname can be prefixed with before, after, or source to include the appropriate field in the event record. If no prefix is specified, both before and after fields are converted.
topic:<topicname>
Matches events from the specified topic name, converting all time-based fields in the event record.
topic:<topicname>:<fieldname>
Matches events from the specified topic name, and converts values for the specified fields only. fieldname can be prefixed with before, after, or source to include the appropriate field in the event record. If no prefix is specified, both before and after fields are converted.
<matchname>:<fieldname>
Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name. The SMT converts values for the specified field name only. fieldname can be prefixed with before, after, or source to include the appropriate field in the event record. If no prefix is specified, both before and after fields are converted.

list

medium

exclude.list

A comma-separated list of rules that specify the fields to exclude from timezone conversion. Specify rules by using one of the following formats:

source:<tablename>
Matches Debezium change events with source information blocks that have the specified table name. The SMT excludes all time-based fields in the matched table from conversion.
source:<tablename>:<fieldname>
Matches Debezium change events with source information blocks that have the specified table name. The SMT excludes from conversion fields in the specified table that match the specified field name. fieldname can be prefixed with before, after, or source to exclude the appropriate field in the event record. If no prefix is specified, both before and after fields are excluded from conversion.
topic:<topicname>
Matches events from the specified topic name, and excludes from conversion all time-based fields in the topic.
topic:<topicname>:<fieldname>
Matches events from the specified topic name, and excludes from conversion any fields in the topic that have the specified name. fieldname can be prefixed with before, after, or source to exclude the appropriate field in the event record. If no prefix is specified, both before and after fields are excluded from conversion.
<matchname>:<fieldname>
Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name. The SMT excludes from conversion only fields that have the specified name. fieldname can be prefixed with before, after, or source to exclude the appropriate field in the event record. If no prefix is specified, both before and after fields are excluded from conversion.

list

medium

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.