Chapter 2. Deploying Camel Kafka Connector with AMQ Streams on OpenShift
This chapter explains how to install Camel Kafka Connector into AMQ Streams on OpenShift and how to get started with example connectors.
2.1. Configuring authentication with registry.redhat.io
You must configure authentication with the registry.redhat.io
container registry before you can use AMQ Streams and Kafka Connect Source-2-Image (S2I) to deploy Camel Kafka Connector on OpenShift.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
-
You must have the OpenShift
oc
client tool installed. For more details, see the OpenShift CLI documentation.
Procedure
Log into your OpenShift cluster as administrator, for example:
oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
$ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
Copy to Clipboard Copied! Open the project in which you want to deploy Camel Kafka Connector, for example:
oc project myproject
$ oc project myproject
Copy to Clipboard Copied! Create a
docker-registry
secret using your Red Hat Customer Portal account, and replacePULL_SECRET_NAME
with the name of the secret that you want to create:oc create secret docker-registry PULL_SECRET_NAME \ --docker-server=registry.redhat.io \ --docker-username=CUSTOMER_PORTAL_USERNAME \ --docker-password=CUSTOMER_PORTAL_PASSWORD \ --docker-email=EMAIL_ADDRESS
$ oc create secret docker-registry PULL_SECRET_NAME \ --docker-server=registry.redhat.io \ --docker-username=CUSTOMER_PORTAL_USERNAME \ --docker-password=CUSTOMER_PORTAL_PASSWORD \ --docker-email=EMAIL_ADDRESS
Copy to Clipboard Copied! You should see the following output:
secret/PULL_SECRET_NAME created
secret/PULL_SECRET_NAME created
Copy to Clipboard Copied! ImportantYou must create this pull secret in every OpenShift project namespace that will include the image streams and use
registry.redhat.io
.Link the secret to your service account to use the secret for pulling images. The following example uses the
default
service account:oc secrets link default PULL_SECRET_NAME --for=pull
$ oc secrets link default PULL_SECRET_NAME --for=pull
Copy to Clipboard Copied! The service account name must match the name that the service account Pod uses.
Link the secret to the
builder
service account in the namespace in which you plan to use Kafka Connect S2I:oc secrets link builder PULL_SECRET_NAME
$ oc secrets link builder PULL_SECRET_NAME
Copy to Clipboard Copied! NoteIf you do not wish to use your Red Hat account username and password to create the pull secret, you should create an authentication token by using a registry service account.
Additional resources
2.2. Installing AMQ Streams and Kafka Connect S2I on OpenShift
AMQ Streams and Kafka Connect with Source-2-Image (S2I) are required to install Camel Kafka Connector. If you do not already have AMQ Streams installed, you can install the AMQ Streams Operator on your OpenShift cluster from the OperatorHub. The OperatorHub is available from the OpenShift Container Platform web console and provides an interface for cluster administrators to discover and install Operators. For more details, see the OpenShift documentation.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
-
You must have authenticated with
registry.redhat.io
using the steps in Section 2.1, “Configuring authentication with registry.redhat.io”. - See Using AMQ Streams on OpenShift for detailed information on installing AMQ Streams and Kafka Connect S2I. This section shows a simple default example of installing using the OpenShift OperatorHub.
Procedure
- In the OpenShift Container Platform web console, log in using an account with cluster administrator privileges.
-
Select your project from the Project drop-down in the toolbar, for example,
myproject
. This must be the project in which you have authenticated withregistry.redhat.io
. - In the left navigation menu, click Operators > OperatorHub.
-
In the Filter by keyword text box, enter
AMQ
to find the Red Hat Integration - AMQ Streams Operator. - Read the information about the Operator, and click Install to display the Operator subscription page.
Select your subscription settings, for example:
- Update Channel > stable
- Installation Mode > A specific namespace on the cluster > myproject
Approval Strategy > Automatic
NoteThese settings depend on the specific requirements of your environment. For more details, see OpenShift documentation on Adding Operators to a cluster.
- Click Install, and wait a few moments until the Operator is ready for use.
Create a new Kafka broker cluster:
- Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka, click Create Instance to create a new Kafka broker cluster.
Edit the custom resource definition as appropriate, and click Create.
ImportantThe default example creates a Kafka cluster with 3 Zookeeper nodes and 3 Kafka nodes with
ephemeral
storage. This temporary storage is suitable for development and testing only, and not for a production environment. For more details, see Using AMQ Streams on OpenShift.
Create a new Kafka Connect S2I cluster:
- Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka Connect S2I, click Create Instance to create a new Kafka Connect cluster with OpenShift Source-2-Image support.
- Edit the custom resource definition as appropriate, and click Create. For more details on using Kafka Connect with S2I, see Using AMQ Streams on OpenShift.
- Select Workloads > Pods to verify that the deployed resources are running on OpenShift.
Additional resources
2.3. Deploying Camel Kafka Connector using Kafka Connect S2I on OpenShift
This section explains how to use Kafka Connect Source-2-Image (S2I) with AMQ Streams to add your Camel Kafka connectors to your existing Docker-based Kafka Connect image and to build a new image. This section also shows how to create an instance of a Camel Kafka connector plug-in using an example AWS2 S3 Camel Kafka connector.
Prerequisites
- You must have cluster administrator access to an OpenShift Container Platform cluster.
- You must have installed AMQ Streams and Kafka Connect with S2I support on your OpenShift cluster. For more details, see Section 2.2, “Installing AMQ Streams and Kafka Connect S2I on OpenShift”.
- You must have downloaded Camel Kafka Connector from Software Downloads > Red Hat Integration.
- You must have access to an Amazon S3 bucket.
Procedure
Log into your OpenShift cluster as administrator, for example:
oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
$ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
Copy to Clipboard Copied! Change to the project in which Kafka Connect S2I is installed:
oc project myproject
$ oc project myproject
Copy to Clipboard Copied! Add your downloaded Camel Kafka connectors to the existing Kafka Connect Docker image build, and wait for the new image build to be configured with the new connectors. For example:
oc start-build my-connect-cluster-connect --from-dir=./camel-kafka-connector/connectors/ --follow
$ oc start-build my-connect-cluster-connect --from-dir=./camel-kafka-connector/connectors/ --follow Uploading directory "camel-kafka-connector/connectors" as binary input for the build ... ... Uploading finished build.build.openshift.io/my-connect-cluster-connect-2 started Receiving source from STDIN as archive ... Caching blobs under "/var/cache/blobs". Getting image source signatures ... Writing manifest to image destination Storing signatures Generating dockerfile with builder image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4 STEP 1: FROM image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4 STEP 2: LABEL "io.openshift.build.image"="image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4" "io.openshift.build.source-location"="/tmp/build/inputs" STEP 3: ENV OPENSHIFT_BUILD_NAME="my-connect-cluster-connect-2" OPENSHIFT_BUILD_NAMESPACE="myproject" STEP 4: USER root STEP 5: COPY upload/src /tmp/src STEP 6: RUN chown -R 1001:0 /tmp/src STEP 7: USER 1001 STEP 8: RUN /opt/kafka/s2i/assemble Assembling plugins into custom plugin directory /tmp/kafka-plugins Moving plugins to /tmp/kafka-plugins STEP 9: CMD /opt/kafka/s2i/run STEP 10: COMMIT temp.builder.openshift.io/myproject/my-connect-cluster-connect-2:d0873588 Getting image source signatures ... Writing manifest to image destination Storing signatures ... Pushing image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect:latest ... Getting image source signatures ... Writing manifest to image destination Storing signatures Successfully pushed image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect@sha256:9db57d33df6d0494ea6ee6e4696fcaf79eb81aabeb0bbc180dec5324d33e7eda Push successful
Copy to Clipboard Copied! Check that Camel Kafka Connector is available in your Kafka Connect cluster as follows:
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
$ oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
Copy to Clipboard Copied! You should see something like the following output:
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0.redhat-00003"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0.redhat-00003"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0.redhat-00003"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0.redhat-00003"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
Copy to Clipboard Copied! Use the following annotation to enable instantiating Camel Kafka connectors using a specific custom resource:
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
$ oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true kafkaconnects2i.kafka.strimzi.io/my-connect-cluster annotated
Copy to Clipboard Copied! ImportantWhen the
use-connector-resources
option is enabled, do not use the Kafka Connect API server. The Kafka Connect Operator will revert any changes that you make.Create the connector instance by creating a specific custom resource that includes your connector configuration. The following example shows the configuration for an AWS2 S3 connector plug-in:
oc apply -f - << EOF apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-source-connector namespace: myproject labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter topics: s3-topic camel.source.path.bucketNameOrArn: camel-kafka-connector camel.source.maxPollDuration: 10000 camel.component.aws2-s3.accessKey: xxxx camel.component.aws2-s3.secretKey: yyyy camel.component.aws2-s3.region: region EOF kafkaconnector.kafka.strimzi.io/s3-source-connector created
$ oc apply -f - << EOF apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-source-connector namespace: myproject labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter topics: s3-topic camel.source.path.bucketNameOrArn: camel-kafka-connector camel.source.maxPollDuration: 10000 camel.component.aws2-s3.accessKey: xxxx camel.component.aws2-s3.secretKey: yyyy camel.component.aws2-s3.region: region EOF kafkaconnector.kafka.strimzi.io/s3-source-connector created
Copy to Clipboard Copied! Check the status of your connector using the following example command:
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/s3-source-connector/status
$ oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/s3-source-connector/status
Copy to Clipboard Copied! -
Connect to your AWS Console, and upload a file to the
camel-kafka-connector
AWS S3 bucket to activate the Camel Kafka route. You can run the Kafka console consumer to see the messages received from the topic as follows:
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning CONTENTS_OF_FILE CONTENTS_OF_FILE ...
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning CONTENTS_OF_FILE CONTENTS_OF_FILE ...
Copy to Clipboard Copied!