273.6. 将数据发送到 Camel
当外部库需要将事件推送到 Camel 路由时,必须将 Reactive Streams 端点设置为消费者。
from("reactive-streams:elements") .to("log:INFO");
from("reactive-streams:elements")
.to("log:INFO");
对 元素
流的句柄可以从 CamelReactiveStreams
实用程序类获取。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
订阅者可用于将事件推送到来自 元素
流的 Camel 路由。
以下是如何将它与 RxJava 2 搭配使用的示例(尽管可以使用任何被动框架发布事件)。
Flowable.interval(1, TimeUnit.SECONDS) .map(i -> "Item " + i) .subscribe(elements);
Flowable.interval(1, TimeUnit.SECONDS)
.map(i -> "Item " + i)
.subscribe(elements);
字符串项在示例中由 RxJava 每秒生成,它们被推送到上述定义的 Camel 路由中。
273.6.1. 使用直接 API 将数据发送到 Camel 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
另外,在这种情况下,可以使用直接 API 从端点 URI 获取 Camel 订阅者。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Send two strings to the "seda:queue" endpoint Flowable.just("hello", "world") .subscribe(camel.subscriber("seda:queue", String.class));
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// Send two strings to the "seda:queue" endpoint
Flowable.just("hello", "world")
.subscribe(camel.subscriber("seda:queue", String.class));