Chapter 12. Clustered Counters
Clustered counters are counters which are distributed and shared among all nodes in the Red Hat Data Grid cluster. Counters can have different consistency levels: strong and weak.
Although a strong/weak consistent counter has separate interfaces, both support updating its value, return the current value and they provide events when its value is updated. Details are provided below in this document to help you choose which one fits best your uses-case.
12.1. Installation and Configuration
In order to start using the counters, you needs to add the dependency in your Maven pom.xml
file:
pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-clustered-counter</artifactId> <version>${version.infinispan}</version> </dependency>
Replace ${version.infinispan}
with the appropriate version of Red Hat Data Grid.
The counters can be configured Red Hat Data Grid configuration file or on-demand via the CounterManager
interface detailed later in this document. A counters configured in Red Hat Data Grid configuration file is created at boot time when the EmbeddedCacheManager
is starting. Theses counters are started eagerly and they are available in all the cluster’s nodes.
configuration.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan> <cache-container ...> <!-- if needed to persist counter, global state needs to be configured --> <global-state> ... </global-state> <!-- your caches configuration goes here --> <counters xmlns="urn:infinispan:config:counters:9.2" num-owners="3" reliability="CONSISTENT"> <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/> <strong-counter name="c2" initial-value="2" storage="VOLATILE"> <lower-bound value="0"/> </strong-counter> <strong-counter name="c3" initial-value="3" storage="PERSISTENT"> <upper-bound value="5"/> </strong-counter> <strong-counter name="c4" initial-value="4" storage="VOLATILE"> <lower-bound value="0"/> <upper-bound value="10"/> </strong-counter> <weak-counter name="c5" initial-value="5" storage="PERSISTENT" concurrency-level="1"/> </counters> </cache-container> </infinispan>
or programmatically, in the GlobalConfigurationBuilder
:
GlobalConfigurationBuilder globalConfigurationBuilder = ...; CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class); builder.numOwner(3).reliability(Reliability.CONSISTENT); builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE); builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE); builder.addWeakCounter().name("c5").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);
On other hand, the counters can be configured on-demand, at any time after the EmbeddedCacheManager
is initialized.
CounterManager manager = ...; manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT)build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build()); manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
CounterConfiguration
is immutable and can be reused.
The method defineCounter()
will return true
if the counter is successful configured or false
otherwise. However, if the configuration is invalid, the method will throw a CounterConfigurationException
. To find out if a counter is already defined, use the method isDefined()
.
CounterManager manager = ... if (!manager.isDefined("someCounter")) { manager.define("someCounter", ...); }
Per cluster attributes:
-
num-owners
: Sets the number of counter’s copies to keep cluster-wide. A smaller number will make update operations faster but will support a lower number of server crashes. It must be positive and its default value is2
. reliability
: Sets the counter’s update behavior in a network partition. Default value isAVAILABLE
and valid values are:-
AVAILABLE
: all partitions are able to read and update the counter’s value. -
CONSISTENT
: only the primary partition (majority of nodes) will be able to read and update the counter’s value. The remaining partitions can only read its value.
-
Per counter attributes:
-
initial-value
[common]: Sets the counter’s initial value. Default is0
(zero). storage
[common]: Sets the counter’s behavior when the cluster is shutdown and restarted. Default value isVOLATILE
and valid values are:-
VOLATILE
: the counter’s value is only available in memory. The value will be lost when a cluster is shutdown. -
PERSISTENT
: the counter’s value is stored in a private and local persistent store. The value is kept when the cluster is shutdown and restored after a restart.
-
On-demand and VOLATILE
counters will lose its value and configuration after a cluster shutdown. They must be defined again after the restart.
-
lower-bound
[strong]: Sets the strong consistent counter’s lower bound. Default value isLong.MIN_VALUE
. -
upper-bound
[strong]: Sets the strong consistent counter’s upper bound. Default value isLong.MAX_VALUE
.
If neither the lower-bound
or upper-bound
are configured, the strong counter is set as unbounded.
The initial-value
must be between lower-bound
and upper-bound
inclusive.
-
concurrency-level
[weak]: Sets the number of concurrent updates. Its value must be positive and the default value is16
.
12.1.1. List counter names
To list all the counters defined, the method CounterManager.getCounterNames()
returns a collection of all counter names created cluster-wide.
12.2. The CounterManager
interface.
The CounterManager
interface is the entry point to define, retrieve and remove a counter. It automatically listen to the creation of EmbeddedCacheManager
and proceeds with the registration of an instance of it per EmbeddedCacheManager
. It starts the caches needed to store the counter state and configures the default counters.
Retrieving the CounterManager
is as simple as invoke the EmbeddedCounterManagerFactory.asCounterManager(EmbeddedCacheManager)
as shown in the example below:
// create or obtain your EmbeddedCacheManager EmbeddedCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);
For Hot Rod client, the CounterManager
is registered in the RemoteCacheManager and it can be retrieved like:
// create or obtain your RemoteCacheManager RemoteCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);
Hot Rod messages format can be found in Hot Rod Protocol 2.7
12.2.1. Remove a counter via CounterManager
use with caution.
There is a difference between remove a counter via the Strong/WeakCounter
interfaces and the CounterManager
. The CounterManager.remove(String)
removes the counter value from the cluster and removes all the listeners registered in the counter in the local counter instance. In addition, the counter instance is no longer reusable and it may return an invalid results.
On the other side, the Strong/WeakCounter
removal only removes the counter value. The instance can still be reused and the listeners still works.
The counter is re-created if it is accessed after a removal.
12.3. The Counter
A counter can be strong (StrongCounter
) or weakly consistent (WeakCounter
) and both is identified by a name. They have a specific interface but they share some logic, namely, both of them are asynchronous ( a CompletableFuture
is returned by each operation), provide an update event and can be reset to its initial value.
If you don’t want to use the async API, it is possible to return a synchronous counter via sync()
method. The API is the same but without the CompletableFuture
return value.
The following methods are common to both interfaces:
String getName(); CompletableFuture<Long> getValue(); CompletableFuture<Void> reset(); <T extends CounterListener> Handle<T> addListener(T listener); CounterConfiguration getConfiguration(); CompletableFuture<Void> remove(); SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
-
getName()
returns the counter name (identifier). -
getValue()
returns the current counter’s value. -
reset()
allows to reset the counter’s value to its initial value. -
addListener()
register a listener to receive update events. More details about it in the Notification and Events section. -
getConfiguration()
returns the configuration used by the counter. -
remove()
removes the counter value from the cluster. The instance can still be used and the listeners are kept. -
sync()
creates a synchronous counter.
The counter is re-created if it is accessed after a removal.
12.3.1. The StrongCounter
interface: when the consistency or bounds matters.
The strong counter provides uses a single key stored in Red Hat Data Grid cache to provide the consistency needed. All the updates are performed under the key lock to updates its values. On other hand, the reads don’t acquire any locks and reads the current value. Also, with this scheme, it allows to bound the counter value and provide atomic operations like compare-and-set/swap.
A StrongCounter
can be retrieved from the CounterManager
by using the getStrongCounter()
method. As an example:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getStrongCounter("my-counter);
Since every operation will hit a single key, the StrongCounter
has a higher contention rate.
The StrongCounter
interface adds the following method:
default CompletableFuture<Long> incrementAndGet() { return addAndGet(1L); } default CompletableFuture<Long> decrementAndGet() { return addAndGet(-1L); } CompletableFuture<Long> addAndGet(long delta); CompletableFuture<Boolean> compareAndSet(long expect, long update); CompletableFuture<Long> compareAndSwap(long expect, long update);
-
incrementAndGet()
increments the counter by one and returns the new value. -
decrementAndGet()
decrements the counter by one and returns the new value. -
addAndGet()
adds a delta to the counter’s value and returns the new value. -
compareAndSet()
andcompareAndSwap()
atomically set the counter’s value if the current value is the expected.
A operation is considered completed when the CompletableFuture
is completed.
The difference between compare-and-set and compare-and-swap is that the former returns true if the operation succeeds while the later returns the previous value. The compare-and-swap is successful if the return value is the same as the expected.
12.3.1.1. Bounded StrongCounter
When bounded, all the update method above will throw a CounterOutOfBoundsException
when they reached the lower or upper bound. The exception has the following methods to check which side bound has been reached:
public boolean isUpperBoundReached(); public boolean isLowerBoundReached();
12.3.1.2. Uses cases
The strong counter fits better in the following uses cases:
- When counter’s value is needed after each update (example, cluster-wise ids generator or sequences)
- When a bounded counter is needed (example, rate limiter)
12.3.1.3. Usage Examples
StrongCounter counter = counterManager.getStrongCounter("unbounded_coutner"); // incrementing the counter System.out.println("new value is " + counter.incrementAndGet().get()); // decrement the counter's value by 100 using the functional API counter.addAndGet(-100).thenApply(v -> { System.out.println("new value is " + v); return null; }).get // alternative, you can do some work while the counter is updated CompletableFuture<Long> f = counter.addAndGet(10); // ... do some work ... System.out.println("new value is " + f.get()); // and then, check the current value System.out.println("current value is " + counter.getValue().get()); // finally, reset to initial value counter.reset().get(); System.out.println("current value is " + counter.getValue().get()); // or set to a new value if zero System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
And below, there is another example using a bounded counter:
StrongCounter counter = counterManager.getStrongCounter("bounded_counter"); // incrementing the counter try { System.out.println("new value is " + counter.addAndGet(100).get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } } // now using the functional API counter.addAndGet(-100).handle((v, throwable) -> { if (throwable != null) { Throwable cause = throwable.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } return null; } System.out.println("new value is " + v); return null; }).get();
Compare-and-set vs Compare-and-swap examples:
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue, newValue; do { oldValue = counter.getValue().get(); newValue = someLogic(oldValue); } while (!counter.compareAndSet(oldValue, newValue).get());
With compare-and-swap, it saves one invocation counter invocation (counter.getValue()
)
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue = counter.getValue().get(); long currentValue, newValue; do { currentValue = oldValue; newValue = someLogic(oldValue); } while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
12.3.2. The WeakCounter
interface: when speed is needed
The WeakCounter
stores the counter’s value in multiple keys in Red Hat Data Grid cache. The number of keys created is configured by the concurrency-level
attribute. Each key stores a partial state of the counter’s value and it can be updated concurrently. It main advantage over the StrongCounter
is the lower contention in the cache. On other hand, the read of its value is more expensive and bounds are not allowed.
The reset operation should be handled with caution. It is not atomic and it produces intermediates values. These value may be seen by a read operation and by any listener registered.
A WeakCounter
can be retrieved from the CounterManager
by using the getWeakCounter()
method. As an example:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
12.3.2.1. Weak Counter Interface
The WeakCounter
adds the following methods:
default CompletableFuture<Void> increment() { return add(1L); } default CompletableFuture<Void> decrement() { return add(-1L); } CompletableFuture<Void> add(long delta);
They are similar to the `StrongCounter’s methods but they don’t return the new value.
12.3.2.2. Uses cases
The weak counter fits best in uses cases where the result of the update operation is not needed or the counter’s value is not required too often. Collecting statistics is a good example of such an use case.
12.3.2.3. Examples
Below, there is an example of the weak counter usage.
WeakCounter counter = counterManager.getWeakCounter("my_counter"); // increment the counter and check its result counter.increment().get(); System.out.println("current value is " + counter.getValue().get()); CompletableFuture<Void> f = counter.add(-100); //do some work f.get(); //wait until finished System.out.println("current value is " + counter.getValue().get()); //using the functional API counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get(); System.out.println("current value is " + counter.getValue().get());
12.4. Notifications and Events
Both strong and weak counter supports a listener to receive its updates events. The listener must implement CounterListener
and it can be registerer by the following method:
<T extends CounterListener> Handle<T> addListener(T listener);
The CounterLister
has the following interface:
public interface CounterListener { void onUpdate(CounterEvent entry); }
The Handle
object returned has the main goal to remove the CounterListener
when it is not longer needed. Also, it allows to have access to the CounterListener
instance that is it handling. It has the following interface:
public interface Handle<T extends CounterListener> { T getCounterListener(); void remove(); }
Finally, the CounterEvent
has the previous and current value and state. It has the following interface:
public interface CounterEvent { long getOldValue(); State getOldState(); long getNewValue(); State getNewState(); }
The state is always State.VALID
for unbounded strong counter and weak counter. State.LOWER_BOUND_REACHED
and State.UPPER_BOUND_REACHED
are only valid for bounded strong counters.
The weak counter reset()
operation will trigger multiple notification with intermediate values.