Chapter 9. Configuring Debezium connectors for your application
When default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.
- Topic router SMT re-routes data change event records to topics that you specify.
- Content-based router SMT evaluates data change event record content and re-routes event records to particular topics according to content.
- Filter SMT uses an expression that you specify to evaluate data change event records. The connector streams only those events that evaluate to true.
- Event flattening SMT flattens the complex structure of a data change event record into the simplified format that might be required by some Kafka consumers.
- Avro serialization for PostgreSQL, MongoDB, or SQL Server connectors makes it easier for change event record consumers to adapt to a changing record schema.
- Outbox event router SMT provides support for the outbox pattern.
- CloudEvents converter enables a Debezium connector to emit change event records that conform to the CloudEvents specification.
9.1. Routing change event records to topics that you specify
Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the ByLogicalTableRouter
single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:
- An expression for identifying the records to re-route
- An expression that resolves to the destination topic
- How to ensure a unique key among the records being re-routed to the destination topic
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.
The ByLogicalTableRouter
transformation is a Kafka Connect SMT.
The following topics provide details:
9.1.1. Use case for routing records to topics that you specify
The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.
Logical tables
A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table
and db_shard2.my_table
. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.
Partitioned PostgreSQL tables
When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the ByLogicalTableRouter
SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false
so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.
9.1.2. Example of routing records for multiple tables to one topic
To route change event records for multiple physical tables to the same topic, configure the ByLogicalTableRouter
transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the ByLogicalTableRouter
SMT requires you to specify regular expressions that determine:
- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
For example, configuration in a .properties
file looks like this:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.
In the example, the regular expression,
(.)customers_shard(.)
matches records for changes to tables whose names include thecustomers_shard
string. This would re-route records for tables with the following names:myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the
myserver.mydb.customers_all_shards
topic.
9.1.3. Ensuring unique keys across records routed to the same topic
A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1
table might have the same key value as a row in the myserver.mydb.customers_shard2
table.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the ByLogicalTableRouter
transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier
. The value of the inserted field is the default destination topic name.
If you want to, you can configure the ByLogicalTableRouter
transformation to insert a different field into the key. To do this, specify the key.field.name
option and set it to a field name that does not clash with existing primary key field names. For example:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
This example adds the shard_id
field to the key structure in routed records.
If you want to adjust the value of the key’s new field, configure both of these options:
key.field.regex
- Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
- Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.
For example:
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
With this configuration, suppose that the default destination topic names are:
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1
, 2
, or 3
.
If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness
property to false
:
... transforms.Reroute.key.enforce.uniqueness=false ...
9.1.4. Options for configuring topic routing transformation
Option | Default | Description |
---|---|---|
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic. | ||
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for | ||
|
Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. | |
|
Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, | |
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, | ||
Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for |
9.2. Routing change event records to topics according to event content
By default, Debezium streams all of the change events that it reads from a table to a single static topic. However, there might be situations in which you might want to reroute selected events to other topics, based on the event content. The process of routing messages based on their content is described in the Content-based routing messaging pattern. To apply this pattern in Debezium, you use the content-based routing single message transform (SMT) to write expressions that are evaluated for each event. Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.
The Debezium content-based routing SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
While it is possible to use Java to create a custom SMT to encode routing logic, using a custom-coded SMT has its drawbacks. For example:
- It is necessary to compile the transformation up front and deploy it to Kafka Connect.
- Every change needs code recompilation and redeployment, leading to inflexible operations.
The content-based routing SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).
Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR 223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.
9.2.1. Setting up the Debezium content-based-routing SMT
For security reasons, the content-based routing SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.2.4.Final.tar.gz
. To use the content-based routing SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.
After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.
Procedure
-
Download the Debezium scripting SMT archive (
debezium-scripting-1.2.4.Final.tar.gz
) from https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions. - Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
- Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
- Restart the Kafka Connect process to pick up the new JAR files.
9.2.2. Example: Debezium basic content-based routing configuration
To configure a Debezium connector to route change event records based on the event content, you configure the ContentBasedRouter
SMT in the Kafka Connect configuration for the connector.
Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria. In the configuration, you create a regular expression that defines routing criteria. The expression defines a pattern for evaluating event records. It also specifies the name of a destination topic where events that match the pattern are routed. The pattern that you specify might designate an event type, such as a table insert, update, or delete operation. You might also define a pattern that matches a value in a specific column or row.
For example, to reroute all update (u
) records to an updates
topic, you might add the following configuration to your connector configuration:
... transforms=route transforms.route.type=io.debezium.transforms.ContentBasedRouter transforms.route.language=jsr223.groovy transforms.route.topic.expression=value.op == 'u' ? 'updates' : null ...
The preceding example specifies the use of the Groovy
expression language.
Records that do not match the pattern are routed to the default topic.
9.2.3. Variables for use in Debezium content-based routing expressions
Debezium binds certain variables into the evaluation context for the SMT. When you create expressions to specify conditions to control the routing destination, the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.
The following table lists the variables that Debezium binds into the evaluation context for the content-based routing SMT:
Name | Description | Type |
---|---|---|
| A key of the message. |
|
| A value of the message. |
|
| Schema of the message key. |
|
| Schema of the message value. |
|
| Name of the target topic. | String |
|
A Java map of message headers. The key field is the header name. The
|
|
An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the routing condition in an expression evaluates to true
, the message is retained. When the routing condition evaluates to false
, the message is removed.
Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.
9.2.4. Configuration of content-based routing conditions for other scripting languages
The way that you express content-based routing conditions depends on the scripting language that you use. For example, as shown in this basic Debezium content-based routing SMT example, when you use Groovy
as the expression language, the following expression reroutes all update (u
) records to the updates
topic, while routing other records to the default topic:
value.op == 'u' ? 'updates' : null
Other languages use different methods to express the same condition.
The Debezium MongoDB connector emits the after
and patch
fields as serialized JSON documents rather than as structures. To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState
SMT.
You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json
artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'
.
Javascript
When you use JavaScript as the expression language, you can call the Struct#get()
method to specify the content-based routing condition, as in the following example:
value.get('op') == 'u' ? 'updates' : null
Javascript with Graal.js
When you create coentent-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy. For example:
value.op == 'u' ? 'updates' : null
9.2.5. Options for configuring the content-based routing transformation
Property | Default | Description |
---|---|---|
An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic. If the name of the destination topic matches the value in | ||
The language in which the expression is written. Must begin with | ||
The expression to be evaluated for every message. Must evaluate to a | ||
|
Specifies how the transformation handles
|
9.3. Filtering Debezium change event records
By default, Debezium delivers every data change event that it receives to the Kafka broker. However, in many cases, you might be interested in only a subset of the events emitted by the producer. To enable you to process only the records that are relevant to you, Debezium provides the filter simple message transform (SMT).
The Debezium filter SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
While it is possible to use Java to create a custom SMT to encode filtering logic, using a custom-coded SMT has its drawbacks. For example:
- It is necessary to compile the transformation up front and deploy it to Kafka Connect.
- Every change needs code recompilation and redeployment, leading to inflexible operations.
The filter SMT supports scripting languages that integrate with JSR 223 (Scripting for the Java™ Platform).
Debezium does not come with any implementations of the JSR 223 API. To use an expression language with Debezium, you must download the JSR 223 script engine implementation for the language, and add to your Debezium connector plug-in directories, along any other JAR files used by the language implementation. For example, for Groovy 3, you can download its JSR 223 implementation from https://groovy-lang.org/. The JSR223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.
9.3.1. Setting up the Debezium filter SMT
For security reasons, the filter SMT is not included with the Debezium connector archives. Instead, it is provided in a separate artifact, debezium-scripting-1.2.4.Final.tar.gz
. To use the filter SMT with a Debezium connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.
After the filter SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions. To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the filter SMT.
Procedure
-
Download the Debezium scripting SMT archive (
debezium-scripting-1.2.4.Final.tar.gz
) from https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions. - Extract the contents of the archive into the Debezium plug-in directories of your Kafka Connect environment.
- Obtain a JSR-223 script engine implementation and add its contents to the Debezium plug-in directories of your Kafka Connect environment.
- Restart the Kafka Connect process to pick up the new JAR files.
9.3.2. Example: Debezium basic filter SMT configuration
You configure the filter transformation in the Debezium connector’s Kafka Connect configuration. In the configuration, you specify the events that you are interested in by defining filter conditions that are based on business rules. As the filter SMT processes the event stream, it evaluates each event against the configured filter conditions. Only events that meet the criteria of the filter conditions are passed to the broker.
To configure a Debezium connector to filter change event records, configure the Filter
SMT in the Kafka Connect configuration for the Debezium connector. Configuration of the filter SMT requires you to specify a regular expression that defines the filtering criteria.
For example, you might add the following configuration in your connector configuration.
... transforms=filter transforms.filter.type=io.debezium.transforms.Filter transforms.filter.language=jsr223.groovy transforms.filter.condition=value.op == 'u' && value.before.id == 2 ...
The preceding example specifies the use of the Groovy
expression language. The regular expression value.op == 'u' && value.before.id == 2
removes all messages, except those that represent update (u
) records with id
values that are equal to 2
.
9.3.3. Variables for use in filter expressions
Debezium binds certain variables into the evaluation context for the filter SMT. When you create expressions to specify filter conditions, you can use the variables that Debezium binds into the evaluation context. By binding variables, Debezium enables the SMT to look up and interpret their values as it evaluates the conditions in an expression.
The following table lists the variables that Debezium binds into the evaluation context for the filter SMT:
Name | Description | Type |
---|---|---|
| A key of the message. |
|
| A value of the message. |
|
| Schema of the message key. |
|
| Schema of the message value. |
|
| Name of the target topic. | String |
|
A Java map of message headers. The key field is the header name. The
|
|
An expression can invoke arbitrary methods on its variables. Expressions should resolve to a Boolean value that determines how the SMT dispositions the message. When the filter condition in an expression evaluates to true
, the message is retained. When the filter condition evaluates to false
, the message is removed.
Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.
9.3.4. Filter condition configuration for other scripting languages
The way that you express filtering conditions depends on the scripting language that you use.
For example, as shown in this basic filter SMT example, when you use Groovy
as the expression language, the following expression removes all messages, except for update records that have id
values set to 2
:
value.op == 'u' && value.before.id == 2
Other languages use different methods to express the same condition.
The Debezium MongoDB connector emits the after
and patch
fields as serialized JSON documents rather than as structures. To use the filter SMT with the MongoDB connector, you must first unwind the fields by applying the ExtractNewDocumentState
SMT.
You could also take the approach of using a JSON parser within the expression. For example, if you use Groovy as the expression language, add the groovy-json
artifact to the classpath, and then add an expression such as (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'
.
Javascript
If you use JavaScript as the expression language, you can call the Struct#get()
method to specify the filtering condition, as in the following example:
value.get('op') == 'u' && value.get('before').get('id') == 2
Javascript with Graal.js
If you use JavaScript with Graal.js to define filtering conditions, you use an approach that is similar to the one that you use with Groovy. For example:
value.op == 'u' && value.before.id == 2
9.3.5. Options for configuring filter transformation
The following table lists the configuration options that you can use with the filter SMT.
Property | Default | Description |
---|---|---|
An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply filtering logic. If the name of the destination topic matches the value in | ||
The language in which the expression is written. Must begin with | ||
The expression to be evaluated for every message. Must evaluate to a Boolean value where a result of | ||
|
Specifies how the transformation handles
|
9.4. Extracting source record after
state from Debezium change events
A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the ExtractNewRecordState
single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
The ExtractNewRecordState
transformation is a Kafka Connect SMT.
The transformation is available to only SQL database connectors.
The following topics provide details:
- Section 9.4.1, “Description of Debezium change event structure”
-
Section 9.4.2, “Behavior of Debezium
ExtractNewRecordState
transformation” -
Section 9.4.3, “Configuration of
ExtractNewRecordState
transformation” - Section 9.4.4, “Example of adding metadata to the Kafka record”
-
Section 9.4.5, “Options for configuring
ExtractNewRecordState
transformation”
9.4.1. Description of Debezium change event structure
Debezium generates data change events that have a complex structure. Each event consists of three parts:
Metadata, which includes but is not limited to:
- The operation that made the change
- Source information such as the names of the database and table where the change was made
- Time stamp for when the change was made
- Optional transaction information
- Row data before the change
- Row data after the change
For example, the structure of an UPDATE
change event looks like this:
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:
{ "field1" : "newvalue1", "field2" : "newvalue2" }
To provide the needed Kafka record format for consumers, configure the ExtractNewRecordState
SMT.
9.4.2. Behavior of Debezium ExtractNewRecordState
transformation
The ExtractNewRecordState
SMT extracts the after
field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after
field to create a simple Kafka record.
You can configure the ExtractNewRecordState
SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring ExtractNewRecordState
for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
- Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
-
Keep Kafka records that contain change events for
DELETE
operations in the stream. The default behavior is that the SMT drops Kafka records forDELETE
operation change events because most consumers cannot yet handle them.
A database DELETE
operation causes Debezium to generate two Kafka records:
-
A record that contains
"op": "d",
thebefore
row data, and some other fields. -
A tombstone record that has the same key as the deleted row and a value of
null
. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.
Instead of dropping the record that contains the before
row data, you can configure the ExtractNewRecordState
SMT to do one of the following:
-
Keep the record in the stream and edit it to have only the
"value": "null"
field. -
Keep the record in the stream and edit it to have a
value
field that contains the key/value pairs that were in thebefore
field with an added"__deleted": "true"
entry.
Similary, instead of dropping the tombstone record, you can configure the ExtractNewRecordState
SMT to keep the tombstone record in the stream.
9.4.3. Configuration of ExtractNewRecordState
transformation
Configure the Debezium ExtractNewRecordState
SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties
file, you would specify something like the following:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
As for any Kafka Connect connector configuration, you can set transforms=
to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following .properties
example sets several ExtractNewRecordState
options:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
Keeps tombstone records for
DELETE
operations in the event stream. delete.handling.mode=rewrite
For
DELETE
operations, edits the Kafka record by flattening thevalue
field that was in the change event. Thevalue
field directly contains the key/value pairs that were in thebefore
field. The SMT adds__deleted
and sets it totrue
, for example:"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
Adds change event metadata for the
table
andlsn
fields to the simplified Kafka record.
9.4.4. Example of adding metadata to the Kafka record
The ExtractNewRecordState
SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:
- The type of operation that made the change
- The name of the database or table that was changed
- Connector-specific fields such as the Postgres LSN field
To add metadata to the simplified Kafka record’s header, specify the add.header
option. To add metadata to the simplified Kafka record’s value, specify the add.fields
option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
With that configuration, a simplified Kafka record would contain something like the following:
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
Also, simplified Kafka records would have a __db
header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata to a simplified Kafka record that is for a DELETE
operation, you must also configure delete.handling.mode=rewrite
.
9.4.5. Options for configuring ExtractNewRecordState
transformation
The following table describes the options that you can specify for the ExtractNewRecordState
SMT.
Option | Default | Description |
---|---|---|
|
Debezium generates a tombstone record for each | |
|
Debezium generates a change event record for each | |
To use row data to determine the topic to route the record to, set this option to an | ||
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example | ||
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example |
9.5. Configuring Debezium connectors to use Avro serialization
Using Avro to serialize record keys and values is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector does the following:
- Applies configured transformations
- Serializes the record key and value into a binary form by using the configured Kafka Connect converters
- Writes the record to the correct Kafka topic
You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
Setting these properties to false
excludes the verbose schema information from each record.
Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for change event record consumers to adapt to a changing record schema.
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for Red Hat Integration - Service Registry.
9.5.1. About the Service Registry
Red Hat Integration - Service Registry provides several components that work with Avro:
- An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
An API and schema registry that tracks:
- Avro schemas that are used in Kafka topics
- Where the Avro converter sends the generated Avro schemas
Since the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
- Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Service Registry with Debezium, add Service Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.
The Service Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
9.5.2. Overview of deploying a Debezium connector that uses Avro serialization
To deploy a Debezium connector that uses Avro serialization, there are three main tasks:
- Deploy a Red Hat Integration - Service Registry instance by following the instructions in Getting Started with Service Registry.
- Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/api key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/api value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
9.5.3. Deploying connectors that use Avro in Debezium containers
In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you build a custom Kafka Connect container image for Debezium, and you configure the Debezium connector to use the Avro converter.
Prerequisites
- You have Docker installed and sufficient rights to create and manage containers.
- You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.
Procedure
Deploy an instance of Service Registry. See Getting Started with Service Registry, Installing Service Registry from the OpenShift OperatorHub, which provides instructions for:
- Installing AMQ Streams
- Setting up AMQ Streams storage
- Installing Service Registry
Extract the Debezium connector archive(s) to create a directory structure for the connector plug-in(s). If you downloaded and extracted the archive for each Debezium connector, the structure looks like this:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:
- Go to the Red Hat Integration download site and download the Service Registry Kafka Connect zip file.
- Extract the archive into the desired Debezium connector directory.
To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. While this duplicates the files, it removes the possibility of conflicting dependencies.
Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:
Create a new
Dockerfile
by usingregistry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0
as the base image. In the following example, you would replace my-plugins with the name of your plug-ins directory:FROM registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the
/opt/kafka/plugins
directory.Build the docker container image. For example, if you saved the docker file that you created in the previous step as
debezium-container-with-avro
, then you would run the following command:docker build -t debezium-container-with-avro:latest
Push your custom image to your container registry, for example:
docker push debezium-container-with-avro:latest
Point to the new container image. Do one of the following:
Edit the
KafkaConnect.spec.image
property of theKafkaConnect
custom resource. If set, this property overrides theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable in the Cluster Operator. For example:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
In the
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
file, edit theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:
Create a Debezium connector instance. The following
inventory-connector.yaml
file example creates aKafkaConnector
custom resource that defines a MySQL connector instance that is configured to use the Avro converter:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: dbserver1 database.whitelist: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Apply the connector instance, for example:
oc apply -f inventory-connector.yaml
This registers
inventory-connector
and the connector starts to run against theinventory
database.
Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example,
inventory-connector
starts.Display the Kafka Connect log output:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
Taking the snapshot involves a number of steps:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
After completing the snapshot, Debezium begins tracking changes in, for example, the
inventory
database’sbinlog
for change events:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
9.5.4. About Avro name requirements
As stated in the Avro documentation, names must adhere to the following rules:
-
Start with
[A-Za-z_]
-
Subsequently contains only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, sanitize.field.names
that you can set to true
if you have columns that do not adhere to Avro rules for names. Setting sanitize.field.names
to true
allows serialization of non-conformant fields without having to actually modify your schema.
9.6. Configuring Debezium connectors to use the outbox pattern
The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.
To implement the outbox pattern in a Debezium application, configure a Debezium connector to:
- Capture changes in an outbox table
- Apply the Debezium outbox event router single message transformation (SMT)
A Debezium connector that is configured to apply the outbox SMT should capture changes in only an outbox table. A connector can capture changes in more than one outbox table only if each outbox table has the same structure.
The Debezium outbox event router SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.
The outbox event router SMT does not support the MongoDB connector.
The following topics provide details:
- Section 9.6.1, “Example of a Debezium outbox message”
- Section 9.6.2, “Outbox table structure expected by Debezium outbox event router SMT”
- Section 9.6.3, “Basic Debezium outbox event router SMT configuration”
- Section 9.6.4, “Using Avro as the payload format in Debezium outbox messages”
- Section 9.6.5, “Emitting additional fields in Debezium outbox messages”
- Section 9.6.6, “Options for configuring outbox event router transformation”
9.6.1. Example of a Debezium outbox message
To learn about how to configure the Debezium outbox event router SMT, consider the following example of a Debezium outbox message:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "1.2.4.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484 }
This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.
9.6.2. Outbox table structure expected by Debezium outbox event router SMT
To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
Column | Effect |
|
Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages. |
|
Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default |
|
Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. |
| A user-defined value that helps categorize or organize events. |
| The representation of the event itself. The default structure is JSON. The content in this field becomes one of these:
To obtain the event payload from a different outbox table column, set the |
9.6.3. Basic Debezium outbox event router SMT configuration
To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter
SMT. For example, the basic configuration in a .properties
file looks like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
9.6.4. Using Avro as the payload format in Debezium outbox messages
The outbox event router SMT supports arbitrary payload formats. The payload
column value in an outbox table is passed on transparently. An alternative to working with JSON is to use Avro. This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer
class to serialize GenericRecord
instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.ByteBufferConverter
By default, the payload
column value (the Avro data) is the only message value. Configuration of ByteBufferConverter
as the value converter propagates the payload
column value as-is into the Kafka message value.
9.6.5. Emitting additional fields in Debezium outbox messages
Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order
in the aggregatetype
column and another column, eventType
, whose possible values are order-created
and order-shipped
. To emit the eventType
column value in the outbox message header, configure the SMT like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=type:header:eventType
To emit the eventType
column value in the outbox message envelope, configure the SMT like this:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=type:envelope:eventType
9.6.6. Options for configuring outbox event router transformation
The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.
Option | Default | Group | Description |
---|---|---|---|
| Table | Specifies the outbox table column that contains the unique event ID. | |
| Table | Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions. | |
Table | By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages. | ||
| Table | Specifies the outbox table column that contains the event payload. | |
| Table | Specifies the outbox table column that contains the payload ID. | |
Table, Envelope | Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:
To specify an alias for the column, specify a trio with the alias as the third value, for example:
The second value is the placement and it must always be Configuration examples are in emitting additional fields in Debezium outbox messages. | ||
Table, Schema | When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc. | ||
| Router | Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table. | |
| Router |
Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the | |
| Router |
Specifies the name of the topic to which the connector emits outbox messages. The default topic name is
| |
| Router |
Indicates whether an empty or | |
| Debezium |
Determines the behavior of the SMT when there is an
All changes in an outbox table are expected to be |
9.7. Emitting change event records in CloudEvents format
CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a MongoDB, MySQL, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.
Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
The CloudEvents specification defines:
- A set of standardized event attributes
- Rules for defining custom attributes
- Encoding rules for mapping event formats to serialized representations such as JSON or Avro
- Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP
To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter
, which is a Kafka Connect message converter.
Currently, only structured mapping mode is supported. The CloudEvents change event envelope can be JSON or Avro and each envelope type supports JSON or Avro as the data
format. It is expected that a future Debezium release will support binary mapping mode. For information about using Avro, see:
9.7.1. Example change event records in CloudEvents format
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data
format.
{ "id" : "name:test_server;lsn:29274832;txId:565", 1 "source" : "/debezium/postgresql/test_server", 2 "specversion" : "1.0", 3 "type" : "io.debezium.postgresql.datachangeevent", 4 "time" : "2020-01-13T13:55:39.738Z", 5 "datacontenttype" : "application/json", 6 "iodebeziumop" : "r", 7 "iodebeziumversion" : "1.2.4.Final", 8 "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "565", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", 9 "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { 10 "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
Item | Description |
---|---|
1 | Unique ID that the connector generates for the change event based on the change event’s content. |
2 |
The source of the event, which is the logical name of the database as specified by the |
3 | The CloudEvents specification version. |
4 |
Connector type that generated the change event. The format of this field is |
5 | Time of the change in the source database. |
6 |
Describes the content type of the |
7 |
An operation identifier. Possible values are |
8 |
All |
9 |
When enabled in the connector, each |
10 |
The actual data change itself. Depending on the operation and the connector, the data might contain |
The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data
format.
{ "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro" 1 "dataschema" : "http://my-registry/schemas/ids/1", 2 "iodebeziumop" : "r", "iodebeziumversion" : "1.2.4.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" 3 }
Item | Description |
---|---|
1 |
Indicates that the |
2 | URI of the schema to which the Avro data adheres. |
3 |
The |
It is also possible to use Avro for the envelope as well as the data
attribute.
9.7.2. Example of configuring CloudEventsConverter
Configure io.debezium.converters.CloudEventsConverter
in your Debezium connector configuration. The following example shows how to configure CloudEventsConverter
to emit change event records that have the following characteristics:
- Use JSON as the envelope.
-
Use the schema registry at
http://my-registry/schemas/ids/1
to serialize thedata
attribute as binary Avro data.
... "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", "value.converter.data.serializer.type" : "avro", "value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1" ...
Specification of serializer.type
is optional, because json
is the default.
CloudEventsConverter
converts Kafka record values. In the same connector configuration, you can specify key.converter
if you want to operate on record keys. For example, you might specify StringConverter
, LongConverter
, JsonConverter
, or AvroConverter
.
9.7.3. CloudEventsConverter
configuration properties
When you configure a Debezium connector to use the CloudEvent converter you can specify the following properties.
Property | Default | Description |
|
The encoding type to use for the CloudEvents envelope structure. The value can be | |
|
The encoding type to use for the | |
N/A |
Any configuration properties to be passed through to the underlying converter when using JSON. The | |
N/A |
Any configuration properties to be passed through to the underlying converter when using Avro. The |
Revised on 2020-10-20 00:34:17 UTC