Chapter 8. Using AMQ Streams with Kafka Connect
Use Kafka Connect to stream data between Kafka and external systems. Kafka Connect provides a framework for moving large amounts of data while maintaining scalability and reliability. Kafka Connect is typically used to integrate Kafka with database, storage, and messaging systems that are external to your Kafka cluster.
Kafka Connect runs in standalone or distributed modes.
- Standalone mode
- In standalone mode, Kafka Connect runs on a single node. Standalone mode is intended for development and testing.
- Distributed mode
- In distributed mode, Kafka Connect runs across one or more worker nodes and the workloads are distributed among them. Distributed mode is intended for production.
Kafka Connect uses connector plugins that implement connectivity for different types of external systems. There are two types of connector plugins: sink and source. Sink connectors stream data from Kafka to external systems. Source connectors stream data from external systems into Kafka.
You can also use the Kafka Connect REST API to create, manage, and monitor connector instances.
Connector configuration specifies details such as the source or sink connectors and the Kafka topics to read from or write to. How you manage the configuration depends on whether you are running Kafka Connect in standalone or distributed mode.
- In standalone mode, you can provide the connector configuration as JSON through the Kafka Connect REST API or you can use properties files to define the configuration.
- In distributed mode, you can only provide the connector configuration as JSON through the Kafka Connect REST API.
Handling high volumes of messages
You can tune the configuration to handle high volumes of messages. For more information, see Chapter 11, Handling high volumes of messages.
8.1. Using Kafka Connect in standalone mode
In Kafka Connect standalone mode, connectors run on the same node as the Kafka Connect worker process, which runs as a single process in a single JVM. This means that the worker process and connectors share the same resources, such as CPU, memory, and disk.
8.1.1. Configuring Kafka Connect in standalone mode
To configure Kafka Connect in standalone mode, edit the config/connect-standalone.properties
configuration file. The following options are the most important.
bootstrap.servers
-
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
. key.converter
-
The class used to convert message keys to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. offset.storage.file.filename
- Specifies the file in which the offset data is stored.
Connector plugins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer.
or consumer.
.
8.1.2. Running Kafka Connect in standalone mode
Configure and run Kafka Connect in standalone mode.
Prerequisites
- AMQ Streams is installed and a Kafka cluster is running.
You have specified connector configuration in properties files.
You can also use the Kafka Connect REST API to manage connectors.
Procedure
Edit the
/opt/kafka/config/connect-standalone.properties
Kafka Connect configuration file and setbootstrap.server
to point to your Kafka brokers. For example:bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
Start Kafka Connect with the configuration file and specify one or more connector configurations.
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
Verify that Kafka Connect is running.
jcmd | grep ConnectStandalone
8.2. Using Kafka Connect in distributed mode
In distributed mode, Kafka Connect runs as a cluster of worker processes, with each worker running on a separate node. Connectors can run on any worker in the cluster, allowing for greater scalability and fault tolerance. The connectors are managed by the workers, which coordinate with each other to distribute the work and ensure that each connector is running on a single node at any given time.
8.2.1. Configuring Kafka Connect in distributed mode
To configure Kafka Connect in distributed mode, edit the config/connect-distributed.properties
configuration file. The following options are the most important.
bootstrap.servers
-
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
. key.converter
-
The class used to convert message keys to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. group.id
-
The name of the distributed Kafka Connect cluster. This must be unique and must not conflict with another consumer group ID. The default value is
connect-cluster
. config.storage.topic
-
The Kafka topic used to store connector configurations. The default value is
connect-configs
. offset.storage.topic
-
The Kafka topic used to store offsets. The default value is
connect-offset
. status.storage.topic
-
The Kafka topic used for worker node statuses. The default value is
connect-status
.
AMQ Streams includes an example configuration file for Kafka Connect in distributed mode – see config/connect-distributed.properties
in the AMQ Streams installation directory.
Connector plugins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer.
or consumer.
.
8.2.2. Running Kafka Connect in distributed mode
Configure and run Kafka Connect in distributed mode.
Prerequisites
- AMQ Streams is installed and a Kafka cluster is running.
Running the cluster
Edit the
/opt/kafka/config/connect-distributed.properties
Kafka Connect configuration file on all Kafka Connect worker nodes.-
Set the
bootstrap.server
option to point to your Kafka brokers. -
Set the
group.id
option. -
Set the
config.storage.topic
option. -
Set the
offset.storage.topic
option. Set the
status.storage.topic
option.For example:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
-
Set the
Start the Kafka Connect workers with the
/opt/kafka/config/connect-distributed.properties
configuration file on all Kafka Connect nodes.su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Verify that Kafka Connect is running.
jcmd | grep ConnectDistributed
- Use the Kafka Connect REST API to manage connectors.
8.3. Managing connectors
The Kafka Connect REST API provides endpoints for creating, updating, and deleting connectors directly. You can also use the API to check the status of connectors or change logging levels. When you create a connector through the API, you provide the configuration details for the connector as part of the API call.
You can also add and manage connectors as plugins. Plugins are packaged as JAR files that contain the classes to implement the connectors through the Kafka Connect API. You just need to specify the plugin in the classpath or add it to a plugin path for Kafka Connect to run the connector plugin on startup.
In addition to using the Kafka Connect REST API or plugins to manage connectors, you can also add connector configuration using properties files when running Kafka Connect in standalone mode. To do this, you simply specify the location of the properties file when starting the Kafka Connect worker process. The properties file should contain the configuration details for the connector, including the connector class, source and destination topics, and any required authentication or serialization settings.
8.3.1. Limiting access to the Kafka Connect API
The Kafka Connect REST API can be accessed by anyone who has authenticated access and knows the endpoint URL, which includes the hostname/IP address and port number. It is crucial to restrict access to the Kafka Connect API only to trusted users to prevent unauthorized actions and potential security issues.
For improved security, we recommend configuring the following properties for the Kafka Connect API:
-
(Kafka 3.4 or later)
org.apache.kafka.disallowed.login.modules
to specifically exclude insecure login modules -
connector.client.config.override.policy
set toNONE
to prevent connector configurations from overriding the Kafka Connect configuration and the consumers and producers it uses
8.3.2. Configuring connectors
Use the Kafka Connect REST API or properties files to create, manage, and monitor connector instances. You can use the REST API when using Kafka Connect in standalone or distributed mode. You can use properties files when using Kafka Connect in standalone mode.
8.3.2.1. Using the Kafka Connect REST API to manage connectors
When using the Kafka Connect REST API, you can create connectors dynamically by sending PUT
or POST
HTTP requests to the Kafka Connect REST API, specifying the connector configuration details in the request body.
When you use the PUT
command, it’s the same command for starting and updating connectors.
The REST interface listens on port 8083 by default and supports the following endpoints:
GET /connectors
- Return a list of existing connectors.
POST /connectors
- Create a connector. The request body has to be a JSON object with the connector configuration.
GET /connectors/<connector_name>
- Get information about a specific connector.
GET /connectors/<connector_name>/config
- Get configuration of a specific connector.
PUT /connectors/<connector_name>/config
- Update the configuration of a specific connector.
GET /connectors/<connector_name>/status
- Get the status of a specific connector.
GET /connectors/<connector_name>/tasks
- Get a list of tasks for a specific connector
GET /connectors/<connector_name>/tasks/<task_id>/status
- Get the status of a task for a specific connector
PUT /connectors/<connector_name>/pause
- Pause the connector and all its tasks. The connector will stop processing any messages.
PUT /connectors/<connector_name>/resume
- Resume a paused connector.
POST /connectors/<connector_name>/restart
- Restart a connector in case it has failed.
POST /connectors/<connector_name>/tasks/<task_id>/restart
- Restart a specific task.
DELETE /connectors/<connector_name>
- Delete a connector.
GET /connectors/<connector_name>/topics
- Get the topics for a specific connector.
PUT /connectors/<connector_name>/topics/reset
- Empty the set of active topics for a specific connector.
GET /connector-plugins
- Get a list of all supported connector plugins.
PUT /connector-plugins/<connector_type>/config/validate
- Validate connector configuration.
8.3.2.2. Specifying connector configuration properties
To configure a Kafka Connect connector, you need to specify the configuration details for source or sink connectors. There are two ways to do this: through the Kafka Connect REST API, using JSON to provide the configuration, or by using properties files to define the configuration properties. The specific configuration options available for each type of connector may differ, but both methods provide a flexible way to specify the necessary settings.
The following options apply to all connectors:
name
- The name of the connector, which must be unique within the current Kafka Connect instance.
connector.class
-
The class of the connector plug-in. For example,
org.apache.kafka.connect.file.FileStreamSinkConnector
. tasks.max
- The maximum number of tasks that the specified connector can use. Tasks enable the connector to perform work in parallel. The connector might create fewer tasks than specified.
key.converter
-
The class used to convert message keys to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
.
You must set at least one of the following options for sink connectors:
topics
- A comma-separated list of topics used as input.
topics.regex
- A Java regular expression of topics used as input.
For all other options, see the connector properties in the Apache Kafka documentation.
AMQ Streams includes the example connector configuration files config/connect-file-sink.properties
and config/connect-file-source.properties
in the AMQ Streams installation directory.
Additional resources
8.3.3. Creating connectors using the Kafka Connect API
Use the Kafka Connect REST API to create a connector to use with Kafka Connect.
Prerequisites
- A Kafka Connect installation.
Procedure
Prepare a JSON payload with the connector configuration. For example:
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
Send a POST request to
<KafkaConnectAddress>:8083/connectors
to create the connector. The following example usescurl
:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
Verify that the connector was deployed by sending a GET request to
<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
8.3.4. Deleting connectors using the Kafka Connect API
Use the Kafka Connect REST API to delete a connector from Kafka Connect.
Prerequisites
- A Kafka Connect installation.
Deleting connectors
Verify that the connector exists by sending a
GET
request to<KafkaConnectAddress>:8083/connectors/<ConnectorName>
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
To delete the connector, send a
DELETE
request to<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
Verify that the connector was deleted by sending a GET request to
<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
8.3.5. Adding connector plugins
Kafka provides example connectors to use as a starting point for developing connectors. The following example connectors are included with AMQ Streams:
- FileStreamSink
- Reads data from Kafka topics and writes the data to a file.
- FileStreamSource
- Reads data from a file and sends the data to Kafka topics.
Both connectors are contained in the libs/connect-file-<kafka_version>.redhat-<build>.jar
plugin.
To use the connector plugins in Kafka Connect, you can add them to the classpath or specify a plugin path in the Kafka Connect properties file and copy the plugins to the location.
Specifying the example connectors in the classpath
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
Setting a plugin path
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
The plugin.path
configuration option can contain a comma-separated list of paths.
You can add more connector plugins if needed. Kafka Connect searches for and runs connector plugins at startup.
When running Kafka Connect in distributed mode, plugins must be made available on all worker nodes.