第5章 インストールアーティファクトを使用した AMQ Streams のデプロイ
OperatorHub で AMQ Streams Operator を使用して AMQ Streams をデプロイする代わりに、インストールアーティファクトを使用できます。AMQ Streams のデプロイメント環境の準備 が整ったら、以下を実行できます。
- Kafka クラスターの作成方法
要件に応じてその他の Kafka コンポーネントをデプロイする任意の手順。
これらの手順は、OpenShift クラスターが利用可能で稼働していることを想定しています。
AMQ Streams は AMQ Streams Strimzi 0.22.x をベースとしています。ここでは、OpenShift 4.6 以降に AMQ Streams をデプロイする方法を説明します。
本ガイドのコマンドを実行するには、クラスターユーザーに RBAC (ロールベースアクセス制御) および CRD を管理する権限を付与する必要があります。
5.1. Kafka クラスターの作成
Kafka クラスターを作成するには、Cluster Operator をデプロイして Kafka クラスターを管理し、Kafka クラスターをデプロイします。
Kafka
リソースを使用して Kafka クラスターをデプロイするときに、Topic Operator および User Operator を同時にデプロイできます。この代わりに、AMQ Streams ではない Kafka クラスターを使用している場合は、Topic Operator および User Operator をスタンドアロンコンポーネントとしてデプロイすることもできます。
Kafka クラスターを Topic Operator および User Operator とデプロイ
AMQ Streams によって管理される Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
- Cluster Operator をデプロイします。
Cluster Operator を使用して以下をデプロイします。
スタンドアロン Topic Operator および User Operator のデプロイ
AMQ Streams によって管理されない Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
5.1.1. Cluster Operator のデプロイ
Cluster Operator は、OpenShift クラスター内で Apache Kafka クラスターのデプロイおよび管理を行います。
本セクションの手順は以下を説明します。
以下を監視するよう Cluster Operator をデプロイする方法。
- 代替のデプロイメント
5.1.1.1. Cluster Operator デプロイメントの監視オプション
Cluster Operator の稼働中に、Kafka リソースの更新に対する監視が開始されます。
Cluster Operator をデプロイして、以下からの Kafka リソースの監視を選択できます。
- 単一の namespace (Cluster Operator が含まれる同じ namespace)
- 複数の namespace
- すべての namespace
AMQ Streams では、デプロイメントの処理を簡単にするため、YAML ファイルのサンプルが提供されます。
Cluster Operator では、以下のリソースの変更が監視されます。
-
Kafka クラスターの
Kafka
。 -
KafkaConnect
の Kafka Connect クラスター。 -
Source2Image がサポートされる Kafka Connect クラスターの
KafkaConnectS2I
。 -
Kafka Connect クラスターでコネクターを作成および管理するための
KafkaConnector
。 -
Kafka MirrorMaker インスタンスの
KafkaMirrorMaker
。 -
Kafka Bridge インスタンスの
KafkaBridge
。
OpenShift クラスターでこれらのリソースの 1 つが作成されると、Operator によってクラスターの詳細がリソースより取得されます。さらに、StatefulSet、Service、および ConfigMap などの必要な OpenShift リソースが作成され、リソースの新しいクラスターの作成が開始されます。
Kafka リソースが更新されるたびに、リソースのクラスターを構成する OpenShift リソースで該当する更新が Operator によって実行されます。
クラスターの望ましい状態がリソースのクラスターに反映されるようにするため、リソースへのパッチ適用後またはリソースの削除後にリソースが再作成されます。この操作は、サービスの中断を引き起こすローリングアップデートの原因となる可能性があります。
リソースが削除されると、Operator によってクラスターがアンデプロイされ、関連する OpenShift リソースがすべて削除されます。
5.1.1.2. 単一の namespace を監視対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターの単一の namespace で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
Cluster Operator をデプロイします。
oc create -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
5.1.1.3. 複数の namespace を監視対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターの複数の namespace 全体で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
ファイルを編集し、Cluster Operator によって監視されるすべての namespace のリストをSTRIMZI_NAMESPACE
環境変数に追加します。たとえば、この手順では Cluster Operator は
watched-namespace-1
、watched-namespace-2
、およびwatched-namespace-3
という namespace を監視します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: watched-namespace-1,watched-namespace-2,watched-namespace-3
リストした各 namespace に
RoleBindings
をインストールします。この例では、コマンドの
watched-namespace
を前述のステップでリストした namespace に置き換えます。watched-namespace-1
、watched-namespace-2
、およびwatched-namespace-3
に対してこれを行います。oc create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace oc create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace oc create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
Cluster Operator をデプロイします。
oc create -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
5.1.1.4. すべての namespace を対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターのすべての namespace 全体で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
このモードで実行している場合、Cluster Operator によって、新規作成された namespace でクラスターが自動的に管理されます。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
ファイルを編集し、STRIMZI_NAMESPACE
環境変数の値を*
に設定します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: # ... serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: "*" # ...
クラスター全体ですべての namespace にアクセスできる権限を Cluster Operator に付与する
ClusterRoleBindings
を作成します。oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
my-cluster-operator-namespace
は、Cluster Operator をインストールする namespace に置き換えます。Cluster Operator を OpenShift クラスターにデプロイします。
oc create -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
5.1.2. Kafka のデプロイ
Apache Kafka は、耐障害性のリアルタイムデータフィードを実現する、オープンソースの分散型 publish/subscribe メッセージングシステムです。
本セクションの手順は以下を説明します。
Cluster Operator を使用して以下をデプロイする方法
- 一時 または 永続 Kafka クラスター
Topic Operator および User Operator (
Kafka
カスタムリソースを設定してデプロイする)
Topic Operator および User Operator の代替のスタンドアロンデプロイメント手順
Kafka をインストールする場合、AMQ Streams によって ZooKeeper クラスターもインストールされ、Kafka と ZooKeeper との接続に必要な設定が追加されます。
5.1.2.1. Kafka クラスターのデプロイメント
この手順では、Cluster Operator を使用して Kafka クラスターを OpenShift にデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って Kafka
リソースが作成されます。
AMQ Streams では、デプロイメントの YAML ファイルのサンプルは examples/kafka/
にあります。
kafka-persistent.yaml
- 3 つの Zookeeper ノードと 3 つの Kafka ノードを使用して永続クラスターをデプロイします。
kafka-jbod.yaml
- それぞれが複数の永続ボリューを使用する、3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-persistent-single.yaml
- 1 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-ephemeral.yaml
- 3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、一時クラスターをデプロイします。
kafka-ephemeral-single.yaml
- 3 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、一時クラスターをデプロイします。
この手順では、一時 および 永続 Kafka クラスターデプロイメントの例を使用します。
- 一時クラスター
-
通常、Kafka の一時クラスターは開発およびテスト環境での使用に適していますが、本番環境での使用には適していません。このデプロイメントでは、ブローカー情報 (ZooKeeper) と、トピックまたはパーティション (Kafka) を格納するための
emptyDir
ボリュームが使用されます。emptyDir
ボリュームを使用すると、その内容は厳密に Pod のライフサイクルと関連し、Pod がダウンすると削除されます。 - 永続クラスター
-
Kafka の永続クラスターでは、
PersistentVolumes
を使用して ZooKeeper および Kafka データを格納します。PersistentVolumeClaim
を使用してPersistentVolume
が取得され、PersistentVolume
の実際のタイプには依存しません。たとえば、YAML ファイルを変更しなくても Amazon AWS デプロイメントで Amazon EBS ボリュームを使用できます。PersistentVolumeClaim
でStorageClass
を使用し、自動ボリュームプロビジョニングをトリガーすることができます。
サンプル YAML ファイルは、サポートされる最新の Kafka バージョンを指定し、サポートされるログメッセージ形式バージョンの設定とブローカー間のプロトコルバージョンの設定を指定します。Kafka のアップグレード時に、これらのプロパティーの更新が必要になります。
サンプルクラスターの名前はデフォルトで my-cluster
になります。クラスター名はリソースの名前によって定義され、クラスターがデプロイされた後に変更できません。クラスターをデプロイする前にクラスター名を変更するには、関連する YAML ファイルにある Kafka
リソースの Kafka.metadata.name
プロパティーを編集します。
デフォルトのクラスター名および指定された Kafka バージョン
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: version: 2.7.0 #... config: #... log.message.format.version: 2.7 inter.broker.protocol.version: 2.7 # ...
Kafka
リソースの設定に関する詳細は、『AMQ Streams on OpenShift の使用』の「Kafka クラスターの設定」を参照してください。
手順
一時 または 永続 クラスターを作成およびデプロイします。
開発またはテストでは、一時クラスターの使用が適している可能性があります。永続クラスターはどのような状況でも使用することができます。
一時 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-ephemeral.yaml
永続 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-persistent.yaml
Kafka クラスターが正常にデプロイされたことを確認します。
oc get deployments
5.1.2.2. Cluster Operator を使用した Topic Operator のデプロイ
この手順では、Cluster Operator を使用して Topic Operator をデプロイする方法を説明します。
Kafka
リソースの entityOperator
プロパティーを設定し、topicOperator
が含まれるようにします。
AMQ Streams によって管理されない Kafka クラスターを Topic Operator と使用する場合は、Topic Operator をスタンドアロンコンポーネントとしてデプロイする必要があります。
entityOperator
および topicOperator
プロパティーの設定に関する詳細は、『AMQ Streams on OpenShift の使用』の「Entity Operator の設定」を参照してください。
手順
Kafka
リソースのentityOperator
プロパティーを編集し、topicOperator
が含まれるようにします。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
「
EntityTopicOperatorSpec
スキーマ参照」に記載されているプロパティーを使用して、Topic Operator のspec
を設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}
) を使用します。リソースを作成または更新します。
oc apply
を使用します。oc apply -f <your-file>
5.1.2.3. Cluster Operator を使用した User Operator のデプロイ
この手順では、Cluster Operator を使用して User Operator をデプロイする方法を説明します。
Kafka
リソースの entityOperator
プロパティーを設定し、userOperator
が含まれるようにします。
AMQ Streams によって管理されない Kafka クラスターを User Operator と使用する場合は、User Operator をスタンドアロンコンポーネントとしてデプロイする必要があります。
entityOperator
および userOperator
プロパティーの設定に関する詳細は、『AMQ Streams on OpenShift の使用』の「Entity Operator の設定」を参照してください。
手順
Kafka
リソースのentityOperator
プロパティーを編集し、userOperator
が含まれるようにします。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
『AMQ Streams on OpenShift の使用』の「
EntityUserOperatorSpec
スキーマ参照」に記載されているプロパティーを使用して、User Operator のspec
を設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}
) を使用します。リソースを作成または更新します。
oc apply -f <your-file>
5.1.3. AMQ Streams Operator の代替のスタンドアロンデプロイメントオプション
Cluster Operator を使用して Kafka クラスターをデプロイするときに、Topic Operator および User Operator をデプロイすることもできます。この代わりに、スタンドアロンデプロイメントを行うことができます。
スタンドアロンデプロイメントとは、Topic Operator および User Operator が AMQ Streams によって管理されない Kafka クラスターと操作できることを意味します。
5.1.3.1. スタンドアロン Topic Operator のデプロイ
この手順では、Topic Operator をスタンドアロンコンポーネントとしてデプロイする方法を説明します。
スタンドアロンデプロイメントには、環境変数の設定が必要で、Cluster Operator を使用した Topic Operator のデプロイよりも複雑です。しかし、Topic Operator は Cluster Operator によってデプロイされた Kafka クラスターに限らず、あらゆる Kafka クラスターと操作できるため、スタンドアロンデプロイメントの柔軟性は高くなります。
前提条件
- Topic Operator が接続する既存の Kafka クラスターが必要です。
手順
以下を設定して、
install/topic-operator/05-Deployment-strimzi-topic-operator.yaml
ファイルのDeployment.spec.template.spec.containers[0].env
プロパティーを編集します。-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
。hostname:port
ペアのコンマ区切りリストで Kafka クラスターのブートストラップブローカーを指定します。 -
STRIMZI_ZOOKEEPER_CONNECT
。hostname:port
ペアのコンマ区切りリストで ZooKeeper ノードを指定します。これは、Kafka クラスターが使用する ZooKeeper クラスターと同じである必要があります。 -
STRIMZI_NAMESPACE
。Operator がKafkaTopic
リソースを監視する OpenShift namespace。 -
STRIMZI_RESOURCE_LABELS
。Operator によって管理されるKafkaTopic
リソースを識別するために使用されるラベルセレクター。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
。定期的な調整の間隔 (秒単位) を指定します。 -
STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS
。Kafka からトピックメタデータの取得を試行する回数を指定します。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を増やすことを検討してください。デフォルトは6
です。 -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
。ZooKeeper セッションのタイムアウト (秒単位)。例:10000
デフォルトは20000
(20 秒) です。 -
STRIMZI_TOPICS_PATH
。Topic Operator がそのメタデータを保存する Zookeeper ノードパス。デフォルトは/strimzi/topics
です。 -
STRIMZI_TLS_ENABLED
。Kafka ブローカーとの通信を暗号化するために、TLS サポートを有効にします。デフォルトはtrue
です。 -
STRIMZI_TRUSTSTORE_LOCATION
。TLS ベースの通信を有効にするための証明書が含まれるトラストストアへのパス。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_TRUSTSTORE_PASSWORD
。STRIMZI_TRUSTSTORE_LOCATION
で定義されるトラストストアにアクセスするためのパスワード。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_KEYSTORE_LOCATION
。TLS ベースの通信を有効にするための秘密鍵が含まれるキーストアへのパス。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_KEYSTORE_PASSWORD
。STRIMZI_KEYSTORE_LOCATION
で定義されるキーストアにアクセスするためのパスワード。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_LOG_LEVEL
。ロギングメッセージの出力レベル。設定可能な値:ERROR
、WARNING
、INFO
、DEBUG
、およびTRACE
デフォルトはINFO
です。 -
STRIMZI_JAVA_OPTS
(任意)。Topic Operator を実行する JVM に使用される Java オプション。例:-Xmx=512M -Xms=256M
-
STRIMZI_JAVA_SYSTEM_PROPERTIES
(任意)。Topic Operator に設定される-D
オプションをリストします。例:-Djavax.net.debug=verbose -DpropertyName=value
-
Topic Operator をデプロイします。
oc create -f install/topic-operator
Topic Operator が正常にデプロイされていることを確認します。
oc describe deployment strimzi-topic-operator
Replicas:
エントリーに1 available
が表示されれば、Topic Operator はデプロイされています。注記OpenShift への接続が低速な場合やイメージがこれまでダウンロードされたことがない場合は、デプロイメントに遅延が発生することがあります。
5.1.3.2. スタンドアロン User Operator のデプロイ
この手順では、User Operator をスタンドアロンコンポーネントとしてデプロイする方法を説明します。
スタンドアロンデプロイメントには、環境変数の設定が必要で、Cluster Operator を使用した User Operator のデプロイよりも複雑です。しかし、User Operator は Cluster Operator によってデプロイされた Kafka クラスターに限らず、あらゆる Kafka クラスターと操作できるため、スタンドアロンデプロイメントの柔軟性は高くなります。
前提条件
- User Operator が接続する既存の Kafka クラスターが必要です。
手順
以下を設定して、
install/user-operator/05-Deployment-strimzi-user-operator.yaml
ファイルのDeployment.spec.template.spec.containers[0].env
プロパティーを編集します。-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
。hostname:port
ペアのコンマ区切りリストで Kafka ブローカーを指定します。 -
STRIMZI_ZOOKEEPER_CONNECT
。hostname:port
ペアのコンマ区切りリストで ZooKeeper ノードを指定します。これは、Kafka クラスターが使用する ZooKeeper クラスターと同じである必要があります。TLS 暗号化で ZooKeeper ノードに接続することはサポートされません。 -
STRIMZI_NAMESPACE
。Operator がKafkaUser
リソースを監視する OpenShift namespace。 -
STRIMZI_LABELS
。Operator によって管理されるKafkaUser
リソースを識別するために使用されるラベルセレクター。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
。定期的な調整の間隔 (秒単位) を指定します。 -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
。ZooKeeper セッションのタイムアウト (秒単位)。例:10000
デフォルトは20000
(20 秒) です。 -
STRIMZI_CA_CERT_NAME
。TLS クライアント認証に対して新しいユーザー証明書を署名するための認証局の公開鍵が含まれる OpenShiftSecret
を示します。Secret
のca.crt
キーに、認証局の公開鍵が含まれている必要があります。 -
STRIMZI_CA_KEY_NAME
。TLS クライアント認証に対して新しいユーザー証明書を署名するための認証局の秘密鍵が含まれる OpenShiftSecret
を示します。Secret
のca.key
キーに、認証局の秘密鍵が含まれている必要があります。 -
STRIMZI_CLUSTER_CA_CERT_SECRET_NAME
。TLS ベースの通信を有効にするために Kafka ブローカーの証明書の署名に使用される認証局の秘密鍵が含まれる OpenShiftSecret
を示します。Secret
のca.crt
キーに、認証局の公開鍵が含まれている必要があります。この環境変数の設定は任意で、Kafka クラスターとの通信が TLS ベースである場合のみ設定する必要があります。 -
STRIMZI_EO_KEY_SECRET_NAME
。Kafka クラスターに対する TLS クライアント認証の秘密鍵と関連する証明書が含まれる OpenShiftSecret
を示します。Secret
のentity-operator.p12
キーに、秘密鍵と証明書が含まれるキーストアが含まれ、entity-operator.password
キーに関連するパスワードが含まれる必要があります。この環境変数の設定は任意で、Kafka クラスターとの通信が TLS ベースで、TLS のクライアント認証が必要な場合のみ設定する必要があります。 -
STRIMZI_CA_VALIDITY
。認証局の有効期限。デフォルトは365
です。 -
STRIMZI_CA_RENEWAL
。認証局の更新期限。 -
STRIMZI_LOG_LEVEL
。ロギングメッセージの出力レベル。設定可能な値:ERROR
、WARNING
、INFO
、DEBUG
、およびTRACE
デフォルトはINFO
です。 -
STRIMZI_GC_LOG_ENABLED
。ガベージコレクション (GC) ロギングを有効にします。デフォルトはtrue
です。デフォルトでは、古い証明書が期限切れになる前に証明書が更新される期間は30
日です。 -
STRIMZI_JAVA_OPTS
(任意)。User Operator を実行する JVM に使用される Java オプション。例:-Xmx=512M -Xms=256M
-
STRIMZI_JAVA_SYSTEM_PROPERTIES
(任意)。User Operator に設定される-D
オプションをリストします。例:-Djavax.net.debug=verbose -DpropertyName=value
-
User Operator をデプロイします。
oc create -f install/user-operator
User Operator が正常にデプロイされていることを確認します。
oc describe deployment strimzi-user-operator
Replicas:
エントリーに1 available
が表示されれば、User Operator はデプロイされています。注記OpenShift への接続が低速な場合やイメージがこれまでダウンロードされたことがない場合は、デプロイメントに遅延が発生することがあります。
5.2. Kafka Connect のデプロイ
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。
AMQ Streams では、Kafka Connect は分散 (distributed) モードでデプロイされます。Kafka Connect はスタンドアロンモードでも動作しますが、AMQ Streams ではサポートされません。
Kafka Connect では、コネクター の概念を使用し、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを出し入れするためのフレームワークが提供されます。
Kafka Connect は通常、Kafka を外部データベース、ストレージシステム、およびメッセージングシステムと統合するために使用されます。
本セクションの手順では以下の方法を説明します。
-
KafkaConnect
リソースを使用した Kafka Connect のデプロイ - 複数の Kafka Connect インスタンスの実行
- 接続の確立に必要なコネクターが含まれる Kafka Connect の作成
- KafkaConnector リソースまたは Kafka Connect REST API を使用したコネクターの作成および管理
- KafkaConnector リソースを Kafka Connect にデプロイ
- KafkaConnector リソースにアノテーションを付けて Kafka コネクターを再起動
- KafkaConnector リソースにアノテーションを付けて Kafka コネクタータスクを再起動
コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。
5.2.1. Kafka Connect の OpenShift クラスターへのデプロイ
この手順では、Cluster Operator を使用して Kafka Connect クラスターを OpenShift クラスターにデプロイする方法を説明します。
Kafka Connect クラスターは Deployment
として実装されます。その Deployment
には、コネクターのワークロードを タスク として分布するノード (ワーカー とも呼ばれる) の設定可能な数が含まれるため、メッセージフローのスケーラビリティーや信頼性が高くなります。
デプロイメントでは、YAML ファイルの仕様を使って KafkaConnect
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/connect/kafka-connect.yaml
KafkaConnect
リソース (または Source-to-Image (S2I) がサポートされる KafkaConnectS2I
リソース) の設定に関する情報は、『AMQ Streams on OpenShift の使用』の「Kafka クラスターの設定」を参照してください。
手順
Kafka Connect を OpenShift クラスターにデプロイします。3 つ以上のブローカーで構成される Kafka クラスターの場合は、
examples/connect/kafka-connect.yaml
ファイルを使用します。3 つ未満のブローカーで構成される Kafka クラスターの場合は、examples/connect/kafka-connect-single-node-kafka.yaml
ファイルを使用します。oc apply -f examples/connect/kafka-connect.yaml
Kafka Connect が正常にデプロイされたことを確認します。
oc get deployments
5.2.2. 複数インスタンスの Kafka Connect 設定
Kafka Connect のインスタンスを複数実行している場合は、以下の config
プロパティーのデフォルト設定を変更する必要があります。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: connect-cluster 1 offset.storage.topic: connect-cluster-offsets 2 config.storage.topic: connect-cluster-configs 3 status.storage.topic: connect-cluster-status 4 # ... # ...
これら 3 つのトピックの値は、同じ group.id
を持つすべての Kafka Connect インスタンスで同じする必要があります。
デフォルト設定を変更しないと、同じ Kafka クラスターに接続する各 Kafka Connect インスタンスは同じ値でデプロイされます。その結果、事実上はすべてのインスタンスが結合されてクラスターで実行され、同じトピックが使用されます。
複数の Kafka Connect クラスターが同じトピックの使用を試みると、Kafka Connect は想定どおりに動作せず、エラーが生成されます。
複数の Kafka Connect インスタンスを実行する場合は、インスタンスごとにこれらのプロパティーの値を変更してください。
5.2.3. コネクタープラグインでの Kafka Connect の拡張
Kafka Connect の AMQ Streams コンテナーイメージには、ファイルベースのデータを Kafka クラスターで出し入れするために 2 つの組み込みコネクターが含まれています。
ファイルコネクター | 説明 |
---|---|
| ファイル (ソース) から Kafka クラスターにデータを転送します。 |
| Kafka クラスターからファイル (シンク) にデータを転送します。 |
ここの手順では、以下を行って、独自のコネクタークラスをコネクターイメージに追加する方法を説明します。
Kafka Connect REST API または KafkaConnector カスタムリソースを使用 して直接コネクターの設定を作成します。
5.2.3.1. AMQ Streams を使用した新しいコンテナーイメージの自動作成
この手順では、AMQ Streams が追加のコネクターで新しいコンテナーイメージを自動的にビルドするように Kafka Connect を設定する方法を説明します。コネクタープラグインは、KafkaConnect
カスタムリソースの .spec.build.plugins
プロパティーを使用して定義します。AMQ Streams はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは .spec.build.output
に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。
前提条件
- Cluster Operator がデプロイされている。
- コンテナーレジストリー。
イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。AMQ Streams は、プライベートコンテナーレジストリーだけでなく、Quay や Docker Hub などのパブリックレジストリーもサポートします。
手順
.spec.build.output
でコンテナーレジストリーを指定し、.spec.build.plugins
で追加のコネクターを指定して、KafkaConnect
カスタムリソースを設定します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... build: output: 2 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 3 - name: debezium-postgres-connector artifacts: - type: tgz url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03 - name: camel-telegram artifacts: - type: tgz url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.7.0/camel-telegram-kafka-connector-0.7.0-package.tar.gz sha512sum: a9b1ac63e3284bea7836d7d24d84208c49cdf5600070e6bd1535de654f6920b74ad950d51733e8020bf4187870699819f54ef5859c7846ee4081507f48873479 #...
リソースを作成または更新します。
$ oc apply -f KAFKA-CONNECT-CONFIG-FILE
- 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
- Kafka Connect REST API または KafkaConnector カスタムリソースを使用して、追加したコネクタープラグインを使用します。
その他のリソース
詳細は、『AMQ Streams on OpenShift の使用』を参照してください。
5.2.3.2. Kafka Connect ベースイメージからの Docker イメージの作成
この手順では、カスタムイメージを作成し、/opt/kafka/plugins
ディレクトリーに追加する方法を説明します。
Red Hat Ecosystem Catalog の Kafka コンテナーイメージを、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins
ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
手順
registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
をベースイメージとして使用して、新しいDockerfile
を作成します。FROM registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
プラグインファイルの例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
以下のいずれかを行います。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES
変数がオーバーライドされます。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
または
-
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_KAFKA_CONNECT_IMAGES
変数を編集して新しいコンテナーイメージを示すようにした後、Cluster Operator を再インストールします。
その他のリソース
詳細は、『AMQ Streams on OpenShift の使用』を参照してください。
5.2.3.3. OpenShift ビルドおよび S2I (Source-to-Image) を使用したコンテナーイメージの作成
この手順では、OpenShift ビルド と S2I (Source-to-Image) フレームワークを使用して、新しいコンテナーイメージを作成する方法を説明します。
OpenShift ビルドは、S2I がサポートされるビルダーイメージとともに、ユーザー提供のソースコードおよびバイナリーを取得し、これらを使用して新しいコンテナーイメージを構築します。構築後、コンテナーイメージは OpenShfit のローカルコンテナーイメージリポジトリーに格納され、デプロイメントで使用可能になります。
S2I がサポートされる Kafka Connect ビルダーイメージは、registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
イメージの一部として、Red Hat Ecosystem Catalog で提供されます。このS2I イメージは、バイナリー (プラグインおよびコネクターとともに) を取得し、/tmp/kafka-plugins/s2i
ディレクトリーに格納されます。このディレクトリーから、Kafka Connect デプロイメントとともに使用できる新しい Kafka Connect イメージを作成します。改良されたイメージの使用を開始すると、Kafka Connect は /tmp/kafka-plugins/s2i
ディレクトリーからサードパーティープラグインをロードします。
build
設定が KafkaConnect
リソースに導入されたため、AMQ Streams はデータコネクションに必要なコネクタープラグインでコンテナーイメージを自動的にビルドできるようになりました。そのため、S2I (Source-to-Image) 対応の Kafka Connect のサポートが非推奨になりました。この変更に備えるため、Kafka Connect S2I インスタンスを Kafka Connect インスタンスに移行できます。
手順
コマンドラインで
oc apply
コマンドを使用し、Kafka Connect の S2I クラスターを作成およびデプロイします。oc apply -f examples/connect/kafka-connect-s2i.yaml
Kafka Connect プラグインでディレクトリーを作成します。
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
oc start-build
コマンドで、準備したディレクトリーを使用してイメージの新しいビルドを開始します。oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
注記ビルドの名前は、デプロイされた Kafka Connect クラスターと同じになります。
- ビルドが完了したら、Kafka Connect のデプロイメントによって新しいイメージが自動的に使用されます。
5.2.4. コネクターの作成および管理
コネクタープラグインのコンテナーイメージを作成したら、Kafka Connect クラスターにコネクターインスタンスを作成する必要があります。その後、稼働中のコネクターインスタンスを設定、監視、および管理できます。
コネクターは特定の コネクタークラス のインスタンスで、メッセージに関して関連する外部システムとの通信方法を認識しています。コネクターは多くの外部システムで使用でき、独自のコネクターを作成することもできます。
ソース および シンク タイプのコネクターを作成できます。
- ソースコネクター
- ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
- シンクコネクター
- シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。
AMQ Streams では、コネクターの作成および管理に 2 つの API が提供されます。
- KafkaConnector リソース (KafkaConnectors と呼ばれます)
- Kafka Connect REST API
API を使用すると、以下を行うことができます。
- コネクターインスタンスのステータスの確認。
- 稼働中のコネクターの再設定。
- コネクターインスタンスのコネクタータスク数の増減。
- コネクターの再起動。
- 失敗したタスクを含むコネクタータスクの再起動。
- コネクターインスタンスの一時停止。
- 一時停止したコネクターインスタンスの再開。
- コネクターインスタンスの削除。
5.2.4.1. KafkaConnector リソース
KafkaConnectors を使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できるため、cURL などの HTTP クライアントが必要ありません。その他の Kafka リソースと同様に、コネクターの望ましい状態を OpenShift クラスターにデプロイされた KafkaConnector YAML ファイルに宣言し、コネクターインスタンスを作成します。KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
該当する KafkaConnector リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。アノテーションは、コネクターインスタンスおよびコネクタータスクを手動で再起動するために使用されます。該当する KafkaConnector を削除して、コネクターを削除します。
下位バージョンの AMQ Streams との互換性を維持するため、KafkaConnectors はデフォルトで無効になっています。Kafka Connect クラスターのために有効にするには、KafkaConnect
リソースでアノテーションを使用する必要があります。手順は、『AMQ Streams on OpenShift の使用』の「Kafka Connect の設定」を参照してください。
KafkaConnectors が有効になると、Cluster Operator によって監視が開始されます。KafkaConnectors に定義された設定と一致するよう、稼働中のコネクターインスタンスの設定を更新します。
AMQ Streams には、examples/connect/source-connector.yaml
という名前のサンプル KafkaConnector
が含まれています。この例を使用して、「サンプル KafkaConnector リソースのデプロイ」 に記載されているように、FileStreamSourceConnector
および FileStreamSinkConnector
を作成および管理できます。
5.2.4.2. Kafka Connect REST API の可用性
Kafka Connect REST API は、<connect-cluster-name>-connect-api
サービスとして 8083 番ポートで使用できます。
KafkaConnectors が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。
REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。
5.2.5. サンプル KafkaConnector リソースのデプロイ
AMQ Streams の examples/connect/source-connector.yaml
にサンプル KafkaConnector
が含まれています。これにより、Kafka ライセンスファイル (サンプルファイルソース) の各行を 1 つの Kafka トピックに送信する基本的な FileStreamSourceConnector
インスタンスが作成されます。
この手順では、以下を作成する方法を説明します。
-
Kafka ライセンスファイル (ソース) からデータを読み取り、データをメッセージとして Kafka トピックに書き込む
FileStreamSourceConnector
。 -
Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む
FileStreamSinkConnector
。
実稼働環境で、「コネクタープラグインでの Kafka Connect の拡張」 の説明どおりに、必要な Kafka Connect コネクターが含まれるコンテナーイメージを準備します。
FileStreamSourceConnector
および FileStreamSinkConnector
はサンプルとして提供されます。ここで説明するように、コンテナーでこれらのコネクターを実行することは、実稼働のユースケースには適していません。
前提条件
- Kafka Connect デプロイメント。
- Kafka Connect デプロイメントで KafkaConnectors が有効になっている。
- Cluster Operator が稼働している必要があります。
手順
examples/connect/source-connector.yaml
ファイルを編集します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
- コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
- 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect
Tasks
の最大数。 - 5
- キーと値のペアとしてのコネクター設定。
- 6
- このサンプルソースコネクター設定では、
/opt/kafka/LICENSE
ファイルからデータが読み取られます。 - 7
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift クラスターにソース
KafkaConnector
を作成します。oc apply -f examples/connect/source-connector.yaml
examples/connect/sink-connector.yaml
ファイルを作成します。touch examples/connect/sink-connector.yaml
以下の YAML を
sink-connector.yaml
ファイルに貼り付けます。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector 1 tasksMax: 2 config: 2 file: "/tmp/my-file" 3 topics: my-topic 4
OpenShift クラスターにシンク
KafkaConnector
を作成します。oc apply -f examples/connect/sink-connector.yaml
コネクターリソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connector
MY-CONNECT-CLUSTER を Kafka Connect クラスターに置き換えます。
コンテナーで、
kafka-console-consumer.sh
を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
ソースおよびシンクコネクターの設定オプション
コネクター設定は、KafkaConnector リソースの spec.config
プロパティーで定義されます。
FileStreamSourceConnector
および FileStreamSinkConnector
クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。 |
| List | Null | データのパブリッシュ先となる Kafka トピック。 |
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。 |
| List | Null | データの読み取り元となる 1 つ以上の Kafka トピック。 |
| 文字列 | Null | データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。 |
その他のリソース
5.2.6. Kafka コネクターの再起動の実行
この手順では、OpenShift アノテーションを使用して Kafka コネクターの再起動を手動でトリガーする方法を説明します。
前提条件
- Cluster Operator が稼働している必要があります。
手順
再起動する Kafka コネクターを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
コネクターを再起動するには、OpenShift で
KafkaConnector
リソースにアノテーションを付けます。以下はoc annotate
を使用した例になります。oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart=true
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。
その他のリソース
- 『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「コネクターの作成および管理」。
5.2.7. Kafka コネクタータスクの再起動の実行
この手順では、OpenShift アノテーションを使用して Kafka コネクタータスクの再起動を手動でトリガーする方法を説明します。
前提条件
- Cluster Operator が稼働している必要があります。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
KafkaConnector
カスタムリソースから再起動するタスクの ID を検索します。タスク ID は 0 から始まる負の値ではない整数です。oc describe KafkaConnector KAFKACONNECTOR-NAME
コネクタータスクを再起動するには、OpenShift で
KafkaConnector
リソースにアノテーションを付けます。たとえば、oc annotate
を使用してタスク 0 を再起動します。oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart-task=0
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。
その他のリソース
- 『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「コネクターの作成および管理」。
5.3. Kafka MirrorMaker のデプロイ
Cluster Operator によって、1 つ以上の Kafka MirrorMaker のレプリカがデプロイされ、Kafka クラスターの間でデータが複製されます。このプロセスはミラーリングと言われ、Kafka パーティションのレプリケーションの概念と混同しないようにします。MirrorMaker は、ソースクラスターからメッセージを消費し、これらのメッセージをターゲットクラスターにパブリッシュします。
5.3.1. Kafka MirrorMaker の OpenShift クラスターへのデプロイ
この手順では、Cluster Operator を使用して Kafka MirrorMaker クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って、デプロイされた MirrorMaker のバージョンに応じて KafkaMirrorMaker
または KafkaMirrorMaker2
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/mirror-maker/kafka-mirror-maker.yaml
-
examples/mirror-maker/kafka-mirror-maker-2.yaml
KafkaMirrorMaker
または KafkaMirrorMaker2
リソースの設定に関する詳細は、『AMQ Streams on OpenShift の使用』の「Kafka MirrorMaker クラスターの設定」を参照してください。
手順
Kafka MirrorMaker を OpenShift クラスターにデプロイします。
MirrorMaker の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml
MirrorMaker 2.0 の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
MirrorMaker が正常にデプロイされたことを確認します。
oc get deployments
5.4. Kafka Bridge のデプロイ
Cluster Operator によって、1 つ以上の Kafka Bridge のレプリカがデプロイされ、HTTP API 経由で Kafka クラスターとクライアントの間でデータが送信されます。
5.4.1. Kafka Bridge を OpenShift クラスターへデプロイ
この手順では、Cluster Operator を使用して Kafka Bridge クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って KafkaBridge
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/bridge/kafka-bridge.yaml
KafkaBridge
リソースの設定に関する詳細は、『AMQ Streams on OpenShift の使用』の「Kafka Bridge クラスターの設定」を参照してください。
手順
Kafka Bridge を OpenShift クラスターにデプロイします。
oc apply -f examples/bridge/kafka-bridge.yaml
Kafka Bridge が正常にデプロイされたことを確認します。
oc get deployments