第2章 AMQ Streams の評価
本章の手順では、AMQ Streams の機能を評価する簡単な方法を説明します。
AMQ Streams をインストールし、トピックからメッセージの送受信を開始するには、指定の順序で手順にしたがいます。
- 必要な前提条件があることを確認します。
- AMQ Streams をインストールします。
- Kafka クラスターを作成します。
- Kafka クラスターにセキュアにアクセスするための認証の有効にします。
- Kafka クラスターにアクセスし、メッセージを送受信します。
前提条件を確認し、本章に指定されている順序でタスクを行うようにしてください。
2.1. 前提条件
- AMQ Streams をデプロイする OpenShift Container Platform クラスター (3.11 以上) が稼働している必要があります。
- AMQ Streams のダウンロードサイト にアクセスできる必要があります。
2.2. AMQ Streams のダウンロード
ZIP ファイルには、AMQ Streams のインストールに必要なリソースと、設定の例が含まれています。
手順
サブスクリプションがアクティベートされ、システムが登録されていることを確認します。
カスタマーポータルを使用して Red Hat サブスクリプションをアクティブ化し、システムを登録する方法は、付録A サブスクリプションの使用 を参照してください。
-
AMQ Streams のダウンロードサイト から
amq-streams-x.y.z-ocp-install-examples.zip
ファイルをダウンロードします。 ファイルを任意の場所で展開します。
- Windows または Mac の場合: ZIP ファイルをダブルクリックして ZIP アーカイブの内容を展開します。
- Red Hat Enterprise Linux の場合: ターゲットマシンでターミナルウィンドウを開き、ZIP ファイルがダウンロードされた場所に移動します。
以下のコマンドを使用して、ZIP ファイルを展開します。
unzip amq-streams-x.y.z-ocp-install-examples.zip
2.3. AMQ Streams のインストール
AMQ Streams をデプロイメントに必要な CRD (Custom Resource Definition) でインストールします。
このタスクでは、デプロイメント用にクラスターで namespace を作成します。namespace を使用して関数を分離することが推奨されます。
前提条件
-
インストールには、
cluster-admin
ロール (system:admin
など) を持つユーザーが必要です。
手順
クラスター管理者権限を持つアカウントを使用して OpenShift クラスターにログインします。
例を以下に示します。
oc login -u system:admin
AMQ Streams Kafka Cluster Operator に新規の
kafka
(プロジェクト) namespace を作成します。oc new-project kafka
AMQ Streams Kafka Cluster Operator をインストールする新しい
kafka
namespace を参照するインストールファイルを変更します。注記デフォルトでは、ファイルは
myproject
namespace で動作します。- Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
- Mac の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
CRD を管理するために、CRD およびロールベースアクセス制御 (RBAC) リソースをデプロイします。
oc project kafka oc apply -f install/cluster-operator/
Kafka クラスターをデプロイする新しい
my-kafka-project
namespace を作成します。oc new-project my-kafka-project
管理者でないユーザー
developer
にmy-kafka-project
へのアクセス権限を付与します。以下に例を示します。
oc adm policy add-role-to-user admin developer -n my-kafka-project
my-kafka-project
namespace を監視する権限を Cluster Operator に付与します。oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=kafka,my-kafka-project -n kafka
oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-kafka-project
oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project
このコマンドによって、Cluster Operator が Kafka クラスターにアクセスするための権限を付与するロールバインディングが作成されます。
新しいクラスターロール
strimzi-admin
を作成します。oc apply -f install/strimzi-admin
ロールを管理者でないユーザー
developer
に追加します。oc adm policy add-cluster-role-to-user strimzi-admin developer
2.4. クラスターの作成
AMQ Streams がインストールされている状態で、Kafka クラスターを作成し、クラスター内でトピックを作成します。
クラスターの作成時に、AMQ Streams のインストール時にデプロイされた Cluster Operator によって新規の Kafka リソースが監視されます。
前提条件
- Kafka クラスターでは、Cluster Operator がデプロイされている必要があります。
- トピックでは、Kafka クラスターが稼働中である必要があります。
手順
ユーザー
developer
としてmy-kafka-project
namespace にログインします。以下に例を示します。
oc login -u developer oc project my-kafka-project
新規ユーザーが OpenShift Container Platform にログインした後に、そのユーザーのアカウントが作成されます。
3 つの Zookeeper ノードと 3 つのブローカーノードで、新しい
my-cluster
Kafka クラスターを作成します。-
ephemeral
ストレージを使用します。 route
を使用するよう設定された外部リスナーを使用して、OpenShift クラスター外部の Kafka クラスターを公開します。cat << EOF | oc create -f - apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 3 listeners: plain: {} tls: {} external: type: route storage: type: ephemeral zookeeper: replicas: 3 storage: type: ephemeral entityOperator: topicOperator: {} EOF
-
クラスターがデプロイされるまで待機します。
oc wait my-kafka-project/my-cluster --for=condition=Ready --timeout=300s -n kafka
クラスターの準備ができたら、パブリッシュするトピックを作成し、外部クライアントからサブスクライブします。
my-cluster
Kafka クラスターで 3 つのレプリカと 3 つのパーティションを使用して、以下のmy-topic
カスタムリソース定義を作成します。cat << EOF | oc create -f - apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: "my-cluster" spec: partitions: 3 replicas: 3 EOF
2.5. クラスターへのアクセス
route
がクラスターへの外部アクセスに使用される場合、ブローカーとクライアント間で TLS (Transport Layer Security) 暗号化を有効にするためにクラスターの CA 証明書が必要になります。
前提条件
- OpenShift クラスター内で Kafka クラスターが稼働中である必要があります。
- Cluster Operator も稼働している必要があります。
手順
ブートストラップ
route
のアドレスを見つけます。oc get routes my-cluster-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'
このアドレスと Kafkaクライアントの 443 番ポートをブートストラップアドレスとして使用します。
ブローカーの認証局の公開証明書を抽出します。
oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
信頼できる証明書をトラストストアにインポートします。
keytool -keystore client.truststore.jks -alias CARoot -import -file ca.crt
メッセージの送受信を開始する準備が整いました。
2.6. トピックからのメッセージの送受信
クラスター外部のメッセージを my-topic
から送受信すると、AMQ Streams インストールをテストできます。
ターミナルを使用して、ローカルマシンで Kafka プロデューサーおよびコンシューマーを実行します。
前提条件
- AMQ Streams が OpenShift クラスターにインストールされていることを確認します。
- メッセージを送受信するには、Zookeeper および Kafka が稼働している必要があります。
- クラスターにアクセスするためにクラスター CA 証明書 が必要です。
- AMQ Streams のダウンロードサイト から最新バージョンの Red Hat AMQ Stremas アーカイブにアクセスできる必要があります。
手順
AMQ Streams のダウンロードサイト から最新バージョンの AMQ Stremas アーカイブ (
amq-streams-x.y.z-bin.zip
) をダウンロードします。ファイルを任意の場所で展開します。
ターミナルを開き、トピック
my-topic
と TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。bin/kafka-console-producer.sh --broker-list <route-address>:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=./client.truststore.jks --topic my-topic
- プロデューサーが実行しているコンソールにメッセージを入力します。
- Enter を押してメッセージを送信します。
新しいターミナルタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。
bin/kafka-console-consumer.sh --bootstrap-server <route-address>:443 --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=./client.truststore.jks --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
- Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。