Dieser Inhalt ist in der von Ihnen ausgewählten Sprache nicht verfügbar.

Chapter 8. Integrating external systems with Debezium


Debezium provides a set of optional integrations that can enrich, standardize, or distribute the change events that it emits and adapts them for consumption by external systems and platforms.

Integrations operate after the connector has produced an event, but before the event is delivered to target downstream systems. By enabling an integration, you can adapt Debezium change data to meet the conventions, protocols, or observability requirements of downstream platforms.

Integrations differ from single‑message transformations (SMTs) and post‑processors. SMTs modify Kafka messages as they pass through Kafka Connect, and post‑processors act on messages from within the Debezium context, before they are handed off to the messaging runtime. By contrast, integrations produce additional event formats or metadata that conform to the standards of a target downstream system. Integrations enhance Debezium events so that can be consumed more easily by tools, platforms, and services that rely on common interfaces.

Each integration is optional and you can enable and configure it independently. Depending on your environment, you can activate one or more integrations to support standardized event formats, improve metadata visibility, or provide compatibility with external observability and governance tools. You can combine integrations with SMTs and post‑processors, to adapt change events to suit the needs of specific consumers across your data ecosystem.

Red Hat build of Debezium provides the following integrations:

CloudEvents converter (Technology Preview)
Enables a Debezium connector to emit change event records that conform to the CloudEvents specification. The converter modifies Debezium change events to comply with the CloudEvents specification format. The coverted events provide greater interoperability with event‑driven platforms, serverless environments, and systems that are designed for CloudEvents‑compliant payloads.
OpenLineage integration (Technology Preview)
Emits OpenLineage‑compatible metadata that describes the flow of change events through Debezium to enable lineage‑aware platforms to automatically trace the movement of data and assess downstream impact as source data evolves.

8.1. Emitting Debezium change event records in CloudEvents format

CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a Db2, MongoDB, MySQL, Oracle, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.

Important

Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.

The CloudEvents specification defines:

  • A set of standardized event attributes
  • Rules for defining custom attributes
  • Encoding rules for mapping event formats to serialized representations such as JSON or Apache Avro
  • Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP

To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter, which is a Kafka Connect message converter.

Currently, only structured mapping mode can be used. The CloudEvents change event envelope can be JSON or Avro, and you can use JSON or Avro as the data format for each envelope type. Information about emitting change events in CloudEvents format is organized as follows:

For information about using Avro, see:

8.1.1. Example Debezium change event records in CloudEvents format

The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.

{
  "id" : "name:test_server;lsn:29274832;txId:565",  
1

  "source" : "/debezium/postgresql/test_server",    
2

  "specversion" : "1.0",                            
3

  "type" : "io.debezium.connector.postgresql.DataChangeEvent", 
4

  "time" : "2020-01-13T13:55:39.738Z",             
5

  "datacontenttype" : "application/json",          
6

  "iodebeziumop" : "r",                            
7

  "iodebeziumversion" : "3.4.3.Final",      
8

  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                       
9

  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                       
10

    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
Expand
Table 8.1. Descriptions of fields in a CloudEvents change event record
ItemDescription

1

Unique ID that the connector generates for the change event based on the change event’s content.

2

The source of the event, which is the logical name of the database as specified by the topic.prefix property in the connector’s configuration.

3

The CloudEvents specification version.

4

Connector type that generated the change event. The format of this field is io.debezium.connector.CONNECTOR_TYPE.DataChangeEvent.
Valid values for CONNECTOR_TYPE are db2, mongodb, mysql, oracle, postgresql, or sqlserver.

5

Time of the change in the source database.

6

Describes the content type of the data attribute. Possible values are json, as in this example, or avro.

7

An operation identifier. Possible values are r for read, c for create, u for update, or d for delete.

8

All source attributes that are known from Debezium change events are mapped to CloudEvents extension attributes by using the iodebezium prefix for the attribute name.

9

When enabled in the connector, each transaction attribute that is known from Debezium change events is mapped to a CloudEvents extension attribute by using the iodebeziumtx prefix for the attribute name.

10

The actual data change. Depending on the operation and the connector, the data might contain before, after, or patch fields.

The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.connector.postgresql.DataChangeEvent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",          
1

  "dataschema" : "http://my-registry/schemas/ids/1", 
2

  "iodebeziumop" : "r",
  "iodebeziumversion" : "3.4.3.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                    
3

}
Expand
Table 8.2. Descriptions of fields in a CloudEvents event record for a connector that uses Avro to format data
ItemDescription

1

Indicates that the data attribute contains Avro binary data.

2

URI of the schema to which the Avro data adheres.

3

The data attribute contains base64-encoded Avro binary data.

It is also possible to use Avro for the envelope as well as the data attribute.

8.1.2. Example of configuring Debezium CloudEvents converter

Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:

  • Use JSON as the envelope.
  • Use the schema registry at http://my-registry/schemas/ids/1 to serialize the data attribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",        
1

"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
Expand
Table 8.3. Description of fields in CloudEvents converter configuration
ItemDescription

1

Specifying the serializer.type is optional, because json is the default.

The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.

8.1.3. Configuration of sources of metadata and some CloudEvents fields

By default, the metadata.source property consists of five parts, as seen in the following example:

"value,id:generate,type:generate,traceparent:header,dataSchemaName:generate"

The first part specifies the source for retrieving a record’s metadata; the permitted values are value and header. The next parts specify how the converter populates values for the following CloudEvents fields and data schema name:

  • id
  • type
  • traceparent
  • dataSchemaName (the name under which the schema is registered in the Schema Registry)

The converter can use one of the following methods to populate each field:

generate
The converter generates a value for the field.
header
The converter obtain values for the field from a message header.
Note

Value for the traceparent CloudEvents field can be only retrieved from a header.

Obtaining record metadata

To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record’s value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers.

Procedure

  1. Implement a mechanism for recording the metadata in the record’s headers before the record reaches the converter, for example, by using the HeaderFrom SMT.
  2. Set the value of the converter’s metadata.source property to header.

The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the HeaderFrom SMT:

...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
Note

To use the HeaderFrom transformation, it might be necessary to filter tombstone and heartbeat messages.

The header value of the metadata.source property is a global setting. As a result, even if you omit parts of a property’s value, such as the id and type sources, the converter generates header values for the omitted parts.

Obtaining CloudEvent metadata

By default, the CloudEvents converter automatically generates values for the id and type fields of a CloudEvent, and generates the schema name for its data field. traceparent CloudEvents field is only included in a message if opentelemetry.tracing.attributes.enable setting is true. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields' values in the appropriate headers. For example:

"value.converter.metadata.source": "value,id:header,type:header,traceparent:header,dataSchemaName:header"

With the preceding configuration in effect, you could configure upstream functions to add id, type, traceparent, and dataSchemaName headers with the values that you want to pass to the CloudEvents converter.

If you want to provide values only for id header, use:

"value.converter.metadata.source": "value,id:header,type:generate,traceparent:header,dataSchemaName:generate"

To configure the converter to obtain id, type, traceparent, and dataSchemaName metadata from headers, use the following short syntax:

"value.converter.metadata.source": "header"

To enable the converter to retrieve the data schema name from a header, you must set schema.data.name.source.header.enable to true.

8.1.4. Debezium CloudEvents converter configuration options

When you configure a Debezium connector to use the CloudEvent converter, you can specify the following options:

Expand
Table 8.4. Descriptions of CloudEvents converter configuration options

Option

Default

Description

serializer.type

json

The encoding type to use for the CloudEvents envelope structure. The value can be json or avro.

data.serializer.type

json

The encoding type to use for the data attribute. The value can be json or avro.

json. ...

N/A

Any configuration options to be passed through to the underlying converter when using JSON. The json. prefix is removed.

avro. ...

N/A

Any configuration options to be passed through to the underlying converter when using Avro. The avro. prefix is removed. For example, for Avro data, you would specify the avro.schema.registry.url option.

schema.name.adjustment.mode

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be none or avro.

schema.cloudevents.name

none

Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when serializer.type is json in which case a record’s value is schemaless. If this property is not specified, the default algorithm is used to generate the schema name: ${serverName}.${databaseName}.CloudEvents.Envelope.

schema.data.name.source.header.enable

false

Specifies whether the converter can retrieve the schema name of the CloudEvents data field from a header. The schema name is obtained from the dataSchemaName parameter that is specified in the metadata.source property.

opentelemetry.tracing.attributes.enable

false

Specifies whether the converter includes OpenTelemetry tracing attributes when it generates a cloud event. The value can be true or false.

extension.attributes.enable

true

Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be true or false.

metadata.source

value,id:generate,type:generate,traceparent:header,dataSchemaName:generate

A comma-separated list that specifies the sources from which the converter retrieves metadata values (source, operation, transaction) for id, type, and traceparent CloudEvents fields, and for the dataSchemaName parameter, which specifies the name under which the schema is registered in a Schema Registry. The first element in the list is a global setting that specifies the source of the metadata. The source of metadata can be value or header. The global setting is followed by a set of pairs. The first element in each pair specifies the name of a CloudEvent field (id, type, or traceparent), or the name of a data schema (dataSchemaName). The second element in the pair specifies how the converter populates the value of the field. Valid values are generate or header. Separate the values in each pair with a colon, for example:

value,id:header,type:generate,traceparent:header,dataSchemaName:header

For configuration examples, see Configuration of sources of metadata and some CloudEvents fields.

8.2. Integrating OpenLineage with Debezium

Debezium provides built-in integration with OpenLineage to automatically track data lineage for Change Data Capture (CDC) operations. The OpenLineage integration provides you with comprehensive visibility into the data flow and transformations that you use in your data pipeline.

Important

The Debezium integration with OpenLineage is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

8.2.1. About Data Lineage and OpenLineage

Data lineage tracks the flow of data through various systems, transformations, and processes. This information provides you with visibility into where data originates, how it moves, and what dependencies exist in the data pipeline. Insights into data lineage is crucial for the following activities:

  • Data governance and compliance
  • Impact analysis when making changes
  • Debugging data quality issues
  • Understanding data dependencies

OpenLineage is an open standard for data lineage that provides a unified way to collect and track lineage metadata across multiple data systems. The specification defines a common model for describing datasets, jobs, and runs, simplifying the process of building comprehensive lineage graphs across heterogeneous data infrastructures.

For more information, see the OpenLineage website and documentation.

8.2.2. Open Lineage run events for connector lifecycle tracking

Debezium emits OpenLineage run events to track connector status throughout its lifecycle, from initialization through shutdown. These events provide visibility into connector health and enable real-time monitoring of data pipeline operations.

The connector emits run events after the following status changes:

START
Reports connector initialization.
RUNNING
Emitted periodically during normal streaming operations and during processing individual tables. These periodic events ensure continuous lineage tracking for long-running streaming CDC operations.
COMPLETE
Reports that the connector shut down gracefully.
FAIL
Reports that the connector encountered an error.

8.2.3. Deployment types

The Debezium OpenLineage integration is available for the following deployment types:

Kafka Connect
You run Debezium connectors as Kafka Connect plugins.
Debezium Server
You run Debezium in standalone server mode.

The same OpenLineage event model and features are available across the deployment types. But different processes are needed to configure the integration and to install dependencies.

8.2.4. How Debezium integrates with OpenLineage

To integrate with OpenLineage, Debezium maps events in its lifecycle to artifacts in the OpenLineage data model.

8.2.4.1. OpenLineage job mapping

The Debezium connector is mapped to an OpenLineage Job, which includes the following elements:

Name
A job inherits its name from the Debezium topic.prefix, combined with the task ID (for example, inventory.0).
Namespace
Inherited from openlineage.integration.job.namespace, if specified; otherwise defaults to the value of the topic.prefix.
Debezium connector version
The version of the Debezium connector that is running and generating lineage events.
Complete connector configuration
All connector configuration properties, enabling full reproducibility and debugging of the data pipeline.
Job metadata
Description, tags, and owners.

8.2.4.2. Dataset mapping for source connectors

You can configure the following dataset mappings:

Input Datasets

Represents the database tables that Debezium is configured to capture changes from. The OpenLineage integration automatically creates input datasets based on the connector configuration. The integration applies the following principles when it creates dataset mappings:

  • Each table that the connector monitors becomes an input dataset.
  • Each dataset captures schema information for the corresponding source table, including the name and data type of each column.
  • DDL changes in the source table are reflected dynamically in the dataset schema.
Output datasets

Represents the Kafka topics where CDC events are written. For Kafka Connect deployments, output datasets are created when you apply the OpenLineage single message transformation (SMT). For Debezium Server deployments, output datasets are automatically captured.

Output dataset mappings are created according to the following principles:

  • Each Kafka topic that the connector produces becomes an output dataset.
  • The output dataset captures the complete CDC event structure, including metadata fields.
  • The name of the dataset is based on the connector’s topic prefix configuration.

8.2.4.3. Dataset mapping for sink connectors

For sink connectors, the data flow is reversed compared to source connectors.

Input datasets

Represent the Kafka topics that the sink connector reads from. These topics typically contain CDC events from Debezium source connectors. The following principles apply when defining input datasets:

  • Each Kafka topic that the sink connector consumes represents an input dataset.
  • The input dataset specifies the Kafka topic schema and metadata.
  • The namespace format follows kafka://bootstrap-server:port, where the bootstrap server is specified via the openlineage.integration.dataset.kafka.bootstrap.servers property.
Output datasets

Represents the target databases or collections where the sink connector writes data.

The following principles apply when defining output dataset mappings:

  • Each target database table or collection represents an output dataset.
  • The output dataset specifies the schema information for the target database.
  • The namespace format depends on the target database system. For more information, see Dataset namespace formatting.

Different sink connectors are available for OpenLineage integration depending on your deployment platform.

In a Kafka Connect environment, you can configure the following Debezium sink connectors to use the OpenLineage integration:

MongoDB sink connector
Writes CDC events to MongoDB collections.
JDBC sink connector
Writes CDC events to relational database tables.

In a Debezium Server environment, you can use the OpenLineage integration only with the Kafka sink. The MongoDB sink and JDBC sink connectors are not available for use with Debezium Server. Path: integrations/openlineage.adoc

8.2.5. Prepare to deploy the Debezium OpenLineage integration

8.2.5.1. Required Dependencies

The OpenLineage integration requires several JAR files that are bundled together in the debezium-openlineage-core-libs archive.

8.2.5.2. Deploy the Debezium OpenLineage integration on Kafka Connect

Before you can use Debezium with OpenLineage in Kafka Connect, you must obtain the required dependencies.

Procedure

  1. Download the OpenLineage core archive.
  2. Extract the contents of the archive into the Debezium plug-in directories in your Kafka Connect environment.

8.2.5.3. Deploy the Debezium OpenLineage integration on Debezium Server

Before you can use Debezium Server with OpenLineage, you must obtain the required dependencies.

Procedure

  1. Download the OpenLineage core archive.
  2. Extract the contents of the archive.
  3. Copy all JAR files to the /debezium/lib directory in your Debezium Server installation.

8.2.6. Configure the OpenLineage integration

To enable the integration, you must configure the Debezium connector and the OpenLineage client. The configuration approach differs between Kafka Connect and Debezium Server deployments.

To enable Debezium to integrate with OpenLineage in Kafka Connect, add properties to your connector configuration, as shown in the following example:

# Enable OpenLineage integration
openlineage.integration.enabled=true

# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml

# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

To enable Debezium Server to integrate with OpenLineage, add OpenLineage properties to the application.properties file, as shown in the following example. OpenLineage properties use the debezium.source. prefix.

# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true

# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml

# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

8.2.6.3. Configuring the OpenLineage client

Create an openlineage.yml file to configure the OpenLineage client. The openlineage.yml configuration file is used in both Kafka Connect and Debezium Server deployments. Use the following example as a guide:

transport:
  type: http
  url: http://your-openlineage-server:5000
  endpoint: /api/v1/lineage
  auth:
    type: api_key
    api_key: your-api-key

# Alternative: Console transport for testing
# transport:
#   type: console

For details OpenLineage client configuration options, refer to the OpenLineage client documentation.

8.2.7. Debezium OpenLineage configuration properties

The following table lists the OpenLineage configuration properties for both deployment types.

Note

For Debezium Server, add the debezium.source. prefix to all property names (for example, debezium.source.openlineage.integration.enabled).

Expand
Property (Kafka Connect)DescriptionRequiredDefault

openlineage.integration.enabled

Enables and disables the OpenLineage integration.

Yes

false

openlineage.integration.config.file.path

Path to the OpenLineage YAML configuration file.

Yes

No default value

openlineage.integration.job.namespace

Namespace used for the job.

No

Value of topic.prefix

openlineage.integration.job.description

Human-readable job description

No

No default value

openlineage.integration.job.tags

Comma-separated list of key-value tags.

No

No default value

openlineage.integration.job.owners

Comma-separated list of name-role ownership entries.

No

No default value

openlineage.integration.dataset.kafka.bootstrap.servers (source connectors only)

Kafka bootstrap servers used to retrieve Kafka topic metadata. For source connectors, if you do not specify a value, the value of schema.history.internal.kafka.bootstrap.servers is used.

For sink connectors, you must specify a value for this property.

Yes (for sink connectors)

Value of schema.history.internal.kafka.bootstrap.servers (for source connectors only)

8.2.8. Job metadata enrichment

To enhance the utility of the lineage data that it emits, you can add metadata that provides tags and owners information for each job. Tags are custom, arbitrary metadata that you can attach to jobs to provide additional context to help classify the job. Owners indicate who is responsible for the job. Use the following formats to specify tags and owners metadata in the OpenLineage configuration properties:

Tags list format
Specify tags as a comma-separated list of key-value pairs, as shown in the following example:
openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
Owners list format
Specify owners as a comma-separated list of name-role pairs, as shown in the following example:
openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner

8.2.9. Configure capture of output dataset lineage

Debezium can capture output dataset lineage to track the Kafka topics or output sink where Debezium sends CDC events. The method that you use to configure capture of the output dataset lineage depends on whether you run the integration in a Kafka Connect or Debezium Server environment.

To capture output dataset lineage in Kafka Connect, configure Debezium to use the OpenLineage Single Message Transform (SMT)

The SMT captures detailed schema information about change events that Debezium writes to Kafka topics. The transformation captures schema data that includes the following items:

  • Event structure (before, after, source, transaction metadata)
  • Field types and nested structures
  • Topic names and namespaces

Procedure

  • Add the following entries to your Debezium Kafka Connect configuration to enable the OpenLineage SMT:

    # Add OpenLineage transform
    transforms=openlineage
    transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
    
    # Required: Configure schema history with Kafka bootstrap servers
    schema.history.internal.kafka.bootstrap.servers=your-kafka:9092

For Debezium Server deployments, output dataset lineage is automatically captured when OpenLineage integration is enabled.

Procedure

  • No additional configuration or transformation is required, because Debezium Server has full control over the output records.

8.2.10. Debezium OpenLineage complete configuration examples

The following examples show complete configurations for enabling OpenLineage integration in both Kafka Connect and Debezium Server.

The following example shows a complete configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "inventory-connector-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "topic.prefix": "inventory",
    "snapshot.mode": "initial",
    "slot.name": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "CDC connector for inventory database",
    "openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "transforms": "openlineage",
    "transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
  }
}

The following example shows a complete application.properties configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Debezium Server with a Kafka sink:

# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092

# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist

# Logging configuration (optional)
quarkus.log.console.json=false

The following example shows a complete configuration for enabling the MongoDB sink connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "mongodb-sink",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
    "topics": "inventory.inventory.products",
    "sink.database": "inventory2",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for MongoDB",
    "openlineage.integration.job.tags": "env=prod,team=cdc",
    "openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}
Note

For sink connectors, the openlineage.integration.dataset.kafka.bootstrap.servers property is required to retrieve input dataset metadata from Kafka topics. Unlike source connectors, sink connectors do not have direct access to Kafka topic metadata through the Kafka Connect framework and must explicitly connect to retrieve schema information.

The following example shows a complete configuration for enabling the JDBC sink connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory",
    "connection.username": "postgres",
    "connection.password": "postgres",
    "topics": "inventory.inventory.customers",
    "insert.mode": "upsert",
    "primary.key.mode": "record_key",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for JDBC",
    "openlineage.integration.job.tags": "env=prod,team=data-engineering",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}

8.2.11. Dataset namespace formatting

Dataset namespaces identify the location and system of origin for input and output datasets. The namespace format varies by database system and connector type (source or sink), following the OpenLineage dataset naming specification.

8.2.11.1. Input dataset namespaces

Input dataset namespaces identify the source database and follow a format specific to each database system. The following examples illustrate how Debezium formats input dataset namespaces for different database systems.

PostgreSQL input dataset (for source connectors)
  • Namespace: postgres://hostname:port
  • Name: schema.table
  • Schema: Column names and types from the source table
Kafka input dataset (for sink connectors)
  • Namespace: kafka://kafka-broker:9092
  • Name: inventory.inventory.products
  • Schema: CDC event structure from the source connector

The exact namespace format depends on your database system and follows the OpenLineage specification for dataset naming.

8.2.11.2. Output dataset namespaces for source connectors

Output dataset namespaces identify the Kafka topics where CDC events are written. The following example shows how Debezium formats the output dataset namespace for source connectors.

Kafka output dataset (for source connectors)
  • Namespace: kafka://bootstrap-server:port
  • Name: topic-prefix.schema.table
  • Schema: Complete CDC event structure including metadata fields

Output dataset namespaces for sink connectors

Output dataset namespaces identify the target databases where sink connectors write data. The following examples show how Debezium sink connectors format output dataset namespaces for different database systems.

MongoDB output dataset
  • Namespace: mongodb://mongodb-host:27017
  • Name: database.collection
  • Schema: Target collection schema
JDBC output dataset (PostgreSQL)
  • Namespace: postgres://postgres-host:5432
  • Name: schema.table
  • Schema: Target table schema

8.2.12.1. Verifying the integration

To verify that the OpenLineage integration is working correctly, complete the following steps:

Procedure

  1. Check the connector logs for OpenLineage-related messages.
  2. If you configured HTTP transport, verify that events appear in your OpenLineage backend.
  3. For testing purposes, you can configure console transport to view events directly in the logs, as shown in the following example:

    transport:
      type: console

8.2.12.2. Common issues

Integration not working
  • Verify that openlineage.integration.enabled is set to true.
  • Check that the path to the OpenLineage configuration file that is specified in the connector configuration is correct, and that Debezium can access the target file.
  • Ensure that the YAML in the OpenLineage configuration file is valid.
  • Verify that all required JAR dependencies are present in the classpath.
Missing output datasets
  • Verify that you configured the connector to use the OpenLineage transformation.
  • Check that you set the property schema.history.internal.kafka.bootstrap.servers in the connector configuration.
Connection issues
  • Verify that you specified the correct server URL and authentication information in the OpenLineage client configuration.
  • Check the network connectivity between Debezium and the OpenLineage server.
Dependency issues
  • Ensure that all required JAR files are present and their versions are compatible versions.
  • Check for classpath conflicts with existing dependencies.
Missing input datasets for sink connectors
  • Verify that the openlineage.integration.dataset.kafka.bootstrap.servers property is configured.
  • Verify that the connector has access to the Kafka bootstrap servers.
  • Verify that the Kafka topics specified in the topics configuration exist and that the connector has access to them.

8.2.12.3. Error Events

When the connector fails, check for the following items in OpenLineage FAIL events:

  • Error messages
  • Stack traces
  • Connector configuration for debugging
Red Hat logoGithubredditYoutubeTwitter

Lernen

Testen, kaufen und verkaufen

Communitys

Über Red Hat

Wir liefern gehärtete Lösungen, die es Unternehmen leichter machen, plattform- und umgebungsübergreifend zu arbeiten, vom zentralen Rechenzentrum bis zum Netzwerkrand.

Mehr Inklusion in Open Source

Red Hat hat sich verpflichtet, problematische Sprache in unserem Code, unserer Dokumentation und unseren Web-Eigenschaften zu ersetzen. Weitere Einzelheiten finden Sie in Red Hat Blog.

Über Red Hat Dokumentation

Legal Notice

Theme

© 2026 Red Hat
Nach oben