Ce contenu n'est pas disponible dans la langue sélectionnée.
Chapter 11. Configuring Debezium connectors for your application
When the default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.
- Kafka Connect automatic topic creation
- Enables Connect to create topics at runtime, and apply configuration settings to those topics based on their names.
- Avro serialization
- Support for configuring Debezium PostgreSQL, MongoDB, or SQL Server connectors to use Avro to serialize message keys and value, making it easier for change event record consumers to adapt to a changing record schema.
- Configuring notifications to report connector status
- Provides a mechanism to expose status information about a connector through a configurable set of channels.
- CloudEvents converter
- Enables a Debezium connector to emit change event records that conform to the CloudEvents specification.
- Sending signals to a Debezium connector
- Provides a way to modify the behavior of a connector, or trigger an action, such as initiating an ad hoc snapshot.
11.1. Customization of Kafka Connect automatic topic creation
Kafka provides two mechanisms for creating topics automatically. You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics. The Kafka broker uses the auto.create.topics.enable
property to control automatic topic creation. In Kafka Connect, the topic.creation.enable
property specifies whether Kafka Connect is permitted to create topics. In both cases, the default settings for the properties enables automatic topic creation.
When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.
Differences between automatic topic creation at the broker and in Kafka Connect
Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.
The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.
See the following topics for more information:
- Section 11.1.1, “Disabling automatic topic creation for the Kafka broker”
- Section 11.1.2, “Configuring automatic topic creation in Kafka Connect”
- Section 11.1.3, “Configuration of automatically created topics”
- Section 11.1.3.1, “Topic creation groups”
- Section 11.1.3.2, “Topic creation group configuration properties”
- Section 11.1.3.3, “Specifying the configuration for the Debezium default topic creation group”
- Section 11.1.3.4, “Specifying the configuration for Debezium custom topic creation groups”
- Section 11.1.3.5, “Registering Debezium custom topic creation groups”
11.1.1. Disabling automatic topic creation for the Kafka broker
By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.
Procedure
-
In the broker configuration, set the value of
auto.create.topics.enable
tofalse
.
11.1.2. Configuring automatic topic creation in Kafka Connect
Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable
property. The default value for the property is true
, enabling automatic topic creation, as shown in the following example:
topic.creation.enable = true
The setting for the topic.creation.enable
property applies to all workers in the Connect cluster.
Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.
For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.
By default, topics that Kafka Connect creates are named based on the pattern server.schema.table
, for example, dbserver.myschema.inventory
.
Procedure
-
To prevent Kafka Connect from creating topics automatically, set the value of
topic.creation.enable
tofalse
in the Kafka Connect custom resource, as in the following example:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false"
Kafka Connect automatic topic creation requires the replication.factor
and partitions
properties to be set for at least the default
topic creation group. It is valid for groups to obtain the values for the required properties from the default values for the Kafka broker.
11.1.3. Configuration of automatically created topics
For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.
11.1.3.1. Topic creation groups
A set of topic properties is associated with a topic creation group. Minimally, you must define a default
topic creation group and specify its configuration properties. Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.
When you create custom topic creation groups, you define the member topics for each group based on topic name patterns. You can specify naming patterns that describe the topics to include or exclude from each group. The include
and exclude
properties contain comma-separated lists of regular expressions that define topic name patterns. For example, if you want a group to include all topics that start with the string dbserver1.inventory
, set the value of its topic.creation.inventory.include
property to dbserver1\\.inventory\\.*
.
If you specify both include
and exclude
properties for a custom topic group, the exclusion rules take precedence, and override the inclusion rules.
11.1.3.2. Topic creation group configuration properties
The default
topic creation group and each custom group is associated with a unique set of configuration properties. You can configure a group to include any of the Kafka topic-level configuration properties. For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group. You must define at least a minimum set of properties to describe the configuration of the topics to be created.
If no custom groups are registered, or if the include
patterns for any registered groups don’t match the names of any topics to be created, then Kafka Connect uses the configuration of the default
group to create topics.
For general information about configuring topics, see Kafka topic creation recommendations in Installing Debezium on OpenShift.
11.1.3.3. Specifying the configuration for the Debezium default topic creation group
Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it. The configuration for the default topic creation group is applied to any topics with names that do not match the include
list pattern of a custom topic creation group.
Prerequisites
In the Kafka Connect custom resource, the
use-connector-resources
value inmetadata.annotations
specifies that the cluster Operator uses KafkaConnector custom resources to configure connectors in the cluster. For example:... metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" ...
Procedure
To define properties for the
topic.creation.default
group, add them tospec.config
in the connector custom resource, as shown in the following example:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: ... config: ... topic.creation.default.replication.factor: 3 1 topic.creation.default.partitions: 10 2 topic.creation.default.cleanup.policy: compact 3 topic.creation.default.compression.type: lz4 4 ...
You can include any Kafka topic-level configuration property in the configuration for the
default
group.
Item | Description |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
Custom groups fall back to the default
group settings only for the required replication.factor
and partitions
properties. If the configuration for a custom topic group leaves other properties undefined, the values specified in the default
group are not applied.
11.1.3.4. Specifying the configuration for Debezium custom topic creation groups
You can define multiple custom topic groups, each with its own configuration.
Procedure
To define a custom topic group, add a
topic.creation.<group_name>.include
property tospec.config
in the connector custom resource, followed by the configuration properties that you want to apply to topics in the custom group.The following example shows an excerpt of a custom resource that defines the custom topic creation groups
inventory
andapplicationlogs
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... 1 topic.creation.inventory.include: dbserver1\\.inventory\\.* 2 topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 3 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4 topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* 5 topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ... ...
Item | Description |
---|---|
1 |
Defines the configuration for the |
2 |
|
3 |
Defines the configuration for the |
4 |
|
5 |
|
11.1.3.5. Registering Debezium custom topic creation groups
After you specify the configuration for any custom topic creation groups, register the groups.
Procedure
Register custom groups by adding the
topic.creation.groups
property to the connector custom resource, and specifying a comma-separated list of custom topic creation groups.The following excerpt from a connector custom resource registers the custom topic creation groups
inventory
andapplicationlogs
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: topic.creation.groups: inventory,applicationlogs ...
Completed configuration
The following example shows a completed configuration that includes the configuration for a default
topic group, along with the configurations for an inventory
and an applicationlogs
custom topic creation group:
Example: Configuration for a default topic creation group and two custom groups
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... topic.creation.default.replication.factor: 3, topic.creation.default.partitions: 10, topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 topic.creation.groups: inventory,applicationlogs topic.creation.inventory.include: dbserver1\\.inventory\\.* topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ...
11.2. Configuring Debezium connectors to use Avro serialization
A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector completes the following actions:
- Applies configured transformations.
- Serializes the record key and value into a binary form by using the configured Kafka Connect converters.
- Writes the record to the correct Kafka topic.
You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
Setting these properties to false
excludes the verbose schema information from each record.
Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for Installing and deploying Red Hat build of Apicurio Registry on OpenShift.
11.2.1. About the Apicurio Registry
Red Hat build of Apicurio Registry
Red Hat build of Apicurio Registry provides the following components that work with Avro:
- An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
An API and schema registry that tracks:
- Avro schemas that are used in Kafka topics.
- Where the Avro converter sends the generated Avro schemas.
Because the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
- Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Apicurio Registry with Debezium, add Apicurio Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.
The Apicurio Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
To use converters provided by Apicurio Registry you need to provide apicurio.registry.url
.
11.2.2. Overview of deploying a Debezium connector that uses Avro serialization
To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:
- Deploy a Red Hat build of Apicurio Registry instance by following the instructions in Installing and deploying Red Hat build of Apicurio Registry on OpenShift.
- Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true schema.name.adjustment.mode=avro
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
11.2.3. Deploying connectors that use Avro in Debezium containers
In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serialization. Complete the following procedure to build a custom Kafka Connect container image for Debezium, and configure the Debezium connector to use the Avro converter.
Prerequisites
- You have Docker installed and sufficient rights to create and manage containers.
- You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.
Procedure
Deploy an instance of Apicurio Registry. See Installing and deploying Red Hat build of Apicurio Registry on OpenShift, which provides instructions for:
- Installing Apicurio Registry
- Installing AMQ Streams
- Setting up AMQ Streams storage
Extract the Debezium connector archives to create a directory structure for the connector plug-ins. If you downloaded and extracted the archives for multiple Debezium connectors, the resulting directory structure looks like the one in the following example:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:
- Go to the Red Hat build of Debezium download site and download the Apicurio Registry Kafka Connect zip file.
- Extract the archive into the desired Debezium connector directory.
To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.
Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:
Create a new
Dockerfile
by usingregistry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
as the base image. In the following example, replace my-plugins with the name of your plug-ins directory:FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the
/opt/kafka/plugins
directory.Build the docker container image. For example, if you saved the docker file that you created in the previous step as
debezium-container-with-avro
, then you would run the following command:docker build -t debezium-container-with-avro:latest
Push your custom image to your container registry, for example:
docker push <myregistry.io>/debezium-container-with-avro:latest
Point to the new container image. Do one of the following:
Edit the
KafkaConnect.spec.image
property of theKafkaConnect
custom resource. If set, this property overrides theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable in the Cluster Operator. For example:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
In the
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
file, edit theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:
Create a Debezium connector instance. The following
inventory-connector.yaml
file example creates aKafkaConnector
custom resource that defines a MySQL connector instance that is configured to use the Avro converter:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 topic.prefix: dbserver1 database.include.list: inventory schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 schema.history.internal.kafka.topic: schema-changes.inventory schema.name.adjustment.mode: avro key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Apply the connector instance, for example:
oc apply -f inventory-connector.yaml
This registers
inventory-connector
and the connector starts to run against theinventory
database.
Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example,
inventory-connector
starts.Display the Kafka Connect log output:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
Taking the snapshot involves a number of steps:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
After completing the snapshot, Debezium begins tracking changes in, for example, the
inventory
database’sbinlog
for change events:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
11.2.4. About Avro name requirements
As stated in the Avro documentation, names must adhere to the following rules:
-
Start with
[A-Za-z_]
-
Subsequently contains only
[A-Za-z0-9_]
characters
Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, field.name.adjustment.mode
that you can set to avro
if you have columns that do not adhere to Avro rules for names. Setting field.name.adjustment.mode
to avro
allows serialization of non-conformant fields without having to actually modify your schema.
11.3. Emitting Debezium change event records in CloudEvents format
CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a Db2, MongoDB, MySQL, Oracle, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.
Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
The CloudEvents specification defines:
- A set of standardized event attributes
- Rules for defining custom attributes
- Encoding rules for mapping event formats to serialized representations such as JSON or Apache Avro
- Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP
To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter
, which is a Kafka Connect message converter.
Currently, only structured mapping mode can be used. The CloudEvents change event envelope can be JSON or Avro, and you can use JSON or Avro as the data
format for each envelope type. Information about emitting change events in CloudEvents format is organized as follows:
For information about using Avro, see:
11.3.1. Example Debezium change event records in CloudEvents format
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data
format.
{ "id" : "name:test_server;lsn:29274832;txId:565", 1 "source" : "/debezium/postgresql/test_server", 2 "specversion" : "1.0", 3 "type" : "io.debezium.postgresql.datachangeevent", 4 "time" : "2020-01-13T13:55:39.738Z", 5 "datacontenttype" : "application/json", 6 "iodebeziumop" : "r", 7 "iodebeziumversion" : "2.5.4.Final", 8 "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", 9 "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { 10 "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } }
Item | Description |
---|---|
1 | Unique ID that the connector generates for the change event based on the change event’s content. |
2 |
The source of the event, which is the logical name of the database as specified by the |
3 | The CloudEvents specification version. |
4 |
Connector type that generated the change event. The format of this field is |
5 | Time of the change in the source database. |
6 |
Describes the content type of the |
7 |
An operation identifier. Possible values are |
8 |
All |
9 |
When enabled in the connector, each |
10 |
The actual data change. Depending on the operation and the connector, the data might contain |
The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data
format.
{ "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro", 1 "dataschema" : "http://my-registry/schemas/ids/1", 2 "iodebeziumop" : "r", "iodebeziumversion" : "2.5.4.Final", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" 3 }
Item | Description |
---|---|
1 |
Indicates that the |
2 | URI of the schema to which the Avro data adheres. |
3 |
The |
It is also possible to use Avro for the envelope as well as the data
attribute.
11.3.2. Example of configuring Debezium CloudEvents converter
Configure io.debezium.converters.CloudEventsConverter
in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:
- Use JSON as the envelope.
-
Use the schema registry at
http://my-registry/schemas/ids/1
to serialize thedata
attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", 1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
Item | Description |
---|---|
1 |
Specifying the |
The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter
if you want to operate on record keys. For example, you might specify StringConverter
, LongConverter
, JsonConverter
, or AvroConverter
.
11.3.3. Configuration of sources of metadata and some CloudEvents fields
By default, the metadata.source
property consists of three parts, as seen in the following example:
"value,id:generate,type:generate"
The first part specifies the source for retrieving a record’s metadata; the permitted values are value
and header
. The next parts specify how to obtain the id
and type
fields of a CloudEvent; the permitted values are generate
and header
.
Obtaining record metadata
To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record’s value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers.
Procedure
-
Implement a mechanism for recording the metadata in the record’s headers before the record reaches the converter, for example, by using the
HeaderFrom
SMT. -
Set the value of the converter’s
metadata.source
property toheader
.
The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the HeaderFrom
SMT:
... "tombstones.on.delete": false, "transforms": "addMetadataHeaders,outbox", "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.addMetadataHeaders.fields": "source,op,transaction", "transforms.addMetadataHeaders.headers": "source,op,transaction", "transforms.addMetadataHeaders.operation": "copy", "transforms.addMetadataHeaders.predicate": "isHeartbeat", "transforms.addMetadataHeaders.negate": true, "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.expand.json.payload": true, "transforms.outbox.table.fields.additional.placement": "type:header", "predicates": "isHeartbeat", "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*", "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.metadata.source": "header", "header.converter": "org.apache.kafka.connect.json.JsonConverter", "header.converter.schemas.enable": true ...
To use the HeaderFrom
transformation, it might be necessary to filter tombstone and heartbeat messages.
The header
value of the metadata.source
property is a global setting. As a result, even if you omit parts of a property’s value, such as the id
and type
sources, the converter generates header
values for the omitted parts.
Obtaining id
and type
of a CloudEvent
By default, the CloudEvents converter automatically generates values for id
and type
fields of a CloudEvent. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields' values in the appropriate headers. For example:
"value.converter.metadata.source": "value,id:header,type:header"
With the preceding configuration in effect, you could configure upstream functions to add id
and type
headers with the values that you want to pass to the CloudEvents converter.
If you want to provide values only for id
header, use:
"value.converter.metadata.source": "value,id:header,type:generate"
To provide metadata, id
, and type
in headers, use the short syntax:
"value.converter.metadata.source": "header"
11.3.4. Debezium CloudEvents converter configuration options
When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.
Option | Default | Description |
|
The encoding type to use for the CloudEvents envelope structure. The value can be | |
|
The encoding type to use for the | |
N/A |
Any configuration options to be passed through to the underlying converter when using JSON. The | |
N/A |
Any configuration options to be passed through to the underlying converter when using Avro. The | |
none |
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be | |
none |
Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when | |
|
Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be | |
|
A comma-separated list that specifies the sources from which the converter retrieves metadata (source, operation, transaction), along with the names of the CloudEvent
For configuration examples, see Configuration of sources of metadata and some CloudEvents fields. |
11.4. Configuring notifications to report connector status
Debezium notifications provide a mechanism to obtain status information about the connector. Notifications can be sent to the following channels:
- SinkNotificationChannel
- Sends notifications through the Connect API to a configured topic.
- LogNotificationChannel
- Notifications are appended to the log.
- JmxNotificationChannel
- Notifications are exposed as an attribute in a JMX bean.
- For details about Debezium notifications, see the following topics
11.4.1. Description of the format of Debezium notifications
Notification messages contain the following information:
Property | Description |
---|---|
id |
A unique identifier that is assigned to the notification. For incremental snapshot notifications, the |
aggregate_type | The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate. |
type |
Provides status information about the event specified in the |
additional_data | A Map<String,String> with detailed information about the notification. For an example, see Debezium notifications about the progress of incremental snapshots. |
timestamp | The time when the notification was created. The value represents the number of milliseconds since the UNIX epoch. |
11.4.2. Types of Debezium notifications
Debezium notifications deliver information about the progress of initial snapshots or incremental snapshots.
Debezium notifications about the status of an initial snapshot
The following example shows a typical notification that provides the status of an initial snapshot:
{
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
"aggregate_type": "Initial Snapshot",
"type": "COMPLETED", 1
"additional_data" : {
"connector_name": "myConnector"
},
"timestamp": "1695817046353"
}
Item | Description |
---|---|
1 |
The
|
The following table shows examples of the different payloads that might be present in notifications that report the status of initial snapshots:
Status | Payload |
---|---|
STARTED |
{ "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"STARTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
IN_PROGRESS |
{ "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Initial Snapshot", "type":"IN_PROGRESS", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2", "current_collection_in_progress":"table1" }, "timestamp": "1695817046353" }
Field |
TABLE_SCAN_COMPLETED |
{ "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Initial Snapshot", "type":"TABLE_SCAN_COMPLETED", "additional_data":{ "connector_name":"my-connector", "data_collection":"table1, table2", "scanned_collection":"table1", "total_rows_scanned":"100", "status":"SUCCEEDED" }, "timestamp": "1695817046353" }
In the preceding example, the
Fields |
COMPLETED |
{ "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"COMPLETED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
ABORTED |
{ "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"ABORTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
SKIPPED |
{ "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"SKIPPED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
11.4.2.1. Example: Debezium notifications that report on the progress of incremental snapshots
The following table shows examples of the different payloads that might be present in notifications that report the status of incremental snapshots:
Status | Payload |
---|---|
Start |
{ "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Incremental Snapshot", "type":"STARTED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } |
Paused |
{ "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51", "aggregate_type":"Incremental Snapshot", "type":"PAUSED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } |
Resumed |
{ "id":"a9468204-769d-430f-96d2-b0933d4839f3", "aggregate_type":"Incremental Snapshot", "type":"RESUMED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } |
Stopped |
{ "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c", "aggregate_type":"Incremental Snapshot", "type":"ABORTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
Processing chunk |
{ "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744", "aggregate_type":"Incremental Snapshot", "type":"IN_PROGRESS", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2", "current_collection_in_progress":"table1", "maximum_key":"100", "last_processed_key":"50" }, "timestamp": "1695817046353" } |
Snapshot completed for a table |
{ "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Incremental Snapshot", "type":"TABLE_SCAN_COMPLETED", "additional_data":{ "connector_name":"my-connector", "data_collection":"table1, table2", "scanned_collection":"table1", "total_rows_scanned":"100", "status":"SUCCEEDED" }, "timestamp": "1695817046353" }
In the preceding example, the
|
Completed |
{ "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Incremental Snapshot", "type":"COMPLETED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } |
11.4.3. Enabling Debezium to emit events to notification channels
To enable Debezium to emit notifications, specify a list of notification channels by setting the notification.enabled.channels
configuration property. By default, the following notification channels are available:
-
sink
-
log
-
jmx
To use the sink
notification channel, you must also set the notification.sink.topic.name
configuration property to the name of the topic where you want Debezium to send notifications.
11.4.3.1. Enabling Debezium notifications to report events exposed through JMX beans
To enable Debezium to report events that are exposed through JMX beans, complete the following configuration steps:
- Enable the JMX MBean Server to expose the notification bean.
-
Add
jmx
to thenotification.enabled.channels
property in the connector configuration. - Connect your preferred JMX client to the MBean Server.
Notifications are exposed through the Notifications
attribute of a bean with the name debezium.<connector-type>.management.notifications.<server>
.
The following image shows a notification that reports the start of an incremental snapshot:
![Fields in the JMX `Notifications` attribute](https://access.redhat.com/webassets/avalon/d/Red_Hat_build_of_Debezium-2.5.4-Debezium_User_Guide-en-US/images/67d4ba8032b9367278d97b4e2df87cc9/jmx-notification-attribute.png)
To discard a notification, call the reset
operation on the bean.
The notifications are also exposed as a JMX notification with type debezium.notification
. To enable an application to listen for the JMX notifications that an MBean emits, subscribe the application to the notifications.
11.5. Sending signals to a Debezium connector
The Debezium signaling mechanism provides a way to modify the behavior of a connector, or to trigger a one-time action, such as initiating an ad hoc snapshot of a table. To use signals to trigger a connector to perform a specified action, you can configure the connector to use one or more of the following channels:
- SourceSignalChannel
- You can issue a SQL command to add a signal message to a specialized signaling data collection. The signaling data collection, which you create on the source database, is designated exclusively for communicating with Debezium.
- KafkaSignalChannel
- You submit signal messages to a configurable Kafka topic.
- JmxSignalChannel
-
You submit signals through the JMX
signal
operation. - FileSignalChannel
- You can use a file to send signals. When Debezium detects that a new logging record or ad hoc snapshot record is added to the channel, it reads the signal, and initiates the requested operation.
Signaling is available for use with the following Debezium connectors:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
You can specify which channel is enabled by setting the signal.enabled.channels
configuration property. The property lists the names of the channels that are enabled. By default, Debezium provides the following channels: source
and kafka
. The source
channel is enabled by default, because it is required for incremental snapshot signals.
11.5.1. Enabling Debezium source signaling channel
By default, the Debezium source signaling channel is enabled.
You must explicitly configure signaling for each connector that you want to use it with.
Procedure
- On the source database, create a signaling data collection table for sending signals to the connector. For information about the required structure of the signaling data collection, see Structure of a signaling data collection.
- For source databases such as Db2 or SQL Server that implement a native change data capture (CDC) mechanism, enable CDC for the signaling table.
Add the name of the signaling data collection to the Debezium connector configuration.
In the connector configuration, add the propertysignal.data.collection
, and set its value to the fully-qualified name of the signaling data collection that you created in Step 1.
For example,signal.data.collection = inventory.debezium_signals
.
The format for the fully-qualified name of the signaling collection depends on the connector.
The following example shows the naming formats to use for each connector:- Db2
-
<schemaName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
For more information about setting thesignal.data.collection
property, see the table of configuration properties for your connector.
11.5.1.1. Required structure of a Debezium signaling data collection
A signaling data collection, or signaling table, stores signals that you send to a connector to trigger a specified operation. The structure of the signaling table must conform to the following standard format.
- Contains three fields (columns).
- Fields are arranged in a specific order, as shown in Table 1.
Field | Type | Description |
---|---|---|
|
|
An arbitrary unique string that identifies a signal instance. |
|
|
Specifies the type of signal to send. |
|
|
Specifies JSON-formatted parameters to pass to a signal action. |
The field names in a data collection are arbitrary. The preceding table provides suggested names. If you use a different naming convention, ensure that the values in each field are consistent with the expected content.
11.5.1.2. Creating a Debezium signaling data collection
You create a signaling table by submitting a standard SQL DDL query to the source database.
Prerequisites
- You have sufficient access privileges to create a table on the source database.
Procedure
-
Submit a SQL query to the source database to create a table that is consistent with the required structure, as shown in the following example:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
The amount of space that you allocate to the VARCHAR
parameter of the id
variable must be sufficient to accommodate the size of the ID strings of signals sent to the signaling table.
If the size of an ID exceeds the available space, the connector cannot process the signal.
The following example shows a CREATE TABLE
command that creates a three-column debezium_signal
table:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
11.5.2. Enabling the Debezium Kafka signaling channel
You can enable the Kafka signaling channel by adding it to the signal.enabled.channels
configuration property, and then adding the name of the topic that receives signals to the signal.kafka.topic
property. After you enable the signaling channel, a Kafka consumer is created to consume signals that are sent to the configured signal topic.
Additional configuration available for the consumer
- Db2 connector Kafka signal configuration properties
- MongoDB connector Kafka signal configuration properties
- MySQL connector Kafka signal configuration properties
- Oracle connector Kafka signal configuration properties
- PostgreSQL connector Kafka signal configuration properties
- SQL Server connector Kafka signal configuration properties
To use Kafka signaling to trigger ad hoc incremental snapshots for most connectors, you must first enable a source
signaling channel in the connector configuration. The source channel implements a watermarking mechanism to deduplicate events that might be captured by an incremental snapshot and then captured again after streaming resumes. Enabling the source channel is not required when using a signaling channel to trigger an incremental snapshot of a read-only MySQL database that has GTIDs enabled. For more information, see MySQL read only incremental snapshot
Message format
The key of the Kafka message must match the value of the topic.prefix
connector configuration option.
The value is a JSON object with type
and data
fields.
When the signal type is set to execute-snapshot
, the data
field must include the fields that are listed in the following table:
Field | Default | Value |
---|---|---|
|
|
The type of the snapshot to run. Currently Debezium supports the |
| N/A |
An array of comma-separated regular expressions that match the fully-qualified names of the data collections to include in the snapshot. |
| N/A |
An optional string that specifies a condition that the connector evaluates to designate a subset of records to include in a snapshot. Note
This property is deprecated and should be replaced by the |
| N/A |
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot.
|
The following example shows a typical execute-snapshot
Kafka message:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
11.5.3. Enabling the Debezium JMX signaling channel
You can enable the JMX signaling by adding jmx
to the signal.enabled.channels
property in the connector configuration, and then enabling the JMX MBean Server to expose the signaling bean.
Procedure
- Use your preferred JMX client (for example. JConsole or JDK Mission Control) to connect to the MBean server.
Search for the Mbean
debezium.<connector-type>.management.signals.<server>
. The Mbean exposessignal
operations that accept the following input parameters:- p0
- The id of the signal.
- p1
-
The type of the signal, for example,
execute-snapshot
. - p2
- A JSON data field that contains additional information about the specified signal type.
Send an
execute-snapshot
signal by providing value for the input parameters.
In the JSON data field, include the information that is listed in the following table:Table 11.9. Execute snapshot data fields Field Default Value type
incremental
The type of the snapshot to run. Currently Debezium supports the
incremental
andblocking
types.data-collections
N/A
An array of comma-separated regular expressions that match the fully-qualified names of the tables to include in the snapshot.
Specify the names by using the same format as is required for the signal.data.collection configuration option.additional-condition
N/A
An optional string that specifies a condition that the connector evaluates to designate a subset of records to include in a snapshot.
NoteThis property is deprecated and should be replaced by the
additional-conditions
property.additional-conditions
N/A
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot.
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following properties for each additional condition:data-collection
- The fully-qualified name of the {data-collection} that the filter applies to. You can apply different filters to each {data-collection}.
filter
Specifies column values that must be present in a database record for the snapshot to include it, for example,
"color='blue'"
.
The snapshot process evaluates records in the {data-collection} against thefilter
value and captures only records that contain matching values.
The specific values that you assign to thefilter
property depend on the type of ad hoc snapshot:-
For incremental snapshots, you specify a search condition fragment, such as
"color='blue'"
, that the snapshot appends to the condition clause of a query. -
For blocking snapshots, you specify a full
SELECT
statement, such as the one that you might set in thesnapshot.select.statement.overrides
property.
-
For incremental snapshots, you specify a search condition fragment, such as
The following image shows an example of how to use JConsole to send a signal:
![Using JConsole to send an `execute-snapshot` signal](https://access.redhat.com/webassets/avalon/d/Red_Hat_build_of_Debezium-2.5.4-Debezium_User_Guide-en-US/images/2046520f6982a869e291b3c7e9ad5cd5/jmx-signal-operation.png)
11.5.4. Types of Debezium signal actions
You can use signaling to initiate the following actions:
Some signals are not compatible with all connectors.
11.5.4.1. Logging signals
You can request a connector to add an entry to the log by creating a signaling table entry with the log
signal type. After processing the signal, the connector prints the specified message to the log. Optionally, you can configure the signal so that the resulting message includes the streaming coordinates.
Column | Value | Description |
---|---|---|
id |
| |
type |
| The action type of the signal. |
data |
{"message": "Signal message at offset {}"} |
The |
11.5.4.2. Ad hoc snapshot signals
You can request a connector to initiate an ad hoc snapshot by creating a signal with the execute-snapshot
signal type. After processing the signal, the connector runs the requested snapshot operation.
Unlike the initial snapshot that a connector runs after it first starts, an ad hoc snapshot occurs during runtime, after the connector has already begun to stream change events from a database. You can initiate ad hoc snapshots at any time.
Ad hoc snapshots are available for the following Debezium connectors:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
Column | Value |
---|---|
id |
|
type |
|
data |
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]} |
Key | Value |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"]}} |
For more information about ad hoc snapshots, see the Snapshots topic in the documentation for your connector.
Additional resources
Ad hoc snapshot stop signals
You can request a connector to stop an in-progress ad hoc snapshot by creating a signal table entry with the stop-snapshot
signal type. After processing the signal, the connector will stop the current in-progress snapshot operation.
You can stop ad hoc snapshots for the following Debezium connectors:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
Column | Value |
---|---|
id |
|
type |
|
data |
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]} |
You must specify the type
of the signal. The data-collections
field is optional. Leave the data-collections
field blank to request the connector to stop all activity in the current snapshot. If you want the incremental snapshot to proceed, but you want to exclude specific collections from the snapshot, provide a comma-separated list of the names of the collections or regular expressions to exclude. After the connector processes the signal, the incremental snapshot proceeds, but it excludes data from the collections that you specify.
11.5.4.3. Incremental snapshots
Incremental snapshots are a specific type of ad hoc snapshot. In an incremental snapshot, the connector captures the baseline state of the tables that you specify, similar to an initial snapshot. However, unlike an initial snapshot, an incremental snapshot captures tables in chunks, rather than all at once. The connector uses a watermarking method to track the progress of the snapshot.
By capturing the initial state of the specified tables in chunks rather than in a single monolithic operation, incremental snapshots provide the following advantages over the initial snapshot process:
- While the connector captures the baseline state of the specified tables, streaming of near real-time events from the transaction log continues uninterrupted.
- If the incremental snapshot process is interrupted, it can be resumed from the point at which it stopped.
- You can initiate an incremental snapshot at any time.
Incremental snapshot pause signals
You can request a connector to pause an in-progress incremental snapshot by creating a signal table entry with the pause-snapshot
signal type. After processing the signal, the connector will stop pause current in-progress snapshot operation. Therefor it’s not possible to specify the data collection as the snapshot processing will be paused in position where it is in time of processing of the signal.
You can pause incremental snapshots for the following Debezium connectors:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
Column | Value |
---|---|
id |
|
type |
|
You must specify the type
of the signal. The data
field is ignored.
Incremental snapshot resume signals
You can request a connector to resume a paused incremental snapshot by creating a signal table entry with the resume-snapshot
signal type. After processing the signal, the connector will resume previously paused snapshot operation.
You can resume incremental snapshots for the following Debezium connectors:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
Column | Value |
---|---|
id |
|
type |
|
You must specify the type
of the signal. The data
field is ignored.
For more information about incremental snapshots, see the Snapshots topic in the documentation for your connector.
11.5.4.4. Blocking snapshot signals
You can request a connector to initiate an ad hoc blocking snapshot by creating a signal with the execute-snapshot
signal type and data.type
with value blocking
. After processing the signal, the connector runs the requested snapshot operation.
Unlike the initial snapshot that a connector runs after it first starts, an ad hoc blocking snapshot occurs during runtime, after the connector has stopped to stream change events from a database. You can initiate ad hoc blocking snapshots at any time.
Blocking snapshots are available for the following Debezium connectors:
- Db2
- MySQL
- Oracle
- PostgreSQL
- SQL Server
Column | Value |
---|---|
id |
|
type |
|
data |
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]} |
Key | Value |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"type": "blocking"} |
For more information about blocking snapshots, see the Snapshots topic in the documentation for your connector.
11.5.4.5. Defining a custom signal action
Custom actions enable you to extend the Debezium signaling framework to trigger actions that are not available in the default implementation. You can use a custom action with multiple connectors.
To define a custom signal action, you must define the following interface:
@FunctionalInterface public interface SignalAction<P extends Partition> { /** * @param signalPayload the content of the signal * @return true if the signal was processed */ boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException; }
The io.debezium.pipeline.signal.actions.SignalAction
exposes a single method with one parameter, which represents the message payloads sent through the signaling channel.
After you define a custom signaling action, use the following SPI interface to make the custom action available to the signaling mechanism: io.debezium.pipeline.signal.actions.SignalActionProvider
.
public interface SignalActionProvider { /** * Create a map of signal action where the key is the name of the action. * * @param dispatcher the event dispatcher instance * @param connectorConfig the connector config * @return a concrete action */ <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig); }
Your implementation must return a map of the signal action. Set the map key to the name of the action. The key is used as the type
of the signal.
11.5.4.6. Debezium core module dependencies
A custom actions Java project has compile dependencies on the Debezium core module. Include the following compile dependencies in your project’s pom.xml
file:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> 1
</dependency>
Declare your provider implementation in the META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider
file.
11.5.4.7. Deploying a custom signal action
Prerequisites
- You have a custom actions Java program.
Procedure
-
To use a custom action with a Debezium connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each Debezium connector that you want to use it with.
For example, in a typical deployment, the Debezium connector files are stored in subdirectories of a Kafka Connect directory (/kafka/connect
), with each connector JAR in its own subdirectory (/kafka/connect/debezium-connector-db2
,/kafka/connect/debezium-connector-mysql
, and so forth).
To use a custom action with multiple connectors, you must place a copy of the custom signaling channel JAR file in the subdirectory for each connector.