273.6. 将数据发送到 Camel
当外部库需要将事件推送到 Camel 路由时,必须将 Reactive Streams 端点设置为消费者。
from("reactive-streams:elements")
.to("log:INFO");
from("reactive-streams:elements")
.to("log:INFO");
元素 流的句柄可以从 CamelReactiveStreams 实用程序类获得。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
订阅者可用于将事件推送到从 元素 流消耗的 Camel 路由。
以下是如何将它与 RxJava 2 搭配使用的示例(尽管任何被动框架都可用于发布事件)。
Flowable.interval(1, TimeUnit.SECONDS)
.map(i -> "Item " + i)
.subscribe(elements);
Flowable.interval(1, TimeUnit.SECONDS)
.map(i -> "Item " + i)
.subscribe(elements);
字符串项由示例中的 RxJava 每秒生成,它们被推送到上面定义的 Camel 路由中。
273.6.1. 使用直接 API 将数据发送到 Camel 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
另外,在本例中,直接 API 可用于从端点 URI 获取 Camel 订阅者。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// Send two strings to the "seda:queue" endpoint
Flowable.just("hello", "world")
.subscribe(camel.subscriber("seda:queue", String.class));
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
// Send two strings to the "seda:queue" endpoint
Flowable.just("hello", "world")
.subscribe(camel.subscriber("seda:queue", String.class));