第11章 クラスター化されたカウンターの使用
Data Grid は、オブジェクトの数を記録するカウンターを提供し、クラスター内のすべてのノードに分散されます。
11.1. クラスター化カウンター
クラスター化されたカウンター は、Data Grid クラスターのすべてのノードで分散され、共有されるカウンターです。カウンターは異なる整合性レベル (strong および weak) を持つことができます。
strong/weak と一貫性のあるカウンターには個別のインターフェイスがありますが、どちらもその値の更新をサポートし、現在の値を返し、その値が更新されたときにイベントを提供します。このドキュメントでは、ユースケースに最適なものを選択する上で役立つ詳細を以下に示します。
11.1.1. インストールおよび設定
カウンターの使用を開始するには、Maven の pom.xml
ファイルに依存関係を追加する必要があります。
pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-clustered-counter</artifactId> </dependency>
このカウンターは、本書で後述する CounterManager
インターフェイスを介して、Data Grid 設定ファイルまたはオンデマンドを設定できます。EmbeddedCacheManager
の起動時に、起動時に Data Grid 設定ファイルに設定されたカウンターが作成します。これらのカウンターは Eagerly で開始され、すべてのクラスターのノードで利用できます。
configuration.xml
<infinispan> <cache-container ...> <!-- To persist counters, you need to configure the global state. --> <global-state> <!-- Global state configuration goes here. --> </global-state> <!-- Cache configuration goes here. --> <counters xmlns="urn:infinispan:config:counters:14.0" num-owners="3" reliability="CONSISTENT"> <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/> <strong-counter name="c2" initial-value="2" storage="VOLATILE" lower-bound="0"/> <strong-counter name="c3" initial-value="3" storage="PERSISTENT" upper-bound="5"/> <strong-counter name="c4" initial-value="4" storage="VOLATILE" lower-bound="0" upper-bound="10"/> <strong-counter name="c5" initial-value="0" upper-bound="100" lifespan="60000"/> <weak-counter name="c6" initial-value="5" storage="PERSISTENT" concurrency-level="1"/> </counters> </cache-container> </infinispan>
または、プログラムを使用して GlobalConfigurationBuilder
で以下を行います。
GlobalConfigurationBuilder globalConfigurationBuilder = ...; CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class); builder.numOwner(3).reliability(Reliability.CONSISTENT); builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE); builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE); builder.addStrongCounter().name("c5").initialValue(0).upperBound(100).lifespan(60000); builder.addWeakCounter().name("c6").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);
一方、このカウンターは、EmbeddedCacheManager
を初期化した後にいつでも設定することができます。
CounterManager manager = ...; manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build()); manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(0).upperBound(100).lifespan(60000).build()); manager.defineCounter("c6", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
CounterConfiguration
は変更できず、再利用できます。
カウンターが正常に設定されていると、defineCounter()
メソッドは true
を返します。そうでない場合は、true
を返します。ただし、設定が無効な場合は、メソッドによって CounterConfigurationException
が発生します。カウンターがすでに定義されているかを調べるには、isDefined()
メソッドを使用します。
CounterManager manager = ... if (!manager.isDefined("someCounter")) { manager.define("someCounter", ...); }
関連情報
11.1.1.1. カウンター名のリスト表示
定義されたすべてのカウンターをリスト表示するには、CounterManager.getCounterNames()
メソッドは、クラスター全体で作成されたすべてのカウンター名のコレクションを返します。
11.1.2. CounterManager
インターフェイス
CounterManager
インターフェイスは、カウンターを定義、取得、および削除するエントリーポイントです。
埋め込みデプロイメント
CounterManager
は EmbeddedCacheManager
の作成を自動的にリッスンし、EmbeddedCacheManager
ごとのインスタンスの登録を続行します。カウンター状態を保存し、デフォルトのカウンターの設定に必要なキャッシュを開始します。
CounterManager
の取得は、以下の例のように EmbeddedCounterManagerFactory.asCounterManager(EmbeddedCacheManager)
を呼び出すだけです。
// create or obtain your EmbeddedCacheManager EmbeddedCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);
サーバーデプロイメント
Hot Rod クライアントの場合、CounterManager
は RemoteCacheManager に登録されており、以下のように取得できます。
// create or obtain your RemoteCacheManager RemoteCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);
11.1.2.1. CounterManager を介したカウンターの削除
Strong/WeakCounter
と CounterManager
でカウンターを削除するのに違いがあります。CounterManager.remove(String)
は、クラスターからカウンター値を削除し、ローカルカウンターインスタンスのカウンターに登録されているすべてのリスナーを削除します。さらに、カウンターインスタンスは再利用可能ではなくなり、無効な結果が返される可能性があります。
一方で、Strong/WeakCounter
を削除するとカウンター値のみが削除されます。インスタンスは引き続き再利用でき、リスナーは引き続き動作します。
削除後にアクセスされると、カウンターは再作成されます。
11.1.3. カウンター
カウンターは、strong (StrongCounter
) または weak(WeakCounter
) になり、いずれも名前で識別されます。各インターフェイスには特定のインターフェイスがありますが、ロジック (つまり各操作により CompletableFuture
が返される) を共有しているため、更新イベントが返され、初期値にリセットできます。
非同期 API を使用しない場合は、sync()
メソッドを介して同期カウンターを返すことができます。API は同じですが、CompletableFuture
の戻り値はありません。
以下のメソッドは、両方のインターフェイスに共通しています。
String getName(); CompletableFuture<Long> getValue(); CompletableFuture<Void> reset(); <T extends CounterListener> Handle<T> addListener(T listener); CounterConfiguration getConfiguration(); CompletableFuture<Void> remove(); SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
-
getName()
はカウンター名 (identifier) を返します。 -
getValue()
は現在のカウンターの値を返します。 -
reset()
により、カウンターの値を初期値にリセットできます。 -
reset()
はリスナーを登録し、更新イベントを受信します。詳細については、通知およびイベント セクションをご覧ください。 -
getConfiguration()
はカウンターによって使用される設定を返します。 -
remove()
はクラスターからカウンター値を削除します。インスタンスは引き続き使用でき、リスナーが保持されます。 -
sync()
は同期カウンターを作成します。
削除後にアクセスされると、カウンターは再作成されます。
11.1.3.1. StrongCounter
インターフェイス: 一貫性または境界が明確になります。
strong カウンターは、Data Grid キャッシュに保存されている単一のキーを使用して、必要な一貫性を提供します。すべての更新は、その値を更新するためにキーロックの下で実行されます。一方、読み取りはロックを取得し、現在の値を読み取ります。さらに、このスキームではカウンター値をバインドでき、比較および設定/スワップなどのアトミック操作を提供できます。
StrongCounter
は、getStrongCounter()
メソッドを使用して CounterManager
から取得することができます。たとえば、以下のようになります。
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
すべての操作は単一のキーに到達するため、StrongCounter
は競合レートが高くなります。
StrongCounter
インターフェイスでは、以下のメソッドを追加します。
default CompletableFuture<Long> incrementAndGet() { return addAndGet(1L); } default CompletableFuture<Long> decrementAndGet() { return addAndGet(-1L); } CompletableFuture<Long> addAndGet(long delta); CompletableFuture<Boolean> compareAndSet(long expect, long update); CompletableFuture<Long> compareAndSwap(long expect, long update);
-
incrementAndGet()
はカウンターを 1 つずつ増分し、新しい値を返します。 -
decrementAndGet()
は、1 つずつカウンターをデクリメントし、新しい値を返します。 -
addAndGet()
は、delta をカウンターの値に追加し、新しい値を返します。 -
compareAndSet()
およびcompareAndSwap()
は、現在の値が想定される場合にカウンターの値を設定します。
CompletableFuture
が完了すると、操作は完了とみなされます。
compare-and-set と compare-and-swap の相違点は、操作に成功した場合に、compare-and-set は true を返しますが、compare-and-swap は前の値をか返すことです。戻り値が期待値と同じ場合は、compare-and-swap が正常になります。
11.1.3.1.1. バインドされた StrongCounter
バインドされている場合、上記の更新メソッドはすべて、下限または上限に達すると CounterOutOfBoundsException
を出力します。例外には、どちら側にバインドが到達したかを確認するための次のメソッドがあります。
public boolean isUpperBoundReached(); public boolean isLowerBoundReached();
11.1.3.1.2. ユースケース
強力なカウンターは、次の使用例に適しています。
- 各更新後にカウンターの値が必要な場合 (例: クラスター単位の ID ジェネレーターまたはシーケンス)
- バインドされたカウンターが必要な場合は (例: レートリミッター)
11.1.3.1.3. 使用例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter"); // incrementing the counter System.out.println("new value is " + counter.incrementAndGet().get()); // decrement the counter's value by 100 using the functional API counter.addAndGet(-100).thenApply(v -> { System.out.println("new value is " + v); return null; }).get(); // alternative, you can do some work while the counter is updated CompletableFuture<Long> f = counter.addAndGet(10); // ... do some work ... System.out.println("new value is " + f.get()); // and then, check the current value System.out.println("current value is " + counter.getValue().get()); // finally, reset to initial value counter.reset().get(); System.out.println("current value is " + counter.getValue().get()); // or set to a new value if zero System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
以下に、バインドされたカウンターを使用する別の例を示します。
StrongCounter counter = counterManager.getStrongCounter("bounded_counter"); // incrementing the counter try { System.out.println("new value is " + counter.addAndGet(100).get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } } // now using the functional API counter.addAndGet(-100).handle((v, throwable) -> { if (throwable != null) { Throwable cause = throwable.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } return null; } System.out.println("new value is " + v); return null; }).get();
Compare-and-set と Compare-and-swap の比較例:
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue, newValue; do { oldValue = counter.getValue().get(); newValue = someLogic(oldValue); } while (!counter.compareAndSet(oldValue, newValue).get());
compare-and-swap では、呼び出しカウンターの呼び出し (counter.getValue()
) が 1 つ保存されます。
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue = counter.getValue().get(); long currentValue, newValue; do { currentValue = oldValue; newValue = someLogic(oldValue); } while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
stong カウンターをレートリミッターとして使用するには、以下のように upper-bound
パラメーターおよび lifespan
パラメーターを設定します。
// 5 request per minute CounterConfiguration configuration = CounterConfiguration.builder(CounterType.BOUNDED_STRONG) .upperBound(5) .lifespan(60000) .build(); counterManager.defineCounter("rate_limiter", configuration); StrongCounter counter = counterManager.getStrongCounter("rate_limiter"); // on each operation, invoke try { counter.incrementAndGet().get(); // continue with operation } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { if (e.getCause() instanceof CounterOutOfBoundsException) { // maximum rate. discard operation return; } else { // unexpected error, handling property } }
lifespan
パラメーターは実験的な機能で、今後のバージョンで削除される可能性があります。
11.1.3.2. WeakCounter
インターフェイス: 速度が必要な場合
WeakCounter
は、カウンターの値を Data Grid キャッシュの複数のキーに保存します作成されたキーの数は concurrency-level
属性によって設定されます。各キーはカウンターの値の一部の状態を保存し、同時に更新できます。StrongCounter
よりも優れた点は、キャッシュの競合率が低いことです。一方、値の読み取りはよりコストが高く、バインドは許可されません。
リセット操作は注意して行う必要があります。これは アトミックではなく、中間値を生成します。これらの値は、読み取り操作および登録されたリスナーによって確認できます。
WeakCounter
は、getWeakCounter()
メソッドを使用して CounterManager
から取得できます。たとえば、以下のようになります。
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
11.1.3.2.1. weak カウンターインターフェイス
WeakCounter
は、以下のメソッドを追加します。
default CompletableFuture<Void> increment() { return add(1L); } default CompletableFuture<Void> decrement() { return add(-1L); } CompletableFuture<Void> add(long delta);
これらは `StrongCounter のメソッドと似ていますが、新しい値は返されません。
11.1.3.2.2. ユースケース
weak カウンターは、更新操作の結果が必要ない場合やカウンターの値があまり必要でないユースケースに最適です。統計の収集は、このようなユースケースの良い例になります。
11.1.3.2.3. 例
以下では、弱いカウンターの使用例を示します。
WeakCounter counter = counterManager.getWeakCounter("my_counter"); // increment the counter and check its result counter.increment().get(); System.out.println("current value is " + counter.getValue()); CompletableFuture<Void> f = counter.add(-100); //do some work f.get(); //wait until finished System.out.println("current value is " + counter.getValue().get()); //using the functional API counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get(); System.out.println("current value is " + counter.getValue().get());
11.1.4. 通知およびイベント
strong カウンターと weak カウンターの両方が、更新イベントを受信するためにリスナーをサポートします。リスナーは CounterListener
を実装する必要があり、これを以下の方法で登録できます。
<T extends CounterListener> Handle<T> addListener(T listener);
CounterListener
には以下のインターフェイスがあります。
public interface CounterListener { void onUpdate(CounterEvent entry); }
返される Handle
オブジェクトには、CounterListener
が必要なくなったときに削除するという主な目的があります。また、処理している CounterListener
インスタンスにアクセスできます。これには、以下のインターフェイスがあります。
public interface Handle<T extends CounterListener> { T getCounterListener(); void remove(); }
最後に、CounterEvent
には、以前と現在の値と状態があります。これには、以下のインターフェイスがあります。
public interface CounterEvent { long getOldValue(); State getOldState(); long getNewValue(); State getNewState(); }
状態は、非有界である strong カウンターおよび weak カウンターでは常に State.VALID
になります。State.LOWER_BOUND_REACHED
および State.UPPER_BOUND_REACHED
は有界である strong カウンターのみに有効です。
weak カウンター reset()
操作は、中間値で複数の通知をトリガーします。