第 11 章 使用集群计数器
Data Grid 提供记录对象的计数,并在集群中的所有节点之间分布的计数器。
11.1. 集群计数器
集群计数器 是在 Data Grid 集群中所有节点分布和共享的计数器。计数器可以具有不同的一致性级别:强和弱。
虽然强/弱一致性计数器具有单独的接口,但支持更新其值,但在更新其值时返回当前值,并提供事件。在本文档中提供了详细信息,以帮助您选择最适合您的用例。
11.1.1. 安装和配置
要开始使用计数器,您需要在 Maven pom.xml
文件中添加依赖项:
pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-clustered-counter</artifactId> </dependency>
计数器可以通过本文档中详述的 CounterManager
接口配置 Data Grid 配置文件或按需。当 EmbeddedCacheManager
启动时,在启动时创建在 Data Grid 配置文件中配置的计数器。这些计数器可立即启动,它们在所有集群节点中可用。
configuration.xml
<infinispan> <cache-container ...> <!-- To persist counters, you need to configure the global state. --> <global-state> <!-- Global state configuration goes here. --> </global-state> <!-- Cache configuration goes here. --> <counters xmlns="urn:infinispan:config:counters:15.0" num-owners="3" reliability="CONSISTENT"> <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/> <strong-counter name="c2" initial-value="2" storage="VOLATILE" lower-bound="0"/> <strong-counter name="c3" initial-value="3" storage="PERSISTENT" upper-bound="5"/> <strong-counter name="c4" initial-value="4" storage="VOLATILE" lower-bound="0" upper-bound="10"/> <strong-counter name="c5" initial-value="0" upper-bound="100" lifespan="60000"/> <weak-counter name="c6" initial-value="5" storage="PERSISTENT" concurrency-level="1"/> </counters> </cache-container> </infinispan>
或以编程方式在 GlobalConfigurationBuilder
中:
GlobalConfigurationBuilder globalConfigurationBuilder = ...; CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class); builder.numOwner(3).reliability(Reliability.CONSISTENT); builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE); builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE); builder.addStrongCounter().name("c5").initialValue(0).upperBound(100).lifespan(60000); builder.addWeakCounter().name("c6").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);
另一方面,可以在初始化 EmbeddedCacheManager
后随时配置计数器。
CounterManager manager = ...; manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build()); manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(0).upperBound(100).lifespan(60000).build()); manager.defineCounter("c6", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
CounterConfiguration
是不可变的,可以被重复使用。
如果计数器配置成功或为 false
,则方法 defineCounter ()
将返回 true
。但是,如果配置无效,则方法会抛出 CounterConfigurationException
。要查找是否已定义计数器,请使用 method isDefined ()
。
CounterManager manager = ... if (!manager.isDefined("someCounter")) { manager.define("someCounter", ...); }
其他资源
11.1.1.1. 列出计数器名称
要列出定义的所有计数器,method CounterManager.getCounterNames ()
返回在集群范围创建的所有计数器名称的集合。
11.1.2. CounterManager
接口
CounterManager
接口是定义、检索和删除计数器的入口点。
嵌入式部署
CounterManager
会自动侦听 EmbeddedCacheManager
创建,然后为每个 EmbeddedCacheManager
执行实例的注册。它启动存储计数器状态所需的缓存并配置默认计数器。
检索 CounterManager
非常简单,就像调用 EmbeddedCounterManagerFactory.asCounterManager (EmbeddedCacheManager)
,如下例所示:
// create or obtain your EmbeddedCacheManager EmbeddedCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);
服务器部署
对于 Hot Rod 客户端,CounterManager
在 RemoteCacheManager 中注册,并可以检索,如下所示:
// create or obtain your RemoteCacheManager RemoteCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);
11.1.2.1. 通过 CounterManager 删除计数器
通过 Strong/WeakCounter
接口和 CounterManager
删除计数器之间有一个区别。CounterManager.remove (String)
从集群中删除计数器值,并删除本地计数器实例中计数器中注册的所有监听程序。另外,计数器实例不再重复使用,它可能会返回无效的结果。
在另一端,Strong/WeakCounter
删除仅移除计数器值。实例仍然可以重复使用,监听器仍然可以正常工作。
如果在移除后访问计数器,则会重新创建计数器。
11.1.3. Counter
计数器可以很强大(StrongCounter
)或弱一致性(WeakCounter
),它们都由一个名称来标识。它们有一个特定的接口,但它们共享一些逻辑,即它们都是异步的(每个操作返回 CompletableFuture
),提供更新事件,并可重置为其初始值。
如果您不想使用 async API,可以通过 sync ()
方法返回同步计数器。API 相同,但没有 CompletableFuture
返回值。
两种方法对这两个接口是通用的:
String getName(); CompletableFuture<Long> getValue(); CompletableFuture<Void> reset(); <T extends CounterListener> Handle<T> addListener(T listener); CounterConfiguration getConfiguration(); CompletableFuture<Void> remove(); SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
-
getName ()
返回计数器名称(identifier)。 -
getValue ()
返回当前计数器的值。 -
reset ()
允许将计数器的值重置为其初始值。 -
addListener ()
注册监听程序以接收更新事件。有关它的更多详细信息,请在 Notification 和 Events 部分中。 -
getConfiguration ()
返回计数器使用的配置。 -
remove ()
从集群中移除计数器值。仍可使用实例,并保留监听器。 -
sync ()
创建一个同步计数器。
如果在移除后访问计数器,则会重新创建计数器。
11.1.3.1. StrongCounter
接口:当一致性或绑定很重要时。
强大的计数器提供使用存储在 Data Grid 缓存中的单个密钥来提供所需的一致性。所有更新都在密钥锁定下执行,以更新其值。另一方面,读取不会获取任何锁定,并读取当前的值。此外,使用此方案时,它允许绑定计数器值并提供原子操作,如 compare-and-set/swap。
可以使用 get
方法从 StrongCounter
()CounterManager
检索 StrongCounter。例如:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
由于每个操作都将达到单个键,因此 StrongCounter
具有较高的竞争率。
StrongCounter
接口添加以下方法:
default CompletableFuture<Long> incrementAndGet() { return addAndGet(1L); } default CompletableFuture<Long> decrementAndGet() { return addAndGet(-1L); } CompletableFuture<Long> addAndGet(long delta); CompletableFuture<Boolean> compareAndSet(long expect, long update); CompletableFuture<Long> compareAndSwap(long expect, long update);
-
incrementAndGet ()
逐一递增计数器并返回新值。 -
decrementAndGet ()
将计数器减少一次,并返回新值。 -
addAndGet ()
将 delta 添加到计数器的值中,并返回新值。 -
compareAndSet ()
和compareAndSwap ()
以原子方式设置计数器的值(如果当前值是预期的)。
当完成 CompletableFuture
时,操作被视为已完成。
compare-and-set 和 compare-and-swap 之间的区别在于,如果操作成功,则前者会返回 true,稍后返回上一个值。如果返回值与预期相同,则 compare-and-swap 可以成功。
11.1.3.1.1. 已绑定的 StrongCounter
绑定后,当上述所有更新方法达到较低或上限时,所有更新方法都会抛出 CounterOutOfBoundsException
。例外有以下方法检查到达哪个侧绑定:
public boolean isUpperBoundReached(); public boolean isLowerBoundReached();
11.1.3.1.2. 使用案例
在以下用例中,强计数器更适合:
- 在每次更新后需要计数器的值(例如,集群范围 ID 生成器或序列)
- 当需要有界计数器时(例如,速率限制器)
11.1.3.1.3. 使用示例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter"); // incrementing the counter System.out.println("new value is " + counter.incrementAndGet().get()); // decrement the counter's value by 100 using the functional API counter.addAndGet(-100).thenApply(v -> { System.out.println("new value is " + v); return null; }).get(); // alternative, you can do some work while the counter is updated CompletableFuture<Long> f = counter.addAndGet(10); // ... do some work ... System.out.println("new value is " + f.get()); // and then, check the current value System.out.println("current value is " + counter.getValue().get()); // finally, reset to initial value counter.reset().get(); System.out.println("current value is " + counter.getValue().get()); // or set to a new value if zero System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
以下是使用绑定计数器的另一个示例:
StrongCounter counter = counterManager.getStrongCounter("bounded_counter"); // incrementing the counter try { System.out.println("new value is " + counter.addAndGet(100).get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } } // now using the functional API counter.addAndGet(-100).handle((v, throwable) -> { if (throwable != null) { Throwable cause = throwable.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } return null; } System.out.println("new value is " + v); return null; }).get();
compare-and-set 与 Compare-and-swap 示例:
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue, newValue; do { oldValue = counter.getValue().get(); newValue = someLogic(oldValue); } while (!counter.compareAndSet(oldValue, newValue).get());
使用 compare-and-swap,它会保存一个调用计数器调用(counter.getValue ()
)
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue = counter.getValue().get(); long currentValue, newValue; do { currentValue = oldValue; newValue = someLogic(oldValue); } while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
要将强计数器用作速率限制器,请配置 上限
和 lifespan
参数,如下所示:
// 5 request per minute CounterConfiguration configuration = CounterConfiguration.builder(CounterType.BOUNDED_STRONG) .upperBound(5) .lifespan(60000) .build(); counterManager.defineCounter("rate_limiter", configuration); StrongCounter counter = counterManager.getStrongCounter("rate_limiter"); // on each operation, invoke try { counter.incrementAndGet().get(); // continue with operation } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { if (e.getCause() instanceof CounterOutOfBoundsException) { // maximum rate. discard operation return; } else { // unexpected error, handling property } }
lifespan
参数是一个实验性功能,可能会在以后的版本中删除。
11.1.3.2. WeakCounter
接口:何时需要速度
WeakCounter
将计数器的值存储在 Data Grid 缓存中的多个键中。创建的密钥数量由 concurrency-level
属性配置。每个键存储计数器值的部分状态,并可同时更新。与 StrongCounter
相比,它的主要优点是缓存中较低争用。另一方面,其值的读取更为昂贵,不允许绑定。
reset 操作应谨慎处理。它不是 原子的,它会生成中间值。这些值可由读取操作以及注册的任何监听程序看到。
可以使用 get
方法从 WeakCounter
()CounterManager
检索 WeakCounter。例如:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
11.1.3.2.1. 弱计数接口
WeakCounter
添加以下方法:
default CompletableFuture<Void> increment() { return add(1L); } default CompletableFuture<Void> decrement() { return add(-1L); } CompletableFuture<Void> add(long delta);
它们与 'StrongCounter's 类似,但它们不会返回新值。
11.1.3.2.2. 使用案例
当不需要更新操作,或者不需要计数器的值时,弱计数器最适合的情况。收集统计信息是此类用例的良好示例。
11.1.3.2.3. 例子
以下是弱计数器用法的示例。
WeakCounter counter = counterManager.getWeakCounter("my_counter"); // increment the counter and check its result counter.increment().get(); System.out.println("current value is " + counter.getValue()); CompletableFuture<Void> f = counter.add(-100); //do some work f.get(); //wait until finished System.out.println("current value is " + counter.getValue().get()); //using the functional API counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get(); System.out.println("current value is " + counter.getValue().get());
11.1.4. 通知和事件
强和弱计数器都支持监听器接收其更新事件。侦听器必须实现 CounterListener
,并可通过以下方法注册:
<T extends CounterListener> Handle<T> addListener(T listener);
CounterListener
有以下接口:
public interface CounterListener { void onUpdate(CounterEvent entry); }
返回的 Handle
对象具有在不再需要时删除 CounterListener
的主要目标。另外,它还允许访问它处理的 CounterListener
实例。它有以下接口:
public interface Handle<T extends CounterListener> { T getCounterListener(); void remove(); }
最后,CounterEvent
具有之前和当前的值和状态。它有以下接口:
public interface CounterEvent { long getOldValue(); State getOldState(); long getNewValue(); State getNewState(); }
对于未绑定的强大计数器和弱计数器,状态为 State.VALID
。state.LOWER_BOUND_REACHED
和 State.UPPER_BOUND_REACHED
仅对有界强计数器有效。
弱计数器 reset ()
操作将触发带有中间值的多个通知。