41.2. Consumer インターフェースの実装


コンシューマーを実装する代替方法

コンシューマーは以下のいずれかの方法で実装できます。

イベント駆動型のコンシューマーの実装

イベント駆動型のコンシューマーでは、処理は外部イベントによって明示的に実行されます。イベントは、リスナーインターフェースが特定のイベントソースに固有である event-listener インターフェースを介して受信されます。

例41.4「JMXConsumer 実装」 は、Apache Camel JMX コンポーネント実装から取得した JMXConsumer クラスの実装を示しています。JMXConsumerクラスはイベント駆動型のコンシューマーの例で、org.apache.camel.impl.DefaultConsumer クラスから継承されることで実装されます。この JMXConsumer 例では、イベントは NotificationListener.handleNotification() メソッドの呼び出しによって表されます。これは JMX イベントを受信する標準的な方法です。これらの JMX イベントを受信するには、例41.4「JMXConsumer 実装」 にあるように NotificationListener インターフェースを実装し、handleNotification() メソッドを上書きする必要があります。

例41.4 JMXConsumer 実装

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}
1
JMXConsumer パターンは、DefaultConsumer クラスを拡張してイベント駆動型のコンシューマーの通常のパターンに従います。また、このコンシューマーは (JMX 通知で表される) JMX からのイベントを受信するように設計されているため、NotificationListener インターフェースを実装する必要があります。
2
親エンドポイントへの参照 endpoint とチェーン内の次のプロセッサーへの参照 processor を引数に取るコンストラクターを少なくとも 1 つ実装する必要があります。
3
JMX 通知が到着すると (NotificationListener で定義された)、handleNotification() メソッドは JMX によって自動的に呼び出されます。このメソッドの本文には、コンシューマーのイベント処理を実行するコードが含まれている必要があります。handleNotification() 呼び出しは JMX レイヤーから発生するため、コンシューマーのスレッドモデルは JMXConsumer クラスではなく、JMX レイヤーによって暗黙的に制御されます。
4
このコード行は、2 つの手順を組み合わせたものです。まず、JMX 通知オブジェクトは、Apache Camel でのイベントの汎用表現であるエクスチェンジオブジェクトに変換されます。次に、新たに作成されたエクスチェンジオブジェクトはルートの次のプロセッサーに渡されます (同期的に呼び出します) 。
5
handleException() メソッドは、DefaultConsumer ベースクラスによって実装されます。デフォルトでは、org.apache.camel.impl.LoggingExceptionHandler クラスを使用して例外を処理します。
注記

handleNotification() メソッドは JMX の例に固有です。独自のイベント駆動型のコンシューマーを実装する場合、カスタムコンシューマーに実装する同様のイベントリスナーメソッドを特定する必要があります。

スケジュールされたポーリングコンシューマーの実装

スケジュールされたポーリングコンシューマーでは、ポーリングイベントはタイマークラス java.util.concurrent.ScheduledExecutorService によって自動的に生成されます。生成されたポーリングイベントを受信するには、 ScheduledPollConsumer.poll() メソッドを実装する必要があります (「コンシューマーパターンおよびスレッド」 を参照)。

例41.5「ScheduledPollConsumer 実装」 では、スケジュールされたポーリングパターンに従ってコンシューマーを実装する方法を示します。これは、ScheduledPollConsumer クラスを拡張することで実装されます。

例41.5 ScheduledPollConsumer 実装

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        Exchange exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor.
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}
1
org.apache.camel.impl.ScheduledPollConsumer クラスを拡張して、スケジュールされたポーリングコンシューマークラス CustomConsumer を実装します。
2
親エンドポイントへの参照 endpoint とチェーン内の次のプロセッサーへの参照 processor を引数に取るコンストラクターを少なくとも 1 つ実装する必要があります。
3
スケジュールされたポーリングイベントを受信するには、poll() メソッドを上書きします。これは、受信イベントを取得して処理するコードを配置する場所です (エクスチェンジオブジェクトで表されます)。
4
この例では、イベントは同期的に処理されます。イベントを非同期的に処理する場合は、getAsyncProcessor() を代わりに呼び出して非同期プロセッサーへの参照を使用する必要があります。イベントを非同期的に処理する方法は、「非同期処理」 を参照してください。
5
(オプション) コンシューマーの開始時にコードを実行する場合は、以下のように doStart() メソッドを上書きします。
6
(オプション) コンシューマーの停止時にコードを実行する場合は、以下のように doStop() メソッドを上書きします。

ポーリングコンシューマーの実装

例41.6「PollingConsumerSupport 実装」 では、ポーリングパターンに従ってコンシューマーを実装する方法を説明します。これは、PollingConsumerSupport クラスを拡張することで実装されます。

例41.6 PollingConsumerSupport 実装

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class pass:quotes[CustomConsumer] extends PollingConsumerSupport { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}
1
org.apache.camel.impl.PollingConsumerSupport クラスを拡張して、ポーリングコンシューマークラス CustomConsumer を実装します。
2
親エンドポイントへの参照 endpoint を引数に取るコンストラクターを少なくとも 1 つ実装する必要があります。ポーリングコンシューマーはプロセッサーインスタンスへの参照を必要としません。
3
receiveNoWait() メソッドは、イベント(エクスチェンジオブジェクト)を取得するための非ブロッキングアルゴリズムを実装する必要があります。利用できるイベントがない場合は、null が返されます。
4
この receive() メソッドでは、イベントを取得するためのブロッキングのアルゴリズムを実装する必要があります。このメソッドは、イベントが利用できない状態が続く場合に無期限にブロックできます。
5
receive(long timeout) メソッドは、指定したタイムアウト(通常はミリ秒単位で指定)までブロックできるアルゴリズムを実装します。
6
コンシューマーの起動またはシャットダウン中に実行するコードを挿入する場合は、doStart() メソッドと doStop() メソッドをそれぞれ実装します。

カスタムスレッドの実装

標準のコンシューマーパターンがコンシューマーの実装に適さない場合には、Consumer インターフェースを直接実装してスレッドコードを作成することができます。ただし、スレッドコードを作成する場合は、「スレッドモデル」 で説明されているように、標準の Apache Camel スレッドモデルに準拠することが重要です。

たとえば、camel-core にある SEDA コンポーネントは Apache Camel スレッドモデルに一貫性のある独自のコンシューマースレッドを実装しています。例41.7「カスタムスレッド実装」 クラスがスレッドを実装する方法の概要を SedaConsumer に示しています。

例41.7 カスタムスレッド実装

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Consumer for the SEDA component.
 *
 * @version $Revision: 922485 $
 */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);

    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    ...
    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }
    ...

    public void run() { 2
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        // Poll the queue and process exchanges
        ...
    }

    ...
    protected void doStart() throws Exception { 3
        int poolSize = endpoint.getConcurrentConsumers();
        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
            .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4
        for (int i = 0; i < poolSize; i++) { 5
            executor.execute(this);
        }
        endpoint.onStarted(this);
    }

    protected void doStop() throws Exception { 6
        endpoint.onStopped(this);
        // must shutdown executor on stop to avoid overhead of having them running
        endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7

        if (multicast != null) {
            ServiceHelper.stopServices(multicast);
        }
    }
    ...
    //----------
    // Implementation of ShutdownAware interface

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        // deny stopping on shutdown as we want seda consumers to run in case some other queues
        // depend on this consumer to run, so it can complete its exchanges
        return true;
    }

    public int getPendingExchangesSize() {
        // number of pending messages on the queue
        return endpoint.getQueue().size();
    }

}
1
SedaConsumer クラスは、org.apache.camel.impl.ServiceSupport クラスを拡張し、ConsumerRunnable、および ShutdownAware インターフェースを実装することで実装されます。
2
Runnable.run() メソッドを実装し、スレッドで実行中にコンシューマーの動作を定義します。この場合、コンシューマーはループで実行され、新しいエクスチェンジのためにキューをポーリングし、キューの後半でエクスチェンジを処理します。
3
doStart() メソッドは ServiceSupport から継承されます。このメソッドは、コンシューマーの起動時に行う動作を定義するために上書きされます。
4
スレッドを直接作成するのではなく、CamelContext と共に登録された ExecutorServiceStrategy オブジェクトを使用してスレッドプールを作成する必要があり ます。これは、Apache Camel がスレッドの集中管理を実装し、正常なシャットダウンなどの機能をサポートすることができるため、重要になります。詳細は、「スレッドモデル」 を参照してください。
5
ExecutorService.execute() メソッド poolSize を呼び出し、スレッドを開始します。
6
doStop() メソッドは ServiceSupport から継承されます。このメソッドは、シャットダウン時にコンシューマーの動作を定義するために上書きされます。
7
executor インスタンスが表すスレッドプールをシャットダウンします。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.