第10章 通知/リスナー API
10.1. 通知/リスナー API
Red Hat JBoss Data Grid は、発生時にイベントの通知を行うリスナー API
を提供します。クライアントは、関係する通知に対してリスナー API
の登録を選択できます。 この API
はアノテーション駆動型で、キャッシュレベルのイベントとキャッシュマネージャーレベルのイベント上で操作します。
10.2. リスナーの例
次の例は、新しいエントリーがキャッシュに追加されるたびに情報を出力する Red Hat JBoss Data Grid のリスナーを定義します。
リスナーの設定
@Listener public class PrintWhenAdded { @CacheEntryCreated public void print(CacheEntryCreatedEvent event) { System.out.println("New entry " + event.getKey() + " created in the cache"); } }
10.3. リスナー通知
10.3.1. リスナー通知
キャッシュイベントが発生するたびに通知がリスナーに送信されます。リスナーは、@Listener
アノテーションが付いた簡単な POJO
です。Listenable は、実装がアタッチするリスナーを持つことを意味するインターフェースです。各リスナーは Listenable で定義されたメソッドを使用して登録されます。
キャッシュレベルまたはキャッシュマネージャーレベルの通知を受信するため、リスナーはキャッシュおよびキャッシュマネージャーの両方にアタッチすることが可能です。
10.3.2. キャッシュレベルの通知
Red Hat JBoss Data Grid では、キャッシュレベルのイベントはキャッシュごとに発生します。キャッシュレベルイベントの例には、関係するキャッシュで登録されたリスナーへの通知を引き起こすエントリーの追加、削除、および変更などが含まれます。
10.3.3. キャッシュマネージャーレベルの通知
Red Hat JBoss Data Grid のキャッシュマネージャーレベルで発生するイベントの例は次のとおりです。
- キャッシュの開始および停止
- クラスターに参加するノードまたはクラスターから離脱するノード
キャッシュマネージャーレベルのイベントはグローバルに位置し、クラスター全体で使用されますが、単一のキャッシュマネージャーによって作成されたキャッシュ内のイベントに制限されます。
最初の 2 つのイベントである CacheStarted
および CacheStopped
は大変似ています。以下の例は、開始または停止されたキャッシュの名前を出力します。
@CacheStarted public void cacheStarted(CacheStartedEvent event){ // Print the name of the Cache that started log.info("Cache Started: " + event.getCacheName()); } @CacheStopped public void cacheStopped(CacheStoppedEvent event){ // Print the name of the Cache that stopped log.info("Cache Stopped: " + event.getCacheName()); }
ViewChangedEvent
または MergeEvent
を受信するとき、新旧メンバーのリストはイベントが生成されたノードから送信されることに注意してください。例として、以下のシナリオについて考えてみましょう。
- 現在、JDG クラスターはノード A、B、および C で構成されます。
- ノード D がクラスターに参加します。
-
ノード A、B、および C は、[A,B,C] を旧メンバーのリストとし、[A,B,C,D] を新メンバーのリストとする
ViewChangedEvent
を受信します。 -
ノード D は、[D] を旧メンバーのリストとし、[A,B,C,D] を新メンバーのリストとする
ViewChangedEvent
を受信します。
よって、ノードのクラスターへの参加またはクラスターからの離脱を判断するために積集合が使用されることがあります。以下のように getOldMembers()
と getNewMembers()
を使用すると、クラスターに参加したノードや離脱したノードのセットを判断できます。
@ViewChanged public void viewChanged(ViewChangedEvent event){ HashSet<Address> oldMembers = new HashSet(event.getOldMembers()); HashSet<Address> newMembers = new HashSet(event.getNewMembers()); HashSet<Address> oldCopy = (HashSet<Address>)oldMembers.clone(); // Remove all new nodes from the old view. // The resulting set indicates nodes that have left the cluster. oldCopy.removeAll(newMembers); if(oldCopy.size() > 0){ for (Address oldAdd : oldCopy){ log.info("Node left:" + oldAdd.toString()); } } // Remove all old nodes from the new view. // The resulting set indicates nodes that have joined the cluster. newMembers.removeAll(oldMembers); if(newMembers.size() > 0){ for(Address newAdd : newMembers){ log.info("Node joined: " + newAdd.toString()); } } }
MergeEvent
の実行中に同様の論理を使用して、クラスターの新しいメンバーセットを判断することもできます。
10.3.4. 同期および非同期通知
デフォルトでは、Red Hat JBoss Data Grid の通知はイベントが生成された同じスレッドで送信されます。そのため、スレッドの進行を妨げないようにリスナーを書く必要があります。
この代わりに、別のスレッドで通知を送信し、元のスレッドの操作を妨げないようにするために、リスナーを非同期としてアノテーション付けすることもできます。
以下を使用してリスナーにアノテーションを付けます。
@Listener (sync = false) public class MyAsyncListener { .... }
XML 設定ファイルで asyncListenerExecutor
要素を使用し、非同期通知の送信に使用されるスレッドプールを調整します。
CacheEntryExpiredEvent
を処理する非クラスターの同期リスナーを使用する場合、非スラスター環境ではエクスパレーションリーパー (reaper) も同期であるため、リスナーが実行を妨害しないようにしてください。
10.4. キャッシュエントリーの変更
10.4.1. キャッシュエントリーの変更
キャッシュエントリーの作成後、プログラムを使用してキャッシュエントリーを変更できます。
10.4.2. キャッシュエントリーが変更されたリスナーの設定
キャッシュエントリーが変更されたリスナーイベントでは、getValue()
メソッドの動作は、実際の操作の実行前または実行後にコールバックがトリガーされたかどうかに特定されます。たとえば、event.isPre() が true の場合、event.getValue() は変更の前に古い値を返します。event.isPre() が false の場合、event.getValue() は新しい値を返します。イベントによって新しいエントリーが作成および挿入される場合、古い値は null になります。isPre()
の詳細は、Red Hat JBoss Data Grid の『API Documentation』に記載されている org.infinispan.notifications.cachelistener.event
パッケージのリストを参照してください。
Listenable
および FilteringListenable
インターフェース (Cache オブジェクトによって実装される) によって公開されるメソッドを使用する場合のみ、プログラムを使用してリスナーを設定できます。
10.4.3. キャッシュエントリーが変更されたリスナーの例
以下の例は、キャッシュエントリーが変更されるたびに情報を出力する Red Hat JBoss Data Grid のリスナーを定義します。
変更されたリスナー
@Listener public class PrintWhenModified { @CacheEntryModified public void print(CacheEntryModifiedEvent event) { System.out.println("Cache entry modified. Details = " + event); } }
10.5. クラスター化リスナー
10.5.1. クラスター化リスナー
クラスター化リスナーは、リスナーを分散キャッシュ設定で使用できるようにします。分散キャッシュ環境では、イベントが発生したノードのローカルイベントのみが登録されたローカルリスナーに通知されます。クラスター化リスナーでは、イベントが発生したノードに関係なくクラスターで発生する書き込み通知を 1 つのリスナーが受信できるようにし、この問題に対応します。そのため、クラスター化リスナーのパフォーマンスは、イベントが発生するノードのイベント通知のみを提供する非クラスター化リスナーよりも遅くなります。
クラスター化リスナーを使用する場合、特定のキャッシュでエントリーの追加、更新、期限切れ、または削除が行われるとクライアントアプリケーションに通知が送られます。イベントはクラスター全体であるため、アプリケーションが存在するノードや接続するノードに関係なく、クライアントアプリケーションはイベントにアクセスできます。
イベントは、常にリスナーが登録されたノード上で発生し、キャッシュの更新が発生した場所は関係しません。
10.5.2. クラスター化リスナーの設定
以下のユースケースではリスナーは受け取ったイベントを保存します。
手順: クラスター化リスナーの設定
@Listener(clustered = true) protected static class ClusterListener { List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>()); @CacheEntryCreated @CacheEntryModified @CacheEntryExpired @CacheEntryRemoved public void onCacheEvent(CacheEntryEvent event) { log.debugf("Adding new cluster event %s", event); events.add(event); } } public void addClusterListener(Cache<?, ?> cache) { ClusterListener clusterListener = new ClusterListener(); cache.addListener(clusterListener); }
-
クラスター化リスナーを有効にするには、
@Listener
クラスにclustered=true
を付けます。 エントリーの追加、変更、期限切れ、または削除時にクライアントアプリケーションが通知を受けるようにするため、以下のメソッドにはアノテーションが付けられます。
-
@CacheEntryCreated
-
@CacheEntryModified
-
@CacheEntryExpired
-
@CacheEntryRemoved
-
- リスナーはキャッシュで登録され、オプションでフィルターまたはコンバーターを渡すことができます。
クラスター化リスナーを使用すると、非クラスターリスナーでは適用されない以下の制限が適用されます。
- クラスターリスナーは作成されたエントリー、変更されたエントリー、期限切れのエントリー、または削除されたエントリーのみをリッスンできます。他のイベントはクラスター化リスナーによってリッスンされません。
- ポストイベントのみがクラスター化リスナーに送信され、プレイベントは無視されます。
10.5.3. キャッシュリスナー API
addListener
メソッドを使用すると、クラスター化されたリスナーを既存の @CacheListener API
の上に追加できます。
キャッシュリスナー API
cache.addListener(Object listener, Filter filter, Converter converter);
public @interface Listener { boolean clustered() default false; boolean includeCurrentState() default false; boolean sync() default true; }
interface CacheEventFilter<K,V> { public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); }
interface CacheEventConverter<K,V,C> { public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); }
- Cache API
ローカルまたはクラスター化リスナーは
cache.addListener
メソッドで登録でき、以下のイベントの 1 つが発生するまでアクティブな状態になります。-
cache.removeListener
を呼び出してリスナーが明示的に登録解除される。 - リスナーが登録されたノードがクラッシュする。
-
- リスナーアノテーション
リスナーアノテーションは以下の 3 つの属性で強化されます。
-
clustered()
: この属性は、アノテーションが付けられたリスナーがクラスター化されているかどうかを定義します。クラスター化されたリスナーは@CacheEntryRemoved
、@CacheEntryCreated
、@CacheEntryExpired
、および@CacheEntryModified
イベントのみの通知を受けることができます。この属性はデフォルトで false に指定されています。 -
includeCurrentState()
: この属性はクラスター化リスナーのみに適用され、デフォルトで false に指定されています。true
に設定すると、クラスター内で既存の状態全体が評価されます。登録されると、キャッシュの各エントリーのCacheCreatedEvent
が即座にリスナーに送信されます。 -
sync()
に関する詳細は、「同期および非同期の通知」を参照してください。
-
oldValue
およびoldMetadata
-
oldValue
およびoldMetadata
の値は、CacheEventFilter
およびCacheEventConverter
クラスの許可メソッド上の追加メソッドです。これらの値は、ローカルリスナーを含むすべてのリスナーに提供されます。これらの値の詳細は、JBoss Data Grid の API ドキュメント を参照してください。 EventType
-
EventType
には、イベントのタイプ、再試行であったかどうか、およびプレまたはポストイベントであったかどうかが含まれます。
クラスター化リスナーを使用する場合、キャッシュが更新される順序は通知を受け取る順序に反映されます。
クラスター化リスナーは、イベントが 1 度だけ送信されることを保証しません。同じイベントが複数回送信されないようにするため、リスナー実装はべき等である必要があります。安定したクラスターや、includeCurrentState
の結果として合成イベントが生成されるタイムスパン外部では、単一性が受け入れられることをインプリメンターは想定することができます。
10.5.4. クラスター化リスナーの例
次のユースケースは、ニューヨーク州のニューヨーク市宛の注文がいつ生成されるか知りたいリスナーを表しています。リスナーには、ニューヨークから出入りする注文をフィルターする Filter が必要です。さらに、注文全体は必要ではなく、配達される日付のみが必要であるため、Converter も必要になります。
ユースケース: ニューヨーク宛の注文のフィルターおよび変換
class CityStateFilter implements CacheEventFilter<String, Order> { private String state; private String city; public boolean accept(String orderId, Order oldOrder, Metadata oldMetadata, Order newOrder, Metadata newMetadata, EventType eventType) { switch (eventType.getType()) { // Only send update if the order is going to our city case CACHE_ENTRY_CREATED: return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); // Only send update if our order has changed from our city to elsewhere or if is now going to our city case CACHE_ENTRY_MODIFIED: if (city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState())) { // If old city matches then we have to compare if new order is no longer going to our city return !city.equals(newOrder.getCity()) || !state.equals(newOrder.getState()); } else { // If the old city doesn't match ours then only send update if new update does match ours return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); } // On remove we have to send update if our order was originally going to city case CACHE_ENTRY_REMOVED: return city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState()); } return false; } } class OrderDateConverter implements CacheEventConverter<String, Order, Date> { private String state; private String city; public Date convert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // If remove we do not care about date - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
10.5.5. 最適化されたキャッシュフィルターコンバーター
「クラスター化リスナーの例」では、結果のフィルターおよび変換を一度に実行するために、最適化された CacheEventFilterConverter
を使用できます。
CacheEventFilterConverter
は、イベントのフィルターと変換を一度に実行できるようにする最適化です。これは、イベントフィルターとコンバーターが同じオブジェクトとして最も効率的に使用され、同じメソッドでフィルターと変換が行われる場合に使用できます。戻り値が null の場合は値がフィルターを通過しなかったことを意味するため、変換が null 値を返さない場合のみ使用できます。null 値を変換するには、CacheEventFilter
および CacheEventConverter
インターフェースを独立して使用します。
以下は、CacheEventFilterConverter
を使用したニューヨーク宛の注文のユースケース例になります。
CacheEventFilterConverter
class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> { private final String state; private final String city; public Date filterAndConvert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // Remove if the date is not required - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
リスナーの登録時、フィルターとコンバーターの両方の引数として FilterConverter
を提供します。
OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York"); cache.addListener(listener, filterConveter, filterConverter);
10.6. リモートイベントリスナー (Hot Rod)
10.6.1. リモートイベントリスナー (Hot Rod)
イベントリスナーは、Red Hat JBoss Data Grid Hot Rod サーバーが CacheEntryCreated
、CacheEntryModified
、CacheEntryExpired
、CacheEntryRemoved
などのイベントのリモートクライアントを通知できるようにします。接続されたクライアントが殺到しないようにするため、クライアントはこれらのイベントをリッスンするかどうかを選択することができます。クライアントがサーバーへの永続接続を維持することが前提となります。
リモートイベントのクライアントリスナーは、ライブラリーモードのクラスター化リスナーと同様に追加できます。以下は、受け取った各イベントを出力するリモートクライアントリスナーの例になります。
イベント出力リスナー
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) { System.out.println(e); } @ClientCacheEntryModified public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) { System.out.println(e); } @ClientCacheEntryExpired public void handleExpiredEvent(ClientCacheEntryExpiredEvent e) { System.out.println(e); } @ClientCacheEntryRemoved public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) { System.out.println(e); } }
-
ClientCacheEntryCreatedEvent
およびClientCacheEntryModifiedEvent
インスタンスは、キーおよびエントリーのバージョンに関する情報を提供します。このバージョンは、replaceWithVersion
やremoveWithVersion
などのサーバー上の条件付き操作を呼び出すために使用されます。 -
期限切れのエントリーで
get()
が呼び出された場合、またはエクスパレーションリーパーがエントリーの期限切れを検出した場合、ClientCacheEntryExpiredEvent
イベントが送信されます。エントリーの期限が切れると、キャッシュがエントリーを null にし、サイズを適切に調整します。しかし、イベントは 2 つのシナリオでのみ生成されます。 -
ClientCacheEntryRemovedEvent
イベントは、削除操作が成功した場合のみ送信されます。削除操作が呼び出された場合にエントリーが見つからなかったり、削除するエントリーがないと、イベントは生成されません。イベント削除が成功するかどうかに関わらずイベントの削除が必要な場合は、カスタマイズされたイベント論理を作成します。 -
すべてのクライアントキャッシュエントリーの作成、変更、および削除されたイベントは、トポロジーの変更により書き込みコマンドを再試行しなければならない場合に
true
を返すboolean isCommandRetried()
メソッドを提供します。これは、イベントが複製されたか、別のイベントが破棄または置換された (作成されたイベントが変更されたイベントに置き換えられるなど) ことを意味します。
想定されるワークロードで書き込みが読み取りよりも優先される場合、送信するイベントをフィルターし、クライアントやネットワークで問題となる可能性がある過剰なトラフィックが大量に生成されないようにします。
10.6.2. イベントリスナーの追加および削除
サーバーでのイベントリスナーの登録
以下の例は、サーバーでイベント出力リスナーを登録します。「イベント出力リスナー」を参照してください。
イベントリスナーの追加
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener());
クライアントイベントリスナーの削除
クライアントイベントリスナーは以下のように削除できます。
EventLogListener listener = ... cache.removeClientListener(listener);
10.6.3. リモートイベントクライアントリスナーの例
以下は、Hot Rod 経由でリモートキャッシュと対話するようにリモートクライアントリスナーを設定するために必要な手順になります。
リモートイベントリスナーの設定
Red Hat カスタマーポータルから Red Hat JBoss Data Grid ディストリビューションをダウンロードします。
最新の JBoss Data Grid ディストリビューションには、クライアントが通信する Hot Rod サーバーが含まれています。
サーバーの起動
JBoss Data Grid サーバーのルートから以下のコマンドを実行し、サーバーを起動します。
$ ./bin/standalone.sh
Hot Rod サーバーと対話するアプリケーションの作成
Maven ユーザー
以下の依存関係でアプリケーションを作成します。バージョンは
8.5.0.Final-redhat-9
以上に変更します。<properties> <infinispan.version>8.5.0.Final-redhat-9</infinispan.version> </properties> [...] <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-remote</artifactId> <version>${infinispan.version}</version> </dependency>
- Maven 以外のユーザーは、使用するビルドツールに合わせて調整するか、すべての JBoss Data Grid jar が含まれるディストリビューションをダウンロードします。
クライアントアプリケーションの作成
以下は、受け取ったイベントをすべてログに記録する簡単なリモートイベントリスナーを示しています。
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientEvent event) { System.out.println(event); } }
リモートイベントリスナーを使用したリモートキャッシュに対する操作の実行
以下は、リモートイベントリスナーを追加し、リモートキャッシュに対して一部の操作を実行する簡単なメイン java クラスの例になります。
RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); EventLogListener listener = new EventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
結果
実行すると、以下と似たコンソール出力が表示されます。
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryRemovedEvent(key=1)
出力は、デフォルトでイベントにはキーと現在の値に関連するデータバージョンが含まれることを示しています。実際の値はパフォーマンス上の理由でクライアントへ返送されません。リモートイベントの受け取りはパフォーマンスに影響します。キャッシュサイズが大きくなるとより多くの操作が実行されるため、パフォーマンスへの影響も大きくなります。Hot Rod クライアントにイベントが大量に送られないようにするには、サーバー側でリモートイベントをフィルターするか、イベントコンテンツをカスタマイズします。
10.6.4. リモートイベントのフィルター
10.6.4.1. リモートイベントのフィルター
クライアントに大量のイベントが送信されないようにするため、Red Hat JBoss Data Grid の Hot Rod リモートイベントをフィルターすることができます。フィルターするには、クライアントに送信されるイベントやクライアント提供の情報でこれらのフィルターが動作する方法をフィルターするインスタンスを作成するキーバリューのフィルターファクトリーを提供します。
リモートクライアントへのイベント送信はパフォーマンスに影響します。リモートリスナーが登録されたクライアントの数が多くなると、パフォーマンスのコストも増加します。また、キャッシュに対して実行された変更の数が多くなると、パフォーマンスへの影響も大きくなります。
サーバー側で送信されるイベントをフィルターするとパフォーマンスのコストを削減できます。カスタムコードを使用して、一部のイベントがリモートクライアントへブロードキャストされないようにしてパフォーマンスを向上することができます。
キーや値の情報またはキャッシュエントリーメタデータの情報を基にしてフィルターを行うことができます。フィルターを有効にするには、フィルターインスタンスを作成するキャッシュイベントフィルターファクトリーを作成する必要があります。以下は、クライアントに送信されたイベントから キー「2」 をフィルターする簡単な実装になります。
KeyValueFilter
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !"2".equals(key); } } }
このキーバリューフィルターファクトリーでリスナーを登録するには、ファクトリーに一意な名前を付け、Hot Rod サーバーをその名前とキャッシュイベントフィルターファクトリーインスタンスでプラグする必要があります。
10.6.4.2. リモートイベントのカスタムフィルター
カスタムフィルターは、一部のイベント情報がリモートクライアントへブロードキャストされないようにしてパフォーマンスを向上できます。
カスタムフィルターで JBoss Data Grid サーバーをプラグするには、以下の手順を使用します。
カスタムフィルターの使用
-
内部にフィルター実装が含まれる JAR ファイルを作成します。
org.infinispan.filter.NamedFactory
アノテーションを使用して、各ファクトリーに名前を割り当てる必要があります。例はKeyValueFilterFactory
を使用します。 - JAR ファイル内に META-INF/services/org.infinispan.notifications.cachelistener.filter. CacheEventFilterFactory ファイルを作成し、その内部にフィルタークラス実装の完全修飾クラス名を記述します。
以下のオプションの 1 つを実行し、 JBoss Data Grid サーバーに JAR ファイルをデプロイします。
オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ
-
JAR を
$JDG_HOME/standalone/deployments/
ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
-
JAR を
オプション 2: CLI を用いた JAR のデプロイ
CLI で目的のインスタンスに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
接続後、
deploy
コマンドを実行します。deploy /path/to/artifact.jar
オプション 3: JAR をカスタムモジュールとしてデプロイ
以下のコマンドを実行して JDG サーバーに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
カスタムフィルターが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムフィルターに追加の依存関係が必要な場合は追加します。
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
別のウインドウで、
$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
を編集して、新規追加したモジュールを依存関係としてorg.infinispan
モジュールに追加します。このファイルに以下のエントリーを追加します。<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- JDGを サーバーを再起動します。
サーバーがフィルターでプラグされたら、そのフィルターを使用するリモートクライアントリスナーを追加します。以下の例は、リモートイベントクライアントリスナーの例 (「リモートイベントクライアントリスナーの例」を参照) を拡張し、@ClientListener
アノテーションをオーバーライドしてリスナーと使用するフィルターファクトリーを示します。
フィルターファクトリーのリスナーへの追加
@org.infinispan.client.hotrod.annotation.ClientListener(filterFactoryName = "basic-filter-factory") public class BasicFilteredEventLogListener extends EventLogListener {}
リスナーは RemoteCacheAPI を使用して追加できるようになりました。以下の例はこれを実証し、一部の操作をリモートキャッシュに対して実行します。
サーバーでのリスナーの登録
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener); cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
システム出力は、フィルターされたものを除くすべてのキーのイベントをクライアントが受け取ることを示しています。
結果
以下は、例の結果となるシステム出力を示しています。
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=3,dataVersion=5) ClientCacheEntryModifiedEvent(key=3,dataVersion=6) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=3)
イベントがリスナーが登録されたノード以外で生成された場合でも、イベントが生成された場所でフィルターが実行されるようにするため、フィルターインスタンスはクラスターへのデプロイ時にマーシャル可能である必要があります。フィルターインスタンスをマーシャル可能にするには、フィルターインスタンスが Serializable や Externalizable を拡張するようにするか、カスタムエクスターナライザーを提供します。
10.6.4.3. 強化されたフィルターファクトリー
クライアントリスナーの追加時、パラメーターをフィルターファクトリーに提供して、クライアント側の情報を基に単一のフィルターファクトリーから挙動の異なるさまざまなフィルターを生成することができます。
以下の設定は、静的に提供されたキーでフィルターを行う代わりに、リスナーの追加時に提供されたキーを基に動的にフィルターを実行できるようにするためのフィルターファクトリーの強化方法を表しています。
強化されたフィルターファクトリーの設定
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(params); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { private final Object[] params; public BasicKeyValueFilter(Object[] params) { this.params = params; } @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !params[0].equals(key); } } }
このフィルターは「2」ではなく「3」でフィルターが実行されるようになりました。
強化されたフィルターファクトリーの実行
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener, new Object[]{3}, null); // <- Filter parameter passed cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
結果
例の結果として、以下が出力されます。
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=2,dataVersion=3) ClientCacheEntryModifiedEvent(key=2,dataVersion=4) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=2)
リモートイベントをカスタマイズすると、クライアントに送信する情報量をさらに削減することができます。
10.6.5. リモートイベントのカスタマイズ
10.6.5.1. リモートイベントのカスタマイズ
Red Hat JBoss Data Grid では、Hot Rod リモートイベントをカスタマイズしてクライアントへ送信する必要がある情報が含まれるようにすることができます。デフォルトでは、クライアントが過負荷にならないようにし、情報送信のコストを削減するため、イベントにはキーやイベントタイプなどの基本的な情報のみが含まれます。
これらのイベントに含まれる情報をカスタマイズし、値などの追加情報が含まれるようにしたり、より少ない情報が含まれるようにすることができます。カスタマイズするには、CacheEventConverterFactory
クラスを実装して作成される CacheEventConverter
インスタンスを使用します。各ファクトリーには、@NamedFactory
アノテーションを使用したこれに関連する名前が必要です。
Red Hat JBoss Data Grid サーバーをイベントコンバーターでプラグするには、以下の手順を使用します。
コンバーターの使用
-
コンバーター実装が含まれる JAR ファイルを作成します。各ファクトリーには、
org.infinispan.filter.NamedFactory
アノテーションを使用したこれに関連する名前が必要です。 - JAR ファイル内に META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory ファイルを作成し、その内部にコンバータークラス実装の完全修飾クラス名を記述します。
以下のオプションの 1 つを実行し、Red Hat JBoss Data Grid サーバーに JAR ファイルをデプロイします。
オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ
-
JAR を
$JDG_HOME/standalone/deployments/
ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
-
JAR を
オプション 2: CLI を用いた JAR のデプロイ
CLI で目的のインスタンスに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
接続後、
deploy
コマンドを実行します。deploy /path/to/artifact.jar
オプション 3: JAR をカスタムモジュールとしてデプロイ
以下のコマンドを実行して JDG サーバーに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
カスタムコンバーターが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムコンバーターに追加の依存関係が必要な場合は追加します。
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
別のウインドウで、
$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
を編集して、新規追加したモジュールを依存関係としてorg.infinispan
モジュールに追加します。このファイルに以下のエントリーを追加します。<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- JDGを サーバーを再起動します。
コンバーターはクライアントが提供する情報にも対応でき、コンバーターインスタンスはリスナーの追加時に提供された情報を基にイベントをカスタマイズできます。API は、リスナーの追加時にコンバーターパラメーターが渡されるようにします。
10.6.5.2. コンバーターの追加
リスナーの追加時、リスナーと使用するコンバーターファクトリーの名前が提供されます。リスナーが追加されると、サーバーはファクトリーを検索し、サーバー側のイベントをカスタマイズするために getConverter
メソッドを呼び出して org.infinispan.filter.Converter
クラスインスタンスを取得します。
以下の例は、Integer (整数) と String (文字列) のキャッシュに対して値の情報が含まれるカスタムイベントをリモートクライアントに送信します。コンバーターは、値とイベントのキーが含まれる新しいカスタムイベントを生成します。カスタムイベントのイベントペイロードはデフォルトのイベントよりも大きくなりますが、フィルターを組み合わせると帯域幅のコストを削減できます。
カスタムイベントの送信
import org.infinispan.notifications.cachelistener.filter.*; @NamedFactory(name = "value-added-converter-factory") class ValueAddedConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, ValueAddedEvent> getConverter(final Object[] params) { return new ValueAddedConverter(); } static class ValueAddedConverter implements CacheEventConverter<Integer, String, ValueAddedEvent> { public ValueAddedEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return new ValueAddedEvent(key, newValue); } } } // Must be Serializable or Externalizable. class ValueAddedEvent implements Serializable { final Integer key; final String value; ValueAddedEvent(Integer key, String value) { this.key = key; this.value = value; } }
10.6.5.3. ライトウェイトイベント
他のコンバーター実装は、キーやイベントタイプ情報が含まれないイベントを返送できます。そのため、イベントが大幅に軽量化されますが、イベントによって詳細情報は提供されません。
このコンバーターでサーバーをプラグするには、コンバーターファクトリーと関連するコンバータークラスを JAR ファイル内にデプロイし、以下のように META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory ファイル内のサービス定義が含まれるようにします。
sample.ValueAddedConverterFactor
この後、ファクトリー名を @ClientListener
アノテーションに追加して、クライアントリスナーをコンバーターファクトリーとリンクする必要があります。
@ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { ... }
10.6.5.4. 動的なコンバーターインスタンス
動的なコンバーターインスタンスは、リスナーの登録時に提供されたパラメーターを基にして変換を行います。コンバーターはコンバーターファクトリーによって受信されたパラメーターを使用してこのオプションを有効にします。例は以下のとおりです。
動的なコンバーター
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; class DynamicCacheEventConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) { return new DynamicCacheEventConverter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable { final Object[] params; DynamicCacheEventConverter(Object[] params) { this.params = params; } public CustomEvent convert(Integer key, String oldValue, Metadata metadata, String newValue, Metadata prevMetadata, EventType eventType) { // If the key matches a key given via parameter, only send the key information if (params[0].equals(key)) return new ValueAddedEvent(key, null); return new ValueAddedEvent(key, newValue); } }
変換の実行に必要な動的パラメーターはリスナーの登録時に提供されます。
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener(), null, new Object[]{1});
10.6.5.5. カスタムイベントのリモートクライアントリスナーの追加
カスタムイベントのリスナーの実装は、デフォルトでないイベントが関係するため、他のリモートイベントの場合とは若干異なります。他のリモートクライアントリスナー実装と同じアノテーションが使用されますが、コールバックは ClientCacheEntryCustomEvent<T>
のインスタンスを受信します (T
はサーバーから送信するカスタムイベントのタイプになります)。例を以下に示します。
カスタムイベントリスナー実装
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientCacheEntryCustomEvent<ValueAddedEvent> event) { System.out.println(event); } }
リモートイベントリスナーを使用してリモートキャッシュに対して操作を実行するには、リモートイベントリスナーを追加してリモートキャッシュに対して操作を実行する簡単なメイン Java クラスを作成します。例を以下に示します。
リモートキャッシュに対する操作の実行
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); CustomEventLogListener listener = new CustomEventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
結果
実行すると、以下と似たコンソール出力が表示されます。
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='one'}, eventType=CLIENT_CACHE_ENTRY_CREATED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='new-one'}, eventType=CLIENT_CACHE_ENTRY_MODIFIED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='null'}, eventType=CLIENT_CACHE_ENTRY_REMOVED
リスナーが登録されたノードではない別のノードでイベントが生成された場合でも、イベントが生成された場所で変換が発生するようにするには、コンバーターインスタンスがクラスターにデプロイされたときにマーシャル可能である必要があります。コンバーターインスタンスをマーシャル可能にするには、Serializable または Externalizable を拡張するようにするか、カスタムの Externalizer を提供します。タイプセーフな API に対して記述するサーバーとクライアントの両方を円滑にするため、クライアントとサーバーの両方がカスタムイベントタイプを認識し、マーシャルできる必要があります。
10.6.6. イベントマーシャリング
イベントをフィルターまたはカスタマイズする際、KeyValueFilter
および Converter
インスタンスはマーシャル可能でなければなりません。クライアントリスナーがクラスターにインストールされると、イベントの送信元でフィルターと変換が行われるようにするため、フィルターインスタンスとコンバーターインスタンスはクラスターの他のノードに送信されます。これにより効率がよくなります。これらのクラスをマーシャル可能にするには、これらのクラスが Serializable を拡張するようにするか、カスタム Externalizer を提供および登録します。
Marshaller インスタンスをサーバー側にデプロイするには、フィルターおよびカスタマイズされたイベントに使用されるメソッドに似たメソッドを使用します。
マーシャラーのデプロイ
-
コンバーター実装が含まれる JAR ファイルを作成します。各ファクトリーには、
org.infinispan.filter.NamedFactory
アノテーションを使用したこれに関連する名前が必要です。 - JAR ファイル内に META-INF/services/org.infinispan.commons.marshall.Marshaller ファイルを作成し、その内部にマーシャラークラス実装の完全修飾クラス名を記述します。
以下のオプションの 1 つを実行し、Red Hat JBoss Data Grid に JAR ファイルをデプロイします。
オプション 1: デプロイメントスキャナーを用いた JAR のデプロイ
-
JAR を
$JDG_HOME/standalone/deployments/
ディレクトリーにコピーします。デプロイメントスキャナーはこのディレクトリーをアクティブに監視し、新たに配置されたファイルをデプロイします。
-
JAR を
オプション 2: CLI を用いた JAR のデプロイ
CLI で目的のインスタンスに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
接続後、
deploy
コマンドを実行します。deploy /path/to/artifact.jar
オプション 3: JAR をカスタムモジュールとしてデプロイ
以下のコマンドを実行して JDG サーバーに接続します。
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
カスタムマーシャラーが含まれる jar はサーバーのモジュールとして定義する必要があります。追加するには、以下のコマンドのモジュール名と .jar 名を置き換え、カスタムマーシャラーに追加の依存関係が必要な場合は追加します。
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
別のウインドウで、
$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
を編集して、新規追加したモジュールを依存関係としてorg.infinispan
モジュールに追加します。このファイルに以下のエントリーを追加します。<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- JDGを サーバーを再起動します。
マーシャラーは、個別の jar または CacheEventConverter や CacheEventFilter インスタンスと同じ jar にデプロイすることができます。
1 つの Marshaller インスタンスのデプロイメントのみがサポートされます。複数の Marshaller インスタンスがデプロイされると、警告メッセージが表示され、使用される Marshaller が示されます。
10.6.7. リモートイベントクラスタリングおよびフェイルオーバー
クライアントがリモートリスナーを追加するとき、クラスターの単一のノードにインストールされます。このノードは、クラスター全体で発生する影響受ける操作すべてに対してイベントをクライアントに返送します。
クラスター化された環境では、リスナーが含まれるノードがダウンした場合、Hot Rod クライアント実装が透過的にクライアントリスナーの登録を別のノードにフェイルオーバーします。これにより、イベントの消費にギャップが生じることがありますが、以下の方法の 1 つを使用すると解決できます。
状態の配信
@ClientListener
アノテーションにはオプションの includeCurrentState
パラメーターがあります。このパラメーターを有効にすると、サーバーは既存のキャッシュエントリーすべての CacheEntryCreatedEvent
イベントインスタンスをクライアントに送信します。この動作は、リスナーが登録されたノードがオフラインになったときに検出されるクライアントによって決定され、自動的にリスナーをクラスターの別のノードに登録します。includeCurrentState
を有効にすると、Hot Rod クライアントが登録されたリスナーを透過的にフェイルオーバーする場合にクライアントの状態や計算を再算出できます。includeCurrentState
パラメーターのパフォーマンスは、キャッシュの大きさに影響されるため、デフォルトでは無効になっています。
@ClientCacheFailover
受信状態に依存する代わりに、@ClientCacheFailover
アノテーションを使用して、クライアントリスナー実装内で ClientCacheFailoverEvent
パラメーターを受信するメソッドを定義できます。Hot Rod クライアントが登録されたノードでクライアントリスナーに障害が発生すると、Hot Rod クライアントによって障害が透過的に検出され、障害が発生したノードに登録されたすべてのリスナーは別のノードにフェイルオーバーされます。
フェイルオーバーの間にクライアントが一部のイベントを見逃すことがあります。イベントを見逃さないようにするには、includeCurrentState
パラメーターを true に設定します。これを有効にすると、クライアントはデータを消去でき、 CacheEntryCreatedEvent
インスタンスすべてを受信し、すべてのキーでこれらのイベントをキャッシュできます。この代わりに、コールバックハンドラーを追加して、Hot Rod クライアントがフェイルオーバーイベントを認識できるようにすることができます。このコールバックメソッドは、クライアントリスナーに影響するクラスタートポロジーの変更に対応する効率的な方法で、クライアントリスナーはフェイルオーバーでどのように動作するかを判断することができます。ニアキャッシュはこの方法を利用し、ClientCacheFailoverEvent
の受信後にニアキャッシュは消去されます。
@ClientCacheFailover
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { // ... @ClientCacheFailover public void handleFailover(ClientCacheFailoverEvent e) { // Deal with client failover, e.g. clear a near cache. } }
ClientCacheFailoverEvent
は、クライアントリスナーがインストールされたノードに障害が発生した場合のみ発生します。