第 11 章 使用集群计数器


Data Grid 提供记录对象的计数,并在集群中的所有节点之间分布的计数器。

11.1. 集群计数器

集群计数器 是在 Data Grid 集群中所有节点分布和共享的计数器。计数器可以具有不同的一致性级别:强和弱。

虽然强/弱一致性计数器具有单独的接口,但支持更新其值,但在更新其值时返回当前值,并提供事件。在本文档中提供了详细信息,以帮助您选择最适合您的用例。

11.1.1. 安装和配置

要开始使用计数器,您需要在 Maven pom.xml 文件中添加依赖项:

pom.xml

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-clustered-counter</artifactId>
</dependency>

计数器可以通过本文档中详述的 CounterManager 接口配置 Data Grid 配置文件或按需。当 EmbeddedCacheManager 启动时,在启动时创建在 Data Grid 配置文件中配置的计数器。这些计数器可立即启动,它们在所有集群节点中可用。

configuration.xml

<infinispan>
    <cache-container ...>
        <!-- To persist counters, you need to configure the global state. -->
        <global-state>
        <!-- Global state configuration goes here. -->
        </global-state>
        <!-- Cache configuration goes here. -->
         <counters xmlns="urn:infinispan:config:counters:15.0" num-owners="3" reliability="CONSISTENT">
             <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/>
             <strong-counter name="c2" initial-value="2" storage="VOLATILE" lower-bound="0"/>
             <strong-counter name="c3" initial-value="3" storage="PERSISTENT" upper-bound="5"/>
             <strong-counter name="c4" initial-value="4" storage="VOLATILE" lower-bound="0" upper-bound="10"/>
             <strong-counter name="c5" initial-value="0" upper-bound="100" lifespan="60000"/>
             <weak-counter name="c6" initial-value="5" storage="PERSISTENT" concurrency-level="1"/>
         </counters>
    </cache-container>
</infinispan>

或以编程方式在 GlobalConfigurationBuilder 中:

GlobalConfigurationBuilder globalConfigurationBuilder = ...;
CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class);
builder.numOwner(3).reliability(Reliability.CONSISTENT);
builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE);
builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE);
builder.addStrongCounter().name("c5").initialValue(0).upperBound(100).lifespan(60000);
builder.addWeakCounter().name("c6").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);

另一方面,可以在初始化 EmbeddedCacheManager 后随时配置计数器。

CounterManager manager = ...;
manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build());
manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build());
manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build());
manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build());
manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(0).upperBound(100).lifespan(60000).build());
manager.defineCounter("c6", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
注意

CounterConfiguration 是不可变的,可以被重复使用。

如果计数器配置成功或为 false,则方法 defineCounter () 将返回 true。但是,如果配置无效,则方法会抛出 CounterConfigurationException。要查找是否已定义计数器,请使用 method isDefined ()

CounterManager manager = ...
if (!manager.isDefined("someCounter")) {
    manager.define("someCounter", ...);
}

11.1.1.1. 列出计数器名称

要列出定义的所有计数器,method CounterManager.getCounterNames () 返回在集群范围创建的所有计数器名称的集合。

11.1.2. CounterManager 接口

CounterManager 接口是定义、检索和删除计数器的入口点。

嵌入式部署

CounterManager 会自动侦听 EmbeddedCacheManager 创建,然后为每个 EmbeddedCacheManager 执行实例的注册。它启动存储计数器状态所需的缓存并配置默认计数器。

检索 CounterManager 非常简单,就像调用 EmbeddedCounterManagerFactory.asCounterManager (EmbeddedCacheManager),如下例所示:

// create or obtain your EmbeddedCacheManager
EmbeddedCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);

服务器部署

对于 Hot Rod 客户端,CounterManager 在 RemoteCacheManager 中注册,并可以检索,如下所示:

// create or obtain your RemoteCacheManager
RemoteCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);

11.1.2.1. 通过 CounterManager 删除计数器

通过 Strong/WeakCounter 接口和 CounterManager 删除计数器之间有一个区别。CounterManager.remove (String) 从集群中删除计数器值,并删除本地计数器实例中计数器中注册的所有监听程序。另外,计数器实例不再重复使用,它可能会返回无效的结果。

在另一端,Strong/WeakCounter 删除仅移除计数器值。实例仍然可以重复使用,监听器仍然可以正常工作。

注意

如果在移除后访问计数器,则会重新创建计数器。

11.1.3. Counter

计数器可以很强大(StrongCounter)或弱一致性(WeakCounter),它们都由一个名称来标识。它们有一个特定的接口,但它们共享一些逻辑,即它们都是异步的(每个操作返回 CompletableFuture ),提供更新事件,并可重置为其初始值。

如果您不想使用 async API,可以通过 sync () 方法返回同步计数器。API 相同,但没有 CompletableFuture 返回值。

两种方法对这两个接口是通用的:

String getName();
CompletableFuture<Long> getValue();
CompletableFuture<Void> reset();
<T extends CounterListener> Handle<T> addListener(T listener);
CounterConfiguration getConfiguration();
CompletableFuture<Void> remove();
SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
  • getName () 返回计数器名称(identifier)。
  • getValue () 返回当前计数器的值。
  • reset () 允许将计数器的值重置为其初始值。
  • addListener () 注册监听程序以接收更新事件。有关它的更多详细信息,请在 Notification 和 Events 部分中。
  • getConfiguration () 返回计数器使用的配置。
  • remove () 从集群中移除计数器值。仍可使用实例,并保留监听器。
  • sync () 创建一个同步计数器。
注意

如果在移除后访问计数器,则会重新创建计数器。

11.1.3.1. StrongCounter 接口:当一致性或绑定很重要时。

强大的计数器提供使用存储在 Data Grid 缓存中的单个密钥来提供所需的一致性。所有更新都在密钥锁定下执行,以更新其值。另一方面,读取不会获取任何锁定,并读取当前的值。此外,使用此方案时,它允许绑定计数器值并提供原子操作,如 compare-and-set/swap。

可以使用 get StrongCounter () 方法从 CounterManager 检索 StrongCounter。例如:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
警告

由于每个操作都将达到单个键,因此 StrongCounter 具有较高的竞争率。

StrongCounter 接口添加以下方法:

default CompletableFuture<Long> incrementAndGet() {
   return addAndGet(1L);
}

default CompletableFuture<Long> decrementAndGet() {
   return addAndGet(-1L);
}

CompletableFuture<Long> addAndGet(long delta);

CompletableFuture<Boolean> compareAndSet(long expect, long update);

CompletableFuture<Long> compareAndSwap(long expect, long update);
  • incrementAndGet () 逐一递增计数器并返回新值。
  • decrementAndGet () 将计数器减少一次,并返回新值。
  • addAndGet () 将 delta 添加到计数器的值中,并返回新值。
  • compareAndSet ()compareAndSwap () 以原子方式设置计数器的值(如果当前值是预期的)。
注意

当完成 CompletableFuture 时,操作被视为已完成。

注意

compare-and-set 和 compare-and-swap 之间的区别在于,如果操作成功,则前者会返回 true,稍后返回上一个值。如果返回值与预期相同,则 compare-and-swap 可以成功。

11.1.3.1.1. 已绑定的 StrongCounter

绑定后,当上述所有更新方法达到较低或上限时,所有更新方法都会抛出 CounterOutOfBoundsException。例外有以下方法检查到达哪个侧绑定:

public boolean isUpperBoundReached();
public boolean isLowerBoundReached();
11.1.3.1.2. 使用案例

在以下用例中,强计数器更适合:

  • 在每次更新后需要计数器的值(例如,集群范围 ID 生成器或序列)
  • 当需要有界计数器时(例如,速率限制器)
11.1.3.1.3. 使用示例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter");

// incrementing the counter
System.out.println("new value is " + counter.incrementAndGet().get());

// decrement the counter's value by 100 using the functional API
counter.addAndGet(-100).thenApply(v -> {
   System.out.println("new value is " + v);
   return null;
}).get();

// alternative, you can do some work while the counter is updated
CompletableFuture<Long> f = counter.addAndGet(10);
// ... do some work ...
System.out.println("new value is " + f.get());

// and then, check the current value
System.out.println("current value is " + counter.getValue().get());

// finally, reset to initial value
counter.reset().get();
System.out.println("current value is " + counter.getValue().get());

// or set to a new value if zero
System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));

以下是使用绑定计数器的另一个示例:

StrongCounter counter = counterManager.getStrongCounter("bounded_counter");

// incrementing the counter
try {
    System.out.println("new value is " + counter.addAndGet(100).get());
} catch (ExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof CounterOutOfBoundsException) {
       if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
          System.out.println("ops, upper bound reached.");
       } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
          System.out.println("ops, lower bound reached.");
       }
    }
}

// now using the functional API
counter.addAndGet(-100).handle((v, throwable) -> {
   if (throwable != null) {
      Throwable cause = throwable.getCause();
      if (cause instanceof CounterOutOfBoundsException) {
         if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
            System.out.println("ops, upper bound reached.");
         } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
            System.out.println("ops, lower bound reached.");
         }
      }
      return null;
   }
   System.out.println("new value is " + v);
   return null;
}).get();

compare-and-set 与 Compare-and-swap 示例:

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue, newValue;
do {
   oldValue = counter.getValue().get();
   newValue = someLogic(oldValue);
} while (!counter.compareAndSet(oldValue, newValue).get());

使用 compare-and-swap,它会保存一个调用计数器调用(counter.getValue ()

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue = counter.getValue().get();
long currentValue, newValue;
do {
   currentValue = oldValue;
   newValue = someLogic(oldValue);
} while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);

要将强计数器用作速率限制器,请配置 上限lifespan 参数,如下所示:

// 5 request per minute
CounterConfiguration configuration = CounterConfiguration.builder(CounterType.BOUNDED_STRONG)
      .upperBound(5)
      .lifespan(60000)
      .build();
counterManager.defineCounter("rate_limiter", configuration);
StrongCounter counter = counterManager.getStrongCounter("rate_limiter");

// on each operation, invoke
try {
   counter.incrementAndGet().get();
   // continue with operation
} catch (InterruptedException e) {
   Thread.currentThread().interrupt();
} catch (ExecutionException e) {
   if (e.getCause() instanceof CounterOutOfBoundsException) {
      // maximum rate. discard operation
      return;
   } else {
      // unexpected error, handling property
   }
}
注意

lifespan 参数是一个实验性功能,可能会在以后的版本中删除。

11.1.3.2. WeakCounter 接口:何时需要速度

WeakCounter 将计数器的值存储在 Data Grid 缓存中的多个键中。创建的密钥数量由 concurrency-level 属性配置。每个键存储计数器值的部分状态,并可同时更新。与 StrongCounter 相比,它的主要优点是缓存中较低争用。另一方面,其值的读取更为昂贵,不允许绑定。

警告

reset 操作应谨慎处理。它不是 原子的,它会生成中间值。这些值可由读取操作以及注册的任何监听程序看到。

可以使用 get WeakCounter () 方法从 CounterManager 检索 WeakCounter。例如:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
11.1.3.2.1. 弱计数接口

WeakCounter 添加以下方法:

default CompletableFuture<Void> increment() {
   return add(1L);
}

default CompletableFuture<Void> decrement() {
   return add(-1L);
}

CompletableFuture<Void> add(long delta);

它们与 'StrongCounter's 类似,但它们不会返回新值。

11.1.3.2.2. 使用案例

当不需要更新操作,或者不需要计数器的值时,弱计数器最适合的情况。收集统计信息是此类用例的良好示例。

11.1.3.2.3. 例子

以下是弱计数器用法的示例。

WeakCounter counter = counterManager.getWeakCounter("my_counter");

// increment the counter and check its result
counter.increment().get();
System.out.println("current value is " + counter.getValue());

CompletableFuture<Void> f = counter.add(-100);
//do some work
f.get(); //wait until finished
System.out.println("current value is " + counter.getValue().get());

//using the functional API
counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get();
System.out.println("current value is " + counter.getValue().get());

11.1.4. 通知和事件

强和弱计数器都支持监听器接收其更新事件。侦听器必须实现 CounterListener,并可通过以下方法注册:

<T extends CounterListener> Handle<T> addListener(T listener);

CounterListener 有以下接口:

public interface CounterListener {
   void onUpdate(CounterEvent entry);
}

返回的 Handle 对象具有在不再需要时删除 CounterListener 的主要目标。另外,它还允许访问它处理的 CounterListener 实例。它有以下接口:

public interface Handle<T extends CounterListener> {
   T getCounterListener();
   void remove();
}

最后,CounterEvent 具有之前和当前的值和状态。它有以下接口:

public interface CounterEvent {
   long getOldValue();
   State getOldState();
   long getNewValue();
   State getNewState();
}
注意

对于未绑定的强大计数器和弱计数器,状态为 State.VALIDstate.LOWER_BOUND_REACHEDState.UPPER_BOUND_REACHED 仅对有界强计数器有效。

警告

弱计数器 reset () 操作将触发带有中间值的多个通知。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.