此内容没有您所选择的语言版本。

Chapter 10. Clustered Counters


Clustered counters are counters which are distributed and shared among all nodes in the Data Grid cluster. Counters can have different consistency levels: strong and weak.

Although a strong/weak consistent counter has separate interfaces, both support updating its value, return the current value and they provide events when its value is updated. Details are provided below in this document to help you choose which one fits best your uses-case.

10.1. Installation and Configuration

In order to start using the counters, you needs to add the dependency in your Maven pom.xml file:

pom.xml

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-clustered-counter</artifactId>
</dependency>

The counters can be configured Data Grid configuration file or on-demand via the CounterManager interface detailed later in this document. A counters configured in Data Grid configuration file is created at boot time when the EmbeddedCacheManager is starting. Theses counters are started eagerly and they are available in all the cluster’s nodes.

configuration.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan>
    <cache-container ...>
        <!-- if needed to persist counter, global state needs to be configured -->
        <global-state>
            ...
        </global-state>
        <!-- your caches configuration goes here -->
         <counters xmlns="urn:infinispan:config:counters:10.1" num-owners="3" reliability="CONSISTENT">
             <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/>
             <strong-counter name="c2" initial-value="2" storage="VOLATILE">
                 <lower-bound value="0"/>
             </strong-counter>
             <strong-counter name="c3" initial-value="3" storage="PERSISTENT">
                 <upper-bound value="5"/>
             </strong-counter>
             <strong-counter name="c4" initial-value="4" storage="VOLATILE">
                 <lower-bound value="0"/>
                 <upper-bound value="10"/>
             </strong-counter>
             <weak-counter name="c5" initial-value="5" storage="PERSISTENT" concurrency-level="1"/>
         </counters>
    </cache-container>
</infinispan>

or programmatically, in the GlobalConfigurationBuilder:

GlobalConfigurationBuilder globalConfigurationBuilder = ...;
CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class);
builder.numOwner(3).reliability(Reliability.CONSISTENT);
builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE);
builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE);
builder.addWeakCounter().name("c5").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);

On other hand, the counters can be configured on-demand, at any time after the EmbeddedCacheManager is initialized.

CounterManager manager = ...;
manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build());
manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build());
manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build());
manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build());
manager.defineCounter("c2", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
Note

CounterConfiguration is immutable and can be reused.

The method defineCounter() will return true if the counter is successful configured or false otherwise. However, if the configuration is invalid, the method will throw a CounterConfigurationException. To find out if a counter is already defined, use the method isDefined().

CounterManager manager = ...
if (!manager.isDefined("someCounter")) {
    manager.define("someCounter", ...);
}

Per cluster attributes:

  • num-owners: Sets the number of counter’s copies to keep cluster-wide. A smaller number will make update operations faster but will support a lower number of server crashes. It must be positive and its default value is 2.
  • reliability: Sets the counter’s update behavior in a network partition. Default value is AVAILABLE and valid values are:

    • AVAILABLE: all partitions are able to read and update the counter’s value.
    • CONSISTENT: only the primary partition (majority of nodes) will be able to read and update the counter’s value. The remaining partitions can only read its value.

Per counter attributes:

  • initial-value [common]: Sets the counter’s initial value. Default is 0 (zero).
  • storage [common]: Sets the counter’s behavior when the cluster is shutdown and restarted. Default value is VOLATILE and valid values are:

    • VOLATILE: the counter’s value is only available in memory. The value will be lost when a cluster is shutdown.
    • PERSISTENT: the counter’s value is stored in a private and local persistent store. The value is kept when the cluster is shutdown and restored after a restart.
Note

On-demand and VOLATILE counters will lose its value and configuration after a cluster shutdown. They must be defined again after the restart.

  • lower-bound [strong]: Sets the strong consistent counter’s lower bound. Default value is Long.MIN_VALUE.
  • upper-bound [strong]: Sets the strong consistent counter’s upper bound. Default value is Long.MAX_VALUE.
Note

If neither the lower-bound or upper-bound are configured, the strong counter is set as unbounded.

Warning

The initial-value must be between lower-bound and upper-bound inclusive.

  • concurrency-level [weak]: Sets the number of concurrent updates. Its value must be positive and the default value is 16.

10.1.1. List counter names

To list all the counters defined, the method CounterManager.getCounterNames() returns a collection of all counter names created cluster-wide.

10.2. The CounterManager interface.

The CounterManager interface is the entry point to define, retrieve and remove a counter. It automatically listen to the creation of EmbeddedCacheManager and proceeds with the registration of an instance of it per EmbeddedCacheManager. It starts the caches needed to store the counter state and configures the default counters.

Retrieving the CounterManager is as simple as invoke the EmbeddedCounterManagerFactory.asCounterManager(EmbeddedCacheManager) as shown in the example below:

// create or obtain your EmbeddedCacheManager
EmbeddedCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);

For Hot Rod client, the CounterManager is registered in the RemoteCacheManager and it can be retrieved like:

// create or obtain your RemoteCacheManager
RemoteCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);

10.2.1. Remove a counter via CounterManager

Warning

use with caution.

There is a difference between remove a counter via the Strong/WeakCounter interfaces and the CounterManager. The CounterManager.remove(String) removes the counter value from the cluster and removes all the listeners registered in the counter in the local counter instance. In addition, the counter instance is no longer reusable and it may return an invalid results.

On the other side, the Strong/WeakCounter removal only removes the counter value. The instance can still be reused and the listeners still works.

Note

The counter is re-created if it is accessed after a removal.

10.3. The Counter

A counter can be strong (StrongCounter) or weakly consistent (WeakCounter) and both is identified by a name. They have a specific interface but they share some logic, namely, both of them are asynchronous ( a CompletableFuture is returned by each operation), provide an update event and can be reset to its initial value.

If you don’t want to use the async API, it is possible to return a synchronous counter via sync() method. The API is the same but without the CompletableFuture return value.

The following methods are common to both interfaces:

String getName();
CompletableFuture<Long> getValue();
CompletableFuture<Void> reset();
<T extends CounterListener> Handle<T> addListener(T listener);
CounterConfiguration getConfiguration();
CompletableFuture<Void> remove();
SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
  • getName() returns the counter name (identifier).
  • getValue() returns the current counter’s value.
  • reset() allows to reset the counter’s value to its initial value.
  • addListener() register a listener to receive update events. More details about it in the Notification and Events section.
  • getConfiguration() returns the configuration used by the counter.
  • remove() removes the counter value from the cluster. The instance can still be used and the listeners are kept.
  • sync() creates a synchronous counter.
Note

The counter is re-created if it is accessed after a removal.

10.3.1. The StrongCounter interface: when the consistency or bounds matters.

The strong counter provides uses a single key stored in Data Grid cache to provide the consistency needed. All the updates are performed under the key lock to updates its values. On other hand, the reads don’t acquire any locks and reads the current value. Also, with this scheme, it allows to bound the counter value and provide atomic operations like compare-and-set/swap.

A StrongCounter can be retrieved from the CounterManager by using the getStrongCounter() method. As an example:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
Warning

Since every operation will hit a single key, the StrongCounter has a higher contention rate.

The StrongCounter interface adds the following method:

default CompletableFuture<Long> incrementAndGet() {
   return addAndGet(1L);
}

default CompletableFuture<Long> decrementAndGet() {
   return addAndGet(-1L);
}

CompletableFuture<Long> addAndGet(long delta);

CompletableFuture<Boolean> compareAndSet(long expect, long update);

CompletableFuture<Long> compareAndSwap(long expect, long update);
  • incrementAndGet() increments the counter by one and returns the new value.
  • decrementAndGet() decrements the counter by one and returns the new value.
  • addAndGet() adds a delta to the counter’s value and returns the new value.
  • compareAndSet() and compareAndSwap() atomically set the counter’s value if the current value is the expected.
Note

A operation is considered completed when the CompletableFuture is completed.

Note

The difference between compare-and-set and compare-and-swap is that the former returns true if the operation succeeds while the later returns the previous value. The compare-and-swap is successful if the return value is the same as the expected.

10.3.1.1. Bounded StrongCounter

When bounded, all the update method above will throw a CounterOutOfBoundsException when they reached the lower or upper bound. The exception has the following methods to check which side bound has been reached:

public boolean isUpperBoundReached();
public boolean isLowerBoundReached();

10.3.1.2. Uses cases

The strong counter fits better in the following uses cases:

  • When counter’s value is needed after each update (example, cluster-wise ids generator or sequences)
  • When a bounded counter is needed (example, rate limiter)

10.3.1.3. Usage Examples

StrongCounter counter = counterManager.getStrongCounter("unbounded_counter");

// incrementing the counter
System.out.println("new value is " + counter.incrementAndGet().get());

// decrement the counter's value by 100 using the functional API
counter.addAndGet(-100).thenApply(v -> {
   System.out.println("new value is " + v);
   return null;
}).get();

// alternative, you can do some work while the counter is updated
CompletableFuture<Long> f = counter.addAndGet(10);
// ... do some work ...
System.out.println("new value is " + f.get());

// and then, check the current value
System.out.println("current value is " + counter.getValue().get());

// finally, reset to initial value
counter.reset().get();
System.out.println("current value is " + counter.getValue().get());

// or set to a new value if zero
System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));

And below, there is another example using a bounded counter:

StrongCounter counter = counterManager.getStrongCounter("bounded_counter");

// incrementing the counter
try {
    System.out.println("new value is " + counter.addAndGet(100).get());
} catch (ExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof CounterOutOfBoundsException) {
       if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
          System.out.println("ops, upper bound reached.");
       } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
          System.out.println("ops, lower bound reached.");
       }
    }
}

// now using the functional API
counter.addAndGet(-100).handle((v, throwable) -> {
   if (throwable != null) {
      Throwable cause = throwable.getCause();
      if (cause instanceof CounterOutOfBoundsException) {
         if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
            System.out.println("ops, upper bound reached.");
         } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
            System.out.println("ops, lower bound reached.");
         }
      }
      return null;
   }
   System.out.println("new value is " + v);
   return null;
}).get();

Compare-and-set vs Compare-and-swap examples:

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue, newValue;
do {
   oldValue = counter.getValue().get();
   newValue = someLogic(oldValue);
} while (!counter.compareAndSet(oldValue, newValue).get());

With compare-and-swap, it saves one invocation counter invocation (counter.getValue())

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue = counter.getValue().get();
long currentValue, newValue;
do {
   currentValue = oldValue;
   newValue = someLogic(oldValue);
} while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);

10.3.2. The WeakCounter interface: when speed is needed

The WeakCounter stores the counter’s value in multiple keys in Data Grid cache. The number of keys created is configured by the concurrency-level attribute. Each key stores a partial state of the counter’s value and it can be updated concurrently. It main advantage over the StrongCounter is the lower contention in the cache. On other hand, the read of its value is more expensive and bounds are not allowed.

Warning

The reset operation should be handled with caution. It is not atomic and it produces intermediates values. These value may be seen by a read operation and by any listener registered.

A WeakCounter can be retrieved from the CounterManager by using the getWeakCounter() method. As an example:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getWeakCounter("my-counter);

10.3.2.1. Weak Counter Interface

The WeakCounter adds the following methods:

default CompletableFuture<Void> increment() {
   return add(1L);
}

default CompletableFuture<Void> decrement() {
   return add(-1L);
}

CompletableFuture<Void> add(long delta);

They are similar to the `StrongCounter’s methods but they don’t return the new value.

10.3.2.2. Uses cases

The weak counter fits best in uses cases where the result of the update operation is not needed or the counter’s value is not required too often. Collecting statistics is a good example of such an use case.

10.3.2.3. Examples

Below, there is an example of the weak counter usage.

WeakCounter counter = counterManager.getWeakCounter("my_counter");

// increment the counter and check its result
counter.increment().get();
System.out.println("current value is " + counter.getValue());

CompletableFuture<Void> f = counter.add(-100);
//do some work
f.get(); //wait until finished
System.out.println("current value is " + counter.getValue().get());

//using the functional API
counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get();
System.out.println("current value is " + counter.getValue().get());

10.4. Notifications and Events

Both strong and weak counter supports a listener to receive its updates events. The listener must implement CounterListener and it can be registered by the following method:

<T extends CounterListener> Handle<T> addListener(T listener);

The CounterListener has the following interface:

public interface CounterListener {
   void onUpdate(CounterEvent entry);
}

The Handle object returned has the main goal to remove the CounterListener when it is not longer needed. Also, it allows to have access to the CounterListener instance that is it handling. It has the following interface:

public interface Handle<T extends CounterListener> {
   T getCounterListener();
   void remove();
}

Finally, the CounterEvent has the previous and current value and state. It has the following interface:

public interface CounterEvent {
   long getOldValue();
   State getOldState();
   long getNewValue();
   State getNewState();
}
Note

The state is always State.VALID for unbounded strong counter and weak counter. State.LOWER_BOUND_REACHED and State.UPPER_BOUND_REACHED are only valid for bounded strong counters.

Warning

The weak counter reset() operation will trigger multiple notification with intermediate values.

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.