40.2. 实施端点接口
实施端点的替代方法 复制链接链接已复制到粘贴板!
支持以下替代端点实现模式:
事件驱动的端点实现 复制链接链接已复制到粘贴板!
如果您的自定义端点符合事件驱动的模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过扩展抽象类 org.apache.camel.impl.DefaultEndpoint 来实现,如 例 40.2 “实现 DefaultEndpoint” 所示。
例 40.2. 实现 DefaultEndpoint
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
public class CustomEndpoint extends DefaultEndpoint {
public CustomEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
// Do any other initialization...
}
public Producer createProducer() throws Exception {
return new CustomProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
return new CustomConsumer(this, processor);
}
public boolean isSingleton() {
return true;
}
// Implement the following methods, only if you need to set exchange properties.
//
public Exchange createExchange() {
return this.createExchange(getExchangePattern());
}
public Exchange createExchange(ExchangePattern pattern) {
Exchange result = new DefaultExchange(getCamelContext(), pattern);
// Set exchange properties
...
return result;
}
}
- 1
- 通过扩展
DefaultEndpoint类来实施事件驱动的自定义端点 CustomEndpoint。 - 2
- 您必须至少有一个构造器使用端点 URI、
endpointUri和父组件引用、组件作为参数。 - 3
- 实施
createProducer ()工厂方法来创建制作者端点。 - 4
- 实施
createConsumer ()工厂方法,以创建事件驱动的消费者实例。 - 5
- 通常,不需要 覆盖
createExchange ()方法。默认情况下,实现从DefaultEndpoint继承创建一个DefaultExchange对象,它可用于任何 Apache Camel 组件。如果您需要在DefaultExchange对象中初始化一些交换属性,但应该在此处覆盖createExchange ()方法来添加交换属性设置。
不要覆盖 createPollingConsumer () 方法。
DefaultEndpoint 类提供以下方法的默认实现,您可能会在编写自定义端点代码时很有用:
-
getEndpointUri ()criu-wagonReturns 端点 URI。 -
getCamelContext ()criu-wagonReturns 对CamelContext的引用。 -
getComponent ()criu- iwlReturns 对父组件的引用。 -
Create
PollingConsumer ()criu- iwlCreates a polling consumer。创建的轮询消费者功能基于事件驱动的消费者。如果您覆盖事件驱动的消费者方法createConsumer (),则获得轮询消费者实施。 -
createExchange (Exchange e)criu-criuConverts the given exchange 对象e,到此端点所需的类型。此方法使用覆盖的createExchange ()端点创建一个新端点。这样可确保该方法也适用于自定义交换类型。
调度的轮询端点实施 复制链接链接已复制到粘贴板!
如果您的自定义端点符合调度的轮询模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过继承抽象类 org.apache.camel.impl.ScheduledPollEndpoint 来实现,如 例 40.3 “ScheduledPollEndpoint 实现” 所示。
例 40.3. ScheduledPollEndpoint 实现
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;
public class CustomEndpoint extends ScheduledPollEndpoint {
protected CustomEndpoint(String endpointUri, CustomComponent component) {
super(endpointUri, component);
// Do any other initialization...
}
public Producer createProducer() throws Exception {
Producer result = new CustomProducer(this);
return result;
}
public Consumer createConsumer(Processor processor) throws Exception {
Consumer result = new CustomConsumer(this, processor);
configureConsumer(result);
return result;
}
public boolean isSingleton() {
return true;
}
// Implement the following methods, only if you need to set exchange properties.
//
public Exchange createExchange() {
return this.createExchange(getExchangePattern());
}
public Exchange createExchange(ExchangePattern pattern) {
Exchange result = new DefaultExchange(getCamelContext(), pattern);
// Set exchange properties
...
return result;
}
}
- 1
- 通过扩展
ScheduledPollEndpoint类,实施调度的轮询自定义端点 CustomEndpoint。 - 2
- 您必须至少有一个构造器,它将端点 URI、
endpointUri和父组件引用、组件 引用、组件作为参数。 - 3
- 实施
createProducer ()工厂方法,以创建制作者端点。 - 4
- 实施
createConsumer ()工厂方法,以创建调度的轮询消费者实例。 - 5
- 6
- 通常,不需要 覆盖
createExchange ()方法。默认情况下,实现从DefaultEndpoint继承创建一个DefaultExchange对象,它可用于任何 Apache Camel 组件。如果您需要在DefaultExchange对象中初始化一些交换属性,但应该在此处覆盖createExchange ()方法来添加交换属性设置。
不要覆盖 createPollingConsumer () 方法。
轮询端点实施 复制链接链接已复制到粘贴板!
如果您的自定义端点符合轮询消费者模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过继承抽象类 org.apache.camel.impl.DefaultPollingEndpoint 来实现,如 例 40.4 “DefaultPollingEndpoint 实现” 所示。
例 40.4. DefaultPollingEndpoint 实现
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultPollingEndpoint;
public class CustomEndpoint extends DefaultPollingEndpoint {
...
public PollingConsumer createPollingConsumer() throws Exception {
PollingConsumer result = new CustomConsumer(this);
configureConsumer(result);
return result;
}
// Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint.
...
}
因为此 CustomEndpoint 类是一个轮询端点,所以您必须实现 createPollingConsumer () 方法,而不是 createConsumer () 方法。从 createPollingConsumer () 返回的消费者实例必须从 PollingConsumer 接口继承。有关如何实现轮询消费者的详情,请参考 “轮询消费者实施”一节。
除了实现 createPollingConsumer () 方法外,实现 DefaultPollingEndpoint 的步骤与实现 ScheduledPollEndpoint 的步骤类似。详情请查看 例 40.3 “ScheduledPollEndpoint 实现”。
实现 BrowsableEndpoint 接口 复制链接链接已复制到粘贴板!
如果要公开当前端点中待处理的交换实例列表,您可以实现 org.apache.camel.spi.BrowsableEndpoint 接口,如 例 40.5 “BrowsableEndpoint Interface” 所示。如果端点执行某种类型的传入事件,则实施此接口是合理的。例如,Apache Camel SEDA 端点实现了 BrowsableEndpoint 接口,see 例 40.6 “SedaEndpoint 实现”。
例 40.5. BrowsableEndpoint Interface
package org.apache.camel.spi;
import java.util.List;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
public interface BrowsableEndpoint extends Endpoint {
List<Exchange> getExchanges();
}
示例 复制链接链接已复制到粘贴板!
例 40.6 “SedaEndpoint 实现” 显示 SedaEndpoint 的示例实施。SEDA 端点是 事件驱动的端点 示例。传入的事件存储在 FIFO 队列中( java.util.concurrent.BlockingQueue的实例),而 SEDA 使用者会启动一个线程来读取和处理事件。事件本身由 org.apache.camel.Exchange 对象表示。
例 40.6. SedaEndpoint 实现
package org.apache.camel.component.seda;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;
public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
private BlockingQueue<Exchange> queue;
public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
super(endpointUri, component);
this.queue = queue;
}
public SedaEndpoint(String uri, SedaComponent component, Map parameters) {
this(uri, component, component.createQueue(uri, parameters));
}
public Producer createProducer() throws Exception {
return new CollectionProducer(this, getQueue());
}
public Consumer createConsumer(Processor processor) throws Exception {
return new SedaConsumer(this, processor);
}
public BlockingQueue<Exchange> getQueue() {
return queue;
}
public boolean isSingleton() {
return true;
}
public List<Exchange> getExchanges() {
return new ArrayList<Exchange> getQueue());
}
}
- 1
SedaEndpoint类遵循通过扩展DefaultEndpoint类来实现事件驱动的端点的模式。SedaEndpoint类还实现了 BrowsableEndpoint 接口,它提供对队列中交换对象列表的访问。- 2
- 根据事件驱动的消费者的常规模式,
SedaEndpoint定义了一个构造器,它采用端点参数、endpointUri和组件引用参数,组件。 - 3
- 提供了另一个构造器,它将队列创建委派给父组件实例。
- 4
createProducer ()工厂方法创建一个CollectionProducer实例,它是一个生成者实现,用于将事件添加到队列中。- 5
createConsumer ()factory 方法创建一个SedaConsumer实例,它负责从队列拉取事件并处理它们。- 6
getQueue ()方法返回对队列的引用。- 7
isSingleton ()方法返回true,这表示应当为每个唯一的 URI 字符串创建一个端点实例。- 8
getExchanges ()方法实现了来自 BrowsableEndpoint 的对应抽象方法。