第10章 通知/リスナー API


10.1. 通知/リスナー API

Red Hat JBoss Data Grid は、発生時にイベントの通知を行うリスナー API を提供します。クライアントは、関係する通知に対してリスナー API の登録を選択できます。 この API はアノテーション駆動型で、キャッシュレベルのイベントとキャッシュマネージャーレベルのイベント上で操作します。

10.2. リスナーの例

次の例は、新しいエントリーがキャッシュに追加されるたびに情報を出力する Red Hat JBoss Data Grid のリスナーを定義します。

リスナーの設定

@Listener
public class PrintWhenAdded {
  @CacheEntryCreated
  public void print(CacheEntryCreatedEvent event) {
    System.out.println("New entry " + event.getKey() + " created in the cache");
  }
}

10.3. リスナー通知

10.3.1. リスナー通知

キャッシュイベントが発生するたびに通知がリスナーに送信されます。リスナーは、@Listener アノテーションが付いた簡単な POJO です。Listenable は、実装がアタッチするリスナーを持つことを意味するインターフェースです。各リスナーは Listenable で定義されたメソッドを使用して登録されます。

キャッシュレベルまたはキャッシュマネージャーレベルの通知を受信するため、リスナーはキャッシュおよびキャッシュマネージャーの両方にアタッチすることが可能です。

10.3.2. キャッシュレベルの通知

Red Hat JBoss Data Grid では、キャッシュレベルのイベントはキャッシュごとに発生します。キャッシュレベルイベントの例には、関係するキャッシュで登録されたリスナーへの通知を引き起こすエントリーの追加、削除、および変更などが含まれます。

10.3.3. キャッシュマネージャーレベルの通知

Red Hat JBoss Data Grid のキャッシュマネージャーレベルで発生するイベントの例は次のとおりです。

  • キャッシュの開始および停止
  • クラスターに参加するノードまたはクラスターから離脱するノード

キャッシュマネージャーレベルのイベントはグローバルに位置し、クラスター全体で使用されますが、単一のキャッシュマネージャーによって作成されたキャッシュ内のイベントに制限されます。

最初の 2 つのイベントである CacheStarted および CacheStopped は大変似ています。以下の例は、開始または停止されたキャッシュの名前を出力します。

@CacheStarted
public void cacheStarted(CacheStartedEvent event){
    // Print the name of the Cache that started
    log.info("Cache Started: " + event.getCacheName());
}

@CacheStopped
public void cacheStopped(CacheStoppedEvent event){
    // Print the name of the Cache that stopped
    log.info("Cache Stopped: " + event.getCacheName());
}

ViewChangedEvent または MergeEvent を受信するとき、新旧メンバーのリストはイベントが生成されたノードから送信されることに注意してください。例として、以下のシナリオについて考えてみましょう。

  • 現在、JDG クラスターはノード A、B、および C で構成されます。
  • ノード D がクラスターに参加します。
  • ノード A、B、および C は、[A,B,C] を旧メンバーのリストとし、[A,B,C,D] を新メンバーのリストとする ViewChangedEvent を受信します。
  • ノード D は、[D] を旧メンバーのリストとし、[A,B,C,D] を新メンバーのリストとする ViewChangedEvent を受信します。

よって、ノードのクラスターへの参加またはクラスターからの離脱を判断するために積集合が使用されることがあります。以下のように getOldMembers()getNewMembers() を使用すると、クラスターに参加したノードや離脱したノードのセットを判断できます。

@ViewChanged
public void viewChanged(ViewChangedEvent event){
    HashSet<Address> oldMembers = new HashSet(event.getOldMembers());
    HashSet<Address> newMembers = new HashSet(event.getNewMembers());
    HashSet<Address> oldCopy = (HashSet<Address>)oldMembers.clone();

    // Remove all new nodes from the old view.
    // The resulting set indicates nodes that have left the cluster.
    oldCopy.removeAll(newMembers);
    if(oldCopy.size() > 0){
        for (Address oldAdd : oldCopy){
            log.info("Node left:" + oldAdd.toString());
        }
    }

    // Remove all old nodes from the new view.
    // The resulting set indicates nodes that have joined the cluster.
    newMembers.removeAll(oldMembers);
    if(newMembers.size() > 0){
        for(Address newAdd : newMembers){
            log.info("Node joined: " + newAdd.toString());
        }
    }
}

MergeEvent の実行中に同様の論理を使用して、クラスターの新しいメンバーセットを判断することもできます。

10.3.4. 同期および非同期通知

デフォルトでは、Red Hat JBoss Data Grid の通知はイベントが生成された同じスレッドで送信されます。そのため、スレッドの進行を妨げないようにリスナーを書く必要があります。

この代わりに、別のスレッドで通知を送信し、元のスレッドの操作を妨げないようにするために、リスナーを非同期としてアノテーション付けすることもできます。

以下を使用してリスナーにアノテーションを付けます。

@Listener (sync = false)
public class MyAsyncListener { .... }

XML 設定ファイルで asyncListenerExecutor 要素を使用し、非同期通知の送信に使用されるスレッドプールを調整します。

重要

CacheEntryExpiredEvent を処理する非クラスターの同期リスナーを使用する場合、非スラスター環境ではエクスパレーションリーパー (reaper) も同期であるため、リスナーが実行を妨害しないようにしてください。

10.4. キャッシュエントリーの変更

10.4.1. キャッシュエントリーの変更

キャッシュエントリーの作成後、プログラムを使用してキャッシュエントリーを変更できます。

10.4.2. キャッシュエントリーが変更されたリスナーの設定

キャッシュエントリーが変更されたリスナーイベントでは、getValue() メソッドの動作は、実際の操作の実行前または実行後にコールバックがトリガーされたかどうかに特定されます。たとえば、event.isPre() が true の場合、event.getValue() は変更の前に古い値を返します。event.isPre() が false の場合、event.getValue() は新しい値を返します。イベントによって新しいエントリーが作成および挿入される場合、古い値は null になります。isPre() の詳細は、Red Hat JBoss Data Grid の『API Documentation』に記載されている org.infinispan.notifications.cachelistener.event パッケージのリストを参照してください。

Listenable および FilteringListenable インターフェース (Cache オブジェクトによって実装される) によって公開されるメソッドを使用する場合のみ、プログラムを使用してリスナーを設定できます。

10.4.3. キャッシュエントリーが変更されたリスナーの例

以下の例は、キャッシュエントリーが変更されるたびに情報を出力する Red Hat JBoss Data Grid のリスナーを定義します。

変更されたリスナー

@Listener
public class PrintWhenModified {

  @CacheEntryModified
  public void print(CacheEntryModifiedEvent event) {
    System.out.println("Cache entry modified. Details = " + event);
  }

}

10.5. クラスター化リスナー

10.5.1. クラスター化リスナー

クラスター化リスナーは、リスナーを分散キャッシュ設定で使用できるようにします。分散キャッシュ環境では、イベントが発生したノードのローカルイベントのみが登録されたローカルリスナーに通知されます。クラスター化リスナーでは、イベントが発生したノードに関係なくクラスターで発生する書き込み通知を 1 つのリスナーが受信できるようにし、この問題に対応します。そのため、クラスター化リスナーのパフォーマンスは、イベントが発生するノードのイベント通知のみを提供する非クラスター化リスナーよりも遅くなります。

クラスター化リスナーを使用する場合、特定のキャッシュでエントリーの追加、更新、期限切れ、または削除が行われるとクライアントアプリケーションに通知が送られます。イベントはクラスター全体であるため、アプリケーションが存在するノードや接続するノードに関係なく、クライアントアプリケーションはイベントにアクセスできます。

イベントは、常にリスナーが登録されたノード上で発生し、キャッシュの更新が発生した場所は関係しません。

10.5.2. クラスター化リスナーの設定

以下のユースケースではリスナーは受け取ったイベントを保存します。

手順: クラスター化リスナーの設定

@Listener(clustered = true)
  protected static class ClusterListener {
     List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>());

     @CacheEntryCreated
     @CacheEntryModified
     @CacheEntryExpired
     @CacheEntryRemoved
     public void onCacheEvent(CacheEntryEvent event) {
        log.debugf("Adding new cluster event %s", event);
        events.add(event);
     }
  }

  public void addClusterListener(Cache<?, ?> cache) {
     ClusterListener clusterListener = new ClusterListener();
     cache.addListener(clusterListener);
  }

  1. クラスター化リスナーを有効にするには、@Listener クラスに clustered=true を付けます。
  2. エントリーの追加、変更、期限切れ、または削除時にクライアントアプリケーションが通知を受けるようにするため、以下のメソッドにはアノテーションが付けられます。

    • @CacheEntryCreated
    • @CacheEntryModified
    • @CacheEntryExpired
    • @CacheEntryRemoved
  3. リスナーはキャッシュで登録され、オプションでフィルターまたはコンバーターを渡すことができます。

クラスター化リスナーを使用すると、非クラスターリスナーでは適用されない以下の制限が適用されます。

  • クラスターリスナーは作成されたエントリー、変更されたエントリー、期限切れのエントリー、または削除されたエントリーのみをリッスンできます。他のイベントはクラスター化リスナーによってリッスンされません。
  • ポストイベントのみがクラスター化リスナーに送信され、プレイベントは無視されます。

10.5.3. キャッシュリスナー API

addListener メソッドを使用すると、クラスター化されたリスナーを既存の @CacheListener API の上に追加できます。

キャッシュリスナー API

cache.addListener(Object listener, Filter filter, Converter converter);

public @interface Listener {
  boolean clustered() default false;
  boolean includeCurrentState() default false;
  boolean sync() default true;
}
interface CacheEventFilter<K,V> {
  public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}
interface CacheEventConverter<K,V,C> {
  public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}
Cache API

ローカルまたはクラスター化リスナーは cache.addListener メソッドで登録でき、以下のイベントの 1 つが発生するまでアクティブな状態になります。

  • cache.removeListener を呼び出してリスナーが明示的に登録解除される。
  • リスナーが登録されたノードがクラッシュする。
リスナーアノテーション

リスナーアノテーションは以下の 3 つの属性で強化されます。

  • clustered(): この属性は、アノテーションが付けられたリスナーがクラスター化されているかどうかを定義します。クラスター化されたリスナーは @CacheEntryRemoved@CacheEntryCreated@CacheEntryExpired、および @CacheEntryModified イベントのみの通知を受けることができます。この属性はデフォルトで false に指定されています。
  • includeCurrentState(): この属性はクラスター化リスナーのみに適用され、デフォルトで false に指定されています。true に設定すると、クラスター内で既存の状態全体が評価されます。登録されると、キャッシュの各エントリーの CacheCreatedEvent が即座にリスナーに送信されます。
  • sync() に関する詳細は、「同期および非同期の通知」を参照してください。
oldValue および oldMetadata
oldValue および oldMetadata の値は、CacheEventFilter および CacheEventConverter クラスの許可メソッド上の追加メソッドです。これらの値は、ローカルリスナーを含むすべてのリスナーに提供されます。これらの値の詳細は、JBoss Data Grid の API ドキュメント を参照してください。
EventType
EventType には、イベントのタイプ、再試行であったかどうか、およびプレまたはポストイベントであったかどうかが含まれます。

クラスター化リスナーを使用する場合、キャッシュが更新される順序は通知を受け取る順序に反映されます。

クラスター化リスナーは、イベントが 1 度だけ送信されることを保証しません。同じイベントが複数回送信されないようにするため、リスナー実装はべき等である必要があります。安定したクラスターや、includeCurrentState の結果として合成イベントが生成されるタイムスパン外部では、単一性が受け入れられることをインプリメンターは想定することができます。

10.5.4. クラスター化リスナーの例

次のユースケースは、ニューヨーク州のニューヨーク市宛の注文がいつ生成されるか知りたいリスナーを表しています。リスナーには、ニューヨークから出入りする注文をフィルターする Filter が必要です。さらに、注文全体は必要ではなく、配達される日付のみが必要であるため、Converter も必要になります。

ユースケース: ニューヨーク宛の注文のフィルターおよび変換

class CityStateFilter implements CacheEventFilter<String, Order> {
    private String state;
    private String city;

    public boolean accept(String orderId, Order oldOrder,
                          Metadata oldMetadata, Order newOrder,
                          Metadata newMetadata, EventType eventType) {
        switch (eventType.getType()) {
            // Only send update if the order is going to our city
            case CACHE_ENTRY_CREATED:
                return city.equals(newOrder.getCity()) &&
                        state.equals(newOrder.getState());
            // Only send update if our order has changed from our city to elsewhere or if is now going to our city
            case CACHE_ENTRY_MODIFIED:
                if (city.equals(oldOrder.getCity()) &&
                        state.equals(oldOrder.getState())) {
                    // If old city matches then we have to compare if new order is no longer going to our city
                    return !city.equals(newOrder.getCity()) ||
                            !state.equals(newOrder.getState());
                } else {
                    // If the old city doesn't match ours then only send update if new update does match ours
                    return city.equals(newOrder.getCity()) &&
                            state.equals(newOrder.getState());
                }
                // On remove we have to send update if our order was originally going to city
            case CACHE_ENTRY_REMOVED:
                return city.equals(oldOrder.getCity()) &&
                        state.equals(oldOrder.getState());
        }
        return false;
    }
}

class OrderDateConverter implements CacheEventConverter<String, Order, Date> {
    private String state;
    private String city;

    public Date convert(String orderId, Order oldValue,
                        Metadata oldMetadata, Order newValue,
                        Metadata newMetadata, EventType eventType) {
        // If remove we do not care about date - this tells listener to remove its data
        if (eventType.isRemove()) {
            return null;
        } else if (eventType.isModified()) {
            if (state.equals(newValue.getState()) &&
                    city.equals(newValue.getCity())) {
                // If it is a modification meaning the destination has changed to ours then we allow it
                return newValue.getDate();
            } else {
                // If destination is no longer our city it means it was changed from us so send null
                return null;
            }
        } else {
            // This was a create so we always send date
            return newValue.getDate();
        }
    }
}

10.5.5. 最適化されたキャッシュフィルターコンバーター

クラスター化リスナーの例」では、結果のフィルターおよび変換を一度に実行するために、最適化された CacheEventFilterConverter を使用できます。

CacheEventFilterConverter は、イベントのフィルターと変換を一度に実行できるようにする最適化です。これは、イベントフィルターとコンバーターが同じオブジェクトとして最も効率的に使用され、同じメソッドでフィルターと変換が行われる場合に使用できます。戻り値が null の場合は値がフィルターを通過しなかったことを意味するため、変換が null 値を返さない場合のみ使用できます。null 値を変換するには、CacheEventFilter および CacheEventConverter インターフェースを独立して使用します。

以下は、CacheEventFilterConverter を使用したニューヨーク宛の注文のユースケース例になります。

CacheEventFilterConverter

class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> {
    private final String state;
    private final String city;

    public Date filterAndConvert(String orderId, Order oldValue,
                                 Metadata oldMetadata, Order newValue,
                                 Metadata newMetadata, EventType eventType) {
        // Remove if the date is not required - this tells listener to remove its data
        if (eventType.isRemove()) {
            return null;
        } else if (eventType.isModified()) {
            if (state.equals(newValue.getState()) &&
                city.equals(newValue.getCity())) {
                // If it is a modification meaning the destination has changed to ours then we allow it
                return newValue.getDate();
            } else {
                // If destination is no longer our city it means it was changed from us so send null
                return null;
            }
        } else {
            // This was a create so we always send date
            return newValue.getDate();
        }
    }
}

リスナーの登録時、フィルターとコンバーターの両方の引数として FilterConverter を提供します。

OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York");
cache.addListener(listener, filterConveter, filterConverter);

10.6. リモートイベントリスナー (Hot Rod)

10.6.1. リモートイベントリスナー (Hot Rod)

イベントリスナーは、Red Hat JBoss Data Grid Hot Rod サーバーが CacheEntryCreatedCacheEntryModifiedCacheEntryExpiredCacheEntryRemoved などのイベントのリモートクライアントを通知できるようにします。接続されたクライアントが殺到しないようにするため、クライアントはこれらのイベントをリッスンするかどうかを選択することができます。クライアントがサーバーへの永続接続を維持することが前提となります。

リモートイベントのクライアントリスナーは、ライブラリーモードのクラスター化リスナーと同様に追加できます。以下は、受け取った各イベントを出力するリモートクライアントリスナーの例になります。

イベント出力リスナー

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class EventLogListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryExpired
   public void handleExpiredEvent(ClientCacheEntryExpiredEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}

  • ClientCacheEntryCreatedEvent および ClientCacheEntryModifiedEvent インスタンスは、キーおよびエントリーのバージョンに関する情報を提供します。このバージョンは、replaceWithVersionremoveWithVersion などのサーバー上の条件付き操作を呼び出すために使用されます。
  • 期限切れのエントリーで get() が呼び出された場合、またはエクスパレーションリーパーがエントリーの期限切れを検出した場合、ClientCacheEntryExpiredEvent イベントが送信されます。エントリーの期限が切れると、キャッシュがエントリーを null にし、サイズを適切に調整します。しかし、イベントは 2 つのシナリオでのみ生成されます。
  • ClientCacheEntryRemovedEvent イベントは、削除操作が成功した場合のみ送信されます。削除操作が呼び出された場合にエントリーが見つからなかったり、削除するエントリーがないと、イベントは生成されません。イベント削除が成功するかどうかに関わらずイベントの削除が必要な場合は、カスタマイズされたイベント論理を作成します。
  • すべてのクライアントキャッシュエントリーの作成、変更、および削除されたイベントは、トポロジーの変更により書き込みコマンドを再試行しなければならない場合に true を返す boolean isCommandRetried() メソッドを提供します。これは、イベントが複製されたか、別のイベントが破棄または置換された (作成されたイベントが変更されたイベントに置き換えられるなど) ことを意味します。
重要

想定されるワークロードで書き込みが読み取りよりも優先される場合、送信するイベントをフィルターし、クライアントやネットワークで問題となる可能性がある過剰なトラフィックが大量に生成されないようにします。

10.6.2. イベントリスナーの追加および削除

サーバーでのイベントリスナーの登録

以下の例は、サーバーでイベント出力リスナーを登録します。「イベント出力リスナー」を参照してください。

イベントリスナーの追加

RemoteCache<Integer, String> cache = rcm.getCache();
cache.addClientListener(new EventLogListener());

クライアントイベントリスナーの削除

クライアントイベントリスナーは以下のように削除できます。

EventLogListener listener = ...
cache.removeClientListener(listener);

10.6.3. リモートイベントクライアントリスナーの例

以下は、Hot Rod 経由でリモートキャッシュと対話するようにリモートクライアントリスナーを設定するために必要な手順になります。

リモートイベントリスナーの設定

  1. Red Hat カスタマーポータルから Red Hat JBoss Data Grid ディストリビューションをダウンロードします。

    最新の JBoss Data Grid ディストリビューションには、クライアントが通信する Hot Rod サーバーが含まれています。

  2. サーバーの起動

    JBoss Data Grid サーバーのルートから以下のコマンドを実行し、サーバーを起動します。

    $ ./bin/standalone.sh
  3. Hot Rod サーバーと対話するアプリケーションの作成

    1. Maven ユーザー

      以下の依存関係でアプリケーションを作成します。バージョンは 8.5.0.Final-redhat-9 以上に変更します。

      <properties>
        <infinispan.version>8.5.0.Final-redhat-9</infinispan.version>
      </properties>
      [...]
      <dependency>
        <groupId>org.infinispan</groupId>
        <artifactId>infinispan-remote</artifactId>
        <version>${infinispan.version}</version>
      </dependency>
    2. Maven 以外のユーザーは、使用するビルドツールに合わせて調整するか、すべての JBoss Data Grid jar が含まれるディストリビューションをダウンロードします。
  4. クライアントアプリケーションの作成

    以下は、受け取ったイベントをすべてログに記録する簡単なリモートイベントリスナーを示しています。

    import org.infinispan.client.hotrod.annotation.*;
    import org.infinispan.client.hotrod.event.*;
    
    @ClientListener
    public class EventLogListener {
    
     @ClientCacheEntryCreated
     @ClientCacheEntryModified
     @ClientCacheEntryRemoved
     public void handleRemoteEvent(ClientEvent event) {
       System.out.println(event);
      }
    
    }
  5. リモートイベントリスナーを使用したリモートキャッシュに対する操作の実行

    以下は、リモートイベントリスナーを追加し、リモートキャッシュに対して一部の操作を実行する簡単なメイン java クラスの例になります。

    RemoteCacheManager rcm = new RemoteCacheManager();
    RemoteCache<Integer, String> cache = rcm.getCache();
    EventLogListener listener = new EventLogListener();
    try {
     cache.addClientListener(listener);
     cache.put(1, "one");
     cache.put(1, "new-one");
     cache.remove(1);
    } finally {
     cache.removeClientListener(listener);
    }

結果

実行すると、以下と似たコンソール出力が表示されます。

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryRemovedEvent(key=1)

出力は、デフォルトでイベントにはキーと現在の値に関連するデータバージョンが含まれることを示しています。実際の値はパフォーマンス上の理由でクライアントへ返送されません。リモートイベントの受け取りはパフォーマンスに影響します。キャッシュサイズが大きくなるとより多くの操作が実行されるため、パフォーマンスへの影響も大きくなります。Hot Rod クライアントにイベントが大量に送られないようにするには、サーバー側でリモートイベントをフィルターするか、イベントコンテンツをカスタマイズします。

10.6.4. リモートイベントのフィルター

10.6.4.1. リモートイベントのフィルター

クライアントに大量のイベントが送信されないようにするため、Red Hat JBoss Data Grid の Hot Rod リモートイベントをフィルターすることができます。フィルターするには、クライアントに送信されるイベントやクライアント提供の情報でこれらのフィルターが動作する方法をフィルターするインスタンスを作成するキーバリューのフィルターファクトリーを提供します。

リモートクライアントへのイベント送信はパフォーマンスに影響します。リモートリスナーが登録されたクライアントの数が多くなると、パフォーマンスのコストも増加します。また、キャッシュに対して実行された変更の数が多くなると、パフォーマンスへの影響も大きくなります。

サーバー側で送信されるイベントをフィルターするとパフォーマンスのコストを削減できます。カスタムコードを使用して、一部のイベントがリモートクライアントへブロードキャストされないようにしてパフォーマンスを向上することができます。

キーや値の情報またはキャッシュエントリーメタデータの情報を基にしてフィルターを行うことができます。フィルターを有効にするには、フィルターインスタンスを作成するキャッシュイベントフィルターファクトリーを作成する必要があります。以下は、クライアントに送信されたイベントから キー「2」 をフィルターする簡単な実装になります。

KeyValueFilter

package sample;

import java.io.Serializable;
import org.infinispan.notifications.cachelistener.filter.*;
import org.infinispan.metadata.*;

@NamedFactory(name = "basic-filter-factory")
public class BasicKeyValueFilterFactory implements CacheEventFilterFactory {
  @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) {
    return new BasicKeyValueFilter();
  }

    static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable {
      @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
        return !"2".equals(key);
      }
    }
}

このキーバリューフィルターファクトリーでリスナーを登録するには、ファクトリーに一意な名前を付け、Hot Rod サーバーをその名前とキャッシュイベントフィルターファクトリーインスタンスでプラグする必要があります。

10.6.4.2. リモートイベントのカスタムフィルター

カスタムフィルターは、一部のイベント情報がリモートクライアントへブロードキャストされないようにしてパフォーマンスを向上できます。

カスタムフィルターで JBoss Data Grid サーバーをプラグするには、以下の手順を使用します。

カスタムフィルターの使用

  1. 内部にフィルター実装が含まれる JAR ファイルを作成します。org.infinispan.filter.NamedFactory アノテーションを使用して、各ファクトリーに名前を割り当てる必要があります。例は KeyValueFilterFactory を使用します。
  2. JAR ファイル内に META-INF/services/org.infinispan.notifications.cachelistener.filter. CacheEventFilterFactory ファイルを作成し、その内部にフィルタークラス実装の完全修飾クラス名を記述します。
  3. 以下のオプションの 1 つを実行し、 JBoss Data Grid サーバーに JAR ファイルをデプロイします。

    • オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ

      • JAR$JDG_HOME/standalone/deployments/ ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
    • オプション 2: CLI を用いた JAR のデプロイ

      • CLI で目的のインスタンスに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • 接続後、deploy コマンドを実行します。

        deploy /path/to/artifact.jar
    • オプション 3: JAR をカスタムモジュールとしてデプロイ

      • 以下のコマンドを実行して JDG サーバーに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • カスタムフィルターが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムフィルターに追加の依存関係が必要な場合は追加します。

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • 別のウインドウで、$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml を編集して、新規追加したモジュールを依存関係として org.infinispan モジュールに追加します。このファイルに以下のエントリーを追加します。

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • JDGを サーバーを再起動します。

サーバーがフィルターでプラグされたら、そのフィルターを使用するリモートクライアントリスナーを追加します。以下の例は、リモートイベントクライアントリスナーの例 (「リモートイベントクライアントリスナーの例」を参照) を拡張し、@ClientListener アノテーションをオーバーライドしてリスナーと使用するフィルターファクトリーを示します。

フィルターファクトリーのリスナーへの追加

@org.infinispan.client.hotrod.annotation.ClientListener(filterFactoryName = "basic-filter-factory")
public class BasicFilteredEventLogListener extends EventLogListener {}

リスナーは RemoteCacheAPI を使用して追加できるようになりました。以下の例はこれを実証し、一部の操作をリモートキャッシュに対して実行します。

サーバーでのリスナーの登録

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener();
try {
  cache.addClientListener(listener);
  cache.putIfAbsent(1, "one");
  cache.replace(1, "new-one");
  cache.putIfAbsent(2, "two");
  cache.replace(2, "new-two");
  cache.putIfAbsent(3, "three");
  cache.replace(3, "new-three");
  cache.remove(1);
  cache.remove(2);
  cache.remove(3);
} finally {
  cache.removeClientListener(listener);
}

システム出力は、フィルターされたものを除くすべてのキーのイベントをクライアントが受け取ることを示しています。

結果

以下は、例の結果となるシステム出力を示しています。

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryCreatedEvent(key=3,dataVersion=5)
ClientCacheEntryModifiedEvent(key=3,dataVersion=6)
ClientCacheEntryRemovedEvent(key=1)
ClientCacheEntryRemovedEvent(key=3)
重要

イベントがリスナーが登録されたノード以外で生成された場合でも、イベントが生成された場所でフィルターが実行されるようにするため、フィルターインスタンスはクラスターへのデプロイ時にマーシャル可能である必要があります。フィルターインスタンスをマーシャル可能にするには、フィルターインスタンスが Serializable や Externalizable を拡張するようにするか、カスタムエクスターナライザーを提供します。

10.6.4.3. 強化されたフィルターファクトリー

クライアントリスナーの追加時、パラメーターをフィルターファクトリーに提供して、クライアント側の情報を基に単一のフィルターファクトリーから挙動の異なるさまざまなフィルターを生成することができます。

以下の設定は、静的に提供されたキーでフィルターを行う代わりに、リスナーの追加時に提供されたキーを基に動的にフィルターを実行できるようにするためのフィルターファクトリーの強化方法を表しています。

強化されたフィルターファクトリーの設定

package sample;

import java.io.Serializable;
import org.infinispan.notifications.cachelistener.filter.*;
import org.infinispan.metadata.*;

@NamedFactory(name = "basic-filter-factory")
public class BasicKeyValueFilterFactory implements CacheEventFilterFactory {
  @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) {
    return new BasicKeyValueFilter(params);
}

  static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable {
    private final Object[] params;
    public BasicKeyValueFilter(Object[] params) { this.params = params; }
    @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return !params[0].equals(key);
    }
  }
}

このフィルターは「2」ではなく「3」でフィルターが実行されるようになりました。

強化されたフィルターファクトリーの実行

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener();
try {
  cache.addClientListener(listener, new Object[]{3}, null); // <- Filter parameter passed
  cache.putIfAbsent(1, "one");
  cache.replace(1, "new-one");
  cache.putIfAbsent(2, "two");
  cache.replace(2, "new-two");
  cache.putIfAbsent(3, "three");
  cache.replace(3, "new-three");
  cache.remove(1);
  cache.remove(2);
  cache.remove(3);
} finally {
  cache.removeClientListener(listener);
}

結果

例の結果として、以下が出力されます。

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryCreatedEvent(key=2,dataVersion=3)
ClientCacheEntryModifiedEvent(key=2,dataVersion=4)
ClientCacheEntryRemovedEvent(key=1)
ClientCacheEntryRemovedEvent(key=2)

リモートイベントをカスタマイズすると、クライアントに送信する情報量をさらに削減することができます。

10.6.5. リモートイベントのカスタマイズ

10.6.5.1. リモートイベントのカスタマイズ

Red Hat JBoss Data Grid では、Hot Rod リモートイベントをカスタマイズしてクライアントへ送信する必要がある情報が含まれるようにすることができます。デフォルトでは、クライアントが過負荷にならないようにし、情報送信のコストを削減するため、イベントにはキーやイベントタイプなどの基本的な情報のみが含まれます。

これらのイベントに含まれる情報をカスタマイズし、値などの追加情報が含まれるようにしたり、より少ない情報が含まれるようにすることができます。カスタマイズするには、CacheEventConverterFactory クラスを実装して作成される CacheEventConverter インスタンスを使用します。各ファクトリーには、@NamedFactory アノテーションを使用したこれに関連する名前が必要です。

Red Hat JBoss Data Grid サーバーをイベントコンバーターでプラグするには、以下の手順を使用します。

コンバーターの使用

  1. コンバーター実装が含まれる JAR ファイルを作成します。各ファクトリーには、org.infinispan.filter.NamedFactory アノテーションを使用したこれに関連する名前が必要です。
  2. JAR ファイル内に META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory ファイルを作成し、その内部にコンバータークラス実装の完全修飾クラス名を記述します。
  3. 以下のオプションの 1 つを実行し、Red Hat JBoss Data Grid サーバーに JAR ファイルをデプロイします。

    • オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ

      • JAR$JDG_HOME/standalone/deployments/ ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
    • オプション 2: CLI を用いた JAR のデプロイ

      • CLI で目的のインスタンスに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • 接続後、deploy コマンドを実行します。

        deploy /path/to/artifact.jar
    • オプション 3: JAR をカスタムモジュールとしてデプロイ

      • 以下のコマンドを実行して JDG サーバーに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • カスタムコンバーターが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムコンバーターに追加の依存関係が必要な場合は追加します。

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • 別のウインドウで、$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml を編集して、新規追加したモジュールを依存関係として org.infinispan モジュールに追加します。このファイルに以下のエントリーを追加します。

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • JDGを サーバーを再起動します。

コンバーターはクライアントが提供する情報にも対応でき、コンバーターインスタンスはリスナーの追加時に提供された情報を基にイベントをカスタマイズできます。API は、リスナーの追加時にコンバーターパラメーターが渡されるようにします。

10.6.5.2. コンバーターの追加

リスナーの追加時、リスナーと使用するコンバーターファクトリーの名前が提供されます。リスナーが追加されると、サーバーはファクトリーを検索し、サーバー側のイベントをカスタマイズするために getConverter メソッドを呼び出して org.infinispan.filter.Converter クラスインスタンスを取得します。

以下の例は、Integer (整数) と String (文字列) のキャッシュに対して値の情報が含まれるカスタムイベントをリモートクライアントに送信します。コンバーターは、値とイベントのキーが含まれる新しいカスタムイベントを生成します。カスタムイベントのイベントペイロードはデフォルトのイベントよりも大きくなりますが、フィルターを組み合わせると帯域幅のコストを削減できます。

カスタムイベントの送信

import org.infinispan.notifications.cachelistener.filter.*;

@NamedFactory(name = "value-added-converter-factory")
class ValueAddedConverterFactory implements CacheEventConverterFactory {
  // The following types correspond to the Key, Value, and the returned Event, respectively.
  public CacheEventConverter<Integer, String, ValueAddedEvent> getConverter(final Object[] params) {
    return new ValueAddedConverter();
  }

  static class ValueAddedConverter implements CacheEventConverter<Integer, String, ValueAddedEvent> {
    public ValueAddedEvent convert(Integer key, String oldValue,
                                   Metadata oldMetadata, String newValue,
                                   Metadata newMetadata, EventType eventType) {
      return new ValueAddedEvent(key, newValue);
    }
  }
}

// Must be Serializable or Externalizable.
class ValueAddedEvent implements Serializable {
    final Integer key;
    final String value;
    ValueAddedEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
    }
}

10.6.5.3. ライトウェイトイベント

他のコンバーター実装は、キーやイベントタイプ情報が含まれないイベントを返送できます。そのため、イベントが大幅に軽量化されますが、イベントによって詳細情報は提供されません。

このコンバーターでサーバーをプラグするには、コンバーターファクトリーと関連するコンバータークラスを JAR ファイル内にデプロイし、以下のように META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory ファイル内のサービス定義が含まれるようにします。

sample.ValueAddedConverterFactor

この後、ファクトリー名を @ClientListener アノテーションに追加して、クライアントリスナーをコンバーターファクトリーとリンクする必要があります。

@ClientListener(converterFactoryName = "value-added-converter-factory")
public class CustomEventLogListener { ... }

10.6.5.4. 動的なコンバーターインスタンス

動的なコンバーターインスタンスは、リスナーの登録時に提供されたパラメーターを基にして変換を行います。コンバーターはコンバーターファクトリーによって受信されたパラメーターを使用してこのオプションを有効にします。例は以下のとおりです。

動的なコンバーター

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   // The following types correspond to the Key, Value, and the returned Event, respectively.
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata metadata, String newValue, Metadata prevMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new ValueAddedEvent(key, null);

      return new ValueAddedEvent(key, newValue);
   }
}

変換の実行に必要な動的パラメーターはリスナーの登録時に提供されます。

RemoteCache<Integer, String> cache = rcm.getCache();
cache.addClientListener(new EventLogListener(), null, new Object[]{1});

10.6.5.5. カスタムイベントのリモートクライアントリスナーの追加

カスタムイベントのリスナーの実装は、デフォルトでないイベントが関係するため、他のリモートイベントの場合とは若干異なります。他のリモートクライアントリスナー実装と同じアノテーションが使用されますが、コールバックは ClientCacheEntryCustomEvent<T> のインスタンスを受信します (T はサーバーから送信するカスタムイベントのタイプになります)。例を以下に示します。

カスタムイベントリスナー実装

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "value-added-converter-factory")
public class CustomEventLogListener {

    @ClientCacheEntryCreated
    @ClientCacheEntryModified
    @ClientCacheEntryRemoved
    public void handleRemoteEvent(ClientCacheEntryCustomEvent<ValueAddedEvent> event)
    {
        System.out.println(event);
    }
}

リモートイベントリスナーを使用してリモートキャッシュに対して操作を実行するには、リモートイベントリスナーを追加してリモートキャッシュに対して操作を実行する簡単なメイン Java クラスを作成します。例を以下に示します。

リモートキャッシュに対する操作の実行

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
CustomEventLogListener listener = new CustomEventLogListener();
try {
  cache.addClientListener(listener);
  cache.put(1, "one");
  cache.put(1, "new-one");
  cache.remove(1);
} finally {
  cache.removeClientListener(listener);
}

結果

実行すると、以下と似たコンソール出力が表示されます。

ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='one'}, eventType=CLIENT_CACHE_ENTRY_CREATED)
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='new-one'}, eventType=CLIENT_CACHE_ENTRY_MODIFIED)
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='null'}, eventType=CLIENT_CACHE_ENTRY_REMOVED
重要

リスナーが登録されたノードではない別のノードでイベントが生成された場合でも、イベントが生成された場所で変換が発生するようにするには、コンバーターインスタンスがクラスターにデプロイされたときにマーシャル可能である必要があります。コンバーターインスタンスをマーシャル可能にするには、Serializable または Externalizable を拡張するようにするか、カスタムの Externalizer を提供します。タイプセーフな API に対して記述するサーバーとクライアントの両方を円滑にするため、クライアントとサーバーの両方がカスタムイベントタイプを認識し、マーシャルできる必要があります。

10.6.6. イベントマーシャリング

イベントをフィルターまたはカスタマイズする際、KeyValueFilter および Converter インスタンスはマーシャル可能でなければなりません。クライアントリスナーがクラスターにインストールされると、イベントの送信元でフィルターと変換が行われるようにするため、フィルターインスタンスとコンバーターインスタンスはクラスターの他のノードに送信されます。これにより効率がよくなります。これらのクラスをマーシャル可能にするには、これらのクラスが Serializable を拡張するようにするか、カスタム Externalizer を提供および登録します。

Marshaller インスタンスをサーバー側にデプロイするには、フィルターおよびカスタマイズされたイベントに使用されるメソッドに似たメソッドを使用します。

マーシャラーのデプロイ

  1. コンバーター実装が含まれる JAR ファイルを作成します。各ファクトリーには、org.infinispan.filter.NamedFactory アノテーションを使用したこれに関連する名前が必要です。
  2. JAR ファイル内に META-INF/services/org.infinispan.commons.marshall.Marshaller ファイルを作成し、その内部にマーシャラークラス実装の完全修飾クラス名を記述します。
  3. 以下のオプションの 1 つを実行し、Red Hat JBoss Data Grid に JAR ファイルをデプロイします。

    • オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ

      • JAR$JDG_HOME/standalone/deployments/ ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
    • オプション 2: CLI を用いた JAR のデプロイ

      • CLI で目的のインスタンスに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • 接続後、deploy コマンドを実行します。

        deploy /path/to/artifact.jar
    • オプション 3: JAR をカスタムモジュールとしてデプロイ

      • 以下のコマンドを実行して JDG サーバーに接続します。

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • カスタムマーシャラーが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムマーシャラーに追加の依存関係が必要な場合は追加します。

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • 別のウインドウで、$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml を編集して、新規追加したモジュールを依存関係として org.infinispan モジュールに追加します。このファイルに以下のエントリーを追加します。

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • JDGを サーバーを再起動します。

マーシャラーは、個別の jar または CacheEventConverter や CacheEventFilter インスタンスと同じ jar にデプロイすることができます。

注記

1 つの Marshaller インスタンスのデプロイメントのみがサポートされます。複数の Marshaller インスタンスがデプロイされると、警告メッセージが表示され、使用される Marshaller が示されます。

10.6.7. リモートイベントクラスタリングおよびフェイルオーバー

クライアントがリモートリスナーを追加するとき、クラスターの単一のノードにインストールされます。このノードは、クラスター全体で発生する影響受ける操作すべてに対してイベントをクライアントに返送します。

クラスター化された環境では、リスナーが含まれるノードがダウンした場合、Hot Rod クライアント実装が透過的にクライアントリスナーの登録を別のノードにフェイルオーバーします。これにより、イベントの消費にギャップが生じることがありますが、以下の方法の 1 つを使用すると解決できます。

状態の配信

@ClientListener アノテーションにはオプションの includeCurrentState パラメーターがあります。このパラメーターを有効にすると、サーバーは既存のキャッシュエントリーすべての CacheEntryCreatedEvent イベントインスタンスをクライアントに送信します。この動作は、リスナーが登録されたノードがオフラインになったときに検出されるクライアントによって決定され、自動的にリスナーをクラスターの別のノードに登録します。includeCurrentState を有効にすると、Hot Rod クライアントが登録されたリスナーを透過的にフェイルオーバーする場合にクライアントの状態や計算を再算出できます。includeCurrentState パラメーターのパフォーマンスは、キャッシュの大きさに影響されるため、デフォルトでは無効になっています。

@ClientCacheFailover

受信状態に依存する代わりに、@ClientCacheFailover アノテーションを使用して、クライアントリスナー実装内で ClientCacheFailoverEvent パラメーターを受信するメソッドを定義できます。Hot Rod クライアントが登録されたノードでクライアントリスナーに障害が発生すると、Hot Rod クライアントによって障害が透過的に検出され、障害が発生したノードに登録されたすべてのリスナーは別のノードにフェイルオーバーされます。

フェイルオーバーの間にクライアントが一部のイベントを見逃すことがあります。イベントを見逃さないようにするには、includeCurrentState パラメーターを true に設定します。これを有効にすると、クライアントはデータを消去でき、 CacheEntryCreatedEvent インスタンスすべてを受信し、すべてのキーでこれらのイベントをキャッシュできます。この代わりに、コールバックハンドラーを追加して、Hot Rod クライアントがフェイルオーバーイベントを認識できるようにすることができます。このコールバックメソッドは、クライアントリスナーに影響するクラスタートポロジーの変更に対応する効率的な方法で、クライアントリスナーはフェイルオーバーでどのように動作するかを判断することができます。ニアキャッシュはこの方法を利用し、ClientCacheFailoverEvent の受信後にニアキャッシュは消去されます。

@ClientCacheFailover

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class EventLogListener {
// ...

    @ClientCacheFailover
    public void handleFailover(ClientCacheFailoverEvent e) {
      // Deal with client failover, e.g. clear a near cache.
    }
}

注記

ClientCacheFailoverEvent は、クライアントリスナーがインストールされたノードに障害が発生した場合のみ発生します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.