Chapter 4. Viewing change events


After deploying the Debezium MySQL connector, it starts capturing changes to the inventory database.

When the connector starts, it writes events to a set of Apache Kafka topics, each of which represents one of the tables in the MySQL database. The name of each topic begins with the name of the database server, dbserver1.

The connector writes to the following Kafka topics:

dbserver1
The schema change topic to which DDL statements that apply to the tables for which changes are being captured are written.
dbserver1.inventory.products
Receives change event records for the products table in the inventory database.
dbserver1.inventory.products_on_hand
Receives change event records for the products_on_hand table in the inventory database.
dbserver1.inventory.customers
Receives change event records for the customers table in the inventory database.
dbserver1.inventory.orders
Receives change event records for the orders table in the inventory database.

The remainder of this tutorial examines the dbserver1.inventory.customers Kafka topic. As you look more closely at the topic, you’ll see how it represents different types of change events, and find information about the connector captured each event.

The tutorial contains the following sections:

4.1. Viewing a create event

By viewing the dbserver1.inventory.customers topic, you can see how the MySQL connector captured create events in the inventory database. In this case, the create events capture new customers being added to the database.

Procedure

  1. Open a new terminal and use kafka-console-consumer to consume the dbserver1.inventory.customers topic from the beginning of the topic.

    This command runs a simple consumer (kafka-console-consumer.sh) in the Pod that is running Kafka (my-cluster-kafka-0):

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers

    The consumer returns four messages (in JSON format), one for each row in the customers table. Each message contains the event records for the corresponding table row.

    There are two JSON documents for each event: a key and a value. The key corresponds to the row’s primary key, and the value shows the details of the row (the fields that the row contains, the value of each field, and the type of operation that was performed on the row).

  2. For the last event, review the details of the key.

    Here are the details of the key of the last event (formatted for readability):

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }

    The event has two parts: a schema and a payload. The schema contains a Kafka Connect schema describing what is in the payload. In this case, the payload is a struct named dbserver1.inventory.customers.Key that is not optional and has one required field (id of type int32).

    The payload has a single id field, with a value of 1004.

    By reviewing the key of the event, you can see that this event applies to the row in the inventory.customers table whose id primary key column had a value of 1004.

  3. Review the details of the same event’s value.

    The event’s value shows that the row was created, and describes what it contains (in this case, the id, first_name, last_name, and email of the inserted row).

    Here are the details of the value of the last event (formatted for readability):

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ns"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "2.7.3.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691,
        "ts_us": 1486500577691547,
        "ts_ns": 1486500577691547930
      }
    }

    This portion of the event is much longer, but like the event’s key, it also has a schema and a payload. The schema contains a Kafka Connect schema named dbserver1.inventory.customers.Envelope (version 1) that can contain five fields:

    op
    A required field that contains a string value describing the type of operation. Values for the MySQL connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a snapshot).
    before
    An optional field that, if present, contains the state of the row before the event occurred. The structure will be described by the dbserver1.inventory.customers.Value Kafka Connect schema, which the dbserver1 connector uses for all rows in the inventory.customers table.
    after
    An optional field that, if present, contains the state of the row after the event occurred. The structure is described by the same dbserver1.inventory.customers.Value Kafka Connect schema used in before.
    source
    A required field that contains a structure describing the source metadata for the event, which in the case of MySQL, contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and, if available, the MySQL server ID, and the timestamp in seconds.
    ts_ms
    An optional field that, if present, contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
    Note

    The JSON representations of the events are much longer than the rows they describe. This is because, with every event key and value, Kafka Connect ships the schema that describes the payload. Over time, this structure may change. However, having the schemas for the key and the value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.

    The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. However, the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.

    The JSON converter includes the key and value schemas in every message, so it does produce very verbose events.

  4. Compare the event’s key and value schemas to the state of the inventory database. In the terminal that is running the MySQL command line client, run the following statement:

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)

    This shows that the event records you reviewed match the records in the database.

4.2. Updating the database and viewing the update event

Now that you have seen how the Debezium MySQL connector captured the create events in the inventory database, you will now change one of the records and see how the connector captures it.

By completing this procedure, you will learn how to find details about what changed in a database commit, and how you can compare change events to determine when the change occurred in relation to other changes.

Procedure

  1. In the terminal that is running the MySQL command line client, run the following statement:

    mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
    Query OK, 1 row affected (0.05 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
  2. View the updated customers table:

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne Marie | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
  3. Switch to the terminal running kafka-console-consumer to see a new fifth event.

    By changing a record in the customers table, the Debezium MySQL connector generated a new event. You should see two new JSON documents: one for the event’s key, and one for the new event’s value.

    Here are the details of the key for the update event (formatted for readability):

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }

    This key is the same as the key for the previous events.

    Here is that new event’s value. There are no changes in the schema section, so only the payload section is shown (formatted for readability):

    {
      "schema": {...},
      "payload": {
        "before": {  1
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": {  2
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {  3
          "name": "2.7.3.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501486,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 364,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "u",  4
        "ts_ms": 1486501486308,  5
        "ts_us": 1486501486308910,  6
        "ts_ns": 1486501486308910814  7
      }
    }
    Table 4.1. Descriptions of fields in the payload of an update event value
    ItemDescription

    1

    The before field shows the values present in the row before the database commit. The original first_name value is Anne.

    2

    The after field shows the state of the row after the change event. The first_name value is now Anne Marie.

    3

    The source field structure has many of the same values as before, except that the ts_sec and pos fields have changed (the file might have changed in other circumstances).

    4

    The op field value is now u, signifying that this row changed because of an update.

    5

    The ts_ms, ts_us, ts_ns field shows a timestamp that indicates when Debezium processed this event.

    By viewing the payload section, you can learn several important things about the update event:

    • By comparing the before and after structures, you can determine what actually changed in the affected row because of the commit.
    • By reviewing the source structure, you can find information about MySQL’s record of the change (providing traceability).
    • By comparing the payload section of an event to other events in the same topic (or a different topic), you can determine whether the event occurred before, after, or as part of the same MySQL commit as another event.

4.3. Deleting a record in the database and viewing the delete event

Now that you have seen how the Debezium MySQL connector captured the create and update events in the inventory database, you will now delete one of the records and see how the connector captures it.

By completing this procedure, you will learn how to find details about delete events, and how Kafka uses log compaction to reduce the number of delete events while still enabling consumers to get all of the events.

Procedure

  1. In the terminal that is running the MySQL command line client, run the following statement:

    mysql> DELETE FROM customers WHERE id=1004;
    Query OK, 1 row affected (0.00 sec)
    Note

    If the above command fails with a foreign key constraint violation, then you must remove the reference of the customer address from the addresses table using the following statement:

    mysql> DELETE FROM addresses WHERE customer_id=1004;
  2. Switch to the terminal running kafka-console-consumer to see two new events.

    By deleting a row in the customers table, the Debezium MySQL connector generated two new events.

  3. Review the key and value for the first new event.

    Here are the details of the key for the first new event (formatted for readability):

    {
      "schema": {
        "type": "struct",
        "name": "dbserver1.inventory.customers.Key"
        "optional": false,
        "fields": [
          {
            "field": "id",
            "type": "int32",
            "optional": false
          }
        ]
      },
      "payload": {
        "id": 1004
      }
    }

    This key is the same as the key in the previous two events you looked at.

    Here is the value of the first new event (formatted for readability):

    {
      "schema": {...},
      "payload": {
        "before": {  1
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": null,  2
        "source": {  3
          "name": "2.7.3.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501558,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 725,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "d",  4
        "ts_ms": 1486501558315,  5
        "ts_us": 1486501558315901,  6
        "ts_ns": 1486501558315901687  7
      }
    }
    Table 4.2. Descriptions of fields in an event value
    ItemDescription

    1

    The before field now has the state of the row that was deleted with the database commit.

    2

    The after field is null because the row no longer exists.

    3

    The source field structure has many of the same values as before, except that the values of the ts_sec and pos fields have changed. In some circumstances, the file value might also change.

    4

    he op field value is now d, indicating that the record was deleted.

    5

    The ts_ms, ts_us, and ts_ns fields show timestamps that indicate when Debezium processed the event.

    Thus, this event provides a consumer with the information that it needs to process the removal of the row. The old values are also provided, because some consumers might require them to properly handle the removal.

  4. Review the key and value for the second new event.

    Here is the key for the second new event (formatted for readability):

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }

    Once again, this key is exactly the same key as in the previous three events you looked at.

    Here is the value of that same event (formatted for readability):

    {
      "schema": null,
      "payload": null
    }

    If Kafka is set up to be log compacted, it will remove older messages from the topic if there is at least one message later in the topic with same key. This last event is called a tombstone event, because it has a key and an empty value. This means that Kafka will remove all prior messages with the same key. Even though the prior messages will be removed, the tombstone event means that consumers can still read the topic from the beginning and not miss any events.

4.4. Restarting the Kafka Connect service

Now that you have seen how the Debezium MySQL connector captures create, update, and delete events, you will now see how it can capture change events even when it is not running.

The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.

In this procedure, you will stop Kafka Connect, change some data in the database, and then restart Kafka Connect to see the change events.

Procedure

  1. Stop the Kafka Connect service.

    1. Open the configuration for the Kafka Connect deployment:

      $ oc edit deployment/my-connect-cluster-connect

      The deployment configuration opens:

      apiVersion: apps.openshift.io/v1
      kind: Deployment
      metadata:
        ...
      spec:
        replicas: 1
      ...
    2. Change the spec.replicas value to 0.
    3. Save the configuration.
    4. Verify that the Kafka Connect service has stopped.

      This command shows that the Kafka Connect service is completed, and that no pods are running:

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-1-dxcs9            0/1     Completed   0          7h
  2. While the Kafka Connect service is down, switch to the terminal running the MySQL client, and add a new record to the database.

    mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
  3. Restart the Kafka Connect service.

    1. Open the deployment configuration for the Kafka Connect service.

      $ oc edit deployment/my-connect-cluster-connect

      The deployment configuration opens:

      apiVersion: apps.openshift.io/v1
      kind: Deployment
      metadata:
        ...
      spec:
        replicas: 0
      ...
    2. Change the spec.replicas value to 1.
    3. Save the deployment configuration.
    4. Verify that the Kafka Connect service has restarted.

      This command shows that the Kafka Connect service is running, and that the pod is ready:

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-2-q9kkl            1/1     Running     0          74s
  4. Switch to the terminal that is running kafka-console-consumer.sh. New events pop up as they arrive.
  5. Examine the record that you created when Kafka Connect was offline (formatted for readability):

    {
      ...
      "payload":{
        "id":1005
      }
    }
    {
      ...
      "payload":{
        "before":null,
        "after":{
           "id":1005,
           "first_name":"Sarah",
           "last_name":"Thompson",
           "email":"kitt@acme.com"
        },
        "source":{
           "version":"2.7.3.Final",
           "connector":"mysql",
           "name":"dbserver1",
           "ts_ms":1582581502000,
           "snapshot":"false",
           "db":"inventory",
           "table":"customers",
           "server_id":223344,
           "gtid":null,
           "file":"mysql-bin.000004",
           "pos":364,
           "row":0,
           "thread":5,
           "query":null
        },
        "op":"c",
        "ts_ms":1582581502317
      }
    }
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.