第22章 プロセスから Kafka メッセージを送受信するための KIE Server の設定
Kafka メッセージを送受信するプロセスを実行するには (カスタムタスクを使用する場合を除く)、KIE Server を使用する必要があります。Red Hat AMQ Streams と統合するように、この KIE Server を設定する必要があります。
手順
Red Hat AMQ Streams との統合を有効にするには、お使いの環境に応じて以下のシステムプロパティーを設定します。
-
Red Hat JBoss EAP で KIE Server を使用している場合は、KIE Server の
org.kie.kafka.server.ext.disabled
システムプロパティーをfalse
に設定します。 -
Spring Boot を使用している場合は、
kieserver.kafka.enabled
システムプロパティーをtrue
に設定します。
-
Red Hat JBoss EAP で KIE Server を使用している場合は、KIE Server の
-
Kafka ブローカーへの接続を設定するには、
org.kie.server.jbpm-kafka.ext.bootstrap.servers
システムプロパティーをブローカーのホストおよびポートに設定します。デフォルト値はlocalhost:9092
です。複数の host:port ペアのコンマ区切りリストを使用できます。 必要に応じて、以下のシステムプロパティーを設定して、Kafka メッセージの送受信を設定します。
-
org.kie.server.jbpm-kafka.ext.client.id
: 要求の実行時にブローカーに渡す ID 文字列。Red Hat AMQ Streams では、この文字列を使用してロギングを行います。 -
org.kie.server.jbpm-kafka.ext.topics.*
: メッセージ名からトピック名へのマッピング。たとえば、ExampleName
がメッセージの名前で、ExampleTopic
トピックでメッセージを送受信する場合は、org.kie.server.jbpm-kafka.ext.topics.ExampleName
システムプロパティーをExampleTopic
に設定します。このようなシステムプロパティーはいくつでも設定できます。メッセージ名がシステムプロパティーを使用してマップされていない場合、プロセスエンジンはこの名前をトピック名として使用します。
-
必要に応じて、以下のシステムプロパティーを設定して、Kafka メッセージの受信を設定します。
-
org.kie.server.jbpm-kafka.ext.allow.auto.create.topics
: 自動トピック作成を許可します。デフォルトでは有効です。 -
org.kie.server.jbpm-kafka.ext.group.id
: この Kafka メッセージコンシューマーが属するグループを識別する一意の文字列。デフォルト値はjbpm-consumer
です。
-
必要に応じて、以下のシステムプロパティーを設定して、Kafka メッセージの送信を設定します。
-
org.kie.server.jbpm-kafka.ext.acks
: 要求を完了としてマークする前に Kafka リーダーが受信する必要がある確認応答の数。デフォルト値は1
です。これは、リーダーがレコードをローカルログに書き込み、すべてのフォロワーからの完全な確認応答を待たずに、プロセスエンジンに応答することを意味します。 -
org.kie.server.jbpm-kafka.ext.max.block.ms
: パブリッシュメソッドがブロックするミリ秒数。この時間が経過すると、プロセスエンジンはビジネスプロセスの実行を再開できます。デフォルト値は2000
(2 秒) です。
-
22.1. カスタムメッセージ形式の設定
デフォルトでは、メッセージイベントを使用する場合に、プロセスエンジンは CloudEvents 仕様 バージョン 1.0 に準拠する形式でメッセージを送受信します。
オプションで、JSON のローデータ形式またはメッセージのカスタム形式を設定できます。カスタム形式を使用する場合は、クラスを実装して指定する必要があります。
前提条件
- プロジェクトでのメッセージ送受信にメッセージイベントを使用する。
手順
メッセージの送受信にカスタム形式を使用する場合は、カスタムクラスを実装して指定します。
クラスのソースコードを開発します。
-
メッセージの送信には、
KafkaEventWriter
インターフェイスを実装するクラスを開発します。 メッセージの受信には、
KafkaEventReader
インターフェイスを実装するクラスを開発します。インターフェイス定義は GitHub リポジトリー からダウンロードできます。
-
メッセージの送信には、
- ビジネスアプリケーションにクラスを指定します。手順は、24章Business Central でのビジネスアプリケーションへのカスタムクラス指定を参照してください。
以下の KIE Server システムプロパティーを設定して、カスタムライターまたはリーダーを設定します。
-
org.kie.server.jbpm-kafka.ext.eventWriterClass
: カスタムイベントのライタークラス。別の形式を使用してメッセージを送信するには、このプロパティーを設定します。カスタム形式を使用する場合は、プロパティーをカスタムイベントライタークラスの完全修飾名に設定します。JSON のローデータ形式を使用するにはプロパティーをorg.kie.server.services.jbpm.kafka.RawJsonEventWriter
に設定します。 -
org.kie.server.jbpm-kafka.ext.eventReaderClass
: カスタムイベントのリーダークラス。別の形式を使用してメッセージを受信するには、このプロパティーを設定します。カスタム形式を使用する場合は、プロパティーをカスタムイベントリーダークラスの完全修飾名に設定します。JSON のローデータ形式を使用するにはプロパティーをorg.kie.server.services.jbpm.kafka.RawJsonEventReader
に設定します。
-