第2章 AMQ Streams の評価


本章の手順では、AMQ Streams の機能を評価する簡単な方法を説明します。

AMQ Streams をインストールし、トピックからメッセージの送受信を開始するには、指定の順序で手順にしたがいます。

  • 必要な前提条件があることを確認します。
  • AMQ Streams をインストールします。
  • Kafka クラスターを作成します。
  • Kafka クラスターにセキュアにアクセスするための認証の有効にします。
  • Kafka クラスターにアクセスし、メッセージを送受信します。

前提条件を確認し、本章に指定されている順序でタスクを行うようにしてください。

2.1. 前提条件

  • AMQ Streams をデプロイする OpenShift Container Platform クラスター (3.11 以上) が稼働している必要があります。
  • AMQ Streams のダウンロードサイト にアクセスできる必要があります。

2.2. AMQ Streams のダウンロード

ZIP ファイルには、AMQ Streams のインストールに必要なリソースと、設定の例が含まれています。

手順

  1. サブスクリプションがアクティベートされ、システムが登録されていることを確認します。

    カスタマーポータルを使用して Red Hat サブスクリプションをアクティブ化し、システムを登録する方法は、付録A サブスクリプションの使用 を参照してください。

  2. AMQ Streams のダウンロードサイト から amq-streams-x.y.z-ocp-install-examples.zip ファイルをダウンロードします。
  3. ファイルを任意の場所で展開します。

    • Windows または Mac の場合: ZIP ファイルをダブルクリックして ZIP アーカイブの内容を展開します。
    • Red Hat Enterprise Linux の場合: ターゲットマシンでターミナルウィンドウを開き、ZIP ファイルがダウンロードされた場所に移動します。

    以下のコマンドを使用して、ZIP ファイルを展開します。

    unzip amq-streams-x.y.z-ocp-install-examples.zip

2.3. AMQ Streams のインストール

AMQ Streams をデプロイメントに必要な CRD (Custom Resource Definition) でインストールします。

このタスクでは、デプロイメント用にクラスターで namespace を作成します。namespace を使用して関数を分離することが推奨されます。

前提条件

  • インストールには、cluster-admin ロール (system:admin など) を持つユーザーが必要です。

手順

  1. クラスター管理者権限を持つアカウントを使用して OpenShift クラスターにログインします。

    例を以下に示します。

    oc login -u system:admin
  2. AMQ Streams Kafka Cluster Operator に新規の kafka (プロジェクト) namespace を作成します。

    oc new-project kafka
  3. AMQ Streams Kafka Cluster Operator をインストールする新しい kafka namespace を参照するインストールファイルを変更します。

    注記

    デフォルトでは、ファイルは myproject namespace で動作します。

    • Linux の場合は、以下を使用します。
    sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
    • Mac の場合は、以下を使用します。
    sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
  4. CRD を管理するために、CRD およびロールベースアクセス制御 (RBAC) リソースをデプロイします。

    oc project kafka
    oc apply -f install/cluster-operator/
  5. Kafka クラスターをデプロイする新しい my-kafka-project namespace を作成します。

    oc new-project my-kafka-project
  6. 管理者でないユーザー developermy-kafka-project へのアクセス権限を付与します。

    以下に例を示します。

    oc adm policy add-role-to-user admin developer -n my-kafka-project
  7. STRIMZI_NAMESPACE 環境変数の値を設定し、my-kafka-project namespace を監視する権限を Cluster Operator に付与します。

    oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=kafka,my-kafka-project -n kafka
    oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
    oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-kafka-project
    oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project

    このコマンドによって、Cluster Operator が Kafka クラスターにアクセスするための権限を付与するロールバインディングが作成されます。

  8. 新しいクラスターロール strimzi-admin を作成します。

    oc apply -f install/strimzi-admin
  9. ロールを管理者でないユーザー developer に追加します。

    oc adm policy add-cluster-role-to-user strimzi-admin developer

2.4. クラスターの作成

AMQ Streams がインストールされている状態で、Kafka クラスターを作成し、クラスター内でトピックを作成します。

クラスターの作成時に、AMQ Streams のインストール時にデプロイされた Cluster Operator によって新規の Kafka リソースが監視されます。

前提条件

  • Kafka クラスターでは、Cluster Operator がデプロイされている必要があります。
  • トピックでは、Kafka クラスターが稼働中である必要があります。

手順

  1. ユーザー developer として my-kafka-project namespace にログインします。

    以下に例を示します。

    oc login -u developer
    oc project my-kafka-project

    新規ユーザーが OpenShift Container Platform にログインした後に、そのユーザーのアカウントが作成されます。

  2. 3 つの Zookeeper ノードと 3 つのブローカーノードで、新しい my-cluster Kafka クラスターを作成します。

    • ephemeral ストレージを使用します。
    • route を使用するよう設定された外部リスナーを使用して、OpenShift クラスター外部の Kafka クラスターを公開します。

      cat << EOF | oc create -f -
      apiVersion: kafka.strimzi.io/v1beta1
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        kafka:
          replicas: 3
          listeners:
            - name: plain
              port: 9092
              type: internal
              tls: false
            - name: tls
              port: 9093
              type: internal
              tls: true
            - name: external
              port: 9094
              type: route 1
              tls: true
          storage:
            type: ephemeral
        zookeeper:
          replicas: 3
          storage:
            type: ephemeral
        entityOperator:
          topicOperator: {}
      EOF
  3. クラスターがデプロイされるまで待機します。

    oc wait my-kafka-project/my-cluster --for=condition=Ready --timeout=300s -n kafka
  4. クラスターの準備ができたら、パブリッシュするトピックを作成し、外部クライアントからサブスクライブします。

    my-cluster Kafka クラスターで 3 つのレプリカと 3 つのパーティションを使用して、以下の my-topic カスタムリソース定義を作成します。

    cat << EOF | oc create -f -
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: "my-cluster"
    spec:
      partitions: 3
      replicas: 3
    EOF

2.5. クラスターへのアクセス

route がクラスターへの外部アクセスに使用される場合、ブローカーとクライアント間で TLS (Transport Layer Security) 暗号化を有効にするためにクラスターの CA 証明書が必要になります。

前提条件

  • OpenShift クラスター内で Kafka クラスターが稼働中である必要があります。
  • Cluster Operator も稼働している必要があります。

手順

  1. ブートストラップ route のアドレスを見つけます。

    oc get routes my-cluster-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'

    このアドレスと Kafkaクライアントの 443 番ポートをブートストラップアドレスとして使用します。

  2. ブローカーの認証局の公開証明書を抽出します。

    oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
  3. 信頼できる証明書をトラストストアにインポートします。

    keytool -keystore client.truststore.jks -alias CARoot -import -file ca.crt

    メッセージの送受信を開始する準備が整いました。

2.6. トピックからのメッセージの送受信

クラスター外部のメッセージを my-topic から送受信すると、AMQ Streams インストールをテストできます。

ターミナルを使用して、ローカルマシンで Kafka プロデューサーおよびコンシューマーを実行します。

前提条件

手順

  1. AMQ Streams のダウンロードサイト から最新バージョンの AMQ Stremas アーカイブ (amq-streams-x.y.z-bin.zip) をダウンロードします。

    ファイルを任意の場所で展開します。

  2. ターミナルを開き、トピック my-topic と TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。

    bin/kafka-console-producer.sh --broker-list ROUTE-ADDRESS:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=./client.truststore.jks --topic my-topic
  3. プロデューサーが実行しているコンソールにメッセージを入力します。
  4. Enter を押してメッセージを送信します。
  5. 新しいターミナルタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。

    bin/kafka-console-consumer.sh --bootstrap-server ROUTE-ADDRESS:443 --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=./client.truststore.jks --topic my-topic --from-beginning
  6. コンシューマーコンソールに受信メッセージが表示されることを確認します。
  7. Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.