Search

Chapter 2. Deploying Camel Kafka Connector with AMQ Streams

download PDF

This chapter explains how to install Camel Kafka Connector into AMQ Streams on OpenShift and how to get started with example connectors.

2.1. Configuring authentication with registry.redhat.io

You must configure authentication with the registry.redhat.io container registry before you can use AMQ Streams and Kafka Connect Soure-2-Image (S2I) to deploy Camel Kafka Connector on OpenShift.

Prerequisites

  • You must have cluster administrator access to an OpenShift Container Platform cluster.
  • You must have the OpenShift oc client tool installed. For more details, see the OpenShift CLI documentation.

Procedure

  1. Log into your OpenShift cluster as administrator, for example:

    $ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
  2. Open the project in which you wish to deploy Camel Kafka Connector, for example:

    $ oc project myproject
  3. Create a docker-registry secret using your Red Hat Customer Portal account, and replace PULL_SECRET_NAME with the name of the secret that you wish to create:

    $ oc create secret docker-registry PULL_SECRET_NAME \
      --docker-server=registry.redhat.io \
      --docker-username=CUSTOMER_PORTAL_USERNAME \
      --docker-password=CUSTOMER_PORTAL_PASSWORD \
      --docker-email=EMAIL_ADDRESS
    Important

    You must create the docker-registry secret in every OpenShift project namespace that will include the image streams and use registry.redhat.io.

  4. Link the secret to your service account to use the secret for pulling images. The following example uses the default service account:

    $ oc secrets link default PULL_SECRET_NAME --for=pull

    The service account name must match the name that the service account Pod uses.

  5. Link the secret to the builder service account in the namespace in which you plan to use Kafka Connect S2I:

    $ oc secrets link builder PULL_SECRET_NAME
    Note

    If you do not want to use your Red Hat account username and password to create the pull secret, you should create an authentication token by using a registry service account.

2.2. Installing AMQ Streams and Kafka Connect S2I on OpenShift

AMQ Streams and Kafka Connect with Source-2-Image (S2I) are required to install Camel Kafka Connector. If you do not already have AMQ Streams installed, you can install the AMQ Streams Operator on your OpenShift cluster from the OperatorHub. The OperatorHub is available from the OpenShift Container Platform web console and provides an interface for cluster administrators to discover and install Operators. For more details, see the OpenShift documentation.

Prerequisites

Procedure

  1. In the OpenShift Container Platform web console, log in using an account with cluster administrator privileges.
  2. Select your project from the Project drop-down in the toolbar, for example, myproject. This must be the project in which you have authenticated with registry.redhat.io.
  3. In the left navigation menu, click Operators > OperatorHub.
  4. In the Filter by keyword text box, enter AMQ to find the Red Hat Integration - AMQ Streams Operator.
  5. Read the information about the Operator, and click Install. This displays the Create Operator Subscription page.
  6. Select your subscription settings:

    • Installation Mode > A specific namespace on the cluster > myproject
    • Update Channel > stable
    • Approval Strategy > Automatic

      Note

      These settings depend on the specific requirements of your environment. For more details, see OpenShift documentation on Adding Operators to a cluster.

  7. Click Subscribe. This displays the Operators > Installed Operators page.
  8. Wait a few moments until the Status for the AMQ Streams Operator displays Succeeded and the subscription is Up to Date.
  9. Create a new Kafka broker cluster:

    1. Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka, click Create Instance to create a new Kafka broker cluster.
    2. Edit the custom resource definition as appropriate, and click Create.

      Important

      The default example creates a Kafka cluster with 3 Zookeeper nodes and 3 Kafka nodes with ephemeral storage. This temporary storage is suitable for development and testing only, and not for a production environment. For more details, see Using AMQ Streams on OpenShift.

  10. Create a new Kafka Connect S2I cluster:

    1. Under Red Hat Integration - AMQ Streams > Provided APIs > Kafka Connect S2I, click Create Instance to create a new Kafka Connect cluster with OpenShift Source-2-Image support.
    2. Edit the custom resource definition as appropriate, and click Create. For more details on using Kafka Connect with S2I, see Using AMQ Streams on OpenShift.
  11. Select Workloads > Pods to verify that the deployed resources are running on OpenShift.

2.3. Deploying Camel Kafka Connector in AMQ Streams on OpenShift

This section explains how to use Kafka Connect Source-2-Image (S2I) to add your Camel Kafka connectors to your existing Docker-based Kafka Connect image to build a new image. This section also shows how to create an instance of a Camel Kafka connector plug-in using an example AWS S3 Camel Kafka connector.

Prerequisites

Procedure

  1. Log into your OpenShift cluster as administrator, for example:

    $ oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
  2. Change to the project in which Kafka Connect S2I is installed:

    $ oc project myproject
  3. Add your downloaded connectors to the existing Kafka Connect Docker image build, and then wait for the new image build to finish and be configured with the new connectors. For example:

    $ oc start-build my-connect-cluster-connect --from-dir=./camel-kafka-connector/connectors/ --follow
    Uploading directory "camel-kafka-connector/connectors" as binary input for the build ...
    ...
    Uploading finished
    build.build.openshift.io/my-connect-cluster-connect-2 started
    Receiving source from STDIN as archive ...
    Caching blobs under "/var/cache/blobs".
    Getting image source signatures
    Copying blob sha256:5ed7b62ff462957d0ee8956db7a787d8e17c1bdee7a78c57c917298019f77ea2
    ...
    Writing manifest to image destination
    Storing signatures
    Generating dockerfile with builder image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4
    STEP 1: FROM image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4
    STEP 2: LABEL "io.openshift.build.image"="image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect-source@sha256:12d5ed92510941f1569faa449665e9fc6ea544e67b7ae189ec6b8df434e121f4"       "io.openshift.build.source-location"="/tmp/build/inputs"
    STEP 3: ENV OPENSHIFT_BUILD_NAME="my-connect-cluster-connect-2"     OPENSHIFT_BUILD_NAMESPACE="myproject"
    STEP 4: USER root
    STEP 5: COPY upload/src /tmp/src
    STEP 6: RUN chown -R 1001:0 /tmp/src
    STEP 7: USER 1001
    STEP 8: RUN /opt/kafka/s2i/assemble
    Assembling plugins into custom plugin directory /tmp/kafka-plugins
    Moving plugins to /tmp/kafka-plugins
    STEP 9: CMD /opt/kafka/s2i/run
    STEP 10: COMMIT temp.builder.openshift.io/myproject/my-connect-cluster-connect-2:d0873588
    Getting image source signatures
    Copying blob sha256:edf3aa290fb3c255a84fe836109093fbfeef65c08544f655fad8d6afb53868ba
    ...
    Writing manifest to image destination
    Storing signatures
    0d392e3df3edc0801f0b7091ba99e2a666008531ccb5271cd0d4b54901dac0b9
    0d392e3df3edc0801f0b7091ba99e2a666008531ccb5271cd0d4b54901dac0b9
    
    Pushing image image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect:latest ...
    Getting image source signatures
    Copying blob sha256:06ea991a3b933c49058585f82006648f8702a33b5de8725e2fe85724f18a2ff4
    ...
    Writing manifest to image destination
    Storing signatures
    Successfully pushed image-registry.openshift-image-registry.svc:5000/myproject/my-connect-cluster-connect@sha256:9db57d33df6d0494ea6ee6e4696fcaf79eb81aabeb0bbc180dec5324d33e7eda
    Push successful
  4. Check that the Camel Kafka connectors are available in your Kafka Connect cluster as follows:

    $ oc exec -i -c kafka my-cluster-kafka-0 -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins

    You should see something like the following output:

    [{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.3.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.3.0"}]
  5. Use the following annotation to enable instantiating Camel Kafka connectors using a specific custom resource:

    $ oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
    Important

    When the use-connector-resources option is enabled, do not use the Kafka Connect API server. The Kafka Connect Operator will revert any changes that you make.

  6. Create the connector instance by creating a specific custom resource that includes your connector configuration. The following example shows the configuration for an AWS S3 connector plug-in:

    $ oc apply -f - << EOF
    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: s3-source-connector
      namespace: myproject
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
      tasksMax: 1
      config:
        key.converter: org.apache.kafka.connect.storage.StringConverter
        value.converter: org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
        topics: s3-topic
        camel.source.path.bucketNameOrArn: camel-connector-test
        camel.source.endpoint.autocloseBody: false
        camel.source.maxPollDuration: 10000
        camel.component.aws-s3.accessKey: xxx
        camel.component.aws-s3.secretKey: xxx
        camel.component.aws-s3.region: xxx
    EOF
    
    kafkaconnector.kafka.strimzi.io/s3-source-connector created
  7. Check the status of your connector using the following command, for example:

    $ oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o yaml
  8. You can also run the Kafka console consumer to see the messages received from the topic:

    $ oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning

2.4. Deploying Camel Kafka Connector examples

This section describes how to deploy the following Camel Kafka Connector examples:

  • AWS S3 source to JMS sink connectors
  • CQL sink connector
  • Debezium PostgreSQL connector (community example)

Prerequisites

  • See the What is needed section in the readmes shown in the Procedure section.

Procedure

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.