Chapter 11. Configuring Debezium connectors for your application


When the default Debezium connector behavior is not right for your application, you can use the following Debezium features to configure the behavior you need.

Kafka Connect automatic topic creation
Enables Connect to create topics at runtime, and apply configuration settings to those topics based on their names.
Avro serialization
Support for configuring Debezium PostgreSQL, MongoDB, or SQL Server connectors to use Avro to serialize message keys and value, making it easier for change event record consumers to adapt to a changing record schema.
CloudEvents converter
Enables a Debezium connector to emit change event records that conform to the CloudEvents specification.
Sending signals to a Debezium connector
Provides a way to modify the behavior of a connector, or trigger an action, such as initiating an ad hoc snapshot.

11.1. Customization of Kafka Connect automatic topic creation

Kafka provides two mechanisms for creating topics automatically. You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics. The Kafka broker uses the auto.create.topics.enable property to control automatic topic creation. In Kafka Connect, the topic.creation.enable property specifies whether Kafka Connect is permitted to create topics. In both cases, the default settings for the properties enables automatic topic creation.

When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.

Differences between automatic topic creation at the broker and in Kafka Connect

Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.

The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.

See the following topics for more information:

11.1.1. Disabling automatic topic creation for the Kafka broker

By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.

Procedure

  • In the broker configuration, set the value of auto.create.topics.enable to false.

11.1.2. Configuring automatic topic creation in Kafka Connect

Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable property. The default value for the property is true, enabling automatic topic creation, as shown in the following example:

topic.creation.enable = true

The setting for the topic.creation.enable property applies to all workers in the Connect cluster.

Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.

For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.

By default, topics that Kafka Connect creates are named based on the pattern server.schema.table, for example, dbserver.myschema.inventory.

Procedure

  • To prevent Kafka Connect from creating topics automatically, set the value of topic.creation.enable to false in the Kafka Connect custom resource, as in the following example:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster

...

spec:
  config:
    topic.creation.enable: "false"
Note

Kafka Connect automatic topic creation requires the replication.factor and partitions properties to be set for at least the default topic creation group. It is valid for groups to obtain the values for the required properties from the default values for the Kafka broker.

11.1.3. Configuration of automatically created topics

For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.

11.1.3.1. Topic creation groups

A set of topic properties is associated with a topic creation group. Minimally, you must define a default topic creation group and specify its configuration properties. Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.

When you create custom topic creation groups, you define the member topics for each group based on topic name patterns. You can specify naming patterns that describe the topics to include or exclude from each group. The include and exclude properties contain comma-separated lists of regular expressions that define topic name patterns. For example, if you want a group to include all topics that start with the string dbserver1.inventory, set the value of its topic.creation.inventory.include property to dbserver1\\.inventory\\.*.

Note

If you specify both include and exclude properties for a custom topic group, the exclusion rules take precedence, and override the inclusion rules.

11.1.3.2. Topic creation group configuration properties

The default topic creation group and each custom group is associated with a unique set of configuration properties. You can configure a group to include any of the Kafka topic-level configuration properties. For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group. You must define at least a minimum set of properties to describe the configuration of the topics to be created.

If no custom groups are registered, or if the include patterns for any registered groups don’t match the names of any topics to be created, then Kafka Connect uses the configuration of the default group to create topics.

For general information about configuring topics, see Kafka topic creation recommendations in Installing Debezium on OpenShift.

11.1.3.3. Specifying the configuration for the Debezium default topic creation group

Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it. The configuration for the default topic creation group is applied to any topics with names that do not match the include list pattern of a custom topic creation group.

Prerequisites

  • In the Kafka Connect custom resource, the use-connector-resources value in metadata.annotations specifies that the cluster Operator uses KafkaConnector custom resources to configure connectors in the cluster. For example:

     ...
        metadata:
          name: my-connect-cluster
          annotations: strimzi.io/use-connector-resources: "true"
     ...

Procedure

  • To define properties for the topic.creation.default group, add them to spec.config in the connector custom resource, as shown in the following example:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
    ...
    
       config:
    ...
         topic.creation.default.replication.factor: 3  1
         topic.creation.default.partitions: 10  2
         topic.creation.default.cleanup.policy: compact  3
         topic.creation.default.compression.type: lz4  4
    ...

    You can include any Kafka topic-level configuration property in the configuration for the default group.

Table 11.1. Connector configuration for the default topic creation group
ItemDescription

1

topic.creation.default.replication.factor defines the replication factor for topics created by the default group.
replication.factor is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

2

topic.creation.default.partitions defines the number of partitions for topics created by the default group.
partitions is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

3

topic.creation.default.cleanup.policy is mapped to the cleanup.policy property of the topic level configuration parameters and defines the log retention policy.

4

topic.creation.default.compression.type is mapped to the compression.type property of the topic level configuration parameters and defines how messages are compressed on hard disk.

Note

Custom groups fall back to the default group settings only for the required replication.factor and partitions properties. If the configuration for a custom topic group leaves other properties undefined, the values specified in the default group are not applied.

11.1.3.4. Specifying the configuration for Debezium custom topic creation groups

You can define multiple custom topic groups, each with its own configuration.

Procedure

  • To define a custom topic group, add a topic.creation.<group_name>.include property to spec.config in the connector custom resource, followed by the configuration properties that you want to apply to topics in the custom group.

    The following example shows an excerpt of a custom resource that defines the custom topic creation groups inventory and applicationlogs:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
    ... 1
        topic.creation.inventory.include: dbserver1\\.inventory\\.*  2
        topic.creation.inventory.partitions: 20
        topic.creation.inventory.cleanup.policy: compact
        topic.creation.inventory.delete.retention.ms: 7776000000
    
        3
        topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4
        topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*  5
        topic.creation.applicationlogs.replication.factor: 1
        topic.creation.applicationlogs.partitions: 20
        topic.creation.applicationlogs.cleanup.policy: delete
        topic.creation.applicationlogs.retention.ms: 7776000000
        topic.creation.applicationlogs.compression.type: lz4
    ...
    ...
Table 11.2. Connector configuration for custom inventory and applicationlogs topic creation groups
ItemDescription

1

Defines the configuration for the inventory group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

2

topic.creation.inventory.include defines a regular expression to match all topics that start with dbserver1.inventory.. The configuration that is defined for the inventory group is applied only to topics with names that match the specified regular expression.

3

Defines the configuration for the applicationlogs group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

4

topic.creation.applicationlogs.include defines a regular expression to match all topics that start with dbserver1.logs.applog-. The configuration that is defined for the applicationlogs group is applied only to topics with names that match the specified regular expression. Because an exclude property is also defined for this group, the topics that match the include regular expression might be further restricted by the that exclude property.

5

topic.creation.applicationlogs.exclude defines a regular expression to match all topics that start with dbserver1.logs.applog-old-. The configuration that is defined for the applicationlogs group is applied only to topics with name that do not match the given regular expression. Because an include property is also defined for this group, the configuration of the applicationlogs group is applied only to topics with names that match the specified include regular expressions and that do not match the specified exclude regular expressions.

11.1.3.5. Registering Debezium custom topic creation groups

After you specify the configuration for any custom topic creation groups, register the groups.

Procedure

  • Register custom groups by adding the topic.creation.groups property to the connector custom resource, and specifying a comma-separated list of custom topic creation groups.

    The following excerpt from a connector custom resource registers the custom topic creation groups inventory and applicationlogs:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
         topic.creation.groups: inventory,applicationlogs
    
    ...

Completed configuration

The following example shows a completed configuration that includes the configuration for a default topic group, along with the configurations for an inventory and an applicationlogs custom topic creation group:

Example: Configuration for a default topic creation group and two custom groups

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: inventory-connector
...
spec:
...

   config:
...
    topic.creation.default.replication.factor: 3,
    topic.creation.default.partitions: 10,
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: lz4
    topic.creation.groups: inventory,applicationlogs
    topic.creation.inventory.include: dbserver1\\.inventory\\.*
    topic.creation.inventory.partitions: 20
    topic.creation.inventory.cleanup.policy: compact
    topic.creation.inventory.delete.retention.ms: 7776000000
    topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.*
    topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*
    topic.creation.applicationlogs.replication.factor: 1
    topic.creation.applicationlogs.partitions: 20
    topic.creation.applicationlogs.cleanup.policy: delete
    topic.creation.applicationlogs.retention.ms: 7776000000
    topic.creation.applicationlogs.compression.type: lz4
...

11.2. Configuring Debezium connectors to use Avro serialization

A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector completes the following actions:

  1. Applies configured transformations.
  2. Serializes the record key and value into a binary form by using the configured Kafka Connect converters.
  3. Writes the record to the correct Kafka topic.

You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. The Getting Started with Debezium guide shows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties to false:

  • key.converter.schemas.enable
  • value.converter.schemas.enable

Setting these properties to false excludes the verbose schema information from each record.

Alternatively, you can serialize the record keys and values by using Apache Avro. The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.

To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the Red Hat Integration - Service Registry documentation.

11.2.1. About the Service Registry

Red Hat Integration - Service Registry provides the following components that work with Avro:

  • An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.
  • An API and schema registry that tracks:

    • Avro schemas that are used in Kafka topics.
    • Where the Avro converter sends the generated Avro schemas.

    Because the Avro schemas are stored in this registry, each record needs to contain only a tiny schema identifier. This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.

  • Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.

To use the Service Registry with Debezium, add Service Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.

Note

The Service Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.

Note

To use converters provided by Service Registry you need to provide apicurio.registry.url.

11.2.2. Overview of deploying a Debezium connector that uses Avro serialization

To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:

  1. Deploy a Red Hat Integration - Service Registry instance by following the instructions in Installing and deploying Service Registry on OpenShift.
  2. Install the Avro converter by downloading the Debezium Service Registry Kafka Connect zip file and extracting it into the Debezium connector’s directory.
  3. Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    key.converter.apicurio.registry.auto-register=true
    key.converter.apicurio.registry.find-latest=true
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    value.converter.apicurio.registry.auto-register=true
    value.converter.apicurio.registry.find-latest=true

Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.

11.2.3. Deploying connectors that use Avro in Debezium containers

In your environment, you might want to use a provided Debezium container to deploy Debezium connectors that use Avro serialization. Complete the following procedure to build a custom Kafka Connect container image for Debezium, and configure the Debezium connector to use the Avro converter.

Prerequisites

  • You have Docker installed and sufficient rights to create and manage containers.
  • You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.

Procedure

  1. Deploy an instance of Service Registry. See Installing and deploying Service Registry on OpenShift, which provides instructions for:

    • Installing Service Registry
    • Installing AMQ Streams
    • Setting up AMQ Streams storage
  2. Extract the Debezium connector archives to create a directory structure for the connector plug-ins. If you downloaded and extracted the archives for multiple Debezium connectors, the resulting directory structure looks like the one in the following example:

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. Add the Avro converter to the directory that contains the Debezium connector that you want to configure to use Avro serialization:

    1. Go to the Red Hat Integration download site and download the Service Registry Kafka Connect zip file.
    2. Extract the archive into the desired Debezium connector directory.

    To configure more than one type of Debezium connector to use Avro serialization, extract the archive into the directory for each relevant connector type. Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.

  4. Create and publish a custom image for running Debezium connectors that are configured to use the Avro converter:

    1. Create a new Dockerfile by using registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0 as the base image. In the following example, replace my-plugins with the name of your plug-ins directory:

      FROM registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the /opt/kafka/plugins directory.

    2. Build the docker container image. For example, if you saved the docker file that you created in the previous step as debezium-container-with-avro, then you would run the following command:

      docker build -t debezium-container-with-avro:latest

    3. Push your custom image to your container registry, for example:

      docker push <myregistry.io>/debezium-container-with-avro:latest

    4. Point to the new container image. Do one of the following:

      • Edit the KafkaConnect.spec.image property of the KafkaConnect custom resource. If set, this property overrides the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable in the Cluster Operator. For example:

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • In the install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml file, edit the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
  5. Deploy each Debezium connector that is configured to use the Avro converter. For each Debezium connector:

    1. Create a Debezium connector instance. The following inventory-connector.yaml file example creates a KafkaConnector custom resource that defines a MySQL connector instance that is configured to use the Avro converter:

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          database.server.name: dbserver1
          database.include.list: inventory
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          database.history.kafka.topic: schema-changes.inventory
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    2. Apply the connector instance, for example:

      oc apply -f inventory-connector.yaml

      This registers inventory-connector and the connector starts to run against the inventory database.

  6. Verify that the connector was created and has started to track changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example, inventory-connector starts.

    1. Display the Kafka Connect log output:

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      Taking the snapshot involves a number of steps:

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      After completing the snapshot, Debezium begins tracking changes in, for example, the inventory database’s binlog for change events:

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...

11.2.4. About Avro name requirements

As stated in the Avro documentation, names must adhere to the following rules:

  • Start with [A-Za-z_]
  • Subsequently contains only [A-Za-z0-9_] characters

Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property, sanitize.field.names that you can set to true if you have columns that do not adhere to Avro rules for names. Setting sanitize.field.names to true allows serialization of non-conformant fields without having to actually modify your schema.

11.3. Emitting Debezium change event records in CloudEvents format

CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a MongoDB, MySQL, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.

Important

Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

The CloudEvents specification defines:

  • A set of standardized event attributes
  • Rules for defining custom attributes
  • Encoding rules for mapping event formats to serialized representations such as JSON or Avro
  • Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP

To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter, which is a Kafka Connect message converter.

Currently, only structured mapping mode is supported. The CloudEvents change event envelope can be JSON or Avro and each envelope type supports JSON or Avro as the data format. It is expected that a future Debezium release will support binary mapping mode.

Information about emitting change events in CloudEvents format is organized as follows:

For information about using Avro, see:

11.3.1. Example Debezium change event records in CloudEvents format

The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.

{
  "id" : "name:test_server;lsn:29274832;txId:565",   1
  "source" : "/debezium/postgresql/test_server",     2
  "specversion" : "1.0",                             3
  "type" : "io.debezium.postgresql.datachangeevent", 4
  "time" : "2020-01-13T13:55:39.738Z",               5
  "datacontenttype" : "application/json",            6
  "iodebeziumop" : "r",                              7
  "iodebeziumversion" : "1.7.2.Final",        8
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "565",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                           9
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                         10
    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
1 1 1
Unique ID that the connector generates for the change event based on the change event’s content.
2 2 2
The source of the event, which is the logical name of the database as specified by the database.server.name property in the connector’s configuration.
3 3 3
The CloudEvents specification version.
4 4 4
Connector type that generated the change event. The format of this field is io.debezium.CONNECTOR_TYPE.datachangeevent. The value of CONNECTOR_TYPE is mongodb, mysql, postgresql, or sqlserver.
5 5
Time of the change in the source database.
6
Describes the content type of the data attribute, which is JSON in this example. The only alternative is Avro.
7
An operation identifier. Possible values are r for read, c for create, u for update, or d for delete.
8
All source attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using the iodebezium prefix for the attribute name.
9
When enabled in the connector, each transaction attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using the iodebeziumtx prefix for the attribute name.
10
The actual data change itself. Depending on the operation and the connector, the data might contain before, after and/or patch fields.

The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.postgresql.datachangeevent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",            1
  "dataschema" : "http://my-registry/schemas/ids/1", 2
  "iodebeziumop" : "r",
  "iodebeziumversion" : "1.7.2.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                        3
}
1
Indicates that the data attribute contains Avro binary data.
2
URI of the schema to which the Avro data adheres.
3
The data attribute contains base64-encoded Avro binary data.

It is also possible to use Avro for the envelope as well as the data attribute.

11.3.2. Example of configuring Debezium CloudEvents converter

Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:

  • Use JSON as the envelope.
  • Use the schema registry at http://my-registry/schemas/ids/1 to serialize the data attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",          1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
1
Specifying the serializer.type is optional, because json is the default.

The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.

11.3.3. Debezium CloudEvents converter configuration options

When you configure a Debezium connector to use the CloudEvent converter you can specify the following options.

Table 11.3. Descriptions of CloudEvents converter configuration options

Option

Default

Description

serializer.type

json

The encoding type to use for the CloudEvents envelope structure. The value can be json or avro.

data.serializer.type

json

The encoding type to use for the data attribute. The value can be json or avro.

json. ...

N/A

Any configuration options to be passed through to the underlying converter when using JSON. The json. prefix is removed.

avro. ...

N/A

Any configuration options to be passed through to the underlying converter when using Avro. The avro. prefix is removed. For example, for Avro data, you would specify the avro.schema.registry.url option.

11.4. Sending signals to a Debezium connector

Important

Signaling is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

The Debezium signaling mechanism provides a way to modify the behavior of a connector, or to trigger a one-time action, such as initiating an ad hoc snapshot of a table. To trigger a connector to perform a specified action, you issue a SQL command to add a signal message to a specialized signaling table, also referred to as a signaling data collection. The signaling table, which you create on the source database, is designated exclusively for communicating with Debezium. When Debezium detects that a new logging record or ad hoc snapshot record is added to the signaling table, it reads the signal, and initiates the requested operation.

Signaling is available for use with the following Debezium connectors:

  • Db2
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

11.4.1. Enabling Debezium signaling

By default, the Debezium signaling mechanism is disabled. You must explicitly enable signaling for each connector that you want to use it with.

Procedure

  1. On the source database, create a signaling data collection table for sending signals to the connector. For information about the required structure of the signaling data collection, see Structure of a signaling data collection.
  2. For source databases such as Db2 or SQL Server that implement a native change data capture (CDC) mechanism, enable CDC for the signaling table.
  3. Add the name of the signaling data collection to the Debezium connector configuration.
    In the connector configuration, add the property signal.data.collection, and set its value to the fully-qualified name of the signaling data collection that you created in Step 1.

    For example, signal.data.collection = inventory.debezium_signals.

    The format for the fully-qualified name of the signaling collection depends on the connector.
    The following example shows the naming formats to use for each connector:

    Db2
    <schemaName>.<tableName>
    MySQL
    <databaseName>.<tableName>
    PostgreSQL
    <schemaName>.<tableName>
    SQL Server
    <databaseName>.<schemaName>.<tableName>

    For more information about setting the signal.data.collection property, see the table of configuration properties for your connector.
  4. Add the signaling table to the list of tables to monitor.
    In the configuration for the Debezium connector, add the name of the data collection that you created in Step 1 to the table.include.list property.

    For more information about the table.include.list property, see the table of configuration properties for your connector.

11.4.1.1. Required structure of a Debezium signaling data collection

A signaling data collection, or signaling table, stores signals that you send to a connector to trigger a specified operation. The structure of the signaling table must conform to the following standard format.

  • Contains three fields (columns).
  • Fields are arranged in a specific order, as shown in Table 1.
Table 11.4. Required structure of a signaling data collection
FieldTypeDescription

id
(required)

string

An arbitrary unique string that identifies a signal instance.
You assign an id to each signal that you submit to the signaling table.
Typically the ID is a UUID string.
You can use signal instances for logging, debugging, or de-duplication.
When a signal triggers Debezium to perform an incremental snapshot, it generates a signal message with an arbitrary id string. The id string that the generated message contains is unrelated to the id string in the submitted signal.

type
(required)

string

Specifies the type of signal to send.
You can use some signal types with any connector for which signaling is available, while other signal types are available for specific connectors only.

data
(optional)

string

Specifies JSON-formatted parameters to pass to a signal action.
Each signal type requires a specific set of data.

Note

The field names in a data collection are arbitrary. The preceding table provides suggested names. If you use a different naming convention, ensure that the values in each field are consistent with the expected content.

11.4.1.2. Creating a Debezium signaling data collection

You create a signaling table by submitting a standard SQL DDL query to the source database.

Prerequisites

  • You have sufficient access privileges to create a table on the target database.

Procedure

  • Submit a SQL query to the source database to create a table that is consistent with the required structure, as shown in the following example:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
Note

The amount of space that you allocate to the VARCHAR parameter of the id variable must be sufficient to accommodate the size of the ID strings of signals sent to the signaling table.
If the size of an ID exceeds the available space, the connector cannot process the signal.

The following example shows a CREATE TABLE command that creates a three-column debezium_signal table:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

11.4.2. Types of Debezium signal actions

You can use signaling to initiate the following actions:

Some signals are not compatible with all connectors.

11.4.2.1. Logging signals

You can request a connector to add an entry to the log by creating a signaling table entry with the log signal type. After processing the signal, the connector prints the specified message to the log. Optionally, you can configure the signal so that the resulting message includes the streaming coordinates.

Table 11.5. Example of a signaling record for adding a log message
ColumnValueDescription

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

 

type

log

The action type of the signal.

data

{"message": "Signal message at offset {}"}

The message parameter specifies the string to print to the log.
If you add a placeholder ({}) to the message, it is replaced with streaming coordinates.

11.4.2.2. Ad hoc snapshot signals

You can request a connector to initiate an ad hoc snapshot by creating a signaling table entry with the execute-snapshot signal type. After processing the signal, the connector runs the requested snapshot operation.

Unlike the initial snapshot that a connector runs after it first starts, an ad hoc snapshot occurs during runtime, after the connector has already begun to stream change events from a database. You can initiate ad hoc snapshots at any time.

Ad hoc snapshots are available for the following Debezium connectors:

  • Db2
  • MySQL
  • PostgreSQL
  • SQL Server
Table 11.6. Example of an ad hoc snapshot signal record
ColumnValue

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

Currently, the execute-snapshot action triggers incremental snapshots only.

For more information about ad hoc snapshots, see the Snapshots topic in the documentation for your connector.

11.4.2.3. Incremental snapshots

Incremental snapshots are a specific type of ad hoc snapshot. In an incremental snapshot, the connector captures the baseline state of the tables that you specify, similar to an initial snapshot. However, unlike an initial snapshot, an incremental snapshot captures tables in chunks, rather than all at once. The connector uses a watermarking method to track the progress of the snapshot.

By capturing the initial state of the specified tables in chunks rather than in a single monolithic operation, incremental snapshots provide the following advantages over the initial snapshot process:

  • While the connector captures the baseline state of the specified tables, streaming of near real-time events from the transaction log continues uninterrupted.
  • If the incremental snapshot process is interrupted, it can be resumed from the point at which it stopped.
  • You can initiate an incremental snapshot at any time.

For more information about incremental snapshots, see the Snapshots topic in the documentation for your connector.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.