3.6. イベントリスナーの作成
Java Hot Rod クライアントは、リスナーを登録して cache-entry レベルのイベントを受信することができます。キャッシュエントリーの作成、変更、および削除されたイベントがサポートされています。
クライアントリスナーの作成は、異なるアノテーションとイベントクラスが使用される点を除き、組み込みリスナーと非常に似ています。受信した各イベントを出力するクライアントリスナーの例を次に示します。
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener(converterFactoryName = "static-converter") public class EventPrintListener { @ClientCacheEntryCreated public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) { System.out.println(e); } @ClientCacheEntryModified public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) { System.out.println(e); } @ClientCacheEntryRemoved public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) { System.out.println(e); } }
ClientCacheEntryCreatedEvent
および ClientCacheEntryModifiedEvent
インスタンスは、影響を受けるキーとエントリーのバージョンに関する情報を提供します。このバージョンは、replaceWithVersion
、removeWithVersion
などのサーバー上で条件付き操作を呼び出すために使用できます。
ClientCacheEntryRemovedEvent
イベントは、削除操作が成功した場合にのみ送信されます。つまり、削除オペレーションが呼び出されても、エントリーが見つからないか、エントリーを削除する必要がない場合は、イベントが生成されません。エントリーが削除されていなくても、削除されたイベントに関心のあるユーザーは、このようなイベントを生成するイベントのカスタマイズロジックを開発できます。詳細は、クライアントイベントのカスタマイズ セクションを参照してください。
ClientCacheEntryCreatedEvent
、ClientCacheEntryModifiedEvent
、および ClientCacheEntryRemovedEvent
のすべてのイベントインスタンスも、トポロジーの変更が原因でこれを引き起こした書き込みコマンドを再度実行する場合に true を返す boolean isCommandRetried()
メソッドも提供します。これは、このイベントが重複したか、別のイベントが破棄されて置き換えられた記号になります (例: ClientCacheEntryModifiedEvent が ClientCacheEntryCreatedEvent に置き換えます)。
クライアントリスナーの実装を作成したら、サーバーに登録する必要があります。これを行うには、以下を実行します。
RemoteCache<?, ?> cache = ... cache.addClientListener(new EventPrintListener());
3.6.1. イベントリスナーの削除
クライアントイベントリスナーが必要ない場合は、以下を削除できます。
EventPrintListener listener = ... cache.removeClientListener(listener);
3.6.2. イベントのフィルタリング
クライアントにイベントが殺到するのを防ぐために、ユーザーはフィルタリング機能を提供して、特定のクライアントリスナーに対し、サーバーによって発生するイベントの数を制限できます。フィルターを有効にするには、フィルターインスタンスを生成するキャッシュイベントフィルターファクトリーを作成する必要があります。
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory; import org.infinispan.filter.NamedFactory; @NamedFactory(name = "static-filter") public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory { @Override public StaticCacheEventFilter getFilter(Object[] params) { return new StaticCacheEventFilter(); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers // needed when running in a cluster class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable { @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { if (key.equals(1)) // static key return true; return false; } }
上記で定義されたキャッシュイベントフィルターファクトリーインスタンスは、キーが 1
であるものを除くすべてのエントリーを静的にフィルターするインスタンスを作成します。
このキャッシュイベントフィルターファクトリーにリスナーを登録できるようにするには、ファクトリーに一意の名前を付ける必要があり、Hot Rod サーバーに名前とキャッシュイベントフィルターファクトリーインスタンスを接続する必要があります。
フィルターの実装を含む JAR ファイルを作成します。
キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで
useRawData
が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。-
JAR ファイル内で
META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory
ファイルを作成し、フィルタークラス実装の完全修飾クラス名を作成します。 -
JAR ファイルを Data Grid サーバーのインストールディレクトリーの
server/lib
ディレクトリーに追加します。 ファクトリー名を
@ClientListener
アノテーションに追加して、クライアントリスナーをこのキャッシュイベントフィルターファクトリーとリンクします。@ClientListener(filterFactoryName = "static-filter") public class EventPrintListener { ... }
リスナーをサーバーに登録します。
RemoteCache<?, ?> cache = ... cache.addClientListener(new EventPrintListener());
リスナーの登録時に提供されたパラメーターに基づいてフィルタリングする動的フィルターインスタンスを登録することもできます。フィルターは、フィルターファクトリーが受け取ったパラメーターを使用してこのオプションを有効にします。以下に例を示します。
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventFilter; class DynamicCacheEventFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(Object[] params) { return new DynamicCacheEventFilter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers // needed when running in a cluster class DynamicCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable { final Object[] params; DynamicCacheEventFilter(Object[] params) { this.params = params; } @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { if (key.equals(params[0])) // dynamic key return true; return false; } }
リスナーの登録時に、フィルタリングに必要な動的パラメーターが提供されます。
RemoteCache<?, ?> cache = ... cache.addClientListener(new EventPrintListener(), new Object[]{1}, null);
フィルターインスタンスは、クラスターにデプロイされるときにマーシャル可能である必要があります。これにより、リスナーの登録先の別のノードで生成された場合でも、イベントが生成される際にフィルターが適切に行われるようにします。それらをマーシャリング可能にするには、Serializable
、Externalizable
を拡張するか、カスタム Externalizer
を提供します。
3.6.3. 通知のスキップ
サーバーからイベント通知を取得しなくても、リモート API メソッドを呼び出してオペレーションを実行する際に、SKIP_LISTENER_NOTIFICATION
フラグを含めます。たとえば、値を作成または変更する際のリスナーの通知を防ぐには、フラグを以下のように設定します。
remoteCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(1, "one");
3.6.4. イベントのカスタマイズ
デフォルトで生成されるイベントには、イベントを関連させるのに十分な情報が含まれていますが、送信コストを削減するために情報を詰め込みすぎないようにしています。必要に応じて、イベントに含まれる情報は、値などの詳細情報を含むか、より少ない情報を含めるためにカスタム化できます。このカスタマイズは、CacheEventConverterFactory
によって生成された CacheEventConverter
インスタンスを使用して行われます。
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; import org.infinispan.filter.NamedFactory; @NamedFactory(name = "static-converter") class StaticConverterFactory implements CacheEventConverterFactory { final CacheEventConverter<Integer, String, CustomEvent> staticConverter = new StaticCacheEventConverter(); public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) { return staticConverter; } } // Serializable, Externalizable or marshallable with Infinispan Externalizers // needed when running in a cluster class StaticCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable { public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return new CustomEvent(key, newValue); } } // Needs to be Serializable, Externalizable or marshallable with Infinispan Externalizers // regardless of cluster or local caches static class CustomEvent implements Serializable { final Integer key; final String value; CustomEvent(Integer key, String value) { this.key = key; this.value = value; } }
上記の例では、コンバーターは、イベント内の値とキーを含む新しいカスタムイベントを生成します。これにより、デフォルトのイベントと比べて大量のイベントペイロードが発生しますが、フィルターと組み合わせると、ネットワークの帯域幅コストを減らすことができます。
コンバーターのターゲットタイプは、Serializable
または Externalizable
のいずれかである必要があります。この特定のコンバーターの場合、デフォルトの Hot Rod クライアントマーシャラーではサポートされないため、デフォルトで Externalizer を提供しません。
カスタムイベントの処理には、以前に実証されたものに対して、若干異なるクライアントリスナーの実装が必要になります。より正確な状態に戻すには、ClientCacheEntryCustomEvent
インスタンスを処理する必要があります。
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class CustomEventPrintListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleCustomEvent(ClientCacheEntryCustomEvent<CustomEvent> e) { System.out.println(e); } }
コールバックで受け取った ClientCacheEntryCustomEvent
は、getEventData
メソッドを介してカスタムイベントを公開し、getType
メソッドは、生成されたイベントがキャッシュエントリーの作成、変更、または削除の結果であったかどうかに関する情報を提供します。
フィルタリングと同様に、リスナーをこのコンバーターファクトリーに登録できるようにするには、ファクトリーに一意の名前を付ける必要があり、Hot Rod サーバーに名前とキャッシュイベントコンバーターファクトリーインスタンスを接続する必要があります。
コンバーターの実装を含む JAR ファイルを作成します。
キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで
useRawData
が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。-
JAR ファイル内で
META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory
ファイルを作成し、コンバータークラス実装の完全修飾クラス名を作成します。 -
JAR ファイルを Data Grid サーバーのインストールディレクトリーの
server/lib
ディレクトリーに追加します。 ファクトリー名を
@ClientListener
アノテーションに追加して、クライアントリスナーをこのコンバーターファクトリーとリンクします。@ClientListener(converterFactoryName = "static-converter") public class CustomEventPrintListener { ... }
リスナーをサーバーに登録します。
RemoteCache<?, ?> cache = ... cache.addClientListener(new CustomEventPrintListener());
リスナーの登録時に提供されたパラメーターを基に変換する動的コンバーターインスタンスもあります。コンバーターは、コンバーターファクトリーによって受け取ったパラメーターを使用してこのオプションを有効にします。以下に例を示します。
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; @NamedFactory(name = "dynamic-converter") class DynamicCacheEventConverterFactory implements CacheEventConverterFactory { public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) { return new DynamicCacheEventConverter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable { final Object[] params; DynamicCacheEventConverter(Object[] params) { this.params = params; } public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { // If the key matches a key given via parameter, only send the key information if (params[0].equals(key)) return new CustomEvent(key, null); return new CustomEvent(key, newValue); } }
変換を行うために必要な動的パラメーターは、リスナーが登録されたときに提供されます。
RemoteCache<?, ?> cache = ... cache.addClientListener(new EventPrintListener(), null, new Object[]{1});
コンバーターインスタンスは、クラスターにデプロイされるときにマーシャル可能である必要があります。これにより、リスナーの登録先の別のノードで生成された場合でも、イベントが生成される際に変換が適切に行われるようにします。それらをマーシャリング可能にするには、Serializable
、Externalizable
を拡張するか、カスタム Externalizer
を提供します。
3.6.5. フィルターとカスタムイベント
イベントのフィルタリングとカスタマイズの両方を実行する場合、org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter
を実装する方が簡単です。これにより、1 つのステップでフィルターとカスタマイズの両方を実行できます。便宜上、org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter
を直接実装するのではなく、org.infinispan.notifications.cachelistener.filter.AbstractCacheEventFilterConverter
を拡張することが推奨されます。以下に例を示します。
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; @NamedFactory(name = "dynamic-filter-converter") class DynamicCacheEventFilterConverterFactory implements CacheEventFilterConverterFactory { public CacheEventFilterConverter<Integer, String, CustomEvent> getFilterConverter(final Object[] params) { return new DynamicCacheEventFilterConverter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster // class DynamicCacheEventFilterConverter extends AbstractCacheEventFilterConverter<Integer, String, CustomEvent>, Serializable { final Object[] params; DynamicCacheEventFilterConverter(Object[] params) { this.params = params; } public CustomEvent filterAndConvert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { // If the key matches a key given via parameter, only send the key information if (params[0].equals(key)) return new CustomEvent(key, null); return new CustomEvent(key, newValue); } }
フィルターとコンバーターと同様に、この組み合わせたフィルター/変換ファクトリーでリスナーを登録するには、ファクトリーに @NamedFactory
アノテーション経由で一意の名前が付与される必要があります。また、Hot Rod サーバーは名前とキャッシュイベントコンバーターファクトリーインスタンスと共にプラグインする必要があります。
コンバーターの実装を含む JAR ファイルを作成します。
キャッシュがカスタムのキー/値クラスを使用する場合は、コールバックを正しくマーシャリングされていないキー/値インスタンスで実行できるように、これらを JAR に含める必要があります。クライアントリスナーで
useRawData
が有効になっている場合、コールバックのキー/値インスタンスはバイナリー形式で提供されるため、これは必要ありません。-
JAR ファイル内で
META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory
ファイルを作成し、コンバータークラス実装の完全修飾クラス名を作成します。 -
JAR ファイルを Data Grid サーバーのインストールディレクトリーの
server/lib
ディレクトリーに追加します。
クライアントの観点からすると、組み合わせたフィルターとコンバータークラスを使用できるようにするには、クライアントリスナーは同じフィルターファクトリーとコンバーターファクトリー名を定義する必要があります。以下に例を示します。
@ClientListener(filterFactoryName = "dynamic-filter-converter", converterFactoryName = "dynamic-filter-converter") public class CustomEventPrintListener { ... }
上記の例で必要な動的パラメーターは、リスナーが filter パラメーターまたは converter パラメーターのいずれかに登録されている場合に提供されます。フィルターパラメーターが空でない場合は、それらが使用されます。それ以外の場合は、コンバーターパラメーターは以下のようになります。
RemoteCache<?, ?> cache = ... cache.addClientListener(new CustomEventPrintListener(), new Object[]{1}, null);
3.6.6. イベントマーシャリング
Hot Rod サーバーは、異なる形式でデータを保存できますが、Java Hot Rod クライアントユーザーは、タイプ化されたオブジェクトで機能する CacheEventConverter
インスタンスまたは CacheEventFilter
インスタンスを開発できます。デフォルトでは、フィルターとコンバーターは、データを POJO (application/x-java-object) として使用しますが、filter/converter からメソッド format()
を上書きすることで、希望の形式を上書きすることができます。形式が null
を返す場合、フィルター/変換は保存時にデータを受け取ります。
Hot Rod Java クライアントは、異なる org.infinispan.commons.marshall.Marshaller
インスタンスを使用するように設定できます。これ CacheEventConverter
インスタンスや CacheEventFilter
インスタンスをデプロイしたり、コンテンツをマーシャリングしたりするのではなく、Java オブジェクトでフィルター/変換できるようにするには、サーバーはオブジェクトとマーシャラーで生成されるバイナリー形式の間で変換できるようにする必要があります。
Marshaller インスタンスのサーバー側のデプロイするには、CacheEventConverter
インスタンスまたは CacheEventFilter
インスタンスをデプロイするのに使用されるものと同様の方法に従います。
- コンバーターの実装を含む JAR ファイルを作成します。
-
JAR ファイル内で
META-INF/services/org.infinispan.commons.marshall.Marshaller
ファイルを作成し、フィルタークラス実装の完全修飾クラス名を作成します。 -
JAR ファイルを Data Grid サーバーのインストールディレクトリーの
server/lib
ディレクトリーに追加します。
Marshaller は個別の jar または CacheEventConverter
インスタンスおよび/または CacheEventFilter
インスタンスと同じ jar にデプロイできることに注意してください。
3.6.6.1. Protostream マーシャラーのデプロイ
キャッシュが Protobuf コンテンツを保存する場合は、Hot Rod クライアントで ProtoStream マーシャラーを使用する際に発生する場合は、サーバーで形式がすでにサポートされているため、カスタムマーシャラーをデプロイする必要はありません。Protobuf 形式から JSON や POJO などの最も一般的な形式に変換するトランスコーダーがあります。
これらのキャッシュでフィルター/変換を使用し、バイナリー Protobuf データではなく Java オブジェクトでフィルター/変換を使用すると、追加の ProtoStream マーシャラーを設定し、フィルタリング/変換の前にデータをアンマーシャリングできるようにする必要があります。これには、Data Grid サーバー設定の一部として、必要な SerializationContextInitializer(s)
を設定する必要があります。
詳細は、Cache Encoding and Marshalling を参照してください。
3.6.7. リスナーの状態の処理
クライアントリスナーアノテーションには、リスナーの追加時またはリスナーのフェイルオーバー時に状態がクライアントに送信されるかどうかを指定する任意の includeCurrentState
属性があります。
デフォルトでは、includeCurrentState
は false ですが、true に設定され、クライアントリスナーがデータが含まれるキャッシュに追加されると、サーバーはキャッシュの内容を繰り返し処理し、各エントリーのイベント (設定されている場合はカスタムイベント) を ClientCacheEntryCreated
としてクライアントに送信します。これにより、クライアントは既存のコンテンツに基づいて一部のローカルデータ構造をビルドできます。コンテンツが繰り返されると、キャッシュの更新を受け取るため、イベントは通常どおり受信されます。キャッシュがクラスター化されている場合、クラスター全体のコンテンツ全体が繰り返し処理されます。
3.6.8. リスナーの障害処理
Hot Rod クライアントがクライアントリスナーを登録すると、クラスターの単一ノードになります。そのノードに障害が発生すると、Java Hot Rod クライアントは透過的に検出し、別のノードに失敗したノードに登録されているすべてのリスナーをフェイルオーバーします。
このフェイルオーバー中に、クライアントはいくつかのイベントを見逃す可能性があります。これらのイベントが不足しないように、クライアントリスナーアノテーションには includeCurrentState
という任意のパラメーターが含まれます。これは、フェイルオーバーの実行時に、キャッシュの内容が繰り返し処理され、ClientCacheEntryCreated
イベント (設定されている場合はカスタムイベント) が生成されます。デフォルトでは、includeCurrentState
は false に設定されています。
コールバックを使用してフェイルオーバーイベントを処理します。
@ClientCacheFailover public void handleFailover(ClientCacheFailoverEvent e) { ... }
これは、クライアントがいくつかのデータをキャッシュしており、フェイルオーバーの結果、いくつかのイベントが見逃される可能性を考慮して、フェイルオーバーイベントを受信したときにローカルにキャッシュされたデータを消去することを決定し、フェイルオーバーイベントの後にキャッシュ全体のコンテンツに対するイベントを受信することを知っているような場合に非常に便利です。