Chapter 8. Configuring Debezium connectors for your application
When default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.
-
The
ByLogicalTableRouter
SMT re-routes data change event records to topics that you specify. -
The
ExtractNewRecordState
SMT flattens the complex structure of a data change event record into the simplified format that might be required by some Kafka consumers. - Configuring Avro serialization for PostgreSQL, MongoDB, or SQL Server connectors makes it easier for change event record consumers to adapt to a changing record schema.
-
The
CloudEventsConverter
enables a Debezium connector to emit change event records that conform to the CloudEvents specification.
8.1. Routing change event records to topics that you specify
Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. To do this, Debezium provides the ByLogicalTableRouter
single message transformation (SMT). Configure this transformation in the Debezium connector’s Kafka Connect configuration. Configuration options enable you to specify the following:
- An expression for identifying the records to re-route
- An expression that resolves to the destination topic
- How to ensure a unique key among the records being re-routed to the destination topic
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.
The ByLogicalTableRouter
transformation is a Kafka Connect SMT.
The following topics provide details:
- Section 8.1.1, “Use case for routing records to topics that you specify”
- Section 8.1.2, “Example of routing records for multiple tables to one topic”
- Section 8.1.3, “Ensuring unique keys across records routed to the same topic”
-
Section 8.1.4, “Options for configuring
ByLogicalTableRouter
transformation”
8.1.1. Use case for routing records to topics that you specify
The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.
Logical tables
A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: db_shard1.my_table
and db_shard2.my_table
. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic.
Partitioned PostgreSQL tables
When the Debezium PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the ByLogicalTableRouter
SMT. Because each key in a partitioned table is guaranteed to be unique, configure key.enforce.uniqueness=false
so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.
8.1.2. Example of routing records for multiple tables to one topic
To route change event records for multiple physical tables to the same topic, configure the ByLogicalTableRouter
transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the ByLogicalTableRouter
SMT requires you to specify regular expressions that determine:
- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
For example, configuration in a .properties
file looks like this:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.
In the example, the regular expression,
(.)customers_shard(.)
matches records for changes to tables whose names include thecustomers_shard
string. This would re-route records for tables with the following names:myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the
myserver.mydb.customers_all_shards
topic.
8.1.3. Ensuring unique keys across records routed to the same topic
A Debezium change event key uses the table columns that make up the table’s primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the myserver.mydb.customers_shard1
table might have the same key value as a row in the myserver.mydb.customers_shard2
table.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the ByLogicalTableRouter
transformation inserts a field into change event keys. By default, the name of the inserted field is __dbz__physicalTableIdentifier
. The value of the inserted field is the default destination topic name.
If you want to, you can configure the ByLogicalTableRouter
transformation to insert a different field into the key. To do this, specify the key.field.name
option and set it to a field name that does not clash with existing primary key field names. For example:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
This example adds the shard_id
field to the key structure in routed records.
If you want to adjust the value of the key’s new field, configure both of these options:
key.field.regex
- Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
key.field.replacement
- Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.
For example:
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
With this configuration, suppose that the default destination topic names are:
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
The transformation uses the values in the second captured group, the shard numbers, as the value of the key’s new field. In this example, the inserted key field’s values would be 1
, 2
, or 3
.
If your tables contain globally unique keys and you do not need to change the key structure, you can set the key.enforce.uniqueness
property to false
:
... transforms.Reroute.key.enforce.uniqueness=false ...
8.1.4. Options for configuring ByLogicalTableRouter
transformation
Property | Default | Description |
Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic. | ||
Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for | ||
|
Indicates whether to add a field to the record’s change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. | |
|
Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, | |
Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, | ||
Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for |
8.2. Extracting source record after
state from Debezium change events
A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the ExtractNewRecordState
single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
The ExtractNewRecordState
transformation is a Kafka Connect SMT.
The transformation is available to only SQL database connectors.
The following topics provide details:
- Section 8.2.1, “Description of Debezium change event structure”
-
Section 8.2.2, “Behavior of Debezium
ExtractNewRecordState
transformation” -
Section 8.2.3, “Configuration of
ExtractNewRecordState
transformation” - Section 8.2.4, “Example of adding metadata to the Kafka record”
-
Section 8.2.5, “Options for configuring
ExtractNewRecordState
transformation”
8.2.1. Description of Debezium change event structure
Debezium generates data change events that have a complex structure. Each event consists of three parts:
Metadata, which includes but is not limited to:
- The operation that made the change
- Source information such as the names of the database and table where the change was made
- Time stamp for when the change was made
- Optional transaction information
- Row data before the change
- Row data after the change
For example, the structure of an UPDATE
change event looks like this:
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:
{ "field1" : "newvalue1", "field2" : "newvalue2" }
To provide the needed Kafka record format for consumers, configure the ExtractNewRecordState
SMT.
8.2.2. Behavior of Debezium ExtractNewRecordState
transformation
The ExtractNewRecordState
SMT extracts the after
field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after
field to create a simple Kafka record.
You can configure the ExtractNewRecordState
SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring ExtractNewRecordState
for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
- Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
-
Keep Kafka records that contain change events for
DELETE
operations in the stream. The default behavior is that the SMT drops Kafka records forDELETE
operation change events because most consumers cannot yet handle them.
A database DELETE
operation causes Debezium to generate two Kafka records:
-
A record that contains
"op": "d",
thebefore
row data, and some other fields. -
A tombstone record that has the same key as the deleted row and a value of
null
. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.
Instead of dropping the record that contains the before
row data, you can configure the ExtractNewRecordData
SMT to do one of the following:
-
Keep the record in the stream and edit it to have only the
"value": "null"
field. -
Keep the record in the stream and edit it to have a
value
field that contains the key/value pairs that were in thebefore
field with an added"__deleted": "true"
entry.
Similary, instead of dropping the tombstone record, you can configure the ExtractNewRecordData
SMT to keep the tombstone record in the stream.
8.2.3. Configuration of ExtractNewRecordState
transformation
Configure the Debezium ExtractNewRecordState
SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties
file, you would specify something like the following:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
As for any Kafka Connect connector configuration, you can set transforms=
to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following .properties
example sets several ExtractNewRecordState
options:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
Keeps tombstone records for
DELETE
operations in the event stream. delete.handling.mode=rewrite
For
DELETE
operations, edits the Kafka record by flattening thevalue
field that was in the change event. Thevalue
field directly contains the key/value pairs that were in thebefore
field. The SMT adds__deleted
and sets it totrue
, for example:"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
Adds change event metadata for the
table
andlsn
fields to the simplified Kafka record.
8.2.4. Example of adding metadata to the Kafka record
The ExtractNewRecordState
SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:
- The type of operation that made the change
- The name of the database or table that was changed
- Connector-specific fields such as the Postgres LSN field
To add metadata to the simplified Kafka record’s header, specify the add.header
option. To add metadata to the simplified Kafka record’s value, specify the add.fields
option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
With that configuration, a simplified Kafka record would contain something like the following:
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
Also, simplified Kafka records would have a __db
header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata to a simplified Kafka record that is for a DELETE
operation, you must also configure delete.handling.mode=rewrite
.
8.2.5. Options for configuring ExtractNewRecordState
transformation
The following table describes the options that you can specify for the ExtractNewRecordState
SMT.
Property | Default | Description |
|
Debezium generates a tombstone record for each | |
|
Debezium generates a change event record for each | |
To use row data to determine the topic to route the record to, set this option to an | ||
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example | ||
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example |
8.3. Avro Serialization
Using Avro to serialize record keys and values is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector does the following:
- Applies configured transformations
- Serializes the record key and value into a binary form by using the configured Kafka Connect converters
- Writes the record to the correct Kafka topic
You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
Setting these properties to false
excludes the verbose schema information from each record.
Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for change event record consumers to adapt to a changing record schema.
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for Red Hat Integration - Service Registry.
8.3.1. About the Red Hat Integration - Service Registry
Red Hat Integration - Service Registry provides several components that work with Avro:
- An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
An API and schema registry that tracks:
- Avro schemas that are used in Kafka topics
- Where the Avro converter sends the generated Avro schemas
Since the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
- Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Service Registry with Debezium, add Service Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.
The Service Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
8.3.2. Deployment overview
To deploy a Debezium connector that uses Avro serialization, there are three main tasks:
- Deploy a Red Hat Integration - Service Registry instance by following the instructions in Getting Started with Service Registry.
- Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/api key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/api value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
8.3.3. Deploying with Debezium containers
In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serializaion. Follow the procedure here to do that. In this procedure, you build a custom Kafka Connect container image for Debezium, and you configure the Debezium connector to use the Avro converter.
Prerequisites
- You have cluster administrator access to an OpenShift cluster.
- You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.
Procedure
Deploy an instance of Service Registry. See Getting Started with Service Registry, Installing Service Registry from the OpenShift OperatorHub, which provides instructions for:
- Installing AMQ Streams
- Setting up AMQ Streams storage
- Installing Service Registry
Extract the Debezium connector archive(s) to create a directory structure for the connector plug-in(s). If you downloaded and extracted the archive for each Debezium connector, the structure looks like this:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:
- Go to the Red Hat Integration download site and download the Service Registry Kafka Connect zip file.
- Extract the archive into the desired Debezium connector directory.
To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. While this duplicates the files, it removes the possibility of conflicting dependencies.
Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:
Create a new
Dockerfile
by usingregistry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
as the base image. In the following example, you would replace my-plugins with the name of your plug-ins directory:FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the
/opt/kafka/plugins
directory.Build the docker container image. For example, if you saved the docker file that you created in the previous step as
debezium-container-with-avro
, then you would run the following command:docker build -t debezium-container-with-avro:latest
Push your custom image to your container registry, for example:
docker push debezium-container-with-avro:latest
Point to the new container image. Do one of the following:
Edit the
KafkaConnect.spec.image
property of theKafkaConnect
custom resource. If set, this property overrides theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable in the Cluster Operator. For example:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
In the
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
file, edit theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:
Create a Debezium connector instance. The following
inventory-connector.yaml
file example creates aKafkaConnector
custom resource that defines a MySQL connector instance that is configured to use the Avro converter:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: dbserver1 database.whitelist: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
Apply the connector instance, for example:
oc apply -f inventory-connector.yaml
This registers
inventory-connector
and the connector starts to run against theinventory
database.
Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example,
inventory-connector
starts.Display the Kafka Connect log output:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
Taking the snapshot involves a number of steps:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
After completing the snapshot, Debezium begins tracking changes in, for example, the
inventory
database’sbinlog
for change events:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
8.3.4. Naming
As stated in the Avro documentation, names must adhere to the following rules:
-
Start with
[A-Za-z_]
-
Subsequently contains only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, sanitize.field.names
that you can set to true
if you have columns that do not adhere to Avro rules for names. Setting sanitize.field.names
to true
allows serialization of non-conformant fields without having to actually modify your schema.
8.4. Exporting CloudEvents
CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a MongoDB, MySQL, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.
Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
The CloudEvents specification defines:
- A set of standardized event attributes
- Rules for defining custom attributes
- Encoding rules for mapping event formats to serialized representations such as JSON
- Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP
To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter
, which is a Kafka Connect message converter.
Currently, only structured mapping mode is supported. The CloudEvents change event envelope must be JSON and the data
format must be JSON. It is expected that a future Debezium release will support binary mapping mode.
8.4.1. Example event format
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data
format.
{ "id" : "name:test_server;lsn:29274832;txId:565", 1 "source" : "/debezium/postgresql/test_server", 2 "specversion" : "1.0", 3 "type" : "io.debezium.postgresql.datachangeevent", 4 "time" : "2020-01-13T13:55:39.738Z", 5 "datacontenttype" : "application/json", 6 "iodebeziumop" : "r", 7 "iodebeziumversion" : "1.1.2.Final", 8 "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "565", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", 9 "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { 10 "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
- 1
- Unique ID that the connector generates for the change event based on the change event’s content.
- 2
- The source of the event, which is the logical name of the database as specified by the
database.server.name
property in the connector’s configuration. - 3
- The CloudEvents specification version.
- 4
- Connector type that generated the change event. The format of this field is
io.debezium.CONNECTOR_TYPE.datachangeevent
. The value ofCONNECTOR_TYPE
ismongodb
,mysql
,postgresql
, orsqlserver
. - 5
- Time of the change in the source database.
- 6
- Describes the content type of the
data
attribute, which is JSON. - 7
- An operation identifier. Possible values are
r
for read,c
for create,u
for update, ord
for delete. - 8
- All
source
attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using theiodebezium
prefix for the attribute name. - 9
- When enabled in the connector, each
transaction
attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using theiodebeziumtx
prefix for the attribute name. - 10
- The actual data change itself. Depending on the operation and the connector, the data might contain
before
,after
and/orpatch
fields.
8.4.2. Example configuration
Configure io.debezium.converters.CloudEventsConverter
in your Debezium connector configuration. Following is an example of configuring CloudEventsConverter
. In this example, you could omit the specification of serializer.type
because json
is the default.
... "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", ...
CloudEventsConverter
converts Kafka record values. In the same connector configuration, you can specify key.converter
if you want to operate on record keys, for example you might specify StringConverter
, LongConverter
, or JsonConverter
.
8.4.3. Configuration properties
When you configure a Debezium connector to use the CloudEvent converter you can specify the following properties.
Property | Default | Description |
|
The encoding type to use for the CloudEvents envelope structure. | |
|
The encoding type to use for the | |
N/A |
Any configuration properties to be passed through to the underlying converter when using JSON. The |