257.4. Camel からデータを取得
Camel ルートからデータフローにサブスクライブするには、以下のスニペットのように、エクスチェンジを名前付きストリームにリダイレクトする必要があります。
from("timer:clock") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:numbers");
from("timer:clock")
.setBody().header(Exchange.TIMER_COUNTER)
.to("reactive-streams:numbers");
ルートは XML DSL を使用して作成することもできます。
この例では、無制限の数字のストリームが、名前番号に関連付けられます。このストリームは、
CamelReactiveStreams
ユーティリティークラスを使用してアクセスできます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Getting a stream of exchanges Publisher<Exchange> exchanges = camel.fromStream("numbers"); // Getting a stream of Integers (using Camel standard conversion system) Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// Getting a stream of exchanges
Publisher<Exchange> exchanges = camel.fromStream("numbers");
// Getting a stream of Integers (using Camel standard conversion system)
Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
このストリームは、リアクティブストリームと互換性のあるライブラリーと簡単に使用できます。以下は、RxJava 2 で使用する方法の例です(ただし、リアクティブフレームワークを使用してイベントを処理することもできます)。
Flowable.fromPublisher(integers) .doOnNext(System.out::println) .subscribe();
Flowable.fromPublisher(integers)
.doOnNext(System.out::println)
.subscribe();
この例では、Camel によって生成されたすべての数字を System.out
に出力します。
257.4.1. direct API を使用した Camel からデータを取得
(Camel DSL を全く使用せずに)リアクティブフレームワークの機能構成を使用して処理フロー全体を定義することを希望する場合は、Camel URI を使用してストリームを定義することもできます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Get a stream from all the files in a directory Publisher<String> files = camel.from("file:folder", String.class); // Use the stream in RxJava2 Flowable.fromPublisher(files) .doOnNext(System.out::println) .subscribe();
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// Get a stream from all the files in a directory
Publisher<String> files = camel.from("file:folder", String.class);
// Use the stream in RxJava2
Flowable.fromPublisher(files)
.doOnNext(System.out::println)
.subscribe();