Chapter 8. Kafka Connect
Kafka Connect is a tool for streaming data between Apache Kafka and external systems. It provides a framework for moving large amounts of data while maintaining scalability and reliability. Kafka Connect is typically used to integrate Kafka with database, storage, and messaging systems that are external to your Kafka cluster.
Kafka Connect uses connector plug-ins that implement connectivity for different types of external systems. There are two types of connector plug-ins: sink and source. Sink connectors stream data from Kafka to external systems. Source connectors stream data from external systems into Kafka.
Kafka Connect can run in standalone or distributed modes.
- Standalone mode
- In standalone mode, Kafka Connect runs on a single node with user-defined configuration read from a properties file.
- Distributed mode
- In distributed mode, Kafka Connect runs across one or more worker nodes and the workloads are distributed among them. You manage connectors and their configuration using an HTTP REST interface.
8.1. Kafka Connect in standalone mode
In standalone mode, Kafka Connect runs as a single process, on a single node. You manage the configuration of standalone mode using properties files.
8.1.1. Configuring Kafka Connect in standalone mode
To configure Kafka Connect in standalone mode, edit the config/connect-standalone.properties
configuration file. The following options are the most important.
bootstrap.servers
-
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
. key.converter
-
The class used to convert message keys to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. offset.storage.file.filename
- Specifies the file in which the offset data is stored.
An example configuration file is provided in the installation directory at config/connect-standalone.properties
. For a complete list of all supported Kafka Connect configuration options, see [kafka-connect-configuration-parameters-str].
Connector plug-ins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer.
or consumer.
.
For more information on configuring Kafka producers and consumers, see:
8.1.2. Configuring connectors in Kafka Connect in standalone mode
You can configure connector plug-ins for Kafka Connect in standalone mode using properties files. Most configuration options are specific to each connector. The following options apply to all connectors:
name
- The name of the connector, which must be unique within the current Kafka Connect instance.
connector.class
-
The class of the connector plug-in. For example,
org.apache.kafka.connect.file.FileStreamSinkConnector
. tasks.max
- The maximum number of tasks that the specified connector can use. Tasks enable the connector to perform work in parallel. The connector might create fewer tasks than specified.
key.converter
-
The class used to convert message keys to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
.
Additionally, you must set at least one of the following options for sink connectors:
topics
- A comma-separated list of topics used as input.
topics.regex
- A Java regular expression of topics used as input.
For all other options, see the documentation for the available connectors.
AMQ Streams includes example connector configuration files – see config/connect-file-sink.properties
and config/connect-file-source.properties
in the AMQ Streams installation directory.
8.1.3. Running Kafka Connect in standalone mode
This procedure describes how to configure and run Kafka Connect in standalone mode.
Prerequisites
- An installed and running AMQ Streams} cluster.
Procedure
Edit the
/opt/kafka/config/connect-standalone.properties
Kafka Connect configuration file and setbootstrap.server
to point to your Kafka brokers. For example:bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
Start Kafka Connect with the configuration file and specify one or more connector configurations.
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
Verify that Kafka Connect is running.
jcmd | grep ConnectStandalone
Additional resources
- For more information on installing AMQ Streams, see Section 2.3, “Installing AMQ Streams”.
- For more information on configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For a complete list of supported Kafka Connect configuration options, see Appendix F, Kafka Connect configuration parameters.
8.2. Kafka Connect in distributed mode
In distributed mode, Kafka Connect runs across one or more worker nodes and the workloads are distributed among them. You manage connector plug-ins and their configuration using the HTTP REST interface.
8.2.1. Configuring Kafka Connect in distributed mode
To configure Kafka Connect in distributed mode, edit the config/connect-distributed.properties
configuration file. The following options are the most important.
bootstrap.servers
-
A list of Kafka broker addresses used as bootstrap connections to Kafka. For example,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
. key.converter
-
The class used to convert message keys to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
The class used to convert message payloads to and from Kafka format. For example,
org.apache.kafka.connect.json.JsonConverter
. group.id
-
The name of the distributed Kafka Connect cluster. This must be unique and must not conflict with another consumer group ID. The default value is
connect-cluster
. config.storage.topic
-
The Kafka topic used to store connector configurations. The default value is
connect-configs
. offset.storage.topic
-
The Kafka topic used to store offsets. The default value is
connect-offset
. status.storage.topic
-
The Kafka topic used for worker node statuses. The default value is
connect-status
.
AMQ Streams includes an example configuration file for Kafka Connect in distributed mode – see config/connect-distributed.properties
in the AMQ Streams installation directory.
For a complete list of all supported Kafka Connect configuration options, see Appendix F, Kafka Connect configuration parameters.
Connector plug-ins open client connections to the Kafka brokers using the bootstrap address. To configure these connections, use the standard Kafka producer and consumer configuration options prefixed by producer.
or consumer.
.
For more information on configuring Kafka producers and consumers, see:
8.2.2. Configuring connectors in distributed Kafka Connect
HTTP REST Interface
Connectors for distributed Kafka Connect are configured using HTTP REST interface. The REST interface listens on port 8083 by default. It supports following endpoints:
GET /connectors
- Return a list of existing connectors.
POST /connectors
- Create a connector. The request body has to be a JSON object with the connector configuration.
GET /connectors/<name>
- Get information about a specific connector.
GET /connectors/<name>/config
- Get configuration of a specific connector.
PUT /connectors/<name>/config
- Update the configuration of a specific connector.
GET /connectors/<name>/status
- Get the status of a specific connector.
PUT /connectors/<name>/pause
- Pause the connector and all its tasks. The connector will stop processing any messages.
PUT /connectors/<name>/resume
- Resume a paused connector.
POST /connectors/<name>/restart
- Restart a connector in case it has failed.
DELETE /connectors/<name>
- Delete a connector.
GET /connector-plugins
- Get a list of all supported connector plugins.
Connector configuration
Most configuration options are connector specific and included in the documentation for the connectors. The following fields are common for all connectors.
name
- Name of the connector. Must be unique within a given Kafka Connect instance.
connector.class
-
Class of the connector plugin. For example
org.apache.kafka.connect.file.FileStreamSinkConnector
. tasks.max
- The maximum number of tasks used by this connector. Tasks are used by the connector to parallelise its work. Connetors may create fewer tasks than specified.
key.converter
-
Class used to convert message keys to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
Class used to convert message payloads to and from Kafka format. This overrides the default value set by the Kafka Connect configuration. For example,
org.apache.kafka.connect.json.JsonConverter
.
Additionally, one of the following options must be set for sink connectors:
topics
- A comma-separated list of topics used as input.
topics.regex
- A Java regular expression of topics used as input.
For all other options, see the documentation for the specific connector.
AMQ Streams includes example connector configuration files. They can be found in config/connect-file-sink.properties
and config/connect-file-source.properties
in the AMQ Streams installation directory.
8.2.3. Running distributed Kafka Connect
This procedure describes how to configure and run Kafka Connect in distributed mode.
Prerequisites
- An installed and running AMQ Streams cluster.
Running the cluster
Edit the
/opt/kafka/config/connect-distributed.properties
Kafka Connect configuration file on all Kafka Connect worker nodes.-
Set the
bootstrap.server
option to point to your Kafka brokers. -
Set the
group.id
option. -
Set the
config.storage.topic
option. -
Set the
offset.storage.topic
option. Set the
status.storage.topic
option.For example:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
-
Set the
Start the Kafka Connect workers with the
/opt/kafka/config/connect-distributed.properties
configuration file on all Kafka Connect nodes.su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Verify that Kafka Connect is running.
jcmd | grep ConnectDistributed
Additional resources
- For more information about installing AMQ Streams, see Section 2.3, “Installing AMQ Streams”.
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For a complete list of supported Kafka Connect configuration options, see Appendix F, Kafka Connect configuration parameters.
8.2.4. Creating connectors
This procedure describes how to use the Kafka Connect REST API to create a connector plug-in for use with Kafka Connect in distributed mode.
Prerequisites
- A Kafka Connect installation running in distributed mode.
Procedure
Prepare a JSON payload with the connector configuration. For example:
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
Send a POST request to
<KafkaConnectAddress>:8083/connectors
to create the connector. The following example usescurl
:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
Verify that the connector was deployed by sending a GET request to
<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
8.2.5. Deleting connectors
This procedure describes how to use the Kafka Connect REST API to delete a connector plug-in from Kafka Connect in distributed mode.
Prerequisites
- A Kafka Connect installation running in distributed mode.
Deleting connectors
Verify that the connector exists by sending a
GET
request to<KafkaConnectAddress>:8083/connectors/<ConnectorName>
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
To delete the connector, send a
DELETE
request to<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
Verify that the connector was deleted by sending a GET request to
<KafkaConnectAddress>:8083/connectors
. The following example usescurl
:curl http://connect0.my-domain.com:8083/connectors
8.3. Connector plug-ins
The following connector plug-ins are included with AMQ Streams.
FileStreamSink
Reads data from Kafka topics and writes the data to a file.
FileStreamSource
Reads data from a file and sends the data to Kafka topics.
You can add more connector plug-ins if needed. Kafka Connect searches for and runs additional connector plug-ins at startup. To define the path that kafka Connect searches for plug-ins, set the plugin.path configuration
option:
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
The plugin.path
configuration option can contain a comma-separated list of paths.
When running Kafka Connect in distributed mode, plug-ins must be made available on all worker nodes.
8.4. Adding connector plugins
This procedure shows you how to add additional connector plug-ins.
Prerequisites
- An installed and running AMQ Streams cluster.
Procedure
Create the
/opt/kafka/connector-plugins
directory.su - kafka mkdir /opt/kafka/connector-plugins
Edit the
/opt/kafka/config/connect-standalone.properties
or/opt/kafka/config/connect-distributed.properties
Kafka Connect configuration file, and set theplugin.path
option to/opt/kafka/connector-plugins
. For example:plugin.path=/opt/kafka/connector-plugins
-
Copy your connector plug-ins to
/opt/kafka/connector-plugins
. - Start or restart the Kafka Connect workers.
Additional resources
- For more information on installing AMQ Streams, see Section 2.3, “Installing AMQ Streams”.
- For more information on configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information on running Kafka Connect in standalone mode, see Section 8.1.3, “Running Kafka Connect in standalone mode”.
- For more information on running Kafka Connect in distributed mode, see Section 8.2.3, “Running distributed Kafka Connect”.
- For a complete list of supported Kafka Connect configuration options, see Appendix F, Kafka Connect configuration parameters.