Este contenido no está disponible en el idioma seleccionado.
Chapter 8. Integrating external systems with Debezium
Debezium provides a set of optional integrations that can enrich, standardize, or distribute the change events that it emits and adapts them for consumption by external systems and platforms.
Integrations operate after the connector has produced an event, but before the event is delivered to target downstream systems. By enabling an integration, you can adapt Debezium change data to meet the conventions, protocols, or observability requirements of downstream platforms.
Integrations differ from single‑message transformations (SMTs) and post‑processors. SMTs modify Kafka messages as they pass through Kafka Connect, and post‑processors act on messages from within the Debezium context, before they are handed off to the messaging runtime. By contrast, integrations produce additional event formats or metadata that conform to the standards of a target downstream system. Integrations enhance Debezium events so that can be consumed more easily by tools, platforms, and services that rely on common interfaces.
Each integration is optional and you can enable and configure it independently. Depending on your environment, you can activate one or more integrations to support standardized event formats, improve metadata visibility, or provide compatibility with external observability and governance tools. You can combine integrations with SMTs and post‑processors, to adapt change events to suit the needs of specific consumers across your data ecosystem.
Red Hat build of Debezium provides the following integrations:
- CloudEvents converter (Technology Preview)
- Enables a Debezium connector to emit change event records that conform to the CloudEvents specification. The converter modifies Debezium change events to comply with the CloudEvents specification format. The coverted events provide greater interoperability with event‑driven platforms, serverless environments, and systems that are designed for CloudEvents‑compliant payloads.
- OpenLineage integration (Technology Preview)
- Emits OpenLineage‑compatible metadata that describes the flow of change events through Debezium to enable lineage‑aware platforms to automatically trace the movement of data and assess downstream impact as source data evolves.
8.1. Emitting Debezium change event records in CloudEvents format Copiar enlaceEnlace copiado en el portapapeles!
CloudEvents is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. Debezium enables you to configure a Db2, MongoDB, MySQL, Oracle, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification.
Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see Technology Preview Features Support Scope.
The CloudEvents specification defines:
- A set of standardized event attributes
- Rules for defining custom attributes
- Encoding rules for mapping event formats to serialized representations such as JSON or Apache Avro
- Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP
To configure a Debezium connector to emit change event records that conform to the CloudEvents specification, Debezium provides the io.debezium.converters.CloudEventsConverter, which is a Kafka Connect message converter.
Currently, only structured mapping mode can be used. The CloudEvents change event envelope can be JSON or Avro, and you can use JSON or Avro as the data format for each envelope type. Information about emitting change events in CloudEvents format is organized as follows:
For information about using Avro, see:
8.1.1. Example Debezium change event records in CloudEvents format Copiar enlaceEnlace copiado en el portapapeles!
The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the data format.
{
"id" : "name:test_server;lsn:29274832;txId:565",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T13:55:39.738Z",
"datacontenttype" : "application/json",
"iodebeziumop" : "r",
"iodebeziumversion" : "3.4.3.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : {
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}
| Item | Description |
|---|---|
| 1 | Unique ID that the connector generates for the change event based on the change event’s content. |
| 2 |
The source of the event, which is the logical name of the database as specified by the |
| 3 | The CloudEvents specification version. |
| 4 |
Connector type that generated the change event. The format of this field is |
| 5 | Time of the change in the source database. |
| 6 |
Describes the content type of the |
| 7 |
An operation identifier. Possible values are |
| 8 |
All |
| 9 |
When enabled in the connector, each |
| 10 |
The actual data change. Depending on the operation and the connector, the data might contain |
The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the data format.
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro",
"dataschema" : "http://my-registry/schemas/ids/1",
"iodebeziumop" : "r",
"iodebeziumversion" : "3.4.3.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg=="
}
| Item | Description |
|---|---|
| 1 |
Indicates that the |
| 2 | URI of the schema to which the Avro data adheres. |
| 3 |
The |
It is also possible to use Avro for the envelope as well as the data attribute.
8.1.2. Example of configuring Debezium CloudEvents converter Copiar enlaceEnlace copiado en el portapapeles!
Configure io.debezium.converters.CloudEventsConverter in your Debezium connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics:
- Use JSON as the envelope.
-
Use the schema registry at
http://my-registry/schemas/ids/1to serialize thedataattribute as binary Avro data.
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
| Item | Description |
|---|---|
| 1 |
Specifying the |
The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify key.converter if you want to operate on record keys. For example, you might specify StringConverter, LongConverter, JsonConverter, or AvroConverter.
8.1.3. Configuration of sources of metadata and some CloudEvents fields Copiar enlaceEnlace copiado en el portapapeles!
By default, the metadata.source property consists of five parts, as seen in the following example:
"value,id:generate,type:generate,traceparent:header,dataSchemaName:generate"
The first part specifies the source for retrieving a record’s metadata; the permitted values are value and header. The next parts specify how the converter populates values for the following CloudEvents fields and data schema name:
-
id -
type -
traceparent -
dataSchemaName(the name under which the schema is registered in the Schema Registry)
The converter can use one of the following methods to populate each field:
generate- The converter generates a value for the field.
header- The converter obtain values for the field from a message header.
Value for the traceparent CloudEvents field can be only retrieved from a header.
Obtaining record metadata
To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record’s value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers.
Procedure
-
Implement a mechanism for recording the metadata in the record’s headers before the record reaches the converter, for example, by using the
HeaderFromSMT. -
Set the value of the converter’s
metadata.sourceproperty toheader.
The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the HeaderFrom SMT:
...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
To use the HeaderFrom transformation, it might be necessary to filter tombstone and heartbeat messages.
The header value of the metadata.source property is a global setting. As a result, even if you omit parts of a property’s value, such as the id and type sources, the converter generates header values for the omitted parts.
Obtaining CloudEvent metadata
By default, the CloudEvents converter automatically generates values for the id and type fields of a CloudEvent, and generates the schema name for its data field. traceparent CloudEvents field is only included in a message if opentelemetry.tracing.attributes.enable setting is true. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields' values in the appropriate headers. For example:
"value.converter.metadata.source": "value,id:header,type:header,traceparent:header,dataSchemaName:header"
With the preceding configuration in effect, you could configure upstream functions to add id, type, traceparent, and dataSchemaName headers with the values that you want to pass to the CloudEvents converter.
If you want to provide values only for id header, use:
"value.converter.metadata.source": "value,id:header,type:generate,traceparent:header,dataSchemaName:generate"
To configure the converter to obtain id, type, traceparent, and dataSchemaName metadata from headers, use the following short syntax:
"value.converter.metadata.source": "header"
To enable the converter to retrieve the data schema name from a header, you must set schema.data.name.source.header.enable to true.
8.1.4. Debezium CloudEvents converter configuration options Copiar enlaceEnlace copiado en el portapapeles!
When you configure a Debezium connector to use the CloudEvent converter, you can specify the following options:
| Option | Default | Description |
|
|
The encoding type to use for the CloudEvents envelope structure. The value can be | |
|
|
The encoding type to use for the | |
| N/A |
Any configuration options to be passed through to the underlying converter when using JSON. The | |
| N/A |
Any configuration options to be passed through to the underlying converter when using Avro. The | |
| none |
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be | |
| none |
Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when | |
|
|
Specifies whether the converter can retrieve the schema name of the CloudEvents | |
|
|
Specifies whether the converter includes OpenTelemetry tracing attributes when it generates a cloud event. The value can be | |
|
|
Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be | |
|
|
A comma-separated list that specifies the sources from which the converter retrieves metadata values (source, operation, transaction) for
For configuration examples, see Configuration of sources of metadata and some CloudEvents fields. |
8.2. Integrating OpenLineage with Debezium Copiar enlaceEnlace copiado en el portapapeles!
Debezium provides built-in integration with OpenLineage to automatically track data lineage for Change Data Capture (CDC) operations. The OpenLineage integration provides you with comprehensive visibility into the data flow and transformations that you use in your data pipeline.
The Debezium integration with OpenLineage is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.
For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.
8.2.1. About Data Lineage and OpenLineage Copiar enlaceEnlace copiado en el portapapeles!
Data lineage tracks the flow of data through various systems, transformations, and processes. This information provides you with visibility into where data originates, how it moves, and what dependencies exist in the data pipeline. Insights into data lineage is crucial for the following activities:
- Data governance and compliance
- Impact analysis when making changes
- Debugging data quality issues
- Understanding data dependencies
OpenLineage is an open standard for data lineage that provides a unified way to collect and track lineage metadata across multiple data systems. The specification defines a common model for describing datasets, jobs, and runs, simplifying the process of building comprehensive lineage graphs across heterogeneous data infrastructures.
For more information, see the OpenLineage website and documentation.
8.2.2. Open Lineage run events for connector lifecycle tracking Copiar enlaceEnlace copiado en el portapapeles!
Debezium emits OpenLineage run events to track connector status throughout its lifecycle, from initialization through shutdown. These events provide visibility into connector health and enable real-time monitoring of data pipeline operations.
The connector emits run events after the following status changes:
- START
- Reports connector initialization.
- RUNNING
- Emitted periodically during normal streaming operations and during processing individual tables. These periodic events ensure continuous lineage tracking for long-running streaming CDC operations.
- COMPLETE
- Reports that the connector shut down gracefully.
- FAIL
- Reports that the connector encountered an error.
8.2.3. Deployment types Copiar enlaceEnlace copiado en el portapapeles!
The Debezium OpenLineage integration is available for the following deployment types:
- Kafka Connect
- You run Debezium connectors as Kafka Connect plugins.
- Debezium Server
- You run Debezium in standalone server mode.
The same OpenLineage event model and features are available across the deployment types. But different processes are needed to configure the integration and to install dependencies.
8.2.4. How Debezium integrates with OpenLineage Copiar enlaceEnlace copiado en el portapapeles!
To integrate with OpenLineage, Debezium maps events in its lifecycle to artifacts in the OpenLineage data model.
8.2.4.1. OpenLineage job mapping Copiar enlaceEnlace copiado en el portapapeles!
The Debezium connector is mapped to an OpenLineage Job, which includes the following elements:
- Name
-
A job inherits its name from the Debezium
topic.prefix, combined with the task ID (for example,inventory.0). - Namespace
-
Inherited from
openlineage.integration.job.namespace, if specified; otherwise defaults to the value of thetopic.prefix. - Debezium connector version
- The version of the Debezium connector that is running and generating lineage events.
- Complete connector configuration
- All connector configuration properties, enabling full reproducibility and debugging of the data pipeline.
- Job metadata
- Description, tags, and owners.
8.2.4.2. Dataset mapping for source connectors Copiar enlaceEnlace copiado en el portapapeles!
You can configure the following dataset mappings:
- Input Datasets
Represents the database tables that Debezium is configured to capture changes from. The OpenLineage integration automatically creates input datasets based on the connector configuration. The integration applies the following principles when it creates dataset mappings:
- Each table that the connector monitors becomes an input dataset.
- Each dataset captures schema information for the corresponding source table, including the name and data type of each column.
- DDL changes in the source table are reflected dynamically in the dataset schema.
- Output datasets
Represents the Kafka topics where CDC events are written. For Kafka Connect deployments, output datasets are created when you apply the OpenLineage single message transformation (SMT). For Debezium Server deployments, output datasets are automatically captured.
Output dataset mappings are created according to the following principles:
- Each Kafka topic that the connector produces becomes an output dataset.
- The output dataset captures the complete CDC event structure, including metadata fields.
- The name of the dataset is based on the connector’s topic prefix configuration.
8.2.4.3. Dataset mapping for sink connectors Copiar enlaceEnlace copiado en el portapapeles!
For sink connectors, the data flow is reversed compared to source connectors.
- Input datasets
Represent the Kafka topics that the sink connector reads from. These topics typically contain CDC events from Debezium source connectors. The following principles apply when defining input datasets:
- Each Kafka topic that the sink connector consumes represents an input dataset.
- The input dataset specifies the Kafka topic schema and metadata.
-
The namespace format follows
kafka://bootstrap-server:port, where the bootstrap server is specified via theopenlineage.integration.dataset.kafka.bootstrap.serversproperty.
- Output datasets
Represents the target databases or collections where the sink connector writes data.
The following principles apply when defining output dataset mappings:
- Each target database table or collection represents an output dataset.
- The output dataset specifies the schema information for the target database.
- The namespace format depends on the target database system. For more information, see Dataset namespace formatting.
8.2.4.4. Sink connector availability in Kafka Connect vs. Debezium Server Copiar enlaceEnlace copiado en el portapapeles!
Different sink connectors are available for OpenLineage integration depending on your deployment platform.
In a Kafka Connect environment, you can configure the following Debezium sink connectors to use the OpenLineage integration:
- MongoDB sink connector
- Writes CDC events to MongoDB collections.
- JDBC sink connector
- Writes CDC events to relational database tables.
In a Debezium Server environment, you can use the OpenLineage integration only with the Kafka sink. The MongoDB sink and JDBC sink connectors are not available for use with Debezium Server. Path: integrations/openlineage.adoc
8.2.5. Prepare to deploy the Debezium OpenLineage integration Copiar enlaceEnlace copiado en el portapapeles!
8.2.5.1. Required Dependencies Copiar enlaceEnlace copiado en el portapapeles!
The OpenLineage integration requires several JAR files that are bundled together in the debezium-openlineage-core-libs archive.
8.2.5.2. Deploy the Debezium OpenLineage integration on Kafka Connect Copiar enlaceEnlace copiado en el portapapeles!
Before you can use Debezium with OpenLineage in Kafka Connect, you must obtain the required dependencies.
Procedure
- Download the OpenLineage core archive.
- Extract the contents of the archive into the Debezium plug-in directories in your Kafka Connect environment.
8.2.5.3. Deploy the Debezium OpenLineage integration on Debezium Server Copiar enlaceEnlace copiado en el portapapeles!
Before you can use Debezium Server with OpenLineage, you must obtain the required dependencies.
Procedure
- Download the OpenLineage core archive.
- Extract the contents of the archive.
-
Copy all JAR files to the
/debezium/libdirectory in your Debezium Server installation.
8.2.6. Configure the OpenLineage integration Copiar enlaceEnlace copiado en el portapapeles!
To enable the integration, you must configure the Debezium connector and the OpenLineage client. The configuration approach differs between Kafka Connect and Debezium Server deployments.
8.2.6.1. Debezium configuration for enabling OpenLineage in a Kafka Connect environment Copiar enlaceEnlace copiado en el portapapeles!
To enable Debezium to integrate with OpenLineage in Kafka Connect, add properties to your connector configuration, as shown in the following example:
# Enable OpenLineage integration
openlineage.integration.enabled=true
# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml
# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
8.2.6.2. Debezium configuration for enabling OpenLineage in a Debezium Server environment Copiar enlaceEnlace copiado en el portapapeles!
To enable Debezium Server to integrate with OpenLineage, add OpenLineage properties to the application.properties file, as shown in the following example. OpenLineage properties use the debezium.source. prefix.
# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true
# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
8.2.6.3. Configuring the OpenLineage client Copiar enlaceEnlace copiado en el portapapeles!
Create an openlineage.yml file to configure the OpenLineage client. The openlineage.yml configuration file is used in both Kafka Connect and Debezium Server deployments. Use the following example as a guide:
transport:
type: http
url: http://your-openlineage-server:5000
endpoint: /api/v1/lineage
auth:
type: api_key
api_key: your-api-key
# Alternative: Console transport for testing
# transport:
# type: console
For details OpenLineage client configuration options, refer to the OpenLineage client documentation.
8.2.7. Debezium OpenLineage configuration properties Copiar enlaceEnlace copiado en el portapapeles!
The following table lists the OpenLineage configuration properties for both deployment types.
For Debezium Server, add the debezium.source. prefix to all property names (for example, debezium.source.openlineage.integration.enabled).
| Property (Kafka Connect) | Description | Required | Default |
|---|---|---|---|
|
| Enables and disables the OpenLineage integration. | Yes |
|
|
| Path to the OpenLineage YAML configuration file. | Yes | No default value |
|
| Namespace used for the job. | No |
Value of |
|
| Human-readable job description | No | No default value |
|
| Comma-separated list of key-value tags. | No | No default value |
|
| Comma-separated list of name-role ownership entries. | No | No default value |
|
|
Kafka bootstrap servers used to retrieve Kafka topic metadata. For source connectors, if you do not specify a value, the value of For sink connectors, you must specify a value for this property. | Yes (for sink connectors) |
Value of |
8.2.8. Job metadata enrichment Copiar enlaceEnlace copiado en el portapapeles!
To enhance the utility of the lineage data that it emits, you can add metadata that provides tags and owners information for each job. Tags are custom, arbitrary metadata that you can attach to jobs to provide additional context to help classify the job. Owners indicate who is responsible for the job. Use the following formats to specify tags and owners metadata in the OpenLineage configuration properties:
- Tags list format
- Specify tags as a comma-separated list of key-value pairs, as shown in the following example:
openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
- Owners list format
- Specify owners as a comma-separated list of name-role pairs, as shown in the following example:
openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner
8.2.9. Configure capture of output dataset lineage Copiar enlaceEnlace copiado en el portapapeles!
Debezium can capture output dataset lineage to track the Kafka topics or output sink where Debezium sends CDC events. The method that you use to configure capture of the output dataset lineage depends on whether you run the integration in a Kafka Connect or Debezium Server environment.
8.2.9.1. Configure capture of output dataset lineage in a Kafka Connect environment Copiar enlaceEnlace copiado en el portapapeles!
To capture output dataset lineage in Kafka Connect, configure Debezium to use the OpenLineage Single Message Transform (SMT)
The SMT captures detailed schema information about change events that Debezium writes to Kafka topics. The transformation captures schema data that includes the following items:
- Event structure (before, after, source, transaction metadata)
- Field types and nested structures
- Topic names and namespaces
Procedure
Add the following entries to your Debezium Kafka Connect configuration to enable the OpenLineage SMT:
# Add OpenLineage transform transforms=openlineage transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage # Required: Configure schema history with Kafka bootstrap servers schema.history.internal.kafka.bootstrap.servers=your-kafka:9092
8.2.9.2. Configure capture of output dataset lineage in a Debezium Server environment Copiar enlaceEnlace copiado en el portapapeles!
For Debezium Server deployments, output dataset lineage is automatically captured when OpenLineage integration is enabled.
Procedure
- No additional configuration or transformation is required, because Debezium Server has full control over the output records.
8.2.10. Debezium OpenLineage complete configuration examples Copiar enlaceEnlace copiado en el portapapeles!
The following examples show complete configurations for enabling OpenLineage integration in both Kafka Connect and Debezium Server.
8.2.10.1. Debezium OpenLineage integration complete Kafka Connect configuration example Copiar enlaceEnlace copiado en el portapapeles!
The following example shows a complete configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Kafka Connect:
{
"name": "inventory-connector-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "inventory",
"snapshot.mode": "initial",
"slot.name": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "CDC connector for inventory database",
"openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"transforms": "openlineage",
"transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
}
}
8.2.10.2. Debezium Server OpenLineage integration complete configuration example Copiar enlaceEnlace copiado en el portapapeles!
The following example shows a complete application.properties configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Debezium Server with a Kafka sink:
# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092
# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist
# Logging configuration (optional)
quarkus.log.console.json=false
8.2.10.3. Debezium OpenLineage integration complete MongoDB sink connector configuration example Copiar enlaceEnlace copiado en el portapapeles!
The following example shows a complete configuration for enabling the MongoDB sink connector to integrate with OpenLineage in Kafka Connect:
{
"name": "mongodb-sink",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
"topics": "inventory.inventory.products",
"sink.database": "inventory2",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for MongoDB",
"openlineage.integration.job.tags": "env=prod,team=cdc",
"openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
For sink connectors, the openlineage.integration.dataset.kafka.bootstrap.servers property is required to retrieve input dataset metadata from Kafka topics. Unlike source connectors, sink connectors do not have direct access to Kafka topic metadata through the Kafka Connect framework and must explicitly connect to retrieve schema information.
8.2.10.4. Debezium OpenLineage integration complete JDBC sink connector configuration example Copiar enlaceEnlace copiado en el portapapeles!
The following example shows a complete configuration for enabling the JDBC sink connector to integrate with OpenLineage in Kafka Connect:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/inventory",
"connection.username": "postgres",
"connection.password": "postgres",
"topics": "inventory.inventory.customers",
"insert.mode": "upsert",
"primary.key.mode": "record_key",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for JDBC",
"openlineage.integration.job.tags": "env=prod,team=data-engineering",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
8.2.11. Dataset namespace formatting Copiar enlaceEnlace copiado en el portapapeles!
Dataset namespaces identify the location and system of origin for input and output datasets. The namespace format varies by database system and connector type (source or sink), following the OpenLineage dataset naming specification.
Additional resources
8.2.11.1. Input dataset namespaces Copiar enlaceEnlace copiado en el portapapeles!
Input dataset namespaces identify the source database and follow a format specific to each database system. The following examples illustrate how Debezium formats input dataset namespaces for different database systems.
- PostgreSQL input dataset (for source connectors)
-
Namespace:
postgres://hostname:port -
Name:
schema.table - Schema: Column names and types from the source table
-
Namespace:
- Kafka input dataset (for sink connectors)
-
Namespace:
kafka://kafka-broker:9092 -
Name:
inventory.inventory.products - Schema: CDC event structure from the source connector
-
Namespace:
The exact namespace format depends on your database system and follows the OpenLineage specification for dataset naming.
8.2.11.2. Output dataset namespaces for source connectors Copiar enlaceEnlace copiado en el portapapeles!
Output dataset namespaces identify the Kafka topics where CDC events are written. The following example shows how Debezium formats the output dataset namespace for source connectors.
- Kafka output dataset (for source connectors)
-
Namespace:
kafka://bootstrap-server:port -
Name:
topic-prefix.schema.table - Schema: Complete CDC event structure including metadata fields
-
Namespace:
Output dataset namespaces for sink connectors
Output dataset namespaces identify the target databases where sink connectors write data. The following examples show how Debezium sink connectors format output dataset namespaces for different database systems.
- MongoDB output dataset
-
Namespace:
mongodb://mongodb-host:27017 -
Name:
database.collection - Schema: Target collection schema
-
Namespace:
- JDBC output dataset (PostgreSQL)
-
Namespace:
postgres://postgres-host:5432 -
Name:
schema.table - Schema: Target table schema
-
Namespace:
8.2.12. Monitoring and Troubleshooting the Debezium OpenLineage integration Copiar enlaceEnlace copiado en el portapapeles!
8.2.12.1. Verifying the integration Copiar enlaceEnlace copiado en el portapapeles!
To verify that the OpenLineage integration is working correctly, complete the following steps:
Procedure
- Check the connector logs for OpenLineage-related messages.
- If you configured HTTP transport, verify that events appear in your OpenLineage backend.
For testing purposes, you can configure console transport to view events directly in the logs, as shown in the following example:
transport: type: console
8.2.12.2. Common issues Copiar enlaceEnlace copiado en el portapapeles!
- Integration not working
-
Verify that
openlineage.integration.enabledis set totrue. - Check that the path to the OpenLineage configuration file that is specified in the connector configuration is correct, and that Debezium can access the target file.
- Ensure that the YAML in the OpenLineage configuration file is valid.
- Verify that all required JAR dependencies are present in the classpath.
-
Verify that
- Missing output datasets
- Verify that you configured the connector to use the OpenLineage transformation.
-
Check that you set the property
schema.history.internal.kafka.bootstrap.serversin the connector configuration.
- Connection issues
- Verify that you specified the correct server URL and authentication information in the OpenLineage client configuration.
- Check the network connectivity between Debezium and the OpenLineage server.
- Dependency issues
- Ensure that all required JAR files are present and their versions are compatible versions.
- Check for classpath conflicts with existing dependencies.
- Missing input datasets for sink connectors
-
Verify that the
openlineage.integration.dataset.kafka.bootstrap.serversproperty is configured. - Verify that the connector has access to the Kafka bootstrap servers.
-
Verify that the Kafka topics specified in the
topicsconfiguration exist and that the connector has access to them.
-
Verify that the
8.2.12.3. Error Events Copiar enlaceEnlace copiado en el portapapeles!
When the connector fails, check for the following items in OpenLineage FAIL events:
- Error messages
- Stack traces
- Connector configuration for debugging