Rechercher

Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 8. Using Streams for Apache Kafka with Kafka Connect

download PDF

Use Kafka Connect to stream data between Kafka and external systems. Kafka Connect provides a framework for moving large amounts of data while maintaining scalability and reliability. Kafka Connect is typically used to integrate Kafka with database, storage, and messaging systems that are external to your Kafka cluster.

Kafka Connect runs in standalone or distributed modes.

Standalone mode
In standalone mode, Kafka Connect runs on a single node. Standalone mode is intended for development and testing.
Distributed mode
In distributed mode, Kafka Connect runs across one or more worker nodes and the workloads are distributed among them. Distributed mode is intended for production.

Kafka Connect uses connector plugins that implement connectivity for different types of external systems. There are two types of connector plugins: sink and source. Sink connectors stream data from Kafka to external systems. Source connectors stream data from external systems into Kafka.

You can also use the Kafka Connect REST API to create, manage, and monitor connector instances.

Connector configuration specifies details such as the source or sink connectors and the Kafka topics to read from or write to. How you manage the configuration depends on whether you are running Kafka Connect in standalone or distributed mode.

  • In standalone mode, you can provide the connector configuration as JSON through the Kafka Connect REST API or you can use properties files to define the configuration.
  • In distributed mode, you can only provide the connector configuration as JSON through the Kafka Connect REST API.

Handling high volumes of messages

You can tune the configuration to handle high volumes of messages. For more information, see Handling high volumes of messages.

8.1. Using Kafka Connect in standalone mode

In Kafka Connect standalone mode, connectors run on the same node as the Kafka Connect worker process, which runs as a single process in a single JVM. This means that the worker process and connectors share the same resources, such as CPU, memory, and disk.

8.1.1. Configuring Kafka Connect in standalone mode

To configure Kafka Connect in standalone mode, edit the config/connect-standalone.properties configuration file. The following options are the most important.

bootstrap.servers
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example, kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092.
key.converter
The class used to convert message keys to and from Kafka format. For example, org.apache.kafka.connect.json.JsonConverter.
value.converter
The class used to convert message payloads to and from Kafka format. For example, org.apache.kafka.connect.json.JsonConverter.
offset.storage.file.filename
Specifies the file in which the offset data is stored.

Connector plugins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer. or consumer..

8.1.2. Running Kafka Connect in standalone mode

Configure and run Kafka Connect in standalone mode.

Prerequisites

  • Streams for Apache Kafka is installed on each host, and the configuration files are available.
  • You have specified connector configuration in properties files.

    You can also use the Kafka Connect REST API to manage connectors.

Procedure

  1. Edit the /opt/kafka/config/connect-standalone.properties Kafka Connect configuration file and set bootstrap.server to point to your Kafka brokers. For example:

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
  2. Start Kafka Connect with the configuration file and specify one or more connector configurations.

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
  3. Verify that Kafka Connect is running.

    jcmd | grep ConnectStandalone

8.2. Using Kafka Connect in distributed mode

In distributed mode, Kafka Connect runs as a cluster of worker processes, with each worker running on a separate node. Connectors can run on any worker in the cluster, allowing for greater scalability and fault tolerance. The connectors are managed by the workers, which coordinate with each other to distribute the work and ensure that each connector is running on a single node at any given time.

8.2.1. Configuring Kafka Connect in distributed mode

To configure Kafka Connect in distributed mode, edit the config/connect-distributed.properties configuration file. The following options are the most important.

bootstrap.servers
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example, kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092.
key.converter
The class used to convert message keys to and from Kafka format. For example, org.apache.kafka.connect.json.JsonConverter.
value.converter
The class used to convert message payloads to and from Kafka format. For example, org.apache.kafka.connect.json.JsonConverter.
group.id
The name of the distributed Kafka Connect cluster. This must be unique and must not conflict with another consumer group ID. The default value is connect-cluster.
config.storage.topic
The Kafka topic used to store connector configurations. The default value is connect-configs.
offset.storage.topic
The Kafka topic used to store offsets. The default value is connect-offset.
status.storage.topic
The Kafka topic used for worker node statuses. The default value is connect-status.

Streams for Apache Kafka includes an example configuration file for Kafka Connect in distributed mode – see config/connect-distributed.properties in the Streams for Apache Kafka installation directory.

Connector plugins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer. or consumer..

8.2.2. Running Kafka Connect in distributed mode

Configure and run Kafka Connect in distributed mode.

Prerequisites

Running the cluster

  1. Edit the /opt/kafka/config/connect-distributed.properties Kafka Connect configuration file on all Kafka Connect worker nodes.

    • Set the bootstrap.server option to point to your Kafka brokers.
    • Set the group.id option.
    • Set the config.storage.topic option.
    • Set the offset.storage.topic option.
    • Set the status.storage.topic option.

      For example:

      bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
      group.id=my-group-id
      config.storage.topic=my-group-id-configs
      offset.storage.topic=my-group-id-offsets
      status.storage.topic=my-group-id-status
  2. Start the Kafka Connect workers with the /opt/kafka/config/connect-distributed.properties configuration file on all Kafka Connect nodes.

    su - kafka
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  3. Verify that Kafka Connect is running.

    jcmd | grep ConnectDistributed
  4. Use the Kafka Connect REST API to manage connectors.

8.3. Managing connectors

The Kafka Connect REST API provides endpoints for creating, updating, and deleting connectors directly. You can also use the API to check the status of connectors or change logging levels. When you create a connector through the API, you provide the configuration details for the connector as part of the API call.

You can also add and manage connectors as plugins. Plugins are packaged as JAR files that contain the classes to implement the connectors through the Kafka Connect API. You just need to specify the plugin in the classpath or add it to a plugin path for Kafka Connect to run the connector plugin on startup.

In addition to using the Kafka Connect REST API or plugins to manage connectors, you can also add connector configuration using properties files when running Kafka Connect in standalone mode. To do this, you simply specify the location of the properties file when starting the Kafka Connect worker process. The properties file should contain the configuration details for the connector, including the connector class, source and destination topics, and any required authentication or serialization settings.

8.3.1. Limiting access to the Kafka Connect API

The Kafka Connect REST API can be accessed by anyone who has authenticated access and knows the endpoint URL, which includes the hostname/IP address and port number. It is crucial to restrict access to the Kafka Connect API only to trusted users to prevent unauthorized actions and potential security issues.

For improved security, we recommend configuring the following properties for the Kafka Connect API:

  • (Kafka 3.4 or later) org.apache.kafka.disallowed.login.modules to specifically exclude insecure login modules
  • connector.client.config.override.policy set to NONE to prevent connector configurations from overriding the Kafka Connect configuration and the consumers and producers it uses

8.3.2. Configuring connectors

Use the Kafka Connect REST API or properties files to create, manage, and monitor connector instances. You can use the REST API when using Kafka Connect in standalone or distributed mode. You can use properties files when using Kafka Connect in standalone mode.

8.3.2.1. Using the Kafka Connect REST API to manage connectors

When using the Kafka Connect REST API, you can create connectors dynamically by sending PUT or POST HTTP requests to the Kafka Connect REST API, specifying the connector configuration details in the request body.

Tip

When you use the PUT command, it’s the same command for starting and updating connectors.

The REST interface listens on port 8083 by default and supports the following endpoints:

GET /connectors
Return a list of existing connectors.
POST /connectors
Create a connector. The request body has to be a JSON object with the connector configuration.
GET /connectors/<connector_name>
Get information about a specific connector.
GET /connectors/<connector_name>/config
Get configuration of a specific connector.
PUT /connectors/<connector_name>/config
Update the configuration of a specific connector.
GET /connectors/<connector_name>/status
Get the status of a specific connector.
GET /connectors/<connector_name>/tasks
Get a list of tasks for a specific connector
GET /connectors/<connector_name>/tasks/<task_id>/status
Get the status of a task for a specific connector
PUT /connectors/<connector_name>/pause
Pause the connector and all its tasks. The connector will stop processing any messages.
PUT /connectors/<connector_name>/stop
Stop the connector and all its tasks. The connector will stop processing any messages. Stopping a connector from running may be more suitable for longer durations than just pausing.
PUT /connectors/<connector_name>/resume
Resume a paused connector.
POST /connectors/<connector_name>/restart
Restart a connector in case it has failed.
POST /connectors/<connector_name>/tasks/<task_id>/restart
Restart a specific task.
DELETE /connectors/<connector_name>
Delete a connector.
GET /connectors/<connector_name>/topics
Get the topics for a specific connector.
PUT /connectors/<connector_name>/topics/reset
Empty the set of active topics for a specific connector.
GET /connectors/<connector_name>/offsets
Get the current offsets for a connector.
DELETE /connectors/<connector_name>/offsets
Reset the offsets for a connector, which must be in a stopped state.
PATCH /connectors/<connector_name>/offsets
Adjust the offsets (using an offset property in the request) for a connector, which must be in a stopped state.
GET /connector-plugins
Get a list of all supported connector plugins.
GET /connector-plugins/<connector_plugin_type>/config
Get the configuration for a connector plugin.
PUT /connector-plugins/<connector_type>/config/validate
Validate connector configuration.

8.3.2.2. Specifying connector configuration properties

To configure a Kafka Connect connector, you need to specify the configuration details for source or sink connectors. There are two ways to do this: through the Kafka Connect REST API, using JSON to provide the configuration, or by using properties files to define the configuration properties. The specific configuration options available for each type of connector may differ, but both methods provide a flexible way to specify the necessary settings.

The following options apply to all connectors:

name
The name of the connector, which must be unique within the current Kafka Connect instance.
connector.class
The class of the connector plug-in. For example, org.apache.kafka.connect.file.FileStreamSinkConnector.
tasks.max
The maximum number of tasks that the specified connector can use. Tasks enable the connector to perform work in parallel. The connector might create fewer tasks than specified.
key.converter
The class used to convert message keys to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example, org.apache.kafka.connect.json.JsonConverter.
value.converter
The class used to convert message payloads to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example, org.apache.kafka.connect.json.JsonConverter.

You must set at least one of the following options for sink connectors:

topics
A comma-separated list of topics used as input.
topics.regex
A Java regular expression of topics used as input.

For all other options, see the connector properties in the Apache Kafka documentation.

Note

Streams for Apache Kafka includes the example connector configuration files config/connect-file-sink.properties and config/connect-file-source.properties in the Streams for Apache Kafka installation directory.

8.3.3. Creating connectors using the Kafka Connect API

Use the Kafka Connect REST API to create a connector to use with Kafka Connect.

Prerequisites

  • A Kafka Connect installation.

Procedure

  1. Prepare a JSON payload with the connector configuration. For example:

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
  2. Send a POST request to <KafkaConnectAddress>:8083/connectors to create the connector. The following example uses curl:

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
  3. Verify that the connector was deployed by sending a GET request to <KafkaConnectAddress>:8083/connectors. The following example uses curl:

    curl http://connect0.my-domain.com:8083/connectors

8.3.4. Deleting connectors using the Kafka Connect API

Use the Kafka Connect REST API to delete a connector from Kafka Connect.

Prerequisites

  • A Kafka Connect installation.

Deleting connectors

  1. Verify that the connector exists by sending a GET request to <KafkaConnectAddress>:8083/connectors/<ConnectorName>. The following example uses curl:

    curl http://connect0.my-domain.com:8083/connectors
  2. To delete the connector, send a DELETE request to <KafkaConnectAddress>:8083/connectors. The following example uses curl:

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
  3. Verify that the connector was deleted by sending a GET request to <KafkaConnectAddress>:8083/connectors. The following example uses curl:

    curl http://connect0.my-domain.com:8083/connectors

8.3.5. Adding connector plugins

Kafka provides example connectors to use as a starting point for developing connectors. The following example connectors are included with Streams for Apache Kafka:

FileStreamSink
Reads data from Kafka topics and writes the data to a file.
FileStreamSource
Reads data from a file and sends the data to Kafka topics.

Both connectors are contained in the libs/connect-file-<kafka_version>.redhat-<build>.jar plugin.

To use the connector plugins in Kafka Connect, you can add them to the classpath or specify a plugin path in the Kafka Connect properties file and copy the plugins to the location.

Specifying the example connectors in the classpath

CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh

Setting a plugin path

plugin.path=/opt/kafka/connector-plugins,/opt/connectors

The plugin.path configuration option can contain a comma-separated list of paths.

You can add more connector plugins if needed. Kafka Connect searches for and runs connector plugins at startup.

Note

When running Kafka Connect in distributed mode, plugins must be made available on all worker nodes.

Red Hat logoGithubRedditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez leBlog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

© 2024 Red Hat, Inc.