第7章 Skupper を使用した Kafka へのアクセス


パブリッククラウドリソースを使用したプライベート Kafka クラスターからのデータの処理

この例は、Skupper を使用してクラウドプロバイダー、データセンター、エッジサイト間でサービスを接続できるさまざまな方法を紹介する 一連の例 の一部です。

概要

この例は、Skupper を使用して、パブリックインターネットに公開せずにリモートサイトの Kafka クラスターにアクセスする方法を示したシンプルな Kafka アプリケーションです。

これには 2 つのサービスが含まれます。

  • プライベートデータセンターで実行されている "cluster1" という名前の Kafka クラスター。クラスターには "topic1" という名前のトピックがあります。
  • パブリッククラウドで実行される Kafka クライアント。"topic1" に 10 個のメッセージを送信し、再度受信します。

この例では、Kafka クラスターをセットアップするために、Strimzi プロジェクトの Kubernetes Operator を使用します。Kafka クライアントは、Quarkus を使用して構築された Java アプリケーションです。

この例では、プライベートデータセンターとパブリッククラウドを表すために、"private" と "public" という 2 つの Kubernetes 名前空間を使用します。

前提条件

手順

  • この例のリポジトリーのクローンを作成する
  • Skupper コマンドラインツールをインストールする
  • 名前空間を設定する
  • Kafka クラスターをデプロイする
  • サイトを作成する
  • サイトをリンクする
  • Kafka クラスターを公開する
  • クライアントを実行する

    1. この例のリポジトリーをクローンします。https://skupper.io/examples/index.html から適切な GitHub リポジトリーに移動し、リポジトリーをクローンします。
    2. Skupper コマンドラインツールをインストールします。

      この例では、Skupper コマンドラインツールを使用して Skupper をデプロイします。開発環境ごとに skupper コマンドを 1 回だけインストールする必要があります。

      CLI のインストールの詳細は、インストール を参照してください。設定されたシステムの場合は、次のコマンドを使用します。

      sudo dnf install skupper-cli
    3. 名前空間を設定します。

      Skupper は、通常は異なるクラスター上の複数の Kubernetes 名前空間で使用するように設計されています。skupper コマンドと kubectl コマンドは、kubeconfig と現在のコンテキストを使用して、動作する名前空間を選択します。

      kubeconfig はホームディレクトリーのファイルに保存されます。skupper および kubectl コマンドは、KUBECONFIG 環境変数を使用してこれを検索します。

      1 つの kubeconfig は、ユーザーごとにアクティブコンテキストを 1 つだけサポートします。この演習では複数のコンテキストを一度に使用するため、個別の kubeconfig を作成する必要があります。

      名前空間ごとに、新しいターミナルウィンドウを開きます。各ターミナルで、KUBECONFIG 環境変数を別のパスに設定し、クラスターにログインします。次に、使用する名前空間を作成し、現在のコンテキストに名前空間を設定します。

      注記

      ログイン手順はプロバイダーによって異なります。以下のドキュメントを参照してください。

      パブリック:

      export KUBECONFIG=~/.kube/config-public
      # Enter your provider-specific login command
      kubectl create namespace public
      kubectl config set-context --current --namespace public

      プライベート:

      export KUBECONFIG=~/.kube/config-private
      # Enter your provider-specific login command
      kubectl create namespace private
      kubectl config set-context --current --namespace private
    4. Kafka クラスターをデプロイします。

      プライベートでは、リストされた YAML ファイルを使用して kubectl create コマンドと kubectl apply コマンドを使用し、Operator をインストールしてクラスターとトピックをデプロイします。

      プライベート:

      kubectl create -f server/strimzi.yaml
      kubectl apply -f server/cluster1.yaml
      kubectl wait --for condition=ready --timeout 900s kafka/cluster1

      出力サンプル

      $ kubectl create -f server/strimzi.yaml
      customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
      rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
      clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
      rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created
      customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created
      deployment.apps/strimzi-cluster-operator created
      customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
      clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
      clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
      clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
      rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
      clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
      clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created
      clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-client-delegation created
      clusterrole.rbac.authorization.k8s.io/strimzi-kafka-client created
      serviceaccount/strimzi-cluster-operator created
      clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
      customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
      customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
      configmap/strimzi-cluster-operator created
      
      $ kubectl apply -f server/cluster1.yaml
      kafka.kafka.strimzi.io/cluster1 created
      kafkatopic.kafka.strimzi.io/topic1 created
      
      $ kubectl wait --for condition=ready --timeout 900s kafka/cluster1
      kafka.kafka.strimzi.io/cluster1 condition met

      注記:

      デフォルトでは、Kafka ブートストラップサーバーは、ドメイン名に Kubernetes 名前空間が含まれるブローカーアドレスを返します。この例のように、Kafka クライアントが Kafka クラスターとは異なる名前の名前空間で実行されている場合、クライアントは Kafka ブローカーを解決できなくなります。

      Kafka ブローカーにアクセスできるようにするには、各ブローカーの advertisedHost プロパティーを、Kafka クライアントがリモートサイトで解決できるドメイン名に設定します。この例では、次のリスナー設定を使用してこれを行います。

      spec:
        kafka:
          listeners:
            - name: plain
              port: 9092
              type: internal
              tls: false
              configuration:
                brokers:
                  - broker: 0
                    advertisedHost: cluster1-kafka-0.cluster1-kafka-brokers

      詳細は、Advertised addresses for brokers を参照してください。

    5. サイトを作成します。

      Skupper サイトは、アプリケーションのコンポーネントが実行される場所です。サイトは相互にリンクされ、アプリケーションのネットワークを形成します。Kubernetes では、サイトは名前空間に関連付けられます。

      名前空間ごとに、skupper init を使用してサイトを作成します。これにより、Skupper ルーターとコントローラーがデプロイされます。次に、skupper status を使用して結果を確認します。

      パブリック:

      skupper init
      skupper status

      出力サンプル

      $ skupper init
      Waiting for LoadBalancer IP or hostname...
      Waiting for status...
      Skupper is now installed in namespace 'public'.  Use 'skupper status' to get more information.
      
      $ skupper status
      Skupper is enabled for namespace "public". It is not connected to any other sites. It has no exposed services.

      プライベート:

      skupper init
      skupper status

      出力サンプル

      $ skupper init
      Waiting for LoadBalancer IP or hostname...
      Waiting for status...
      Skupper is now installed in namespace 'private'.  Use 'skupper status' to get more information.
      
      $ skupper status
      Skupper is enabled for namespace "private". It is not connected to any other sites. It has no exposed services.

      以下の手順を実行する場合は、いつでも skupper status を使用して進捗状況を確認できます。

    6. サイトをリンクします。

      Skupper リンクは、2 つのサイト間の通信チャネルです。リンクは、アプリケーションの接続と要求のトランスポートとして機能します。

      リンクを作成するには、skupper token createskupper link create の 2 つの skupper コマンドを組み合わせて使用する必要があります。

      skupper token create コマンドは、リンクを作成する権限を示す秘密トークンを生成します。トークンにはリンクの詳細も含まれます。次に、リモートサイトで、skupper link create コマンドを実行すると、トークンを使用して、トークンを生成したサイトへのリンクを作成します。

      注記

      リンクトークンは実際にはシークレットです。トークンがある場合は、サイトにリンクできます。信頼できる人だけがアクセスできるようにしてください。

      まず、パブリックサイトの skupper token create を使用してトークンを生成します。次に、プライベートサイトで skupper link create を使用して、サイトをリンクします。

      パブリック:

      skupper token create ~/secret.token

      出力サンプル

      $ skupper token create ~/secret.token
      Token written to ~/secret.token

      プライベート:

      skupper link create ~/secret.token

      出力サンプル

      $ skupper link create ~/secret.token
      Site configured to link to https://10.105.193.154:8081/ed9c37f6-d78a-11ec-a8c7-04421a4c5042 (name=link1)
      Check the status of the link using 'skupper link status'.

      ターミナルセッションが異なるマシン上にある場合は、トークンを安全に転送するために scp または同様のツールを使用する必要がある場合があります。デフォルトでは、トークンは 1 回の使用後、または作成後 15 分後に期限切れになります。

    7. Kafka クラスターを公開します。

      プライベートでは、--headless オプションを指定した skupper expose を使用して、Kafka クラスターを Skupper ネットワーク上のヘッドレスサービスとして公開します。

      次に、パブリックで kubectl get service コマンドを使用して、しばらくすると cluster1-kafka-brokers サービスが表示されることを確認します。

      プライベート:

      skupper expose statefulset/cluster1-kafka --headless --port 9092

      出力サンプル

      $ skupper expose statefulset/cluster1-kafka --headless --port 9092
      statefulset cluster1-kafka exposed as cluster1-kafka-brokers

      パブリック:

      kubectl get service/cluster1-kafka-brokers

      出力サンプル

      $ kubectl get service/cluster1-kafka-brokers
      NAME                     TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)    AGE
      cluster1-kafka-brokers   ClusterIP   None         <none>        9092/TCP   2s
    8. クライアントを実行します。

      kubectl run コマンドを使用して、パブリクでクライアントプログラムを実行します。

      パブリック:

      kubectl run client --attach --rm --restart Never --image quay.io/skupper/kafka-example-client --env BOOTSTRAPSERVERS=cluster1-kafka-brokers:9092

      出力サンプル

      $ kubectl run client --attach --rm --restart Never --image quay.io/skupper/kafka-example-client --env BOOTSTRAPSERVERS=cluster1-kafka-brokers:9092
      [...]
      Received message 1
      Received message 2
      Received message 3
      Received message 4
      Received message 5
      Received message 6
      Received message 7
      Received message 8
      Received message 9
      Received message 10
      Result: OK
      [...]

      クライアントコードを確認するには、このプロジェクトの クライアントディレクトリー を参照してください。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.