5.3. Counter
计数器可以是强度(StrongCounter
)或弱点一致(WeakCounter
),且两者都由名称标识。它们有一个特定的接口,但它们共享一些逻辑,即异步接口(每个操作返回了 CompletableFuture
),提供一个更新事件,并可重置为其初始值。
如果您不想使用 async API,可以通过 sync ()
方法返回同步计数器。API 相同,但没有 CompletableFuture
返回值。
两种方法对两个接口都很常见:
String getName(); CompletableFuture<Long> getValue(); CompletableFuture<Void> reset(); <T extends CounterListener> Handle<T> addListener(T listener); CounterConfiguration getConfiguration(); CompletableFuture<Void> remove(); SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
-
getName ()
返回计数器名称(identifier)。 -
getValue ()
返回当前计数器的值。 -
reset ()
允许将计数器的值重置为其初始值。 -
添加Listener ()
注册监听程序以接收更新事件。有关它的更多详细信息,请参阅 通知和事件 部分。 -
getConfiguration ()
返回计数器使用的配置。 -
remove ()
从集群中移除计数器值。实例仍然可以被使用,并保留监听程序。 -
sync ()
创建一个同步计数器。
如果在移除后访问计数器,则会重新创建计数器。
5.3.1. StrongCounter
接口:当一致性或边界何时重要。
强大的计数器使用存储在 Data Grid 缓存中的单个密钥来提供所需的一致性。所有更新都在键锁定下执行,以更新其值。另一方面,读取不会获取任何锁定并读取当前值。另外,使用此方案,它允许绑定计数器值,并提供类似 compare-and-set/swap 等原子操作。
可以使用 getStrongCounter ()
方法从 CounterManager
检索 StrongCounter
()。例如:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
由于每个操作都达到单个键,因此 StrongCounter
具有更高的争用率。
StrongCounter
接口添加了以下方法:
default CompletableFuture<Long> incrementAndGet() { return addAndGet(1L); } default CompletableFuture<Long> decrementAndGet() { return addAndGet(-1L); } CompletableFuture<Long> addAndGet(long delta); CompletableFuture<Boolean> compareAndSet(long expect, long update); CompletableFuture<Long> compareAndSwap(long expect, long update);
-
incrementAndGet
() 会递增计数器并返回新值。 -
decrementAndGet ()
将计数器减一,并返回新值。 -
addAndGet ()
在计数器的值中添加 delta 并返回新值。 -
如果当前值是预期的,比较AndSet ()
和compareAndSwap ()
会原子地设置计数器的值。
当完成 CompletableFuture
时,操作被视为已完成。
compare-and-set 和 compare-and-swap 之间的区别在于,如果操作成功,则前者返回 true,而后续返回前面的值。如果返回值与预期相同,则 compare-and-swap 可以成功。
5.3.1.1. Bounded StrongCounter
绑定后,上面的所有更新方法都会抛出 CounterOutOfBoundsException
,当它们达到下限或上限时。这个例外有以下方法来检查已达到哪个侧绑定:
public boolean isUpperBoundReached(); public boolean isLowerBoundReached();
5.3.1.2. 用例
在以下用例中,强计数器更适合:
- 每次更新后需要计数器的值(例如,集群 ID 生成器或序列)
- 需要绑定计数器时(例如,速率限制)
5.3.1.3. 使用示例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter"); // incrementing the counter System.out.println("new value is " + counter.incrementAndGet().get()); // decrement the counter's value by 100 using the functional API counter.addAndGet(-100).thenApply(v -> { System.out.println("new value is " + v); return null; }).get(); // alternative, you can do some work while the counter is updated CompletableFuture<Long> f = counter.addAndGet(10); // ... do some work ... System.out.println("new value is " + f.get()); // and then, check the current value System.out.println("current value is " + counter.getValue().get()); // finally, reset to initial value counter.reset().get(); System.out.println("current value is " + counter.getValue().get()); // or set to a new value if zero System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
下面有一个使用绑定计数器的另一个示例:
StrongCounter counter = counterManager.getStrongCounter("bounded_counter"); // incrementing the counter try { System.out.println("new value is " + counter.addAndGet(100).get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } } // now using the functional API counter.addAndGet(-100).handle((v, throwable) -> { if (throwable != null) { Throwable cause = throwable.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } return null; } System.out.println("new value is " + v); return null; }).get();
compare-and-set 与 Compare-and-swap 示例:
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue, newValue; do { oldValue = counter.getValue().get(); newValue = someLogic(oldValue); } while (!counter.compareAndSet(oldValue, newValue).get());
使用 compare-and-swap 时,它会保存一个调用计数器调用(counter.getValue ()
)
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue = counter.getValue().get(); long currentValue, newValue; do { currentValue = oldValue; newValue = someLogic(oldValue); } while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
要将强计数器用作速率限制,请配置 上限和 lifetime
span 参数,如下所示:
// 5 request per minute CounterConfiguration configuration = CounterConfiguration.builder(CounterType.BOUNDED_STRONG) .upperBound(5) .lifespan(60000) .build(); counterManager.defineCounter("rate_limiter", configuration); StrongCounter counter = counterManager.getStrongCounter("rate_limiter"); // on each operation, invoke try { counter.incrementAndGet().get(); // continue with operation } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { if (e.getCause() instanceof CounterOutOfBoundsException) { // maximum rate. discard operation return; } else { // unexpected error, handling property } }
lifespan
参数是一个实验性功能,可能在以后的版本中删除。
5.3.2. WeakCounter
接口:需要速度时
WeakCounter
将计数器的值存储在 Data Grid 缓存中的多个键中。创建的键数量由 concurrency-level
属性配置。每个密钥存储计数器值的部分状态,并可同时更新。它比 StrongCounter
的主要优点是缓存中的竞争。另一方面,其值的读取更为昂贵,不允许绑定。
reset 操作应谨慎处理。它不是 原子的,它会生成中间值。这些值可以被读取操作以及注册的任何监听程序查看。
可以使用 get
方法从 WeakCounter
()CounterManager
检索 WeakCounter。例如:
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
5.3.2.1. 弱计数器接口
WeakCounter
添加以下方法:
default CompletableFuture<Void> increment() { return add(1L); } default CompletableFuture<Void> decrement() { return add(-1L); } CompletableFuture<Void> add(long delta);
它们与"StrongCounter 的方法类似,但它们不会返回新值。
5.3.2.2. 用例
弱计数器最适合用例,因为不需要更新操作的结果,或者不需要计数器的值。收集统计数据是这样的用例的良好示例。
5.3.2.3. 例子
下面有一个弱计数器用法的示例。
WeakCounter counter = counterManager.getWeakCounter("my_counter"); // increment the counter and check its result counter.increment().get(); System.out.println("current value is " + counter.getValue()); CompletableFuture<Void> f = counter.add(-100); //do some work f.get(); //wait until finished System.out.println("current value is " + counter.getValue().get()); //using the functional API counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get(); System.out.println("current value is " + counter.getValue().get());