7.6. ログの Kafka ブローカーへの転送
デフォルトの Elasticsearch ログストアに加えて、またはこの代わりに外部の Kafka ブローカーにログを転送できます。
外部 Kafka インスタンスへのログ転送を設定するには、そのインスタンスへの出力および出力を使用するパイプラインで ClusterLogForwarder
カスタムリソース (CR) を作成します。出力に特定の Kafka トピックを追加するか、デフォルトを使用できます。Kafka の出力は TCP(セキュアでない) または TLS(セキュアな TCP) 接続を使用できます。
手順
以下のように
ClusterLogForwarder
CR YAML ファイルを作成します。apiVersion: logging.openshift.io/v1 kind: ClusterLogForwarder metadata: name: instance 1 namespace: openshift-logging 2 spec: outputs: - name: app-logs 3 type: kafka 4 url: tls://kafka.example.devlab.com:9093/app-topic 5 secret: name: kafka-secret 6 - name: infra-logs type: kafka url: tcp://kafka.devlab2.example.com:9093/infra-topic 7 - name: audit-logs type: kafka url: tls://kafka.qelab.example.com:9093/audit-topic secret: name: kafka-secret-qe pipelines: - name: app-topic 8 inputRefs: 9 - application outputRefs: 10 - app-logs parse: json 11 labels: logType: "application" 12 - name: infra-topic 13 inputRefs: - infrastructure outputRefs: - infra-logs labels: logType: "infra" - name: audit-topic inputRefs: - audit outputRefs: - audit-logs - default 14 labels: logType: "audit"
- 1
ClusterLogForwarder
CR の名前はinstance
である必要があります。- 2
ClusterLogForwarder
CR の namespace はopenshift-logging
である必要があります。- 3
- 出力の名前を指定します。
- 4
kafka
タイプを指定します。- 5
- Kafka ブローカーの URL およびポートを有効な絶対 URL として指定し、オプションで特定のトピックで指定します。
tcp
(セキュアでない) プロトコルまたはtls
(セキュアな TCP) プロトコルを使用できます。CIDR アノテーションを使用するクラスター全体のプロキシーが有効になっている場合、出力は IP アドレスではなくサーバー名または FQDN である必要があります。 - 6
tls
接頭辞を使用する場合、TLS 通信のエンドポイントに必要なシークレットの名前を指定する必要があります。シークレットはopenshift-logging
プロジェクトに存在し、tls.crt、tls.key、および ca-bundle.crt のキーが含まれる必要があります。これらは、それぞれが表す証明書を参照します。- 7
- オプション: 非セキュアな出力を送信するには、URL の前に
tcp
の接頭辞を使用します。また、この出力のsecret
キーとそのname
を省略します。 - 8
- オプション: パイプラインの名前を指定します。
- 9
- パイプラインを使用して、転送する必要のあるログタイプを指定します (
application
infrastructure
、またはaudit
)。 - 10
- ログを転送するためにそのパイプラインで使用する出力を指定します。
- 11
- 必要に応じて、構造化された JSON ログエントリーを
structured
フィールドの JSON オブジェクトとして転送します。ログエントリーに有効な構造化された JSON が含まれる必要があります。そうでない場合は、OpenShift Logging は構造化
フィールドを削除し、代わりにログエントリーをデフォルトのインデックスapp-00000x
に送信します。 - 12
- オプション: 文字列。ログに追加する 1 つまたは複数のラベル。
- 13
- オプション: サポートされるタイプの他の外部ログアグリゲーターにログを転送するように複数の出力を設定します。
- オプション。パイプラインを説明する名前。
-
inputRefs
は、そのパイプラインを使用して転送するログタイプです (application
、infrastructure
、またはaudit
)。 -
outputRefs
は使用する出力の名前です。 - オプション: 文字列。ログに追加する 1 つまたは複数のラベル。
- 14
- オプション: ログを内部 Elasticsearch インスタンスに転送するために
default
を指定します。
オプション: 1 つの出力を複数の kafka ブローカーに転送するには、以下の例のように kafka ブローカーの配列を指定します。
... spec: outputs: - name: app-logs type: kafka secret: name: kafka-secret-dev kafka: 1 brokers: 2 - tls://kafka-broker1.example.com:9093/ - tls://kafka-broker2.example.com:9093/ topic: app-topic 3 ...
CR オブジェクトを作成します。
$ oc create -f <file-name>.yaml
Red Hat OpenShift Logging Operator は Fluentd Pod を再デプロイします。Pod が再デプロイされない場合、強制的に再デプロイするために Fluentd Pod を削除できます。
$ oc delete pod --selector logging-infra=fluentd