3.6. イベントリスナーの作成


Java Hot Rod クライアントは、リスナーを登録して cache-entry レベルのイベントを受信することができます。キャッシュエントリーの作成、変更、および削除されたイベントがサポートされています。

クライアントリスナーの作成は、異なるアノテーションとイベントクラスが使用される点を除き、組み込みリスナーと非常に似ています。受信した各イベントを出力するクライアントリスナーの例を次に示します。

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "static-converter")
public class EventPrintListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}

ClientCacheEntryCreatedEvent および ClientCacheEntryModifiedEvent インスタンスは、影響を受けるキーとエントリーのバージョンに関する情報を提供します。このバージョンは、replaceWithVersionremoveWithVersion などのサーバー上で条件付き操作を呼び出すために使用できます。

ClientCacheEntryRemovedEvent イベントは、削除操作が成功した場合にのみ送信されます。つまり、削除オペレーションが呼び出されても、エントリーが見つからないか、エントリーを削除する必要がない場合は、イベントが生成されません。エントリーが削除されていなくても、削除されたイベントに関心のあるユーザーは、このようなイベントを生成するイベントのカスタマイズロジックを開発できます。詳細は、クライアントイベントのカスタマイズ セクションを参照してください。

ClientCacheEntryCreatedEventClientCacheEntryModifiedEvent、および ClientCacheEntryRemovedEvent のすべてのイベントインスタンスも、トポロジーの変更が原因でこれを引き起こした書き込みコマンドを再度実行する場合に true を返す boolean isCommandRetried() メソッドも提供します。これは、このイベントが重複したか、別のイベントが破棄されて置き換えられた記号になります (例: ClientCacheEntryModifiedEvent が ClientCacheEntryCreatedEvent に置き換えます)。

クライアントリスナーの実装を作成したら、サーバーに登録する必要があります。これを行うには、以下を実行します。

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener());

3.6.1. イベントリスナーの削除

クライアントイベントリスナーが必要ない場合は、以下を削除できます。

EventPrintListener listener = ...
cache.removeClientListener(listener);

3.6.2. イベントのフィルタリング

クライアントにイベントが殺到するのを防ぐために、ユーザーはフィルタリング機能を提供して、特定のクライアントリスナーに対し、サーバーによって発生するイベントの数を制限できます。フィルターを有効にするには、フィルターインスタンスを生成するキャッシュイベントフィルターファクトリーを作成する必要があります。

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-filter")
public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory {

   @Override
   public StaticCacheEventFilter getFilter(Object[] params) {
      return new StaticCacheEventFilter();
   }
}


// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(1)) // static key
         return true;

      return false;
   }
}

上記で定義されたキャッシュイベントフィルターファクトリーインスタンスは、キーが 1 であるものを除くすべてのエントリーを静的にフィルターするインスタンスを作成します。

このキャッシュイベントフィルターファクトリーにリスナーを登録できるようにするには、ファクトリーに一意の名前を付ける必要があり、Hot Rod サーバーに名前とキャッシュイベントフィルターファクトリーインスタンスを接続する必要があります。

  1. フィルターの実装を含む JAR ファイルを作成します。

    キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで useRawData が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。

  2. JAR ファイル内で META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory ファイルを作成し、フィルタークラス実装の完全修飾クラス名を作成します。
  3. JAR ファイルを Data Grid サーバーのインストールディレクトリーの server/lib ディレクトリーに追加します。
  4. ファクトリー名を @ClientListener アノテーションに追加して、クライアントリスナーをこのキャッシュイベントフィルターファクトリーとリンクします。

    @ClientListener(filterFactoryName = "static-filter")
    public class EventPrintListener { ... }
  5. リスナーをサーバーに登録します。

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new EventPrintListener());

リスナーの登録時に提供されたパラメーターに基づいてフィルタリングする動的フィルターインスタンスを登録することもできます。フィルターは、フィルターファクトリーが受け取ったパラメーターを使用してこのオプションを有効にします。以下に例を示します。

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;

class DynamicCacheEventFilterFactory implements CacheEventFilterFactory {
   @Override
   public CacheEventFilter<Integer, String> getFilter(Object[] params) {
      return new DynamicCacheEventFilter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class DynamicCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   final Object[] params;

   DynamicCacheEventFilter(Object[] params) {
      this.params = params;
   }

   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(params[0])) // dynamic key
         return true;

      return false;
   }
}

リスナーの登録時に、フィルタリングに必要な動的パラメーターが提供されます。

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), new Object[]{1}, null);
警告

フィルターインスタンスは、クラスターにデプロイされるときにマーシャル可能である必要があります。これにより、リスナーの登録先の別のノードで生成された場合でも、イベントが生成される際にフィルターが適切に行われるようにします。それらをマーシャリング可能にするには、SerializableExternalizable を拡張するか、カスタム Externalizer を提供します。

3.6.3. 通知のスキップ

サーバーからイベント通知を取得しなくても、リモート API メソッドを呼び出してオペレーションを実行する際に、SKIP_LISTENER_NOTIFICATION フラグを含めます。たとえば、値を作成または変更する際のリスナーの通知を防ぐには、フラグを以下のように設定します。

remoteCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(1, "one");

3.6.4. イベントのカスタマイズ

デフォルトで生成されるイベントには、イベントを関連させるのに十分な情報が含まれていますが、送信コストを削減するために情報を詰め込みすぎないようにしています。必要に応じて、イベントに含まれる情報は、値などの詳細情報を含むか、より少ない情報を含めるためにカスタム化できます。このカスタマイズは、CacheEventConverterFactory によって生成された CacheEventConverter インスタンスを使用して行われます。

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-converter")
class StaticConverterFactory implements CacheEventConverterFactory {
   final CacheEventConverter<Integer, String, CustomEvent> staticConverter = new StaticCacheEventConverter();
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return staticConverter;
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return new CustomEvent(key, newValue);
   }
}

// Needs to be Serializable, Externalizable or marshallable with Infinispan Externalizers
// regardless of cluster or local caches
static class CustomEvent implements Serializable {
   final Integer key;
   final String value;
   CustomEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
   }
}

上記の例では、コンバーターは、イベント内の値とキーを含む新しいカスタムイベントを生成します。これにより、デフォルトのイベントと比べて大量のイベントペイロードが発生しますが、フィルターと組み合わせると、ネットワークの帯域幅コストを減らすことができます。

警告

コンバーターのターゲットタイプは、Serializable または Externalizable のいずれかである必要があります。この特定のコンバーターの場合、デフォルトの Hot Rod クライアントマーシャラーではサポートされないため、デフォルトで Externalizer を提供しません。

カスタムイベントの処理には、以前に実証されたものに対して、若干異なるクライアントリスナーの実装が必要になります。より正確な状態に戻すには、ClientCacheEntryCustomEvent インスタンスを処理する必要があります。

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class CustomEventPrintListener {

   @ClientCacheEntryCreated
   @ClientCacheEntryModified
   @ClientCacheEntryRemoved
   public void handleCustomEvent(ClientCacheEntryCustomEvent<CustomEvent> e) {
      System.out.println(e);
   }

}

コールバックで受け取った ClientCacheEntryCustomEvent は、getEventData メソッドを介してカスタムイベントを公開し、getType メソッドは、生成されたイベントがキャッシュエントリーの作成、変更、または削除の結果であったかどうかに関する情報を提供します。

フィルタリングと同様に、リスナーをこのコンバーターファクトリーに登録できるようにするには、ファクトリーに一意の名前を付ける必要があり、Hot Rod サーバーに名前とキャッシュイベントコンバーターファクトリーインスタンスを接続する必要があります。

  1. コンバーターの実装を含む JAR ファイルを作成します。

    キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで useRawData が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。

  2. JAR ファイル内で META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory ファイルを作成し、コンバータークラス実装の完全修飾クラス名を作成します。
  3. JAR ファイルを Data Grid サーバーのインストールディレクトリーの server/lib ディレクトリーに追加します。
  4. ファクトリー名を @ClientListener アノテーションに追加して、クライアントリスナーをこのコンバーターファクトリーとリンクします。

    @ClientListener(converterFactoryName = "static-converter")
    public class CustomEventPrintListener { ... }
  5. リスナーをサーバーに登録します。

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new CustomEventPrintListener());

リスナーの登録時に提供されたパラメーターを基に変換する動的コンバーターインスタンスもあります。コンバーターは、コンバーターファクトリーによって受け取ったパラメーターを使用してこのオプションを有効にします。以下に例を示します。

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-converter")
class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}

変換を行うために必要な動的パラメーターは、リスナーが登録されたときに提供されます。

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), null, new Object[]{1});
警告

コンバーターインスタンスは、クラスターにデプロイされるときにマーシャル可能である必要があります。これにより、リスナーの登録先の別のノードで生成された場合でも、イベントが生成される際に変換が適切に行われるようにします。それらをマーシャリング可能にするには、SerializableExternalizable を拡張するか、カスタム Externalizer を提供します。

3.6.5. フィルターとカスタムイベント

イベントのフィルタリングとカスタマイズの両方を実行する場合、org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter を実装する方が簡単です。これにより、1 つのステップでフィルターとカスタマイズの両方を実行できます。便宜上、org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter を直接実装するのではなく、org.infinispan.notifications.cachelistener.filter.AbstractCacheEventFilterConverter を拡張することが推奨されます。以下に例を示します。

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-filter-converter")
class DynamicCacheEventFilterConverterFactory implements CacheEventFilterConverterFactory {
   public CacheEventFilterConverter<Integer, String, CustomEvent> getFilterConverter(final Object[] params) {
      return new DynamicCacheEventFilterConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
//
class DynamicCacheEventFilterConverter extends AbstractCacheEventFilterConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventFilterConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent filterAndConvert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}

フィルターとコンバーターと同様に、この組み合わせたフィルター/変換ファクトリーでリスナーを登録するには、ファクトリーに @NamedFactory アノテーション経由で一意の名前が付与される必要があります。また、Hot Rod サーバーは名前とキャッシュイベントコンバーターファクトリーインスタンスと共にプラグインする必要があります。

  1. コンバーターの実装を含む JAR ファイルを作成します。

    キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで useRawData が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。

  2. JAR ファイル内で META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory ファイルを作成し、コンバータークラス実装の完全修飾クラス名を作成します。
  3. JAR ファイルを Data Grid サーバーのインストールディレクトリーの server/lib ディレクトリーに追加します。

クライアントの観点からすると、組み合わせたフィルターとコンバータークラスを使用できるようにするには、クライアントリスナーは同じフィルターファクトリーとコンバーターファクトリー名を定義する必要があります。以下に例を示します。

@ClientListener(filterFactoryName = "dynamic-filter-converter", converterFactoryName = "dynamic-filter-converter")
public class CustomEventPrintListener { ... }

上記の例で必要な動的パラメーターは、リスナーが filter パラメーターまたは converter パラメーターのいずれかに登録されている場合に提供されます。フィルターパラメーターが空でない場合は、それらが使用されます。それ以外の場合は、コンバーターパラメーターは以下のようになります。

RemoteCache<?, ?> cache = ...
cache.addClientListener(new CustomEventPrintListener(), new Object[]{1}, null);

3.6.6. イベントマーシャリング

Hot Rod サーバーは、異なる形式でデータを保存できますが、Java Hot Rod クライアントユーザーは、タイプ化されたオブジェクトで機能する CacheEventConverter インスタンスまたは CacheEventFilter インスタンスを開発できます。デフォルトでは、フィルターとコンバーターは、データを POJO (application/x-java-object) として使用しますが、filter/converter からメソッド format() を上書きすることで、希望の形式を上書きすることができます。形式が null を返す場合、フィルター/変換は保存時にデータを受け取ります。

Hot Rod Java クライアントは、異なる org.infinispan.commons.marshall.Marshaller インスタンスを使用するように設定できます。これ CacheEventConverter インスタンスや CacheEventFilter インスタンスをデプロイしたり、コンテンツをマーシャリングしたりするのではなく、Java オブジェクトでフィルター/変換できるようにするには、サーバーはオブジェクトとマーシャラーで生成されるバイナリー形式の間で変換できるようにする必要があります。

Marshaller インスタンスのサーバー側のデプロイするには、CacheEventConverter インスタンスまたは CacheEventFilter インスタンスをデプロイするのに使用されるものと同様の方法に従います。

  1. コンバーターの実装を含む JAR ファイルを作成します。
  2. JAR ファイル内で META-INF/services/org.infinispan.commons.marshall.Marshaller ファイルを作成し、フィルタークラス実装の完全修飾クラス名を作成します。
  3. JAR ファイルを Data Grid サーバーのインストールディレクトリーの server/lib ディレクトリーに追加します。

Marshaller は個別の jar または CacheEventConverter インスタンスおよび/または CacheEventFilter インスタンスと同じ jar にデプロイできることに注意してください。

3.6.6.1. Protostream マーシャラーのデプロイ

キャッシュが Protobuf コンテンツを保存する場合は、Hot Rod クライアントで ProtoStream マーシャラーを使用する際に発生する場合は、サーバーで形式がすでにサポートされているため、カスタムマーシャラーをデプロイする必要はありません。Protobuf 形式から JSON や POJO などの最も一般的な形式に変換するトランスコーダーがあります。

これらのキャッシュでフィルター/変換を使用し、バイナリー Protobuf データではなく Java オブジェクトでフィルター/変換を使用すると、追加の ProtoStream マーシャラーを設定し、フィルタリング/変換の前にデータをアンマーシャリングできるようにする必要があります。これには、Data Grid サーバー設定の一部として、必要な SerializationContextInitializer(s) を設定する必要があります。

詳細は、Cache Encoding and Marshalling を参照してください。

3.6.7. リスナーの状態の処理

クライアントリスナーアノテーションには、リスナーの追加時またはリスナーのフェイルオーバー時に状態がクライアントに送信されるかどうかを指定する任意の includeCurrentState 属性があります。

デフォルトでは、includeCurrentState は false ですが、true に設定され、クライアントリスナーがデータが含まれるキャッシュに追加されると、サーバーはキャッシュの内容を繰り返し処理し、各エントリーのイベント (設定されている場合はカスタムイベント) を ClientCacheEntryCreated としてクライアントに送信します。これにより、クライアントは既存のコンテンツに基づいて一部のローカルデータ構造をビルドできます。コンテンツが繰り返されると、キャッシュの更新を受け取るため、イベントは通常どおり受信されます。キャッシュがクラスター化されている場合、クラスター全体のコンテンツ全体が繰り返し処理されます。

3.6.8. リスナーの障害処理

Hot Rod クライアントがクライアントリスナーを登録すると、クラスターの単一ノードになります。そのノードに障害が発生すると、Java Hot Rod クライアントは透過的に検出し、別のノードに失敗したノードに登録されているすべてのリスナーをフェイルオーバーします。

このフェイルオーバー中に、クライアントはいくつかのイベントを見逃す可能性があります。これらのイベントが不足しないように、クライアントリスナーアノテーションには includeCurrentState という任意のパラメーターが含まれます。これは、フェイルオーバーの実行時に、キャッシュの内容が繰り返し処理され、ClientCacheEntryCreated イベント (設定されている場合はカスタムイベント) が生成されます。デフォルトでは、includeCurrentState は false に設定されています。

コールバックを使用してフェイルオーバーイベントを処理します。

@ClientCacheFailover
public void handleFailover(ClientCacheFailoverEvent e) {
  ...
}

これは、クライアントがいくつかのデータをキャッシュしており、フェイルオーバーの結果、いくつかのイベントが見逃される可能性を考慮して、フェイルオーバーイベントを受信したときにローカルにキャッシュされたデータを消去することを決定し、フェイルオーバーイベントの後にキャッシュ全体のコンテンツに対するイベントを受信することを知っているような場合に非常に便利です。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.