40.2. 实施端点接口
实现端点的替代方法
支持以下替代端点实现模式:
事件驱动的端点实现
如果您的自定义端点符合事件驱动的模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过扩展抽象类来实现,则 org.apache.camel.impl.DefaultEndpoint
,如 例 40.2 “实施 DefaultEndpoint” 所示。
例 40.2. 实施 DefaultEndpoint
import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; public class CustomEndpoint extends DefaultEndpoint { 1 public CustomEndpoint(String endpointUri, Component component) { 2 super(endpointUri, component); // Do any other initialization... } public Producer createProducer() throws Exception { 3 return new CustomProducer(this); } public Consumer createConsumer(Processor processor) throws Exception { 4 return new CustomConsumer(this, processor); } public boolean isSingleton() { return true; } // Implement the following methods, only if you need to set exchange properties. // public Exchange createExchange() { 5 return this.createExchange(getExchangePattern()); } public Exchange createExchange(ExchangePattern pattern) { Exchange result = new DefaultExchange(getCamelContext(), pattern); // Set exchange properties ... return result; } }
- 1
- 通过扩展
DefaultEndpoint
类来实施事件驱动的自定义端点 CustomEndpoint。 - 2
- 您必须至少有一个构造器使用端点 URI、
endpointUri
和父组件引用、组件
作为参数。 - 3
- 实施
createProducer()
factory 方法来创建制作者端点。 - 4
- 实施
createConsumer()
工厂方法来创建事件驱动的消费者实例。 - 5
- 通常,不需要 覆盖
createExchange()
方法。从DefaultEndpoint
继承的实现会默认创建一个DefaultExchange
对象,它可以用于任何 Apache Camel 组件。然而,如果您需要在DefaultExchange
对象中初始化一些交换属性,则最好覆盖此处的createExchange()
方法来添加 Exchange 属性设置。
不要 覆盖 createPollingConsumer()
方法。
DefaultEndpoint
类提供以下方法的默认实现,您可以在编写自定义端点代码时发现这些实施可能很有用:
-
getEndpointUri()
the endpoint URI。 -
getCamelContext()
进行对CamelContext
的引用。 -
getComponent()
3.10.0-- the return returns 引用父组件。 -
创建PollingConsumer()>
;_<-对称Creates 轮询消费者。创建的轮询消费者功能取决于事件驱动的消费者。如果覆盖事件驱动的消费者方法,请创建Consumer()
,您可以获得轮询消费者的实施。 -
CreateExchange(Exchange e)
the given exchange 对象 e to the given exchange 对象e
,指向该端点所需的类型。此方法使用覆盖的createExchange()
端点来创建新的端点。这可确保该方法也可用于自定义交换类型。
调度轮询端点实现
如果您的自定义端点符合调度的轮询模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过从抽象类继承来实现(请参阅 例 40.3 “ScheduledPollEndpoint 实现”,如 所示。
例 40.3. ScheduledPollEndpoint 实现
import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.impl.ScheduledPollEndpoint; public class CustomEndpoint extends ScheduledPollEndpoint { 1 protected CustomEndpoint(String endpointUri, CustomComponent component) { 2 super(endpointUri, component); // Do any other initialization... } public Producer createProducer() throws Exception { 3 Producer result = new CustomProducer(this); return result; } public Consumer createConsumer(Processor processor) throws Exception { 4 Consumer result = new CustomConsumer(this, processor); configureConsumer(result); 5 return result; } public boolean isSingleton() { return true; } // Implement the following methods, only if you need to set exchange properties. // public Exchange createExchange() { 6 return this.createExchange(getExchangePattern()); } public Exchange createExchange(ExchangePattern pattern) { Exchange result = new DefaultExchange(getCamelContext(), pattern); // Set exchange properties ... return result; } }
- 1
- 通过扩展
ScheduledPollEndpoint
类来实施调度的轮询自定义端点 CustomEndpoint。 - 2
- 您必须至少有一个构造器使用端点 URI、
endpointUri
和父组件引用、组件
作为参数。 - 3
- 实施
createProducer()
factory 方法来创建制作者端点。 - 4
- 实施
createConsumer()
工厂方法来创建调度的轮询消费者实例。 - 5
- 6
- 通常,不需要 覆盖
createExchange()
方法。从DefaultEndpoint
继承的实现会默认创建一个DefaultExchange
对象,它可以用于任何 Apache Camel 组件。然而,如果您需要在DefaultExchange
对象中初始化一些交换属性,则最好覆盖此处的createExchange()
方法来添加 Exchange 属性设置。
不要 覆盖 createPollingConsumer()
方法。
轮询端点实现
如果您的自定义端点遵循轮询消费者模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过从抽象类继承来实现的,则 org.apache.camel.impl.DefaultPollingEndpoint
,如 例 40.4 “DefaultPollingEndpoint Implementation” 所示。
例 40.4. DefaultPollingEndpoint Implementation
import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.impl.DefaultPollingEndpoint; public class CustomEndpoint extends DefaultPollingEndpoint { ... public PollingConsumer createPollingConsumer() throws Exception { PollingConsumer result = new CustomConsumer(this); configureConsumer(result); return result; } // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint. ... }
由于此 CustomEndpoint 类是一个轮询端点,您必须实施 createPollingConsumer()
方法,而不是 createConsumer()
方法。从 createPollingConsumer()
返回的消费者实例必须从 PollingConsumer 接口继承。有关如何实施轮询消费者的详情,请参考 “轮询消费者实施”一节。
除了 createPollingConsumer()
方法的实施步骤外,实施 DefaultPollingEndpoint
的步骤与实施 ScheduledPollEndpoint
的步骤类似。详情请查看 例 40.3 “ScheduledPollEndpoint 实现”。
实施 BrowsableEndpoint 接口
如果要公开当前端点中待处理的交换实例列表,您可以实现 org.apache.camel.spi.BrowsableEndpoint 接口,如 例 40.5 “BrowsableEndpoint 接口” 所示。如果端点执行某种形式的传入事件,则需要实施这个接口。例如,Apache Camel SEDA 端点实施 BrowsableEndpoint 接口>_<-KUBECONFIGsee 例 40.6 “SedaEndpoint 实施”。
例 40.5. BrowsableEndpoint 接口
package org.apache.camel.spi; import java.util.List; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; public interface BrowsableEndpoint extends Endpoint { List<Exchange> getExchanges(); }
示例
例 40.6 “SedaEndpoint 实施” 显示 SedaEndpoint
的示例实现。SEDA 端点是一个由 事件驱动的端点的示例。传入的事件存储在 FIFO 队列中( java.util.concurrent.BlockingQueue
的实例)和 SEDA 使用者启动线程以读取和处理事件。事件本身由 org.apache.camel.Exchange
对象表示。
例 40.6. SedaEndpoint 实施
package org.apache.camel.component.seda; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.BrowsableEndpoint; public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1 private BlockingQueue<Exchange> queue; public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2 super(endpointUri, component); this.queue = queue; } public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3 this(uri, component, component.createQueue(uri, parameters)); } public Producer createProducer() throws Exception { 4 return new CollectionProducer(this, getQueue()); } public Consumer createConsumer(Processor processor) throws Exception { 5 return new SedaConsumer(this, processor); } public BlockingQueue<Exchange> getQueue() { 6 return queue; } public boolean isSingleton() { 7 return true; } public List<Exchange> getExchanges() { 8 return new ArrayList<Exchange> getQueue()); } }
- 1
SedaEndpoint
类遵循通过扩展DefaultEndpoint
类来实施事件驱动的端点的模式。SedaEndpoint
类还实施 BrowsableEndpoint 接口,它提供对队列中交换对象列表的访问。- 2
- 按照事件驱动的消费者的常用模式,
SedaEndpoint
定义了一个使用端点参数、端点Uri
以及组件引用参数 的构造器。 - 3
- 提供了另一个构造器,它将队列创建委派给父组件实例。
- 4
createProducer()
factory 方法创建CollectionProducer
的实例,这是向队列添加事件的生产者实现。- 5
createConsumer()
工厂方法创建SedaConsumer
的实例,它负责从队列拉取事件并进行处理。- 6
getQueue()
方法返回对队列的引用。- 7
isSingleton()
方法返回true
,表示应为每个唯一 URI 字符串创建一个端点实例。- 8
getExchanges()
方法从 BrowsableEndpoint 实现了对应的抽象方法。