第7章 Skupper を使用した Kafka へのアクセス
パブリッククラウドリソースを使用したプライベート Kafka クラスターからのデータの処理
この例は、Skupper を使用してクラウドプロバイダー、データセンター、エッジサイト間でサービスを接続できるさまざまな方法を紹介する 一連の例 の一部です。
概要
この例は、Skupper を使用して、パブリックインターネットに公開せずにリモートサイトの Kafka クラスターにアクセスする方法を示したシンプルな Kafka アプリケーションです。
これには 2 つのサービスが含まれます。
- プライベートデータセンターで実行されている "cluster1" という名前の Kafka クラスター。クラスターには "topic1" という名前のトピックがあります。
- パブリッククラウドで実行される Kafka クライアント。"topic1" に 10 個のメッセージを送信し、再度受信します。
この例では、Kafka クラスターをセットアップするために、Strimzi プロジェクトの Kubernetes Operator を使用します。Kafka クライアントは、Quarkus を使用して構築された Java アプリケーションです。
この例では、プライベートデータセンターとパブリッククラウドを表すために、"private" と "public" という 2 つの Kubernetes 名前空間を使用します。
手順
- この例のリポジトリーのクローンを作成する
- Skupper コマンドラインツールをインストールする
- 名前空間を設定する
- Kafka クラスターをデプロイする
- サイトを作成する
- サイトをリンクする
- Kafka クラスターを公開する
- この例のリポジトリーをクローンします。https://skupper.io/examples/index.html から適切な GitHub リポジトリーに移動し、リポジトリーをクローンします。
Skupper コマンドラインツールをインストールします。
この例では、Skupper コマンドラインツールを使用して Skupper をデプロイします。開発環境ごとに
skupper
コマンドを 1 回だけインストールする必要があります。CLI のインストールの詳細は、インストール を参照してください。設定されたシステムの場合は、次のコマンドを使用します。
sudo dnf install skupper-cli
名前空間を設定します。
Skupper は、通常は異なるクラスター上の複数の Kubernetes 名前空間で使用するように設計されています。
skupper
コマンドとkubectl
コマンドは、kubeconfig と現在のコンテキストを使用して、動作する名前空間を選択します。kubeconfig はホームディレクトリーのファイルに保存されます。
skupper
およびkubectl
コマンドは、KUBECONFIG
環境変数を使用してこれを検索します。1 つの kubeconfig は、ユーザーごとにアクティブコンテキストを 1 つだけサポートします。この演習では複数のコンテキストを一度に使用するため、個別の kubeconfig を作成する必要があります。
名前空間ごとに、新しいターミナルウィンドウを開きます。各ターミナルで、
KUBECONFIG
環境変数を別のパスに設定し、クラスターにログインします。次に、使用する名前空間を作成し、現在のコンテキストに名前空間を設定します。注記ログイン手順はプロバイダーによって異なります。以下のドキュメントを参照してください。
パブリック:
export KUBECONFIG=~/.kube/config-public # Enter your provider-specific login command kubectl create namespace public kubectl config set-context --current --namespace public
プライベート:
export KUBECONFIG=~/.kube/config-private # Enter your provider-specific login command kubectl create namespace private kubectl config set-context --current --namespace private
Kafka クラスターをデプロイします。
プライベートでは、リストされた YAML ファイルを使用して
kubectl create
コマンドとkubectl apply
コマンドを使用し、Operator をインストールしてクラスターとトピックをデプロイします。プライベート:
kubectl create -f server/strimzi.yaml kubectl apply -f server/cluster1.yaml kubectl wait --for condition=ready --timeout 900s kafka/cluster1
出力サンプル
$ kubectl create -f server/strimzi.yaml customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created deployment.apps/strimzi-cluster-operator created customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-client-delegation created clusterrole.rbac.authorization.k8s.io/strimzi-kafka-client created serviceaccount/strimzi-cluster-operator created clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created configmap/strimzi-cluster-operator created $ kubectl apply -f server/cluster1.yaml kafka.kafka.strimzi.io/cluster1 created kafkatopic.kafka.strimzi.io/topic1 created $ kubectl wait --for condition=ready --timeout 900s kafka/cluster1 kafka.kafka.strimzi.io/cluster1 condition met
注記:
デフォルトでは、Kafka ブートストラップサーバーは、ドメイン名に Kubernetes 名前空間が含まれるブローカーアドレスを返します。この例のように、Kafka クライアントが Kafka クラスターとは異なる名前の名前空間で実行されている場合、クライアントは Kafka ブローカーを解決できなくなります。
Kafka ブローカーにアクセスできるようにするには、各ブローカーの
advertisedHost
プロパティーを、Kafka クライアントがリモートサイトで解決できるドメイン名に設定します。この例では、次のリスナー設定を使用してこれを行います。spec: kafka: listeners: - name: plain port: 9092 type: internal tls: false configuration: brokers: - broker: 0 advertisedHost: cluster1-kafka-0.cluster1-kafka-brokers
詳細は、Advertised addresses for brokers を参照してください。
サイトを作成します。
Skupper サイトは、アプリケーションのコンポーネントが実行される場所です。サイトは相互にリンクされ、アプリケーションのネットワークを形成します。Kubernetes では、サイトは名前空間に関連付けられます。
名前空間ごとに、
skupper init
を使用してサイトを作成します。これにより、Skupper ルーターとコントローラーがデプロイされます。次に、skupper status
を使用して結果を確認します。パブリック:
skupper init skupper status
出力サンプル
$ skupper init Waiting for LoadBalancer IP or hostname... Waiting for status... Skupper is now installed in namespace 'public'. Use 'skupper status' to get more information. $ skupper status Skupper is enabled for namespace "public". It is not connected to any other sites. It has no exposed services.
プライベート:
skupper init skupper status
出力サンプル
$ skupper init Waiting for LoadBalancer IP or hostname... Waiting for status... Skupper is now installed in namespace 'private'. Use 'skupper status' to get more information. $ skupper status Skupper is enabled for namespace "private". It is not connected to any other sites. It has no exposed services.
以下の手順を実行する場合は、いつでも
skupper status
を使用して進捗状況を確認できます。サイトをリンクします。
Skupper リンクは、2 つのサイト間の通信チャネルです。リンクは、アプリケーションの接続と要求のトランスポートとして機能します。
リンクを作成するには、
skupper token create
とskupper link create
の 2 つのskupper
コマンドを組み合わせて使用する必要があります。skupper token create
コマンドは、リンクを作成する権限を示す秘密トークンを生成します。トークンにはリンクの詳細も含まれます。次に、リモートサイトで、skupper link create
コマンドを実行すると、トークンを使用して、トークンを生成したサイトへのリンクを作成します。注記リンクトークンは実際にはシークレットです。トークンがある場合は、サイトにリンクできます。信頼できる人だけがアクセスできるようにしてください。
まず、パブリックサイトの
skupper token create
を使用してトークンを生成します。次に、プライベートサイトでskupper link create
を使用して、サイトをリンクします。パブリック:
skupper token create ~/secret.token
出力サンプル
$ skupper token create ~/secret.token Token written to ~/secret.token
プライベート:
skupper link create ~/secret.token
出力サンプル
$ skupper link create ~/secret.token Site configured to link to https://10.105.193.154:8081/ed9c37f6-d78a-11ec-a8c7-04421a4c5042 (name=link1) Check the status of the link using 'skupper link status'.
ターミナルセッションが異なるマシン上にある場合は、トークンを安全に転送するために
scp
または同様のツールを使用する必要がある場合があります。デフォルトでは、トークンは 1 回の使用後、または作成後 15 分後に期限切れになります。Kafka クラスターを公開します。
プライベートでは、
--headless
オプションを指定したskupper expose
を使用して、Kafka クラスターを Skupper ネットワーク上のヘッドレスサービスとして公開します。次に、パブリックで
kubectl get service
コマンドを使用して、しばらくするとcluster1-kafka-brokers
サービスが表示されることを確認します。プライベート:
skupper expose statefulset/cluster1-kafka --headless --port 9092
出力サンプル
$ skupper expose statefulset/cluster1-kafka --headless --port 9092 statefulset cluster1-kafka exposed as cluster1-kafka-brokers
パブリック:
kubectl get service/cluster1-kafka-brokers
出力サンプル
$ kubectl get service/cluster1-kafka-brokers NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE cluster1-kafka-brokers ClusterIP None <none> 9092/TCP 2s
クライアントを実行します。
kubectl run
コマンドを使用して、パブリクでクライアントプログラムを実行します。パブリック:
kubectl run client --attach --rm --restart Never --image quay.io/skupper/kafka-example-client --env BOOTSTRAPSERVERS=cluster1-kafka-brokers:9092
出力サンプル
$ kubectl run client --attach --rm --restart Never --image quay.io/skupper/kafka-example-client --env BOOTSTRAPSERVERS=cluster1-kafka-brokers:9092 [...] Received message 1 Received message 2 Received message 3 Received message 4 Received message 5 Received message 6 Received message 7 Received message 8 Received message 9 Received message 10 Result: OK [...]
クライアントコードを確認するには、このプロジェクトの クライアントディレクトリー を参照してください。