40.2. 实施端点接口


实施端点的替代方法

支持以下替代端点实现模式:

事件驱动的端点实现

如果您的自定义端点符合事件驱动的模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过扩展抽象类 org.apache.camel.impl.DefaultEndpoint 来实现,如 例 40.2 “实现 DefaultEndpoint” 所示。

例 40.2. 实现 DefaultEndpoint

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;

public class CustomEndpoint extends DefaultEndpoint { 1

    public CustomEndpoint(String endpointUri, Component component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        return new CustomProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        return new CustomConsumer(this, processor);
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 5
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 DefaultEndpoint 类来实施事件驱动的自定义端点 CustomEndpoint
2
您必须至少有一个构造器使用端点 URI、endpointUri 和父组件引用、组件 作为参数。
3
实施 createProducer () 工厂方法来创建制作者端点。
4
实施 createConsumer () 工厂方法,以创建事件驱动的消费者实例。
5
通常,不需要 覆盖 createExchange () 方法。默认情况下,实现从 DefaultEndpoint 继承创建一个 DefaultExchange 对象,它可用于任何 Apache Camel 组件。如果您需要在 DefaultExchange 对象中初始化一些交换属性,但应该在此处覆盖 createExchange () 方法来添加交换属性设置。
重要

不要覆盖 createPollingConsumer () 方法。

DefaultEndpoint 类提供以下方法的默认实现,您可能会在编写自定义端点代码时很有用:

  • getEndpointUri () criu-wagonReturns 端点 URI。
  • getCamelContext () criu-wagonReturns 对 CamelContext 的引用。
  • getComponent () criu- iwlReturns 对父组件的引用。
  • Create PollingConsumer () criu- iwlCreates a polling consumer。创建的轮询消费者功能基于事件驱动的消费者。如果您覆盖事件驱动的消费者方法 createConsumer (),您可以获得轮询消费者实现。
  • createExchange (Exchange e) criu-criuConverts the given exchange 对象 e,到此端点所需的类型。此方法使用覆盖的 createExchange () 端点创建一个新端点。这样可确保该方法也适用于自定义交换类型。

调度的轮询端点实施

如果您的自定义端点符合调度的轮询模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过继承抽象类 org.apache.camel.impl.ScheduledPollEndpoint 来实现,如 例 40.3 “ScheduledPollEndpoint 实现” 所示。

例 40.3. ScheduledPollEndpoint 实现

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;

public class CustomEndpoint extends ScheduledPollEndpoint {  1

    protected CustomEndpoint(String endpointUri, CustomComponent component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        Producer result = new CustomProducer(this);
        return result;
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        Consumer result = new CustomConsumer(this, processor);
        configureConsumer(result); 5
        return result;
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 6
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 ScheduledPollEndpoint 类,实施调度的轮询自定义端点 CustomEndpoint
2
您必须至少有一个构造器,它将端点 URI、endpointUri 和父组件引用、组件 引用、组件 作为参数。
3
实施 createProducer () 工厂方法,以创建制作者端点。
4
实施 createConsumer () 工厂方法,以创建调度的轮询消费者实例。
5
ScheduledPollEndpoint 基础类中定义的 configureConsumer () 方法负责将消费者查询选项注入消费者查询选项。请参阅 “消费者参数注入”一节
6
通常,不需要 覆盖 createExchange () 方法。默认情况下,实现从 DefaultEndpoint 继承创建一个 DefaultExchange 对象,它可用于任何 Apache Camel 组件。如果您需要在 DefaultExchange 对象中初始化一些交换属性,但应该在此处覆盖 createExchange () 方法来添加交换属性设置。
重要

不要覆盖 createPollingConsumer () 方法。

轮询端点实施

如果您的自定义端点符合轮询消费者模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过继承抽象类 org.apache.camel.impl.DefaultPollingEndpoint 来实现,如 例 40.4 “DefaultPollingEndpoint 实现” 所示。

例 40.4. DefaultPollingEndpoint 实现

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultPollingEndpoint;

public class CustomEndpoint extends DefaultPollingEndpoint {
    ...
    public PollingConsumer createPollingConsumer() throws Exception {
        PollingConsumer result = new CustomConsumer(this);
        configureConsumer(result);
        return result;
    }

    // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint.
    ...
}

因为此 CustomEndpoint 类是一个轮询端点,所以您必须实现 createPollingConsumer () 方法,而不是 createConsumer () 方法。从 createPollingConsumer () 返回的消费者实例必须从 PollingConsumer 接口继承。有关如何实现轮询消费者的详情,请参考 “轮询消费者实施”一节

除了实现 createPollingConsumer () 方法外,实现 DefaultPollingEndpoint 的步骤与实现 ScheduledPollEndpoint 的步骤类似。详情请查看 例 40.3 “ScheduledPollEndpoint 实现”

实现 BrowsableEndpoint 接口

如果要公开当前端点中待处理的交换实例列表,您可以实现 org.apache.camel.spi.BrowsableEndpoint 接口,如 例 40.5 “BrowsableEndpoint Interface” 所示。如果端点执行某种类型的传入事件,则实施此接口是合理的。例如,Apache Camel SEDA 端点实现了 BrowsableEndpoint 接口,see 例 40.6 “SedaEndpoint 实现”

例 40.5. BrowsableEndpoint Interface

package org.apache.camel.spi;

import java.util.List;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;

public interface BrowsableEndpoint extends Endpoint {
    List<Exchange> getExchanges();
}

示例

例 40.6 “SedaEndpoint 实现” 显示 SedaEndpoint 的示例实施。SEDA 端点是 事件驱动的端点 示例。传入的事件存储在 FIFO 队列中( java.util.concurrent.BlockingQueue的实例),而 SEDA 使用者会启动一个线程来读取和处理事件。事件本身由 org.apache.camel.Exchange 对象表示。

例 40.6. SedaEndpoint 实现

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;

public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1
    private BlockingQueue<Exchange> queue;

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2
        super(endpointUri, component);
        this.queue = queue;
    }

    public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3
        this(uri, component, component.createQueue(uri, parameters));
    }

    public Producer createProducer() throws Exception { 4
        return new CollectionProducer(this, getQueue());
    }

    public Consumer createConsumer(Processor processor) throws Exception { 5
        return new SedaConsumer(this, processor);
    }

    public BlockingQueue<Exchange> getQueue() { 6
        return queue;
    }

    public boolean isSingleton() { 7
        return true;
    }

    public List<Exchange> getExchanges() { 8
        return new ArrayList<Exchange> getQueue());
    }
}
1
SedaEndpoint 类遵循通过扩展 DefaultEndpoint 类来实现事件驱动的端点的模式。SedaEndpoint 类还实现了 BrowsableEndpoint 接口,它提供对队列中交换对象列表的访问。
2
根据事件驱动的消费者的常规模式,SedaEndpoint 定义了一个构造器,它采用端点参数、endpointUri 和组件引用参数,组件
3
提供了另一个构造器,它将队列创建委派给父组件实例。
4
createProducer () 工厂方法创建一个 CollectionProducer 实例,它是一个生成者实现,用于将事件添加到队列中。
5
createConsumer () factory 方法创建一个 SedaConsumer 实例,它负责从队列拉取事件并处理它们。
6
getQueue () 方法返回对队列的引用。
7
isSingleton () 方法返回 true,这表示应当为每个唯一的 URI 字符串创建一个端点实例。
8
getExchanges () 方法实现了来自 BrowsableEndpoint 的对应抽象方法。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.