Chapter 2. Deploying Camel Kafka Connector with AMQ Streams
This chapter explains how to install Camel Kafka Connector into AMQ Streams on OpenShift and how to get started with example connectors.
2.1. Configuring authentication with registry.redhat.io
You must configure authentication with the registry.redhat.io
container registry before you can use AMQ Streams and Kafka Connect Soure-2-Image (S2I) to deploy Camel Kafka Connector on OpenShift.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
-
You must have the OpenShift
oc
client tool installed. For more details, see the OpenShift CLI documentation.
Procedure
Log into your OpenShift cluster as administrator, for example:
$ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
Open the project in which you wish to deploy Camel Kafka Connector, for example:
$ oc project myproject
Create a
docker-registry
secret using your Red Hat Customer Portal account, and replacePULL_SECRET_NAME
with the name of the secret that you wish to create:$ oc create secret docker-registry PULL_SECRET_NAME \ --docker-server=registry.redhat.io \ --docker-username=CUSTOMER_PORTAL_USERNAME \ --docker-password=CUSTOMER_PORTAL_PASSWORD \ --docker-email=EMAIL_ADDRESS
ImportantYou must create the
docker-registry
secret in every OpenShift project namespace that will include the image streams and useregistry.redhat.io
.Link the secret to your service account to use the secret for pulling images. The following example uses the
default
service account:$ oc secrets link default PULL_SECRET_NAME --for=pull
The service account name must match the name that the service account Pod uses.
Link the secret to the
builder
service account in the namespace in which you plan to use Kafka Connect S2I:$ oc secrets link builder PULL_SECRET_NAME
NoteIf you do not want to use your Red Hat account username and password to create the pull secret, you should create an authentication token by using a registry service account.
Additional resources
2.2. Installing AMQ Streams and Kafka Connect S2I on OpenShift
AMQ Streams and Kafka Connect with Source-2-Image (S2I) are required to install Camel Kafka Connector. If you do not already have AMQ Streams installed, you can install the AMQ Streams Operator on your OpenShift cluster from the OperatorHub. The OperatorHub is available from the OpenShift Container Platform web console and provides an interface for cluster administrators to discover and install Operators. For more details, see the OpenShift documentation.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
-
You must have authenticated with
registry.redhat.io
using the steps in Section 2.1, “Configuring authentication with registry.redhat.io”. - See Using AMQ Streams on OpenShift for detailed information on installing AMQ Streams and Kafka Connect S2I. This section shows a simple default example of installing using the OpenShift OperatorHub.
Procedure
- In the OpenShift Container Platform web console, log in using an account with cluster administrator privileges.
-
Select your project from the Project drop-down in the toolbar, for example,
myproject
. This must be the project in which you have authenticated withregistry.redhat.io
. - In the left navigation menu, click Operators > OperatorHub.
-
In the Filter by keyword text box, enter
AMQ
to find the Red Hat Integration - AMQ Streams Operator. - Read the information about the Operator, and click Install. This displays the Create Operator Subscription page.
Select your subscription settings:
- Installation Mode > A specific namespace on the cluster > myproject
- Update Channel > stable
Approval Strategy > Automatic
NoteThese settings depend on the specific requirements of your environment. For more details, see OpenShift documentation on Adding Operators to a cluster.
- Click Subscribe. This displays the Operators > Installed Operators page.
- Wait a few moments until the Status for the AMQ Streams Operator displays Succeeded and the subscription is Up to Date.
Create a new Kafka broker cluster:
- Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka, click Create Instance to create a new Kafka broker cluster.
Edit the custom resource definition as appropriate, and click Create.
ImportantThe default example creates a Kafka cluster with 3 Zookeeper nodes and 3 Kafka nodes with
ephemeral
storage. This temporary storage is suitable for development and testing only, and not for a production environment. For more details, see Using AMQ Streams on OpenShift.
Create a new Kafka Connect S2I cluster:
- Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka Connect S2I, click Create Instance to create a new Kafka Connect cluster with OpenShift Source-2-Image support.
- Edit the custom resource definition as appropriate, and click Create. For more details on using Kafka Connect with S2I, see Using AMQ Streams on OpenShift.
- Select Workloads > Pods to verify that the deployed resources are running on OpenShift.
Additional resources
2.3. Deploying Camel Kafka Connector in AMQ Streams on OpenShift
This section explains how to use Kafka Connect Source-2-Image (S2I) to add your Camel Kafka connectors to your existing Docker-based Kafka Connect image to build a new image. This section also shows how to create an instance of a Camel Kafka connector plug-in using an example AWS S3 Camel Kafka connector.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
-
You must have the OpenShift
oc
client tool installed. For more details, see the OpenShift CLI documentation. - You must have installed AMQ Streams and Kafka Connect with S2I support on your OpenShift cluster. For more details, see Section 2.2, “Installing AMQ Streams and Kafka Connect S2I on OpenShift”.
- You must have downloaded the connectors available for Camel Kafka Connector from Software Downloads > Red Hat Integration - Camel K.
Procedure
Log into your OpenShift cluster as administrator, for example:
$ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
Change to the project in which Kafka Connect S2I is installed:
$ oc project myproject
Add your downloaded connectors to the existing Kafka Connect Docker image build, and then wait for the new image build to finish and be configured with the new connectors. For example:
$ oc start-build my-connect-cluster-connect --from-dir=./camel-kafka-connector/connectors/ --follow Uploading directory "camel-kafka-connector/connectors" as binary input for the build ... ... Uploading finished build.build.openshift.io/my-connect-cluster-connect-2 started Receiving source from STDIN as archive ... Caching blobs under "/var/cache/blobs". Getting image source signatures Copying blob sha256:5ed7b62ff462957d0ee8956db7a787d8e17c1bdee7a78c57c917298019f77ea2 ... Writing manifest to image destination Storing signatures Generating dockerfile with builder image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4 STEP 1: FROM image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4 STEP 2: LABEL "io.openshift.build.image"="image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4" "io.openshift.build.source-location"="/tmp/build/inputs" STEP 3: ENV OPENSHIFT_BUILD_NAME="my-connect-cluster-connect-2" OPENSHIFT_BUILD_NAMESPACE="myproject" STEP 4: USER root STEP 5: COPY upload/src /tmp/src STEP 6: RUN chown -R 1001:0 /tmp/src STEP 7: USER 1001 STEP 8: RUN /opt/kafka/s2i/assemble Assembling plugins into custom plugin directory /tmp/kafka-plugins Moving plugins to /tmp/kafka-plugins STEP 9: CMD /opt/kafka/s2i/run STEP 10: COMMIT temp.builder.openshift.io/myproject/my-connect-cluster-connect-2:d0873588 Getting image source signatures Copying blob sha256:edf3aa290fb3c255a84fe836109093fbfeef65c08544f655fad8d6afb53868ba ... Writing manifest to image destination Storing signatures 0d392e3df3edc0801f0b7091ba99e2a666008531ccb5271cd0d4b54901dac0b9 0d392e3df3edc0801f0b7091ba99e2a666008531ccb5271cd0d4b54901dac0b9 Pushing image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect:latest ... Getting image source signatures Copying blob sha256:06ea991a3b933c49058585f82006648f8702a33b5de8725e2fe85724f18a2ff4 ... Writing manifest to image destination Storing signatures Successfully pushed image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect@sha256:9db57d33df6d0494ea6ee6e4696fcaf79eb81aabeb0bbc180dec5324d33e7eda Push successful
Check that the Camel Kafka connectors are available in your Kafka Connect cluster as follows:
$ oc exec -i -c kafka my-cluster-kafka-0 -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
You should see something like the following output:
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.3.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.3.0"}]
Use the following annotation to enable instantiating Camel Kafka connectors using a specific custom resource:
$ oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
ImportantWhen the
use-connector-resources
option is enabled, do not use the Kafka Connect API server. The Kafka Connect Operator will revert any changes that you make.Create the connector instance by creating a specific custom resource that includes your connector configuration. The following example shows the configuration for an AWS S3 connector plug-in:
$ oc apply -f - << EOF apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-source-connector namespace: myproject labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter topics: s3-topic camel.source.path.bucketNameOrArn: camel-connector-test camel.source.endpoint.autocloseBody: false camel.source.maxPollDuration: 10000 camel.component.aws-s3.accessKey: xxx camel.component.aws-s3.secretKey: xxx camel.component.aws-s3.region: xxx EOF kafkaconnector.kafka.strimzi.io/s3-source-connector created
Check the status of your connector using the following command, for example:
$ oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o yaml
You can also run the Kafka console consumer to see the messages received from the topic:
$ oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning
2.4. Deploying Camel Kafka Connector examples
This section describes how to deploy the following Camel Kafka Connector examples:
- AWS S3 source to JMS sink connectors
- CQL sink connector
- Debezium PostgreSQL connector (community example)
Prerequisites
- See the What is needed section in the readmes shown in the Procedure section.
Procedure
Perform the steps described in the GitHub readme for the each demonstration example:
Additional resources