257.7. Camel データをリアクティブフレームワークに処理する
リアクティブストリーム パブリッシャー は一方向のデータエクスチェンジを可能にしますが、Camel ルートはインアウトエクスチェンジパターンを使用することがよくあります (たとえば、REST エンドポイントを定義し、一般に、各リクエストに対してレスポンスが必要な場合)。
このような状況では、ユーザーはリアクティブ処理ステップをフローに追加して、Camel ルートを強化したり、リアクティブフレームワークを使用して変換全体を定義したりできます。
たとえば、次のルートがあるとします。
from("timer:clock") .setBody().header(Exchange.TIMER_COUNTER) .to("direct:reactive") .log("Continue with Camel route... n=${body}");
from("timer:clock")
.setBody().header(Exchange.TIMER_COUNTER)
.to("direct:reactive")
.log("Continue with Camel route... n=${body}");
リアクティブ処理ステップは、direct:reactive エンドポイントに関連付けることができます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); camel.process("direct:reactive", Integer.class, items -> Flowable.fromPublisher(items) // RxJava2 .map(n -> -n)); // make every number negative
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
camel.process("direct:reactive", Integer.class, items ->
Flowable.fromPublisher(items) // RxJava2
.map(n -> -n)); // make every number negative
Camel ルートを流れるデータは、外部のリアクティブフレームワークによって処理され、Camel 内で処理フローが続行されます。
このメカニズムは、完全にリアクティブな方法で In-Out エクスチェンジを定義するためにも使用できます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // requires a rest-capable Camel component camel.process("rest:get:orders", exchange -> Flowable.fromPublisher(exchange) .flatMap(ex -> allOrders())); // retrieve orders asynchronously
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// requires a rest-capable Camel component
camel.process("rest:get:orders", exchange ->
Flowable.fromPublisher(exchange)
.flatMap(ex -> allOrders())); // retrieve orders asynchronously
詳細については、Camel の例 (camel-example-reactive-streams) を参照してください。