10.4. Kafka トピックの設定


KafkaTopic リソースのプロパティーを使用して、Kafka トピックを設定します。KafkaTopic 内のトピック設定に加えられた変更は、Kafka に伝播されます。

oc apply を使用すると、トピックを作成または編集できます。oc delete を使用すると、既存のトピックを削除できます。

以下に例を示します。

  • oc apply -f <topic_config_file>
  • oc delete KafkaTopic <topic_name>

トピックを削除できるようにするには、Kafka リソースの spec.kafka.configdelete.topic.enabletrue (デフォルト) に設定する必要があります。

この手順では、10 個のパーティションと 2 つのレプリカがあるトピックを作成する方法を説明します。

注記

手順は、トピック管理の一方向モードと双方向モードで同じです。

作業を開始する前に

KafkaTopic リソースでは、次の変更は許可されません。

  • spec.topicName で定義されたトピックの名前を変更します。spec.topicNamestatus.topicName の不一致が検出されます。
  • spec.partitions を使用してパーティションの数を減らす (Kafka ではサポートされていません)。
  • spec.replicas で指定されたレプリカの数を変更します。
警告

キーを持つトピックの spec.partitions を増やすと、レコードのパーティション分割が変更され、特にトピックでセマンティックパーティション分割が使用されている場合に問題が発生する可能性があります。

前提条件

  • mTLS 認証と TLS 暗号化を使用する Kafka ブローカーリスナーで設定された実行中の Kafka クラスター。
  • 実行中の Topic Operator (通常は、Entity Operator とともにデプロイされます)。
  • トピックを削除する場合は、 Kafka リソースの spec.kafka.configdelete.topic.enable=true (デフォルト) である必要があります。

手順

  1. KafkaTopic リソースを設定します。

    Kafka トピックの設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic-1
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2

    ヒント

    トピックを変更する場合、oc get kafkatopic my-topic-1 -o yaml を使用してリソースの現在のバージョンを取得できます。

  2. OpenShift で KafkaTopic リソースを作成します。

    oc apply -f <topic_config_file>
  3. トピックの準備完了ステータスが True に変わるまで待ちます。

    oc get kafkatopics -o wide -w -n <namespace>

    Kafka トピックのステータス

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-1   my-cluster  10          3                  True
    my-topic-2   my-cluster  10          3
    my-topic-3   my-cluster  10          3                  True

    READY 出力が True を示す場合、トピックの作成は成功です。

  4. READY 列が空白のままの場合は、リソース YAML または Topic Operator ログからステータスの詳細を取得してください。

    ステータスメッセージには、現在のステータスの理由の詳細が示されます。

    oc get kafkatopics my-topic-2 -o yaml

    NotReady ステータスのトピックの詳細

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-13T10:14:43.351550Z"
        message: Number of partitions cannot be decreased
        reason: PartitionDecreaseException
        status: "True"
        type: NotReady

    この例では、トピックの準備ができていない理由は、KafkaTopic 設定で元のパーティション数が減ったためです。Kafka はこれをサポートしていません。

    トピック設定をリセットした後、ステータスはトピックの準備ができていることを示します。

    oc get kafkatopics my-topic-2 -o wide -w -n <namespace>

    トピックのステータス更新

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-2   my-cluster  10          3                  True

    詳細のフェッチではメッセージが表示されない

    oc get kafkatopics my-topic-2 -o yaml

    READY ステータスのトピックの詳細

    # ...
    status:
      conditions:
      - lastTransitionTime: '2022-06-13T10:15:03.761084Z'
        status: 'True'
        type: Ready

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.