第22章 プロセスから Kafka メッセージを送受信するための KIE Server の設定
イベントを使用して Kafka メッセージを送受信するプロセスを実行するには、KIE Server を使用する必要があります。Red Hat AMQ Streams と統合するために、KIE Server インスタンスを設定する必要があります。
前提条件
- KIE Server のインスタンスがインストールされています。
手順
Red Hat AMQ Streams との統合を有効にするには、お使いの環境に応じて以下のシステムプロパティーを設定します。
-
Red Hat JBoss EAP で KIE Server を使用している場合は、KIE Server システムプロパティー
org.kie.kafka.server.ext.disabledをfalseに設定します。 -
Spring Boot を使用している場合は、
kieserver.kafka.enabledシステムプロパティーをtrueに設定します。
-
Red Hat JBoss EAP で KIE Server を使用している場合は、KIE Server システムプロパティー
-
Kafka ブローカーへの接続を設定するには、
org.kie.server.jbpm-kafka.ext.bootstrap.serversシステムプロパティーをブローカーのホストおよびポートに設定します。デフォルト値はlocalhost:9092です。複数の host:port ペアのコンマ区切りリストを使用できます。 オプション: Kafka メッセージの送信と受信の両方に関連する、次のシステムプロパティーのいずれかを設定します。
Expand 表22.1 Kafka メッセージの送信と受信の両方に関連するオプションの KIE Server システムプロパティー プロパティー 説明 org.kie.server.jbpm-kafka.ext.client.idリクエスト時にブローカーに渡す ID 文字列。Red Hat AMQ Streams では、この文字列を使用してロギングを行います。
org.kie.server.jbpm-kafka.ext.topics.*メッセージ名とトピック名のマッピング。たとえば、
ExampleNameがメッセージの名前で、ExampleTopicトピックでメッセージを送受信する場合は、org.kie.server.jbpm-kafka.ext.topics.ExampleNameシステムプロパティーをExampleTopicに設定します。このようなシステムプロパティーはいくつでも設定できます。メッセージ名がシステムプロパティーを使用してマップされていない場合、プロセスエンジンはこの名前をトピック名として使用します。org.kie.server.jbpm-kafka.ext.property_nameorg.kie.server.jbpm-kafka.ext接頭辞を使用して、Red Hat AMQ Streams コンシューマーまたはプロデューサープロパティーを設定します。たとえば、buffer.memoryプロデューサープロパティーの値を設定するには、KIE Server システムプロパティーorg.kie.server.jbpm-kafka.ext.buffer.memoryを設定します。この設定は、この KIE Server のイベントを使用して Kafka メッセージを送受信するすべてのプロセスに適用されます。
Red Hat AMQ Streams コンシューマーおよびプロデューサープロパティーの一覧は、RHEL の AMQ Streams の使用 の付録 コンシューマー設定パラメーター および プロデューサー設定パラメーター を参照してください。
オプション: Kafka メッセージの受信に関連する、次のシステムプロパティーのいずれかを設定します。
Expand 表22.2 オプション: Kafka メッセージの受信に関連する、オプションの KIE Server システムプロパティー プロパティー 説明 デフォルト値 org.kie.server.jbpm-kafka.ext.allow.auto.create.topicsトピックの自動作成を許可する。
trueorg.kie.server.jbpm-kafka.ext.group.idこの Kafka メッセージコンシューマーが所属するグループを識別する一意の文字列。
jbpm-consumer.オプション: Kafka メッセージの送信に関連する、次のシステムプロパティーのいずれかを設定します。
Expand 表22.3 Kafka メッセージの送信に関連するオプションの KIE Server システムプロパティー プロパティー 説明 デフォルト値 org.kie.server.jbpm-kafka.ext.acks要求を完了としてマークする前に Kafka リーダーが受信する必要がある確認応答の数。
1.この値は、リーダーがそのローカルログにレコードを書き込み、その後、すべてのフォロワーからの完全な確認を待たずに、プロセスエンジンに応答することを意味します。org.kie.server.jbpm-kafka.ext.max.block.mspublish メソッドがブロックするミリ秒数。この時間が経過すると、プロセスエンジンはビジネスプロセスの実行を再開できます。
2000(2 秒)
22.1. カスタムメッセージ形式の設定 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、メッセージイベントを使用する場合に、プロセスエンジンは CloudEvents 仕様 バージョン 1.0 に準拠する形式でメッセージを送受信します。
オプションで、JSON のローデータ形式またはメッセージのカスタム形式を設定できます。カスタム形式を使用する場合は、クラスを実装して指定する必要があります。
前提条件
- プロジェクトでのメッセージ送受信にメッセージイベントを使用する。
手順
メッセージの送受信にカスタム形式を使用する場合は、カスタムクラスを実装して指定します。
クラスのソースコードを開発します。
-
メッセージの送信には、
KafkaEventWriterインターフェイスを実装するクラスを開発します。 メッセージの受信には、
KafkaEventReaderインターフェイスを実装するクラスを開発します。インターフェイス定義は GitHub repository からダウンロードできます。
-
メッセージの送信には、
- ビジネスアプリケーションにクラスを指定します。手順は、24章Business Central でのビジネスアプリケーションへのカスタムクラス指定を参照してください。
以下の KIE Server システムプロパティーを設定して、カスタムライターまたはリーダーを設定します。
Expand 表22.4 カスタムライターまたはリーダーを設定するための KIE Server のシステムプロパティー プロパティー 説明 org.kie.server.jbpm-kafka.ext.eventWriterClassカスタムイベントライタークラス。別の形式を使用してメッセージを送信するには、このプロパティーを設定します。カスタム形式を使用する場合は、プロパティーをカスタムイベントライタークラスの完全修飾名に設定します。JSON のローデータ形式を使用するにはプロパティーを
org.kie.server.services.jbpm.kafka.RawJsonEventWriterに設定します。org.kie.server.jbpm-kafka.ext.eventReaderClassカスタムイベントリーダークラス。別の形式を使用してメッセージを受信するには、このプロパティーを設定します。カスタム形式を使用する場合は、プロパティーをカスタムイベントリーダークラスの完全修飾名に設定します。JSON のローデータ形式を使用するにはプロパティーを
org.kie.server.services.jbpm.kafka.RawJsonEventReaderに設定します。