在 Java 应用程序中嵌入了 Data Grid


Red Hat Data Grid 8.5

使用 Data Grid 创建嵌入式缓存

Red Hat Customer Content Services

摘要

将 Data Grid 添加到 Java 项目,并将嵌入式缓存用于您的应用程序。

Red Hat Data Grid

Data Grid 是一个高性能分布式内存数据存储。

无架构数据结构
将不同对象存储为键值对的灵活性。
基于网格的数据存储
旨在在集群中分发和复制数据。
弹性扩展
动态调整节点数量,以便在不中断服务的情况下满足需求。
数据互操作性
从不同端点在网格中存储、检索和查询数据。

Data Grid 文档

红帽客户门户网站中提供了 Data Grid 的文档。

Data Grid 下载

访问红帽客户门户上的 Data Grid 软件下载

注意

您必须有一个红帽帐户才能访问和下载数据中心软件。

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。详情请查看 CTO Chris Wright 的信息

第 1 章 将 Data Grid 添加到 Maven 存储库

Data Grid Java 发行版可从 Maven 获取。

您可以从客户门户网站下载 Data Grid Maven 存储库,或者从公共 Red Hat Enterprise Maven 存储库拉取 Data Grid 依赖项。

1.1. 下载 Maven 存储库

如果您不想使用公共 Red Hat Enterprise Maven 存储库,将 Data Grid Maven 存储库下载并安装到本地文件系统、Apache HTTP 服务器或 Maven 存储库管理器。

流程

  1. 登录到红帽客户门户。
  2. 导航到 Data Grid 的软件下载
  3. 下载 Red Hat Data Grid 8.5 Maven 存储库。
  4. 将存档的 Maven 存储库提取到本地文件系统。
  5. 打开 README.md 文件,并按照适当的安装说明进行操作。

1.2. 添加 Red Hat Maven 软件仓库

在您的 Maven 构建环境中包括红帽 GA 存储库,以获取 Data Grid 工件和依赖项。

流程

  • 将 Red Hat GA 存储库添加到 Maven 设置文件中,通常为 ~/.m2/settings.xml,或者直接在项目的 pom.xml 文件中。

    <repositories>
      <repository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </repository>
    </repositories>
    <pluginRepositories>
      <pluginRepository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </pluginRepository>
    </pluginRepositories>

1.3. 配置项目 POM

配置项目中的项目对象模型(POM)文件,以将 Data Grid 依赖项用于嵌入式缓存、Hot Rod 客户端和其他功能。

流程

  1. 打开您的项目 pom.xml 进行编辑。
  2. 使用正确的 Data Grid 版本定义 version.infinispan 属性。
  3. dependencyManagement 部分中包含 infinispan-bom

    Bill Of Materials (BOM)控制依赖项版本,从而避免了版本冲突,这意味着您不需要为添加到项目的每个 Data Grid 工件设置版本。

  4. 保存并关闭 pom.xml

以下示例显示了 Data Grid 版本和 BOM:

<properties>
  <version.infinispan>14.0.21.Final-redhat-00001</version.infinispan>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-bom</artifactId>
      <version>${version.infinispan}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

后续步骤

根据需要,将 Data Grid 工件作为依赖项添加到 pom.xml 中。

第 2 章 创建嵌入式缓存

Data Grid 提供了一个 EmbeddedCacheManager API,可让您以编程方式控制缓存管理器和嵌入式缓存生命周期。

2.1. 在项目中添加 Data Grid

将 Data Grid 添加到项目,以便在您的应用程序中创建嵌入的缓存。

先决条件

  • 配置项目以从 Maven 存储库获取数据网格工件。

流程

  • infinispan-core 工件作为依赖项添加到 pom.xml 中,如下所示:
<dependencies>
  <dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-core</artifactId>
  </dependency>
</dependencies>

2.2. 创建和使用嵌入式缓存

Data Grid 提供了一个 GlobalConfigurationBuilder API,用于控制 Cache Manager 和用于配置缓存的 ConfigurationBuilder API。

先决条件

  • infinispan-core 工件添加为 pom.xml 中的依赖项。

流程

  1. 初始化 CacheManager

    注意

    在创建缓存前,您必须始终调用 cacheManager.start () 方法来初始化 CacheManager。默认构造器为您完成此操作,但构建器的超载版本不会为您这样做。

    缓存管理器也是重量的对象,Data Grid 建议实例化每个 JVM 只有一个实例。

  2. 使用 ConfigurationBuilder API 定义缓存配置。
  3. 使用 getCache ()createCache ()getOrCreateCache () 方法获取缓存。

    Data Grid 建议使用 getOrCreateCache () 方法,因为它在所有节点上创建一个缓存或返回现有的缓存。

  4. 如有必要,对缓存使用 PERMANENT 标志,以在重启后保留。
  5. 通过调用 cacheManager.stop () 方法以释放 JVM 资源并正常关闭任何缓存,以停止 CacheManager
// Set up a clustered Cache Manager.
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
// Initialize the default Cache Manager.
DefaultCacheManager cacheManager = new DefaultCacheManager(global.build());
// Create a distributed cache with synchronous replication.
ConfigurationBuilder builder = new ConfigurationBuilder();
                     builder.clustering().cacheMode(CacheMode.DIST_SYNC);
// Obtain a volatile cache.
Cache<String, String> cache = cacheManager.administration().withFlags(CacheContainerAdmin.AdminFlag.VOLATILE).getOrCreateCache("myCache", builder.build());
// Stop the Cache Manager.
cacheManager.stop();

getCache () 方法

调用 getCache (String) 方法以获取缓存,如下所示:

Cache<String, String> myCache = manager.getCache("myCache");

前面的操作会创建一个名为 myCache 的缓存(如果尚未存在),并返回它。

使用 getCache () 方法仅在您调用方法的节点上创建缓存。换句话说,它会执行一个本地操作,它必须在集群中的每个节点上调用。通常,在多个节点间部署的应用程序会在初始化过程中获取缓存,以确保缓存都是 对称的,并在每个节点上存在。

createCache () 方法

调用 createCache () 方法,以在整个集群中动态创建缓存。

Cache<String, String> myCache = manager.administration().createCache("myCache", "myTemplate");

前面的操作还会在随后加入集群的任何节点上自动创建缓存。

默认情况下,您使用 createCache () 方法创建的缓存是临时的。如果整个集群关闭,则缓存重启后不会自动创建。

PERMANENT 标志

使用 PERMANENT 标志来确保缓存可以在重启后保留。

Cache<String, String> myCache = manager.administration().withFlags(AdminFlag.PERMANENT).createCache("myCache", "myTemplate");

要使 PERMANENT 标志生效,您必须启用全局状态并设置配置存储提供程序。

有关配置存储供应商的更多信息,请参阅 GlobalStateConfigurationBuilder#configurationStorage ()

2.3. Cache API

Data Grid 提供了一个 Cache 接口,它公开简单的方法来添加、检索和删除条目,包括 JDK 的 ConcurrentMap 接口公开的原子机制。根据所使用的缓存模式,调用这些方法会触发多个事情,甚至可能包括将条目复制到远程节点或从远程节点查找条目,或可能缓存存储。

对于简单使用,使用 Cache API 不应与使用 JDK Map API 不同,因此从基于映射到 Data Grid 的缓存的简单内存缓存迁移应该很简单。

Certain Map 方法的性能一致性

与 Data Grid 一起使用时,在 Map 中公开的某些方法具有一定的性能后果,如 size ()value ()keySet ()entrySet ()。有关 keySet、value 和 entrySet 的具体方法,请参阅其 Javadoc 了解更多详情。

试图全局执行这些操作会对性能有很大的影响,并成为可扩展性瓶颈。因此,这些方法应该仅用于信息或调试目的。

应注意,在 withFlags () 方法中使用某些标记可以缓解其中的一些问题,请检查每个方法的文档以了解更多详情。

Mortal 和 Immortal Data

除了仅存储条目外,Data Grid 的缓存 API 允许您向数据附加 mortality 信息。例如,只是使用 put (键,值) 会创建一个 immortal 条目,例如,一个存在于缓存中的条目(永久存在),直到它被删除(或逐出内存以防止耗尽内存)。但是,如果您使用 put (key, value, lifespan, timeunit) 将数据放入缓存中,这会创建一个 mortal 条目,即具有固定生命周期的条目,并在该生命周期后过期。

除了 Lifespan 外,Data Grid 还支持 maxIdle 作为额外的指标,以决定到期。可以使用任何 lifespans 或 maxIdles 的组合。

putForExternalRead 操作

Data Grid 的 Cache 类包含不同的"put"操作,称为 putForExternalRead。当 Data Grid 用作在其他位置保留数据的临时缓存时,此操作特别有用。在大量读取场景中,缓存中的竞争不应延迟实时事务,因为缓存应该只是优化,而不是以某种方式获得。

要达到此目的,putForExternalRead () 充当 put 调用,只有在缓存中不存在密钥时才运行,并在另一个线程尝试同时存储同一密钥时失败。在这个特殊情况下,缓存数据是优化系统的方法,而不需要缓存失败会影响到持续的事务,因此为什么以不同的方式处理失败。putForExternalRead () 被视为快速操作,因为无论它是否成功,它都不会等待任何锁定,因此会立即返回到调用者。

要了解如何使用此操作,让我们来看基本的示例。试想一下,个人实例的缓存,每个由 PersonId 的密钥,其数据源自于单独的数据存储中。以下代码显示了在此示例上下文中使用 putForExternalRead 的最常见模式:

// Id of the person to look up, provided by the application
PersonId id = ...;

// Get a reference to the cache where person instances will be stored
Cache<PersonId, Person> cache = ...;

// First, check whether the cache contains the person instance
// associated with with the given id
Person cachedPerson = cache.get(id);

if (cachedPerson == null) {
   // The person is not cached yet, so query the data store with the id
   Person person = dataStore.lookup(id);

   // Cache the person along with the id so that future requests can
   // retrieve it from memory rather than going to the data store
   cache.putForExternalRead(id, person);
} else {
   // The person was found in the cache, so return it to the application
   return cachedPerson;
}

请注意,putForExternalRead 不应用作使用源自应用程序执行的新 Person 实例更新缓存的机制(例如,来自修改人员地址的事务)。更新缓存的值时,请使用标准 放置 操作,否则可能会出现缓存损坏数据的可能性。

2.3.1. AdvancedCache API

除了简单缓存接口外,Data Grid 还提供 AdvancedCache 接口,并提供给扩展作者。AdvancedCache 提供了访问某些内部组件,并应用标志来改变某些缓存方法的默认行为。以下代码片段描述了如何获取 AdvancedCache:

AdvancedCache advancedCache = cache.getAdvancedCache();
2.3.1.1. 标记

标志应用到常规缓存方法,以更改某些方法的行为。有关所有可用标志及其效果的列表,请查看 标记 枚举。使用 AdvancedCache.withFlags () 应用标志。此 builder 方法可用于将任意数量的标记应用到缓存调用,例如:

advancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING)
   .withFlags(Flag.FORCE_SYNCHRONOUS)
   .put("hello", "world");

2.3.2. Asynchronous API

除了同步的 API 方法(如 Cache.put ()Cache.remove () 等)之外,数据网格也有一个异步的非阻塞 API,您可以在其中实现相同的结果。

这些方法的命名方式与阻塞计数器类似,并附加"Async"。  例如,Cache.putAsync ()Cache.removeAsync () 等。  这些异步对应部分返回一个包含操作实际结果的 CompletableFuture

例如,在 cache 参数中,在 Cache<String, String>, Cache.put (String key, String value) 返回 String while Cache.putAsync (String key) returns Complet ableFuture<String&gt;。

2.3.2.1. 为什么使用这种 API?

非阻塞 API 非常强大,它们提供了所有同步通信保证 - 能够处理通信故障和例外 - 在调用完成后不需要阻止。  这可让您更好地利用系统中的并行性。  例如:

Set<CompletableFuture<?>> futures = new HashSet<>();
futures.add(cache.putAsync(key1, value1)); // does not block
futures.add(cache.putAsync(key2, value2)); // does not block
futures.add(cache.putAsync(key3, value3)); // does not block

// the remote calls for the 3 puts will effectively be executed
// in parallel, particularly useful if running in distributed mode
// and the 3 keys would typically be pushed to 3 different nodes
// in the cluster

// check that the puts completed successfully
for (CompletableFuture<?> f: futures) f.get();
2.3.2.2. 哪些进程实际发生在异步中?

Data Grid 中有 4 个事情,它们被视为典型写入操作的关键路径。这些是成本顺序:

  • 网络调用
  • Marshalling
  • 写入缓存存储(可选)
  • 锁定

使用 async 方法将取网络调用和划分出关键路径。  然而,出于各种技术原因,写入缓存存储和获取锁定,但仍然出现在调用者的线程中。

第 3 章 以编程方式配置用户角色和权限

在 Java 应用中使用嵌入式缓存时,以编程方式配置安全授权。

3.1. Data Grid 用户角色和权限

Data Grid 包括多个角色,为用户提供访问缓存和数据网格资源的权限。

角色权限描述

admin

ALL

具有所有权限的超级用户,包括控制缓存管理器生命周期。

deployer

ALL_READ、ALL_WRITE、LISTEN、EXEC、MONITOR、CREATE

除了 应用程序 权限外,还可创建和删除 Data Grid 资源。

application

ALL_READ, ALL_WRITE, LISTEN, EXEC, MONITOR

观察者 权限外,还具有对 Data Grid 资源的读写访问权限。也可以侦听事件并执行服务器任务和脚本。

observer

ALL_READ, MONITOR

除了监控权限外,还具有对 Data Grid 资源 的读取访问权限。

monitor

MONITOR

可以通过 JMX 和 指标端点 查看统计信息。

3.1.1. 权限

用户角色是具有不同访问级别的权限集。

表 3.1. 缓存管理器权限

权限

功能

描述

配置

defineConfiguration

定义新的缓存配置。

LISTEN

addListener

针对缓存管理器注册监听程序。

生命周期

stop

停止缓存管理器。

CREATE

createCache, removeCache

创建和删除容器资源,如缓存、计数器、模式和脚本。

MONITOR

getStats

允许访问 JMX 统计信息和 指标端点

ALL

-

包括所有缓存管理器权限。

表 3.2. 缓存权限

权限

功能

描述

READ

Get包含

从缓存检索条目。

放置 , put IfAbsent,replace,remove,eviction

写入、替换、删除、驱除缓存中的数据。

EXEC

distexec,stream

允许针对缓存执行代码。

LISTEN

addListener

针对缓存注册监听程序。

BULK_READ

keySet,values,entrySet,query

执行批量检索操作。

BULK_WRITE

清除,putAll

执行批量写入操作。

生命周期

启动,stop

启动和停止缓存。

ADMIN

getVersion,addInterceptor*, removeInterceptor,getInterceptorChain,getEvictionManager,getComponentRegistry,getDistributionManager,getAuthorizationManager,evict,getRpcManager,getCacheConfiguration,getCacheManager, getInvocationContextContainer,setAvailability,getDataContainer,getStats,getXAResource

允许访问底层组件和内部结构。

MONITOR

getStats

允许访问 JMX 统计信息和 指标端点

ALL

-

包括所有缓存权限。

ALL_READ

-

组合了 READ 和 BULK_READ 权限。

ALL_WRITE

-

组合 WRITE 和 BULK_WRITE 权限。

3.1.2. 角色和权限映射器

Data Grid 将用户实施为主体的集合。主体代表单独的用户身份,如用户名或用户所属的组。在内部,它们使用 javax.security.auth.Subject 类实现。

要启用授权,主体必须映射到角色名称,然后扩展到一组权限。

Data Grid 包括 PrincipalRoleMapper API,用于将安全主体与角色关联,以及用于将角色与特定权限关联的 RolePermissionMapper API。

Data Grid 提供以下角色和权限映射程序实现:

集群角色映射器
在集群 registry 中存储角色映射的主体。
集群权限映射器
在集群 registry 中存储权限映射的角色。允许您动态修改用户角色和权限。
身份角色映射器
使用主体名称作为角色名称。主体名称的类型或格式取决于源。例如,在 LDAP 目录中,主体名称可以是可辨识名称(DN)。
通用名称角色映射器
使用 Common Name (CN)作为角色名称。您可以将此角色映射程序与 LDAP 目录或包含可辨识名称(DN)的客户端证书一起使用;例如 cn=managers,ou=people,dc=example,dc=com 映射到 managers 角色。
注意

默认情况下,principal-to-role 映射仅应用于代表组的主体。通过将 authorization.group-only-mapping 配置属性设置为 false,可以将 Data Grid 配置为也为用户主体执行映射。

3.1.2.1. 在 Data Grid 中将用户映射到角色和权限

考虑从 LDAP 服务器检索的以下用户,作为 DN 的集合:

CN=myapplication,OU=applications,DC=mycompany
CN=dataprocessors,OU=groups,DC=mycompany
CN=finance,OU=groups,DC=mycompany

使用 通用名称角色映射器 时,用户将映射到以下角色:

dataprocessors
finance

Data Grid 具有以下角色定义:

dataprocessors: ALL_WRITE ALL_READ
finance: LISTEN

用户应具有以下权限:

ALL_WRITE ALL_READ LISTEN

3.1.3. 配置角色映射器

默认情况下,Data Grid 启用集群角色映射程序和集群权限映射程序。要将不同的实现用于角色映射,您必须配置角色映射程序。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. 在 Cache Manager 配置中,将角色映射器声明为安全授权的一部分。
  3. 保存对配置的更改。

使用嵌入式缓存,您可以使用 principalRoleMapper ()rolePermissionMapper () 方法以编程方式配置角色和权限映射器。

角色映射器配置

XML

<cache-container>
  <security>
    <authorization>
      <common-name-role-mapper />
    </authorization>
  </security>
</cache-container>

JSON

{
  "infinispan" : {
    "cache-container" : {
      "security" : {
        "authorization" : {
          "common-name-role-mapper": {}
        }
      }
    }
  }
}

YAML

infinispan:
  cacheContainer:
    security:
      authorization:
        commonNameRoleMapper: ~

3.2. 为嵌入式缓存启用和配置授权

在使用嵌入式缓存时,您可以使用 GlobalSecurityConfigurationBuilderConfigurationBuilder 类配置授权。

流程

  1. 构建 GlobalConfigurationBuilder 并使用 security ().authorization ().enable () 方法启用安全授权。
  2. 使用 principalRoleMapper () 方法指定角色映射程序。
  3. 如果需要,使用 role ()permission () 方法定义自定义角色和权限映射。

    GlobalConfigurationBuilder global = new GlobalConfigurationBuilder();
      global.security().authorization().enable()
              .principalRoleMapper(new ClusterRoleMapper())
              .role("myroleone").permission(AuthorizationPermission.ALL_WRITE)
              .role("myroletwo").permission(AuthorizationPermission.ALL_READ);
  4. ConfigurationBuilder 中为缓存启用授权。

    • 添加来自全局配置的所有角色。

      ConfigurationBuilder config = new ConfigurationBuilder();
      config.security().authorization().enable();
    • 明确为缓存定义角色,以便 Data Grid 拒绝对没有角色的用户的访问。

      ConfigurationBuilder config = new ConfigurationBuilder();
      config.security().authorization().enable().role("myroleone");

3.3. 在运行时添加授权角色

将安全授权与数据网格缓存一起使用时,动态将角色映射到权限。

先决条件

  • 为嵌入式缓存配置授权。
  • 具有 Data Grid 的 ADMIN 权限。

流程

  1. 获取 RolePermissionMapper 实例。
  2. 使用 addRole () 方法定义新角色。

    MutableRolePermissionMapper mapper = (MutableRolePermissionMapper) cacheManager.getCacheManagerConfiguration().security().authorization().rolePermissionMapper();
    mapper.addRole(Role.newRole("myroleone", true, AuthorizationPermission.ALL_WRITE, AuthorizationPermission.LISTEN));
    mapper.addRole(Role.newRole("myroletwo", true, AuthorizationPermission.READ, AuthorizationPermission.WRITE));

3.4. 使用安全缓存执行代码

当您为使用安全授权的嵌入式缓存构建 DefaultCacheManager 时,Cache Manager 会返回 SecureCache,它会在调用任何操作前检查安全上下文。SecureCache 还确保应用程序无法检索较低级别的不安全对象,如 DataContainer。因此,您必须使用具有适当级别权限的 Data Grid 用户执行代码。

先决条件

  • 为嵌入式缓存配置授权。

流程

  1. 如有必要,从 Data Grid 上下文检索当前主题:

    Security.getSubject();
  2. PrivilegedAction 中嵌套方法调用,以使用 Subject 执行它们。

    Security.doAs(mySubject, (PrivilegedAction<String>)() -> cache.put("key", "value"));

3.5. 配置访问控制列表(ACL)缓存

当您向用户授予或拒绝角色时,Data Grid 存储哪些用户可以在内部访问您的缓存的详细信息。此 ACL 缓存通过避免 Data Grid 的需求来计算每个请求执行读写操作的适当权限,从而提高了安全授权的性能。

注意

每当您为用户授予或拒绝角色时,Data Grid 会清除 ACL 缓存,以确保它正确应用用户权限。这意味着,每次授予或拒绝角色时,数据网格都必须为所有用户重新计算缓存权限。为了获得最佳性能,您不应该在生产环境中频繁或重复授予和拒绝角色。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. 使用 cache-size 属性指定 ACL 缓存的最大条目数。

    ACL 缓存中的条目具有缓存的卡性,即 * 用户。您应该将最大条目数设置为可保存所有缓存和用户信息的值。例如,默认大小为 1000,适用于最多 100 个缓存和 10 个用户的部署。

  3. 使用 cache-timeout 属性设置超时值(以毫秒为单位)。

    如果 Data Grid 无法访问该条目的超时时间内 ACL 缓存中的条目。当用户随后尝试缓存操作时,Data Grid 会重新计算其缓存权限,并将条目添加到 ACL 缓存中。

    重要

    cache-sizecache-timeout 属性指定 0 代表禁用 ACL 缓存。只有在禁用授权时,才应禁用 ACL 缓存。

  4. 保存对配置的更改。
ACL 缓存配置

XML

<infinispan>
  <cache-container name="acl-cache-configuration">
    <security cache-size="1000"
              cache-timeout="300000">
      <authorization/>
    </security>
  </cache-container>
</infinispan>

JSON

{
  "infinispan" : {
    "cache-container" : {
      "name" : "acl-cache-configuration",
      "security" : {
        "cache-size" : "1000",
        "cache-timeout" : "300000",
        "authorization" : {}
      }
    }
  }
}

YAML

infinispan:
  cacheContainer:
    name: "acl-cache-configuration"
    security:
      cache-size: "1000"
      cache-timeout: "300000"
      authorization: ~

3.5.1. 刷新 ACL 缓存

可以使用 GlobalSecurityManager MBean (可通过 JMX 访问)来清除 ACL 缓存。

第 4 章 启用并配置 Data Grid 统计和 JMX 监控

网格可以提供缓存管理器和缓存统计信息,以及导出 JMX MBeans。

4.1. 在嵌入式缓存中启用统计信息

配置 Data Grid 以导出缓存管理器和嵌入式缓存的统计信息。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. 添加 statistics="true" 属性或 .statistics (true) 方法。
  3. 保存并关闭您的 Data Grid 配置。
嵌入式缓存统计

XML

<infinispan>
  <cache-container statistics="true">
    <distributed-cache statistics="true"/>
    <replicated-cache statistics="true"/>
  </cache-container>
</infinispan>

GlobalConfigurationBuilder

GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder().cacheContainer().statistics(true);
DefaultCacheManager cacheManager = new DefaultCacheManager(global.build());

Configuration builder = new ConfigurationBuilder();
builder.statistics().enable();

4.2. 配置 Data Grid 指标

Data Grid 生成与任何监控系统兼容的指标。

  • 量表提供值,如写操作或 JVM 正常运行时间的平均纳秒数。
  • histograms 提供有关操作执行时间的详细信息,如读取、写入和删除时间。

默认情况下,当启用统计数据时,Data Grid 会生成量表,但您也可以将其配置为生成直方图。

注意

Data Grid 指标在 供应商 范围内提供。与 JVM 相关的指标 在基本 范围内提供。

先决条件

  • 您必须将 Micrometer Core 和 Micrometer Registry Prometheus JARs 添加到类路径中,以便为嵌入式缓存导出数据网格指标。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. metrics 元素或对象添加到缓存容器。
  3. 使用量表属性或字段启用或禁用 量表
  4. 使用 histograms 属性或字段启用或禁用直方图。
  5. 保存并关闭您的客户端配置。
指标配置

XML

<infinispan>
  <cache-container statistics="true">
    <metrics gauges="true"
             histograms="true" />
  </cache-container>
</infinispan>

JSON

{
  "infinispan" : {
    "cache-container" : {
      "statistics" : "true",
      "metrics" : {
        "gauges" : "true",
        "histograms" : "true"
      }
    }
  }
}

YAML

infinispan:
  cacheContainer:
    statistics: "true"
    metrics:
      gauges: "true"
      histograms: "true"

GlobalConfigurationBuilder

GlobalConfiguration globalConfig = new GlobalConfigurationBuilder()
  //Computes and collects statistics for the Cache Manager.
  .statistics().enable()
  //Exports collected statistics as gauge and histogram metrics.
  .metrics().gauges(true).histograms(true)
  .build();

其他资源

4.3. 注册 JMX MBeans

Data Grid 可以注册可以用来收集统计信息并执行管理操作的 JMX MBeans。您还必须启用统计信息,否则 Data Grid 为 JMX MBeans 中的所有统计属性提供 0 值。

重要

只有在 Data Grid 嵌入于应用程序中而不是远程 Data Grid 服务器时,使用 JMX Mbeans 来收集统计信息。

当您使用 JMX Mbeans 从远程 Data Grid 服务器收集统计信息时,从 JMX Mbeans 接收的数据可能与 REST 等其他 API 接收的数据不同。在这种情况下,从其他 API 接收的数据更为准确。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. jmx 元素或对象添加到缓存容器,并将 true 指定为 enabled 属性或字段的值。
  3. 添加 domain 属性或字段,并指定公开 JMX MBeans 的域(如果需要)。
  4. 保存并关闭您的客户端配置。
JMX 配置

XML

<infinispan>
  <cache-container statistics="true">
    <jmx enabled="true"
         domain="example.com"/>
  </cache-container>
</infinispan>

JSON

{
  "infinispan" : {
    "cache-container" : {
      "statistics" : "true",
      "jmx" : {
        "enabled" : "true",
        "domain" : "example.com"
      }
    }
  }
}

YAML

infinispan:
  cacheContainer:
    statistics: "true"
    jmx:
      enabled: "true"
      domain: "example.com"

GlobalConfigurationBuilder

GlobalConfiguration global = GlobalConfigurationBuilder.defaultClusteredBuilder()
   .jmx().enable()
   .domain("org.mydomain");

4.3.1. 启用 JMX 远程端口

提供唯一的远程 JMX 端口,以通过 JMXServiceURL 格式的连接公开数据网格 MBean。

您可以使用以下方法之一启用远程 JMX 端口:

  • 启用需要向其中一个 Data Grid 服务器安全域进行身份验证的远程 JMX 端口。
  • 使用标准 Java 管理配置选项手动启用远程 JMX 端口。

先决条件

  • 对于带有身份验证的远程 JMX,请使用默认安全域定义 JMX 特定的用户角色。用户必须具有具有读/写访问权限的 controlRole,或者具有只读访问权限的 monitorRole 才能访问任何 JMX 资源。Data Grid 会自动将全局 ADMINMONITOR 权限映射到 JMX controlRolemonitorRole 角色。

流程

使用以下方法之一启动启用了远程 JMX 端口的 Data Grid 服务器:

  • 通过端口 9999 启用远程 JMX。

    bin/server.sh --jmx 9999
    警告

    在生产环境中不适用于禁用 SSL 的远程 JMX。

  • 在启动时将以下系统属性传递给 Data Grid 服务器。

    bin/server.sh -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
    警告

    启用没有身份验证或 SSL 的远程 JMX 不安全,在任何环境中都不推荐使用。禁用身份验证和 SSL 允许未授权用户连接到服务器并访问托管的数据。

其他资源

4.3.2. Data Grid MBeans

Data Grid 公开了代表可管理资源的 JMX MBeans。

org.infinispan:type=Cache
可用于缓存实例的属性和操作。
org.infinispan:type=CacheManager
可用于缓存管理器的属性和操作,包括数据网格缓存和集群健康统计信息。

有关可用 JMX MBeans 的完整列表以及描述和可用的操作和属性,请参阅 Data Grid JMX 组件 文档。

其他资源

4.3.3. 在自定义 MBean 服务器中注册 MBeans

Data Grid 包含一个 MBeanServerLookup 接口,可用于在自定义 MBeanServer 实例中注册 MBeans。

先决条件

  • 创建 MBeanServerLookup 的实现,以便 getMBeanServer () 方法返回自定义 MBeanServer 实例。
  • 配置数据网格以注册 JMX MBeans。

流程

  1. 打开 Data Grid 配置以进行编辑。
  2. mbean-server-lookup 属性或字段添加到 Cache Manager 的 JMX 配置。
  3. 指定 MBeanServerLookup 实施的完全限定名称(FQN)。
  4. 保存并关闭您的客户端配置。
JMX MBean 服务器查找配置

XML

<infinispan>
  <cache-container statistics="true">
    <jmx enabled="true"
         domain="example.com"
         mbean-server-lookup="com.example.MyMBeanServerLookup"/>
  </cache-container>
</infinispan>

JSON

{
  "infinispan" : {
    "cache-container" : {
      "statistics" : "true",
      "jmx" : {
        "enabled" : "true",
        "domain" : "example.com",
        "mbean-server-lookup" : "com.example.MyMBeanServerLookup"
      }
    }
  }
}

YAML

infinispan:
  cacheContainer:
    statistics: "true"
    jmx:
      enabled: "true"
      domain: "example.com"
      mbeanServerLookup: "com.example.MyMBeanServerLookup"

GlobalConfigurationBuilder

GlobalConfiguration global = GlobalConfigurationBuilder.defaultClusteredBuilder()
   .jmx().enable()
   .domain("org.mydomain")
   .mBeanServerLookup(new com.acme.MyMBeanServerLookup());

4.4. 在状态传输操作过程中导出指标

您可以为跨节点分布集群缓存导出时间指标。

当集群缓存拓扑更改时,状态传输操作发生,如加入或离开集群的节点。在状态传输操作期间,Data Grid 会导出每个缓存中的指标,以便您可以确定缓存的状态。状态传输将属性作为属性公开,以便 Data Grid 可以从每个缓存中导出指标。

注意

您不能在无效模式下执行状态传输操作。

Data Grid 生成与 REST API 和 JMX API 兼容的时间指标。

先决条件

  • 配置数据网格指标。
  • 为您的缓存类型启用指标,如嵌入式缓存或远程缓存。
  • 通过更改集群缓存拓扑来启动状态传输操作。

流程

  • 选择以下任一方法:

    • 配置 Data Grid 以使用 REST API 来收集指标。
    • 配置 Data Grid 以使用 JMX API 来收集指标。

4.5. 监控跨站点复制的状态

监控备份位置的站点状态,以检测站点之间的通信中断。当远程站点状态变为 离线时,Data Grid 会停止将数据复制到备份位置。您的数据不同步,您必须在使集群重新上线前修复不一致的问题。

对于早期问题检测,需要监控跨站点事件。使用以下监控策略之一:

使用 REST API 监控跨站点复制

使用 REST 端点监控所有缓存的跨站点复制状态。您可以实施自定义脚本来轮询 REST 端点,或者使用以下示例。

先决条件

  • 启用跨站点复制。

流程

  1. 实施脚本以轮询 REST 端点。

    以下示例演示了如何使用 Python 脚本每 5 秒轮询站点状态。

#!/usr/bin/python3
import time
import requests
from requests.auth import HTTPDigestAuth


class InfinispanConnection:

    def __init__(self, server: str = 'http://localhost:11222', cache_manager: str = 'default',
                 auth: tuple = ('admin', 'change_me')) -> None:
        super().__init__()
        self.__url = f'{server}/rest/v2/container/x-site/backups/'
        self.__auth = auth
        self.__headers = {
            'accept': 'application/json'
        }

    def get_sites_status(self):
        try:
            rsp = requests.get(self.__url, headers=self.__headers, auth=HTTPDigestAuth(self.__auth[0], self.__auth[1]))
            if rsp.status_code != 200:
                return None
            return rsp.json()
        except:
            return None


# Specify credentials for Data Grid user with permission to access the REST endpoint
USERNAME = 'admin'
PASSWORD = 'change_me'
# Set an interval between cross-site status checks
POLL_INTERVAL_SEC = 5
# Provide a list of servers
SERVERS = [
    InfinispanConnection('http://127.0.0.1:11222', auth=(USERNAME, PASSWORD)),
    InfinispanConnection('http://127.0.0.1:12222', auth=(USERNAME, PASSWORD))
]
#Specify the names of remote sites
REMOTE_SITES = [
    'nyc'
]
#Provide a list of caches to monitor
CACHES = [
    'work',
    'sessions'
]


def on_event(site: str, cache: str, old_status: str, new_status: str):
    # TODO implement your handling code here
    print(f'site={site} cache={cache} Status changed {old_status} -> {new_status}')


def __handle_mixed_state(state: dict, site: str, site_status: dict):
    if site not in state:
        state[site] = {c: 'online' if c in site_status['online'] else 'offline' for c in CACHES}
        return

    for cache in CACHES:
        __update_cache_state(state, site, cache, 'online' if cache in site_status['online'] else 'offline')


def __handle_online_or_offline_state(state: dict, site: str, new_status: str):
    if site not in state:
        state[site] = {c: new_status for c in CACHES}
        return

    for cache in CACHES:
        __update_cache_state(state, site, cache, new_status)


def __update_cache_state(state: dict, site: str, cache: str, new_status: str):
    old_status = state[site].get(cache)
    if old_status != new_status:
        on_event(site, cache, old_status, new_status)
        state[site][cache] = new_status


def update_state(state: dict):
    rsp = None
    for conn in SERVERS:
        rsp = conn.get_sites_status()
        if rsp:
            break
    if rsp is None:
        print('Unable to fetch site status from any server')
        return

    for site in REMOTE_SITES:
        site_status = rsp.get(site, {})
        new_status = site_status.get('status')
        if new_status == 'mixed':
            __handle_mixed_state(state, site, site_status)
        else:
            __handle_online_or_offline_state(state, site, new_status)


if __name__ == '__main__':
    _state = {}
    while True:
        update_state(_state)
        time.sleep(POLL_INTERVAL_SEC)

当站点状态从 Online 变为 offline 或 vice-versa 时,调用 on_event 的功能。

如果要使用这个脚本,您必须指定以下变量:

  • USERNAMEPASSWORD :Data Grid 用户的用户名和密码,有权访问 REST 端点。
  • POLL_INTERVAL_SEC :轮询之间的秒数。
  • SERVERS :此站点的数据网格服务器列表。该脚本只需要一个有效的响应,但提供了列表以允许故障切换。
  • REMOTE_SITES: 要监控这些服务器上的远程站点列表。
  • CACHES :要监控的缓存名称列表。
使用 Prometheus 指标监控跨站点复制

Prometheus 和其他监控系统可让您配置警报来检测站点状态何时更改为 离线

提示

监控跨站点延迟指标可帮助您发现潜在的问题。

先决条件

  • 启用跨站点复制。

流程

  1. 配置数据网格指标。
  2. 使用 Prometheus 指标格式配置警报规则。

    • 对于站点状态,在线 使用 10 代表 离线
    • 对于 expr filed,请使用以下格式:
      vendor_cache_manager_default_cache_<cache name>_x_site_admin_<site name>_status

      在以下示例中,当 NYC 站点为名为 work 或 session 的缓存 离线时,Prometheus 会发出警告

      groups:
      - name: Cross Site Rules
        rules:
        - alert: Cache Work and Site NYC
          expr: vendor_cache_manager_default_cache_work_x_site_admin_nyc_status == 0
        - alert: Cache Sessions and Site NYC
          expr: vendor_cache_manager_default_cache_sessions_x_site_admin_nyc_status == 0

      下图显示了 NYC 站点 离线 进行缓存 工作 的警报。

      图 4.1. Prometheus Alert

      Prometheus xsite 警报

第 5 章 设置 Data Grid 集群传输

Data Grid 需要传输层,以便节点可以自动加入和离开集群。传输层还可让数据在网络间复制或分发数据,并执行操作,如重新平衡和状态传输。

5.1. 默认 JGroups 堆栈

Data Grid 在 infinispan-core-14.0.21.Final -redhat-00001.jar 文件的 default-configs 目录中提供默认的 JGroups 堆栈文件 default-jgroups114.xml

文件名堆栈名称描述

default-jgroups-udp.xml

udp

使用 UDP 进行传输和 UDP 多播进行发现。适用于较大的集群(超过 100 个节点),或者使用复制缓存或无效模式。最小化打开的插槽数量。

default-jgroups-tcp.xml

tcp

使用 TCP 传输和 MPING 协议进行发现,它使用 UDP 多播。只有在使用分布式缓存 时,只适用于 较小的集群(在 100 个节点下),因为 TCP 比 UDP 作为点对点协议更高效。

default-jgroups-kubernetes.xml

kubernetes

使用 TCP 进行传输,DNS_PING 进行发现。适用于始终提供 UDP 多播的 Kubernetes 和 Red Hat OpenShift 节点。

default-jgroups-ec2.xml

ec2

使用 TCP 进行传输和 aws.S3_PING 进行发现。适用于 UDP 多播不可用的 Amazon EC2 节点。需要额外的依赖项。

default-jgroups-google.xml

google

使用 TCP 进行传输,GOOGLE_PING2 进行发现。适用于没有 UDP 多播的 Google Cloud Platform 节点。需要额外的依赖项。

default-jgroups-azure.xml

azure

使用 TCP 进行传输,AZURE_PING 进行发现。适用于没有 UDP 多播的 Microsoft Azure 节点。需要额外的依赖项。

default-jgroups-tunnel.xml

tunnel

使用 TUNNEL 进行传输。适用于 Data Grid 位于防火墙后面且无法在 Data Grid 节点之间直接连接的环境。它需要一个外部访问的服务(Gossip Router)来重定向流量。它要求以 host1[port],host2[port] 格式设置 jgroups.tunnel.hosts 属性,使用 Gossip Router (s)主机和端口设置…​

其他资源

5.2. 集群发现协议

Data Grid 支持不同的协议,允许节点在网络上自动找到并形成集群。

Data Grid 可以使用两种发现机制:

  • 在大多数网络上工作且不依赖于外部服务的通用发现协议。
  • 依赖于外部服务的发现协议来存储和检索 Data Grid 集群的拓扑信息。
    例如,DNS_PING 协议通过 DNS 服务器记录执行发现。
注意

在托管平台上运行 Data Grid 需要使用针对各个云提供商实施的网络限制的发现机制。

5.2.1. PING

PING 或 UDPPING 是一种通用 JGroups 发现机制,它使用 UDP 协议的动态多播。

加入后,节点将 PING 请求发送到 IP 多播地址,以发现已在 Data Grid 集群中的其他节点。每个节点通过包含协调器节点地址的数据包以及自己的地址来响应 PING 请求。C=coordinator 的地址和 A=own 地址。如果没有节点响应 PING 请求,则加入节点会成为新集群中的协调节点。

PING 配置示例

<PING num_discovery_runs="3"/>

其他资源

5.2.2. TCPPING

TCPPING 是一种通用 JGroups 发现机制,它对群集成员使用静态地址列表。

使用 TCPPING 时,您可以手动将 Data Grid 集群中每个节点的 IP 地址或主机名指定为 JGroups 堆栈的一部分,而不是让节点动态发现其他节点。

TCPPING 配置示例

<TCP bind_port="7800" />
<TCPPING timeout="3000"
         initial_hosts="${jgroups.tcpping.initial_hosts:hostname1[port1],hostname2[port2]}"
         port_range="0"
         num_initial_members="3"/>

其他资源

5.2.3. MPING

MPING 使用 IP 多播来发现 Data Grid 集群的初始成员资格。

您可以使用 MPING 将 TCPPING 发现替换为 TCP 堆栈,并使用 multicasing 进行发现,而不使用初始主机的静态列表。但是,您还可以将 MPING 与 UDP 堆栈一起使用。

MPING 配置示例

<MPING mcast_addr="${jgroups.mcast_addr:239.6.7.8}"
       mcast_port="${jgroups.mcast_port:46655}"
       num_discovery_runs="3"
       ip_ttl="${jgroups.udp.ip_ttl:2}"/>

其他资源

5.2.4. TCPGOSSIP

gossip 路由器在网络上提供一个中央位置,用于您的数据网格集群可以检索其他节点的地址。

您可以将 Gossip 路由器的地址(IP:PORT)注入 Data Grid 节点,如下所示:

  1. 将 address 作为系统属性传递给 JVM;例如,-DGossipRouterAddress="10.10.2.4[12001]"
  2. 在 JGroups 配置文件中引用该系统属性。

gossip 路由器配置示例

<TCP bind_port="7800" />
<TCPGOSSIP timeout="3000"
           initial_hosts="${GossipRouterAddress}"
           num_initial_members="3" />

其他资源

5.2.5. JDBC_PING2

JDBC_PING2 使用共享数据库存储关于 Data Grid 集群的信息。此协议支持任何可以使用 JDBC 连接的数据库。

节点将其 IP 地址写入共享数据库,以便加入节点可以在网络上找到 Data Grid 集群。当节点离开 Data Grid 集群时,它们会从共享数据库中删除其 IP 地址。

JDBC_PING2 配置示例

<JDBC_PING connection_url="jdbc:mysql://localhost:3306/database_name"
           connection_username="user"
           connection_password="password"
           connection_driver="com.mysql.jdbc.Driver"/>

重要

将适当的 JDBC 驱动程序添加到 classpath 中,以便 Data Grid 可以使用 JDBC_PING2。

5.2.5.1. 将服务器数据源用于 JDBC_PING2 发现

将受管数据源添加到 Data Grid 服务器,并使用它来为集群传输 JDBC_PING2 发现协议提供数据库连接。

先决条件

  • 安装 Data Grid Server 集群。

流程

  1. 将 JDBC 驱动程序 JAR 部署到 Data Grid Server server/lib 目录中
  2. 为您的数据库创建数据源。

    <server xmlns="urn:infinispan:server:15.0">
      <data-sources>
         <!-- Defines a unique name for the datasource and JNDI name that you
              reference in JDBC cache store configuration.
              Enables statistics for the datasource, if required. -->
         <data-source name="ds"
                      jndi-name="jdbc/postgres"
                      statistics="true">
            <!-- Specifies the JDBC driver that creates connections. -->
            <connection-factory driver="org.postgresql.Driver"
                                url="jdbc:postgresql://localhost:5432/postgres"
                                username="postgres"
                                password="changeme">
               <!-- Sets optional JDBC driver-specific connection properties. -->
               <connection-property name="name">value</connection-property>
            </connection-factory>
            <!-- Defines connection pool tuning properties. -->
            <connection-pool initial-size="1"
                             max-size="10"
                             min-size="3"
                             background-validation="1000"
                             idle-removal="1"
                             blocking-timeout="1000"
                             leak-detection="10000"/>
         </data-source>
      </data-sources>
    </server>
  3. 创建 JGroups 堆栈,它将使用 JDBC_PING2 协议进行发现。
  4. 通过使用 server:data-source 属性指定数据源的名称,将集群传输配置为使用数据源。

    <infinispan>
        <jgroups>
            <stack name="jdbc" extends="tcp">
                <JDBC_PING stack.combine="REPLACE" stack.position="MPING" />
            </stack>
        </jgroups>
        <cache-container>
            <transport stack="jdbc" server:data-source="ds" />
        </cache-container>
    </infinispan>

其他资源

5.2.6. DNS_PING

JGroups DNS_PING 查询 DNS 服务器,以便在 Kubernetes 环境中发现数据网格群集成员,如 OKD 和 Red Hat OpenShift。

DNS_PING 配置示例

<dns.DNS_PING dns_query="myservice.myproject.svc.cluster.local" />

其他资源

5.2.7. 云发现协议

Data Grid 包括默认的 JGroups 堆栈,它使用特定于云提供商的发现协议实现。

发现协议默认堆栈文件工件Version

aws.S3_PING

default-jgroups-ec2.xml

org.jgroups.aws:jgroups-aws

3.0.0.Final

OOGLE_PING2

default-jgroups-google.xml

org.jgroups.google:jgroups-google

2.0.0.Final

azure.AZURE_PING

default-jgroups-azure.xml

org.jgroups.azure:jgroups-azure

2.0.2.final

为云发现协议提供依赖项

要使用 aws.S3_PINGGOOGLE_PING2azure.AZURE_PING 云发现协议,您需要向 Data Grid 提供依赖的库。

流程

  • 将工件依赖项添加到项目 pom.xml 中。

然后,您可以将云发现协议配置为 JGroups 堆栈文件或系统属性的一部分。

5.3. 使用默认 JGroups 堆栈

Data Grid 使用 JGroups 协议堆栈,以便节点可以在专用集群通道上发送其他消息。

Data Grid 为 UDPTCP 协议提供预配置的 JGroups 堆栈。您可以使用这些默认堆栈作为构建自定义集群传输配置的起点,该配置根据您的网络要求进行了优化。

流程

执行以下操作之一使用默认 JGroups 堆栈之一:

  • 使用 infinispan.xml 文件中的 stack 属性。

    <infinispan>
      <cache-container default-cache="replicatedCache">
        <!-- Use the default UDP stack for cluster transport. -->
        <transport cluster="${infinispan.cluster.name}"
                   stack="udp"
                   node-name="${infinispan.node.name:}"/>
      </cache-container>
    </infinispan>
  • 使用 addProperty () 方法设置 JGroups 堆栈文件:

    GlobalConfiguration globalConfig = new GlobalConfigurationBuilder().transport()
            .defaultTransport()
            .clusterName("qa-cluster")
            //Uses the default-jgroups-udp.xml stack for cluster transport.
            .addProperty("configurationFile", "default-jgroups-udp.xml")
            .build();

验证

Data Grid 记录以下信息以指示它使用的堆栈:

[org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack udp

其他资源

5.4. 自定义 JGroups 堆栈

调整和调优属性,以创建适合您的网络要求的集群传输配置。

Data Grid 提供可让您扩展默认 JGroups 堆栈以方便配置的属性。您可以在合并、删除和替换其他属性时从默认堆栈继承属性。

流程

  1. infinispan.xml 文件中创建一个新的 JGroups 堆栈声明。
  2. 添加 extends 属性,并指定 JGroups 堆栈来继承属性。
  3. 使用 stack.combine 属性修改继承堆栈中配置的协议的属性。
  4. 使用 stack.position 属性定义自定义堆栈的位置。
  5. 指定堆栈名称,作为 传输 配置中 stack 属性的值。

    例如,您可以使用 Gossip 路由器和使用默认 TCP 堆栈的对称加密来评估,如下所示:

    <infinispan>
      <jgroups>
        <!-- Creates a custom JGroups stack named "my-stack". -->
        <!-- Inherits properties from the default TCP stack. -->
        <stack name="my-stack" extends="tcp">
          <!-- Uses TCPGOSSIP as the discovery mechanism instead of MPING -->
          <TCPGOSSIP initial_hosts="${jgroups.tunnel.gossip_router_hosts:localhost[12001]}"
                 stack.combine="REPLACE"
                 stack.position="MPING" />
          <!-- Removes the FD_SOCK2 protocol from the stack. -->
          <FD_SOCK2 stack.combine="REMOVE"/>
          <!-- Modifies the timeout value for the VERIFY_SUSPECT2 protocol. -->
          <VERIFY_SUSPECT2 timeout="2000"/>
          <!-- Adds SYM_ENCRYPT to the stack after VERIFY_SUSPECT2. -->
          <SYM_ENCRYPT sym_algorithm="AES"
                       keystore_name="mykeystore.p12"
                       keystore_type="PKCS12"
                       store_password="changeit"
                       key_password="changeit"
                       alias="myKey"
                       stack.combine="INSERT_AFTER"
                       stack.position="VERIFY_SUSPECT2" />
        </stack>
      </jgroups>
      <cache-container name="default" statistics="true">
        <!-- Uses "my-stack" for cluster transport. -->
        <transport cluster="${infinispan.cluster.name}"
                   stack="my-stack"
                   node-name="${infinispan.node.name:}"/>
      </cache-container>
    </infinispan>
  6. 检查 Data Grid 日志,以确保它使用堆栈。

    [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack my-stack

参考

5.4.1. 继承属性

当您扩展 JGroups 堆栈时,继承属性允许您调整您要扩展的堆栈中的协议和属性。

  • stack.position 指定要修改的协议。
  • stack.combine 使用以下值来扩展 JGroups 堆栈:

    value描述

    组合

    覆盖协议属性。

    REPLACE

    替换协议。

    INSERT_AFTER

    在另一个协议后将协议添加到堆栈中。不会影响指定为插入点的协议。

    JGroups 堆栈中的协议会根据堆栈中的位置相互影响。例如,您应该在 SYM_ENCRYPTASYM_ENCRYPT 协议后放置 NAKACK2 等协议,以便保护 NAKACK2

    INSERT_BEFORE

    在另一个协议前,将协议插入到堆栈中。影响您指定为插入点的协议。

    删除

    从堆栈中删除协议。

5.5. 使用 JGroups 系统属性

在启动时将系统属性传递给 Data Grid,以调优群集传输。

流程

  • 根据需要,使用 -D<property-name>=<property-value > 参数设置 JGroups 系统属性。

例如,设置自定义绑定端口和 IP 地址,如下所示:

java -cp ... -Djgroups.bind.port=1234 -Djgroups.bind.address=192.0.2.0
注意

当您将 Data Grid 集群嵌入到集群红帽 JBoss EAP 应用程序中时,JGroups 系统属性可以相互冲突或覆盖。

例如,您没有为 Data Grid 集群或 Red Hat JBoss EAP 应用程序设置唯一的绑定地址。在这种情况下,Data Grid 和 Red Hat JBoss EAP 应用程序都使用 JGroups 默认属性,并尝试使用相同的绑定地址组成集群。

5.5.1. 集群传输属性

使用以下属性自定义 JGroups 集群传输。

系统属性描述默认值必填/选填

jgroups.bind.address

集群传输的绑定地址。

SITE_LOCAL

选填

jgroups.bind.port

套接字的绑定端口。

7800

选填

jgroups.mcast_addr

用于多播的 IP 地址,包括发现和集群间通信。IP 地址必须是适合 IP 多播的有效"类 D"地址。

239.6.7.8

选填

jgroups.mcast_port

多播套接字的端口。

46655

选填

jgroups.ip_ttl

IP 多播数据包的时间到时间(TTL)。该值定义数据包在丢弃前可以进行的网络跃点数。

2

选填

jgroups.thread_pool.min_threads

线程池的最小线程数量。

0

选填

jgroups.thread_pool.max_threads

线程池的最大线程数。

200

选填

jgroups.join_timeout

等待加入请求成功的最大毫秒数。

2000

选填

jgroups.thread_dumps_threshold

在记录线程转储前,线程池需要满的次数。

10000

选填

jgroups.fd.port-offset

来自 FD (检测协议)套接字的 jgroups.bind.port 端口的偏移。

50000 (port 57800 )

选填

jgroups.frag_size

消息中的最大字节数。大于碎片的消息。

60000

选填

jgroups.diag.enabled

启用 JGroups 诊断探测。

false

选填

5.5.2. 云发现协议的系统属性

使用以下属性为托管平台配置 JGroups 发现协议:

5.5.2.1. Amazon EC2

用于配置 aws.S3_PING 的系统属性。

系统属性描述默认值必填/选填

jgroups.s3.region_name

Amazon S3 区域的名称。

没有默认值。

选填

jgroups.s3.bucket_name

Amazon S3 存储桶的名称。名称必须存在,并且必须是唯一的。

没有默认值。

选填

5.5.2.2. Google Cloud Platform

用于配置 GOOGLE_PING2 的系统属性。

系统属性描述默认值必填/选填

jgroups.google.bucket_name

Google Compute Engine 存储桶的名称。名称必须存在,并且必须是唯一的。

没有默认值。

必填

5.5.2.3. Azure

azure.AZURE_PING' 的系统属性。

系统属性描述默认值必填/选填

jboss.jgroups.azure_ping.storage_account_name

Azure 存储帐户的名称。名称必须存在,并且必须是唯一的。

没有默认值。

必填

jboss.jgroups.azure_ping.storage_access_key

Azure 存储访问密钥的名称。

没有默认值。

必填

jboss.jgroups.azure_ping.container

存储 ping 信息的容器的有效 DNS 名称。

没有默认值。

必填

5.5.2.4. OpenShift

DNS_PING 的系统属性。

系统属性描述默认值必填/选填

jgroups.dns.query

设置返回群集成员的 DNS 记录。

没有默认值。

必填

jgroups.dns.record

设置 DNS 记录类型。

A

选填

5.6. 使用 inline JGroups 堆栈

您可以将完整的 JGroups 堆栈定义插入到 infinispan.xml 文件中。

流程

  • infinispan.xml 文件中嵌入自定义 JGroups 堆栈声明。

    <infinispan>
      <!-- Contains one or more JGroups stack definitions. -->
      <jgroups>
        <!-- Defines a custom JGroups stack named "prod". -->
        <stack name="prod">
          <TCP bind_port="7800" port_range="30" recv_buf_size="20000000" send_buf_size="640000"/>
          <RED/>
          <MPING break_on_coord_rsp="true"
                 mcast_addr="${jgroups.mping.mcast_addr:239.2.4.6}"
                 mcast_port="${jgroups.mping.mcast_port:43366}"
                 num_discovery_runs="3"
                 ip_ttl="${jgroups.udp.ip_ttl:2}"/>
          <MERGE3 />
          <FD_SOCK2 />
          <FD_ALL3 timeout="3000" interval="1000" timeout_check_interval="1000" />
          <VERIFY_SUSPECT2 timeout="1000" />
          <pbcast.NAKACK2 use_mcast_xmit="false" xmit_interval="200" xmit_table_num_rows="50"
                          xmit_table_msgs_per_row="1024" xmit_table_max_compaction_time="30000" />
          <UNICAST3 conn_close_timeout="5000" xmit_interval="200" xmit_table_num_rows="50"
                    xmit_table_msgs_per_row="1024" xmit_table_max_compaction_time="30000" />
          <pbcast.STABLE desired_avg_gossip="2000" max_bytes="1M" />
          <pbcast.GMS print_local_addr="false" join_timeout="${jgroups.join_timeout:2000}" />
          <UFC max_credits="4m" min_threshold="0.40" />
          <MFC max_credits="4m" min_threshold="0.40" />
          <FRAG4 />
        </stack>
      </jgroups>
      <cache-container default-cache="replicatedCache">
        <!-- Uses "prod" for cluster transport. -->
        <transport cluster="${infinispan.cluster.name}"
               stack="prod"
               node-name="${infinispan.node.name:}"/>
      </cache-container>
    </infinispan>

5.7. 使用外部 JGroups 堆栈

引用在 infinispan.xml 文件中定义自定义 JGroups 堆栈的外部文件。

流程

  1. 将自定义 JGroups 堆栈文件放在应用类路径上。

    或者,您可以在声明外部堆栈文件时指定绝对路径。

  2. 使用 stack-file 元素引用外部堆栈文件。

    <infinispan>
      <jgroups>
         <!-- Creates a "prod-tcp" stack that references an external file. -->
         <stack-file name="prod-tcp" path="prod-jgroups-tcp.xml"/>
      </jgroups>
      <cache-container default-cache="replicatedCache">
        <!-- Use the "prod-tcp" stack for cluster transport. -->
        <transport stack="prod-tcp" />
        <replicated-cache name="replicatedCache"/>
      </cache-container>
      <!-- Cache configuration goes here. -->
    </infinispan>

您还可以使用 TransportConfigurationBuilder 类中的 addProperty () 方法来指定自定义 JGroups 堆栈文件,如下所示:

GlobalConfiguration globalConfig = new GlobalConfigurationBuilder().transport()
        .defaultTransport()
        .clusterName("prod-cluster")
        //Uses a custom JGroups stack for cluster transport.
        .addProperty("configurationFile", "my-jgroups-udp.xml")
        .build();

在本例中,my-jgroups-udp.xml 使用自定义属性引用 UDP 堆栈,如下所示:

自定义 UDP 堆栈示例

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.2.xsd">
    <UDP bind_addr="${jgroups.bind_addr:127.0.0.1}"
         mcast_addr="${jgroups.udp.mcast_addr:239.0.2.0}"
         mcast_port="${jgroups.udp.mcast_port:46655}"
         tos="8"
         ucast_recv_buf_size="20000000"
         ucast_send_buf_size="640000"
         mcast_recv_buf_size="25000000"
         mcast_send_buf_size="640000"
         bundler.max_size="64000"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         diag.enabled="false"
         thread_naming_pattern="pl"
         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="30"
         thread_pool.keep_alive_time="5000" />
    <!-- Other JGroups stack configuration goes here. -->
</config>

5.8. 使用自定义 JChannels

构建自定义 JGroups JChannels,如下例所示:

GlobalConfigurationBuilder global = new GlobalConfigurationBuilder();
JChannel jchannel = new JChannel();
// Configure the jchannel as needed.
JGroupsTransport transport = new JGroupsTransport(jchannel);
global.transport().transport(transport);
new DefaultCacheManager(global.build());
注意

网格无法使用已经连接的自定义 JChannels。

其他资源

5.9. 加密集群传输

保护集群传输,以便节点与加密消息通信。您还可以配置 Data Grid 集群来执行证书身份验证,以便只有具有有效身份的节点才能加入。

5.9.1. JGroups 加密协议

要保护集群流量,您可以配置 Data Grid 节点,以使用 secret 密钥加密 JGroups 消息有效负载。

Data Grid 节点可以从以下方法获取 secret 密钥:

  • 协调器节点(symmetric 加密)。
  • 共享密钥存储(symmetric encryption)。

从协调器节点检索 secret 密钥

您可以通过在 Data Grid 配置中的 JGroups 堆栈中添加 ASYM_ENCRYPT 协议来配置非对称加密。这允许 Data Grid 集群生成和分发 secret 密钥。

重要

在使用非对称加密时,您还应提供密钥存储,以便节点能够执行证书身份验证并安全地交换机密密钥。这可保护集群不受中间人(MitM)攻击的影响。

非对称加密保护集群流量,如下所示:

  1. Data Grid 集群中的第一个节点(coordinator 节点)会生成一个 secret 密钥。
  2. 加入节点使用协调器执行证书身份验证,以互相验证身份。
  3. 加入的节点从 coordinator 节点请求 secret 密钥。该请求包含加入节点的公钥。
  4. 协调器节点使用公钥加密 secret 密钥,并将其返回到加入节点。
  5. 加入节点解密并安装 secret 密钥。
  6. 节点加入集群,使用 secret 密钥加密并解密信息。

从共享密钥存储检索 secret 密钥

您可以通过在 Data Grid 配置中的 JGroups 堆栈中添加 SYM_ENCRYPT 协议来配置对称加密。这允许 Data Grid 集群从您提供的密钥存储获取 secret 密钥。

  1. 节点在启动时从 Data Grid 类路径上的密钥存储安装 secret 密钥。
  2. 节点加入集群,使用 secret 密钥加密和解密消息。

非对称和对称加密的比较

带有证书身份验证的 ASYM_ENCRYPT 提供了额外的加密层,与 SYM_ENCRYPT 相比。您提供密钥存储来为 secret 密钥加密对协调节点的请求。Data Grid 会自动生成该 secret 密钥并处理集群流量,同时允许您指定何时生成 secret 密钥。例如,您可以配置集群以在节点离开时生成新的 secret 密钥。这样可确保节点无法绕过证书身份验证,并使用旧密钥加入。

另一方面,SYM_ENCRYPTASYM_ENCRYPT 快,因为节点不需要与集群协调器交换密钥。SYM_ENCRYPT 是没有配置在集群成员资格更改时自动生成新 secret 密钥的潜在缺陷。用户负责生成和分发节点用于加密集群流量的 secret 密钥。

5.9.2. 使用非对称加密保护集群传输

配置 Data Grid 集群,以生成和分发加密 JGroups 消息的 secret 密钥。

流程

  1. 使用证书链创建密钥存储,使 Data Grid 能够验证节点身份。
  2. 将密钥存储放在集群中每个节点的 classpath 上。

    对于 Data Grid Server,您要将密钥存储放在 $RHDG_HOME 目录中。

  3. SSL_KEY_EXCHANGEASYM_ENCRYPT 协议添加到 Data Grid 配置中的 JGroups 堆栈,如下例所示:

    <infinispan>
      <jgroups>
        <!-- Creates a secure JGroups stack named "encrypt-tcp" that extends the default TCP stack. -->
        <stack name="encrypt-tcp" extends="tcp">
          <!-- Adds a keystore that nodes use to perform certificate authentication. -->
          <!-- Uses the stack.combine and stack.position attributes to insert SSL_KEY_EXCHANGE into the default TCP stack after VERIFY_SUSPECT2. -->
          <SSL_KEY_EXCHANGE keystore_name="mykeystore.jks"
                            keystore_password="changeit"
                            stack.combine="INSERT_AFTER"
                            stack.position="VERIFY_SUSPECT2"/>
          <!-- Configures ASYM_ENCRYPT -->
          <!-- Uses the stack.combine and stack.position attributes to insert ASYM_ENCRYPT into the default TCP stack before pbcast.NAKACK2. -->
          <!-- The use_external_key_exchange = "true" attribute configures nodes to use the `SSL_KEY_EXCHANGE` protocol for certificate authentication. -->
          <ASYM_ENCRYPT asym_keylength="2048"
                        asym_algorithm="RSA"
                        change_key_on_coord_leave = "false"
                        change_key_on_leave = "false"
                        use_external_key_exchange = "true"
                        stack.combine="INSERT_BEFORE"
                        stack.position="pbcast.NAKACK2"/>
        </stack>
      </jgroups>
      <cache-container name="default" statistics="true">
        <!-- Configures the cluster to use the JGroups stack. -->
        <transport cluster="${infinispan.cluster.name}"
                   stack="encrypt-tcp"
                   node-name="${infinispan.node.name:}"/>
      </cache-container>
    </infinispan>

验证

当您启动 Data Grid 集群时,以下日志消息表示集群使用 secure JGroups 堆栈:

[org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack <encrypted_stack_name>

只有在它们使用 ASYM_ENCRYPT 且可以从协调器节点获取 secret 密钥时,数据网格节点才能加入集群。否则,以下信息会写入 Data Grid 日志:

[org.jgroups.protocols.ASYM_ENCRYPT] <hostname>: received message without encrypt header from <hostname>; dropping it

5.9.3. 使用对称加密保护集群传输

配置 Data Grid 集群,以使用您提供的密钥存储中的 secret 密钥加密 JGroups 消息。

流程

  1. 创建包含 secret 密钥的密钥存储。
  2. 将密钥存储放在集群中每个节点的 classpath 上。

    对于 Data Grid Server,您要将密钥存储放在 $RHDG_HOME 目录中。

  3. SYM_ENCRYPT 协议添加到 Data Grid 配置中的 JGroups 堆栈。
<infinispan>
  <jgroups>
    <!-- Creates a secure JGroups stack named "encrypt-tcp" that extends the default TCP stack. -->
    <stack name="encrypt-tcp" extends="tcp">
      <!-- Adds a keystore from which nodes obtain secret keys. -->
      <!-- Uses the stack.combine and stack.position attributes to insert SYM_ENCRYPT into the default TCP stack after VERIFY_SUSPECT2. -->
      <SYM_ENCRYPT keystore_name="myKeystore.p12"
                   keystore_type="PKCS12"
                   store_password="changeit"
                   key_password="changeit"
                   alias="myKey"
                   stack.combine="INSERT_AFTER"
                   stack.position="VERIFY_SUSPECT2"/>
    </stack>
  </jgroups>
  <cache-container name="default" statistics="true">
    <!-- Configures the cluster to use the JGroups stack. -->
    <transport cluster="${infinispan.cluster.name}"
               stack="encrypt-tcp"
               node-name="${infinispan.node.name:}"/>
  </cache-container>
</infinispan>

验证

当您启动 Data Grid 集群时,以下日志消息表示集群使用 secure JGroups 堆栈:

[org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack <encrypted_stack_name>

只有在使用 SYM_ENCRYPT 且可以从共享密钥存储获取 secret 密钥时,数据网格节点才能加入集群。否则,以下信息会写入 Data Grid 日志:

[org.jgroups.protocols.SYM_ENCRYPT] <hostname>: received message without encrypt header from <hostname>; dropping it

5.10. 集群流量的 TCP 和 UDP 端口

Data Grid 对集群传输信息使用以下端口:

默认端口协议描述

7800

TCP/UDP

JGroups 集群绑定端口

46655

UDP

JGroups 多播

跨站点复制

Data Grid 为 JGroups RELAY2 协议使用以下端口:

7900
对于在 OpenShift 上运行的 Data Grid 集群。
7800
如果将 UDP 用于节点间的流量,使用 TCP 作为集群之间的流量。
7801
如果将 TCP 用于节点和 TCP 间的流量,用于集群之间的流量。

第 6 章 集群锁定

集群锁定是数据网格集群中跨节点分布和共享的数据结构。集群锁定允许您运行在节点间同步的代码。

6.1. 锁定 API

Data Grid 提供了一个 ClusteredLock API,可让您在嵌入式模式中使用 Data Grid 时同时在集群中执行代码。

API 由以下内容组成:

  • ClusteredLock 会公开方法来实现集群锁定。
  • ClusteredLockManager 会公开方法来定义、配置、检索和删除集群锁定。
  • EmbeddedClusteredLockManagerFactory 初始化 ClusteredLockManager 实现。

所有权

Data Grid 支持 NODE 所有权,以便集群中的所有节点都可以使用锁定。

Reentrancy

数据网格集群锁定不是潜在的,因此集群中的任何节点都可以获取锁定,但只有创建锁定的节点才能释放它。

如果为同一所有者发送两个连续锁定调用,则第一个调用会获取锁定(如果可用),第二个调用会被阻止。

6.2. 使用集群锁定

了解如何在应用程序中嵌入的 Data Grid 中使用集群锁定。

先决条件

  • infinispan-clustered-lock 依赖项添加到 pom.xml 中:
<dependency>
   <groupId>org.infinispan</groupId>
   <artifactId>infinispan-clustered-lock</artifactId>
</dependency>

流程

  1. 从缓存管理器初始化 ClusteredLockManager 接口。此接口是定义、检索和删除集群锁定的入口点。
  2. 为每个集群锁定指定唯一名称。
  3. 使用 lock.tryLock (1, TimeUnit.SECONDS) 方法获取锁定。
// Set up a clustered Cache Manager.
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();

// Configure the cache mode, in this case it is distributed and synchronous.
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.DIST_SYNC);

// Initialize a new default Cache Manager.
DefaultCacheManager cm = new DefaultCacheManager(global.build(), builder.build());

// Initialize a Clustered Lock Manager.
ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);

// Define a clustered lock named 'lock'.
clm1.defineLock("lock");

// Get a lock from each node in the cluster.
ClusteredLock lock = clm1.get("lock");

AtomicInteger counter = new AtomicInteger(0);

// Acquire the lock as follows.
// Each 'lock.tryLock(1, TimeUnit.SECONDS)' method attempts to acquire the lock.
// If the lock is not available, the method waits for the timeout period to elapse. When the lock is acquired, other calls to acquire the lock are blocked until the lock is released.
CompletableFuture<Boolean> call1 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 1");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 1");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call2 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 2");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 2");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call3 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 3");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 3");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture.allOf(call1, call2, call3).whenComplete((r, ex) -> {
    // Print the value of the counter.
    System.out.println("Value of the counter is " + counter.get());

    // Stop the Cache Manager.
    cm.stop();
});

6.3. 为锁定配置内部缓存

集群锁定管理器包含一个存储锁定状态的内部缓存。您可以以声明性方式或以编程方式配置内部缓存。

流程

  1. 定义集群中存储集群锁定状态的节点数量。默认值为 -1,它将值复制到所有节点。
  2. 为缓存可靠性指定以下值之一,它控制集群锁定在集群分割到分区或多个节点离开时的行为方式:

    • AVAILABLE: 任何分区中的节点都可以同时在锁定时操作。
    • CONSISTENT :只有属于大多数分区的节点才能在锁定上运行。这是默认值。
    • 编程配置

      import org.infinispan.lock.configuration.ClusteredLockManagerConfiguration;
      import org.infinispan.lock.configuration.ClusteredLockManagerConfigurationBuilder;
      import org.infinispan.lock.configuration.Reliability;
      ...
      
      GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
      
      final ClusteredLockManagerConfiguration config = global.addModule(ClusteredLockManagerConfigurationBuilder.class).numOwner(2).reliability(Reliability.AVAILABLE).create();
      
      DefaultCacheManager cm = new DefaultCacheManager(global.build());
      
      ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);
      
      clm1.defineLock("lock");
    • 声明性配置

      <infinispan
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="urn:infinispan:config:15.0 https://infinispan.org/schemas/infinispan-config-15.0.xsd"
              xmlns="urn:infinispan:config:15.0">
      
          <cache-container default-cache="default">
              <transport/>
              <local-cache name="default">
                  <locking concurrency-level="100" acquire-timeout="1000"/>
              </local-cache>
              <clustered-locks xmlns="urn:infinispan:config:clustered-locks:15.0"
                               num-owners = "3"
                               reliability="AVAILABLE">
                  <clustered-lock name="lock1" />
                  <clustered-lock name="lock2" />
              </clustered-locks>
          </cache-container>
          <!-- Cache configuration goes here. -->
      </infinispan>

第 7 章 在网格中执行代码

缓存的主要优点是能够快速查找其键,甚至跨机器查找值。实际上,这单独使用可能是许多用户使用 Data Grid 的原因。但是,Data Grid 可以提供许多无法立即明显的好处。由于 Data Grid 通常在机器集群中使用,因此我们还提供相应的功能,可帮助利用整个集群来执行用户所需的工作负载。

7.1. Cluster Executor

由于您有一组计算机,因此最好利用其组合计算能力在所有这些计算机上执行代码。Cache Manager 附带一个 nice 工具,可让您在集群中执行任意代码。请注意,这个功能不需要使用缓存。此 Cluster Executor 可通过调用 EmbeddedCacheManager 上的 executor ()来检索。这个 executor 在集群和非集群配置中可以被检索。

注意

ClusterExecutor 专为执行代码不依赖于缓存中数据的代码而设计,而是作为帮助用户在集群中轻松执行代码的方法。

此管理器专门围绕 Java 流 API 构建,因此所有方法都使用功能接口作为参数。因为这些参数将发送到其他节点,因此它们需要按顺序排序。我们甚至使用 nice trick 来确保我们的 lambda 可立即串行。这是通过参数实现 Serializable 和 real 参数类型(如 Runnable 或 Function)。在确定要调用的方法时,JRE 将选择最具体的类,因此在这样,您的 lambdas 始终可以被序列化。也可以使用外部工具来进一步减小消息大小。

默认情况下,管理器将向集群中的所有节点提交给定命令,包括从中提交它的节点。您可以使用 filterTargets 方法控制在其上执行任务的节点,如 部分中所述。

7.1.1. 过滤执行节点

可以限制命令要运行的节点。例如,您可能只想在同一机架的计算机上运行计算。或者,您可能想要在本地站点中一次执行操作,并在不同的站点上再次执行操作。集群 executor 可以限制在相同或不同机器、机架或站点级别向发送请求的节点。

SameRack.java

EmbeddedCacheManager manager = ...;
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)

要使用此拓扑基本过滤,您必须通过服务器提示启用拓扑感知一致的哈希。

您还可以根据节点的地址使用 predicate 进行过滤。这也可以选择与之前代码片段中的基于拓扑过滤结合使用。

我们还允许使用任何方法选择目标节点,该方法使用 Predicate 来过滤哪些节点可以被视为执行。请注意,这也可与 Topology 过滤结合使用,以便更精细地控制您在集群中执行代码的位置。

Predicate.java

EmbeddedCacheManager manager = ...;
// Just filter
manager.executor().filterTargets(a -> a.equals(..)).submit(...)
// Filter only those in the desired topology
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)

7.1.2. Timeout(超时)

集群可执行文件允许为每个调用设置超时。默认为在传输配置中配置的分布式同步超时。这个超时可在集群和非集群缓存管理器中正常工作。当超时过期时,executor 可能会或可能无法中断执行任务的线程。但是,当超时发生任何 Consumerfuture 时,将完成传递 TimeoutException。这个值可以通过激活 超时 方法并提供所需持续时间来覆盖。

7.1.3. 单一节点提交

Cluster Executor 也可以以单一节点提交模式运行,而不是向所有节点提交命令,而不必选择通常会收到该命令的其中一个节点,并将其提交到只有一个节点。每个提交都可能会使用不同的节点来执行任务。这对将 ClusterExecutor 用作 java.util.concurrent.Executor 非常有用,您可能会注意到 ClusterExecutor 已实现该 ClusterExecutor。

SingleNode.java

EmbeddedCacheManager manager = ...;
manager.executor().singleNodeSubmission().submit(...)

7.1.3.1. 故障切换

在单一节点提交中运行时,可能需要允许 Cluster Executor 处理给定命令期间发生异常的情况,再次重试命令。当发生这种情况时,Cluster Executor 将再次选择一个一个节点,以重新将命令重新提交到所需的故障转移尝试次数。请注意,所选节点可以是通过拓扑或 predicate 检查的任何节点。通过调用覆盖的 singleNodeSubmission 方法来启用故障转移。给定的命令将再次提交到一个节点,直到命令完成且无例外,或者总提交数等于提供的故障切换计数。

7.1.4. 示例:PI Approximation

本例演示了如何使用 ClusterExecutor 估算 PI 的值。

Pi approximation 大大受益于通过 Cluster Executor 的并行分布式执行。回想一下,块的区域是 Sa = 4r2,圆圈的区域是 Ca=pi*r2。将 r2 从第二个 equation 替换成第一个,它会关闭 pi = 4 * Ca/Sa。现在,我们可以将大量 darts 成方块的图像;如果我们将这个大体中的 darts sto 分值放在圆圈中,那么我们将大约是 Ca/Sa 值。由于我们知道,pi = 4 * Ca/Sa 我们可以轻松获得 pi 的大约价值。更糟糕的是,我们实现了更好的投放。在以下示例中,我们找到了 1 亿 darts,而不是"缩小"它们串行地并行化整个数据网格集群中的 dart shooting 的工作。请注意,这在 1 的集群中可以正常工作,但会较慢。

public class PiAppx {

   public static void main (String [] arg){
      EmbeddedCacheManager cacheManager = ..
      boolean isCluster = ..

      int numPoints = 1_000_000_000;
      int numServers = isCluster ? cacheManager.getMembers().size() : 1;
      int numberPerWorker = numPoints / numServers;

      ClusterExecutor clusterExecutor = cacheManager.executor();
      long start = System.currentTimeMillis();
      // We receive results concurrently - need to handle that
      AtomicLong countCircle = new AtomicLong();
      CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> {
         int insideCircleCount = 0;
         for (int i = 0; i < numberPerWorker; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }, (address, count, throwable) -> {
         if (throwable != null) {
            throwable.printStackTrace();
            System.out.println("Address: " + address + " encountered an error: " + throwable);
         } else {
            countCircle.getAndAdd(count);
         }
      });
      fut.whenComplete((v, t) -> {
         // This is invoked after all nodes have responded with a value or exception
         if (t != null) {
            t.printStackTrace();
            System.out.println("Exception encountered while waiting:" + t);
         } else {
            double appxPi = 4.0 * countCircle.get() / numPoints;

            System.out.println("Distributed PI appx is " + appxPi +
                  " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms");
         }
      });

      // May have to sleep here to keep alive if no user threads left
   }

   private static boolean insideCircle(double x, double y) {
      return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
            <= Math.pow(0.5, 2);
   }
}

第 8 章 使用 Streams API 执行代码

使用 Streams API 有效地处理存储在 Data Grid 缓存中的数据。

第 9 章 流

您可能需要处理一个子集或缓存中的所有数据来生成结果。这可能会造成 Map Reduce 的想法。Data Grid 允许用户执行类似内容,但利用标准的 JRE API 来执行此操作。Java 8 引入了一个流( Stream )的概念,它允许对集合进行功能风格的操作,而不是自行迭代数据。流操作可以以类似 MapReduce 的方式实施。流,就像 MapReduce 一样,您可以对整个缓存执行处理,这可能是一个非常大的数据集,但采用效率。

注意

在处理缓存中存在的数据时,流是首选的方法,因为流会自动调整集群拓扑更改。

另外,由于我们可以控制条目在我们如何迭代,因此如果您希望它同时在集群中执行所有操作,则可以更有效地在缓存中执行操作。

通过调用 stream 或 parallelStream 方法,从 entrySetkeySetvalues 集合 从缓存 返回的流检索。

9.1. 常见流操作

本节重点介绍了您正在使用的底层缓存类型的各种选项。

9.2. 密钥过滤

可以过滤流,使其仅在给定的密钥子集上运行。这可以通过在 CacheStream 上调用 filterKeys 方法来完成。这应该总是通过 Predicate 过滤器使用,如果 predicate 保存所有键,则更快。

如果您熟悉 AdvancedCache 接口,您可能会知道为什么在这个 keyFilter 上使用 getAll。如果您需要条目为 并在本地节点的内存中需要它们,则有一些小的好处(大较小的有效负载)来使用 getAll。但是,如果您需要在这些元素上处理流,因为您将得到分布式和线程并行处理。

9.3. 基于片段的过滤

注意

这是一个高级功能,应该只用于深入了解数据网格网段和哈希技术。如果您需要将数据分段到单独的调用中,这些片段过滤很有用。当与其他工具(如 Apache Spark )集成时,这非常有用。

这个选项只支持复制和分布式缓存。这允许用户在 KeyPartitioner 决定时对数据的子集进行操作。可以通过在 CacheStream 上调用 filterKeySegments 方法来过滤网段。这在键过滤器后应用,但在执行任何中间操作前应用。

9.4. local/Invalidation

与本地或无效缓存一起使用的流可以像在常规集合上使用流一样使用。如果需要,数据网格处理所有翻译,并与更有趣的选项(例如 storeAsBinary 和 cache loader)一起工作。仅执行流操作的节点本地数据,例如无效只使用本地条目。

9.5. Example

以下代码使用一个缓存,并返回一个带有字符串 "JBoss" 的所有缓存条目的映射

Map<Object, String> jbossValues =
cache.entrySet().stream()
     .filter(e -> e.getValue().contains("JBoss"))
     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

9.6. distribution/Replication/Scattered

这是流进入其范围的地方。执行流操作时,它会将各种中间和终端操作发送到具有相关数据的每个节点。这允许处理拥有数据的节点上的中间值,并且仅将最终结果发送回原始节点,从而提高性能。

9.6.1. 重新哈希 Aware

在内部,数据被分段,每个节点只对它拥有的数据执行操作。这允许平均处理数据,假设片段足够精细,以为每个节点上提供相同数量的数据。

使用分布式缓存时,当新节点加入或离开时,可以在节点间重新处理数据。分布式流处理会自动重新处理数据,因此您不必在节点离开或加入集群时考虑监控。Reshuffled 条目可能会第二次处理,我们将在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。

最好禁用重新哈希对流的了解。只有在您的请求只能处理重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来实现,当重新哈希无法完全忽略时,大多数操作的性能都会获得相应的性能。唯一的例外是 iterator 和 per per (使用较少的内存),因为它们不必跟踪已处理的密钥。

警告

请重新考虑禁用重新哈希感知,除非您真正知道自己正在执行的操作。

9.6.2. serialization

由于操作会互相发送到其他节点,因此它们必须可以被 Data Grid marshalling 处理。这允许将操作发送到其他节点。

最简单的方法是使用 CacheStream 实例,并使用 lambda,就像您正常一样。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…​)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体方法。

在前面的示例中,我们使用 Collector 将所有结果收集到 Map 中。不幸的是 ,Collector s 类不会生成 Serializable 实例。因此,如果您需要使用它们,可以通过两种方式来实现:

一个选项是使用 CacheCollectors 类,该类允许提供 Supplier<Collector& gt;。然后,这个实例可以使用 Collector 提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

或者,您可以避免使用 CacheCollectors,而是使用采用 Supplier<Collector > 的超载 收集 方法。这些过载 收集 方法只能通过 CacheStream 接口获得。

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

但是,如果您无法使用 CacheCacheStream 接口,则无法使用 Serializable 参数,而是必须通过将 lambda 发送到多个接口来手动调用 lambdas。这不是一个非常好的,但会获得作业完成。

Map<Object, String> jbossValues = map.entrySet().stream()
              .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

推荐的和最高性能的方法是使用 AdvancedExternalizer,因为这提供了最小的有效负载。不幸的是,您不能将 lamdbas 用作高级外部工具,需要预先定义该类。

您可以使用高级外部工具,如下所示:

   Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

   class ContainsFilter implements Predicate<Map.Entry<Object, String>> {
      private final String target;

      ContainsFilter(String target) {
         this.target = target;
      }

      @Override
      public boolean test(Map.Entry<Object, String> e) {
         return e.getValue().contains(target);
      }
   }

   class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> {

      @Override
      public Set<Class<? extends ContainsFilter>> getTypeClasses() {
         return Util.asSet(ContainsFilter.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException {
         output.writeUTF(object.target);
      }

      @Override
      public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return new ContainsFilter(input.readUTF());
      }
   }

您还可以为收集器供应商使用高级外部工具来进一步缩小有效负载大小。

Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

 class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> {
      static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier();

      private ToMapCollectorSupplier() { }

      @Override
      public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() {
         return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue);
      }
   }

   class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> {

      @Override
      public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() {
         return Util.asSet(ToMapCollectorSupplier.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException {
      }

      @Override
      public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return ToMapCollectorSupplier.INSTANCE;
      }
   }

9.7. 并行复杂度

默认情况下,分布式流会尽量并行化。最终用户可以控制这一点,实际上它们始终需要控制其中一个选项。这些流的并行性有两种。

当从 缓存集合创建流时,最终用户可以在调用流或 parallelStream 方法之间进行选择。https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html#stream--根据是否选择了并行流,将在本地为每个节点启用多个线程。请注意,一些操作(如重新哈希)和每个操作都会在本地使用顺序流。这在某些情况下可以增强,以允许本地并行流。

在使用本地并行时,用户应该小心,因为它需要大量条目或操作,这些条目或操作需要可以更快地计算。另请注意,如果用户使用具有 的并行流,并且每个操作不应阻断,这会在常见池中执行,这通常为计算操作保留。

当有多个节点时,远程请求可能需要控制同时处理远程请求是否同时处理。默认情况下,除 iterator 执行并发请求外的所有终端操作。迭代器,用于降低本地节点上的总体内存压力的方法,仅执行实际上性能稍好更高的顺序请求。

如果用户希望在 CacheStream 上调用 sequentialDistributionparallelDistribution 方法来更改此默认值。

9.8. 任务超时

可以为操作请求设置超时值。这个超时仅用于远程请求超时,它基于每个请求。前者表示本地执行不会超时,后者意味着,如果您有一个故障转移场景,如后续请求上方描述,则每个请求都有新的超时。如果没有指定超时,它将使用复制超时作为默认超时时间。您可以通过执行以下操作在任务中设置超时:

CacheStream<Map.Entry<Object, String>> stream = cache.entrySet().stream();
stream.timeout(1, TimeUnit.MINUTES);

有关此问题的更多信息,请检查 java doc in timeout javadoc。

9.9. 注入

都有一个称为 的终端操作,允许对数据运行某种副作用的操作。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEach-java.util.function.Consumer-在这种情况下,可能需要获得支持此流的缓存的引用。如果您的 Consumer 实现 CacheAware 接口,则在来自 Consumer 接口的 accept 方法之前调用 injectCache 方法。

9.10. 分布式流执行

分布式流执行方式类似于映射减少。除了本例中,我们会向不同的节点发送零到多个中间操作(映射、过滤等)和单个终端操作。操作基本上会减少到以下几项:

  1. 所需的片段由哪个节点分组,这是给定网段的主要所有者
  2. 生成请求以发送到包含中间和终端操作的每个远程节点,包括它应该处理的片段

    1. 如有必要,终端操作将在本地执行
    2. 每个远程节点都将接收此请求并运行操作,然后发送响应回
  3. 然后,本地节点将收集本地响应和远程响应,同时执行操作本身所需的任何减少。
  4. 最终减少了响应,然后返回给用户

在大多数情况下,所有操作都是完全分发的,因为操作是在每个远程节点上完全应用,通常只有最后一个操作或相关操作才可以重新应用,以减少多个节点的结果。务必要注意,中间值实际上不需要被序列化,它是发送的最后一个值,这是所需部分(突出显示各种操作的异常)。

终端 operator 分布的结果 会减少以下段落描述了各种终端操作器的分布式缩减如何工作。其中一些是特殊的,可能需要中间值才能被序列化,而不是最终结果。

allMatch noneMatch anyMatch
allMatch 操作在每个节点上运行,然后所有结果在本地逻辑上进行,以获取适当的值。noneMatchanyMatch 操作使用逻辑或替代。这些方法也有早期终止支持,在已知的最终结果后停止远程和本地操作。
collect
collect 方法很有趣,它可以执行一些额外的步骤。远程节点以正常方式执行所有内容,但它不会在结果上执行最终的 结束 程序,而是发回完全组合的结果。然后,本地线程 会将 远程和本地结果合并为一个值,然后最后完成。这里要记住的键是最终的值不一定是可序列化的,而是来自供应商和 组合 方法的值。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html#supplier--
数�
count 方法只从每个节点中添加数字。
findAny findFirst
findAny 操作只返回它们找到的第一个值,无论是来自远程节点还是本地。请注意,当发现一个值后,它就不会处理其他值。请注意 findFirst 方法是特殊的,因为它需要排序的中间操作,这在 例外 部分中详细介绍。
最大分钟
maxmin 方法在每个节点上找到对应的 min 或 max 值,然后在本地执行最终缩减,以确保返回所有节点中的 min 或 max。
reduce
各种减少方法 123 将按顺序化结果,与累计者可以尽可能多。然后,如果提供,它将在本地整合本地和远程结果。请注意,这意味着来自 combiner 的值不必是 Serializable。

9.11. 基于密钥的重新哈希了解操作符

迭代器分割 器和每个 终端运算符都与重新哈希感知器不同的是,必须跟踪每个网段处理了哪些键,而不是仅段段。这是为了保证一次(迭代器和分割器)或至少一次行为(对于每个),即使在集群成员资格发生变化下也是如此。

在远程节点上调用时,迭代 器和 分割 器运算符将返回条目的回滚批处理,其中下一个批处理仅在最后一个完全被完全使用后发回。完成此批处理是为了限制给定时间在内存中有多少条目。用户节点将保存在其处理哪些密钥以及给定片段完成时,将从内存中释放这些密钥。这就是为什么在迭代器方法中使用顺序处理的原因,因此仅在内存中保存片段密钥的子集,而不是来自所有节点。

per () 方法也会返回批处理,但在处理至少批处理后,它会返回批处理密钥。这样,原始节点可以知道已经处理了哪些密钥,以减少再次处理同一条目的几率。不幸的是,这意味着当节点意外停机时,至少有一次行为。在这种情况下,节点可能已经处理批处理,而还没有完成其中一个和这些条目,但还没有在完成的批处理中会在重新哈希失败时再次运行。请注意,在收到所有响应前,添加节点不会导致这个问题,因为重新哈希故障转移不会发生。

这些操作批处理的大小都由相同的值控制,这可以通过调用 CacheStream 上的 distributedBatchSize 方法进行配置。这个值将默认为在状态传输中配置的 chunkSize。不幸的是,这个值会权衡内存用量与性能,至少一次,您的鼠标可能会有所不同。

迭代器 与复制和分布式缓存一起使用

当节点是分布式流的所有请求片段的主要或备份所有者时,Data Grid 在本地执行 迭代分割 器终端操作,这会优化性能,因为远程迭代更为资源密集型。

此优化适用于复制和分布式缓存。但是,在使用 共享 并启用了 write-behind 的缓存存储时,Data Grid 会远程执行迭代。在这种情况下,远程执行迭代可确保一致性。

9.12. 中间操作例外

有些带有特殊例外的中间操作 会跳过peek、排序 1不同的https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#sorted--所有这些方法在流处理中都会发生某种形式,以保证正确性,如下所示。请注意,这意味着这些操作可能会导致严重的性能降级。

skip
Anrtificial iterator 被计划到中间跳过操作。然后,结果会在本地生成,以便可以跳过适当数量的元素。
排序
警告:此操作需要本地节点上的内存所有条目。Anrtificial iterator 被计划到中间排序操作。所有结果都在本地排序。可能的计划中可能有一个分布式排序来返回元素批处理,但这尚未实施。
不同的
警告:此操作需要本地节点上的所有或几乎所有条目。每个远程节点上都执行不同的情况,然后对它产生不同的迭代器返回这些不同的值。最后,所有这些结果都有不同的操作。

其余的中间操作会按预期完全分发。

9.13. 例子

字数

单词 count 是典型的,如果过度使用,如映射/缩减范例。假设我们在 Data Grid 节点上存储了 key → 句子的映射。Key 是一个字符串,每个句子都是一个 String,我们必须计算所有句子中所有可用的词语。此类分布式任务的实现可定义如下:

public class WordCountExample {

   /**
    * In this example replace c1 and c2 with
    * real Cache references
    *
    * @param args
    */
   public static void main(String[] args) {
      Cache<String, String> c1 = ...;
      Cache<String, String> c2 = ...;

      c1.put("1", "Hello world here I am");
      c2.put("2", "Infinispan rules the world");
      c1.put("3", "JUDCon is in Boston");
      c2.put("4", "JBoss World is in Boston as well");
      c1.put("12","JBoss Application Server");
      c2.put("15", "Hello world");
      c1.put("14", "Infinispan community");
      c2.put("15", "Hello world");

      c1.put("111", "Infinispan open source");
      c2.put("112", "Boston is close to Toronto");
      c1.put("113", "Toronto is a capital of Ontario");
      c2.put("114", "JUDCon is cool");
      c1.put("211", "JBoss World is awesome");
      c2.put("212", "JBoss rules");
      c1.put("213", "JBoss division of RedHat ");
      c2.put("214", "RedHat community");

      Map<String, Long> wordCountMap = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
   }
}

在这种情况下,可以简单地执行上例中的单词 count。

但是,如果我们想要在示例中找到最频繁的字词,该怎么办?如果您考虑第二个情况,您将意识到您需要首先计算所有词语,并首先可用。因此,我们实际上有一些选项。

我们可以在收集器上使用一个 finisher,它会在收集所有结果后在用户线程上调用。之前示例中删除了一些冗余行。

public class WordCountExample {
   public static void main(String[] args) {
      // Lines removed

      String mostFrequentWord = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.collectingAndThen(
            Collectors.groupingBy(Function.identity(), Collectors.counting()),
               wordCountMap -> {
                  String mostFrequent = null;
                  long maxCount = 0;
                     for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                        int count = e.getValue().intValue();
                        if (count > maxCount) {
                           maxCount = count;
                           mostFrequent = e.getKey();
                        }
                     }
                     return mostFrequent;
               }));

}

不幸的是,最后一步只会在单一线程中运行,如果我们有很多词语可能非常慢。可以通过另一种方式与 Streams 并行化。

在处理后,我们提到了本地节点,因此我们实际上可以在映射结果上使用流。因此,我们可以在结果中使用并行流。

public class WordFrequencyExample {
   public static void main(String[] args) {
      // Lines removed

      Map<String, Long> wordCount = c1.entrySet().parallelStream()
              .map(e -> e.getValue().split("\\s"))
              .flatMap(Arrays::stream)
              .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
      Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce(
              (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);

这样,在计算最频繁的元素时,您仍然可以在本地使用所有内核。

删除特定条目

分布式流也可以用作修改它所在数据的方法。例如,您可能想要删除包含特定词语的缓存中的所有条目。

public class RemoveBadWords {
   public static void main(String[] args) {
      // Lines removed
      String word = ..

      c1.entrySet().parallelStream()
         .filter(e -> e.getValue().contains(word))
         .forEach((c, e) -> c.remove(e.getKey()));

如果我们仔细注意了要序列化的内容,并且没有什么,我们注意到,只有词语以及操作才会按顺序化到其他 nods,因为它由 lambda 捕获。但是,实际保存部分是缓存操作在主所有者上执行,从而减少从缓存中删除这些值所需的网络流量数量。lambda 不会捕获缓存,因为我们在每个节点上调用时提供一个特殊的 BiConsumer 方法覆盖,这会将缓存传递给 BiConsumer

以这种方式考虑使用 for each 命令的一个问题是底层流没有获得锁定。缓存删除操作仍将自然而获得锁定,但该值可能会从流看到的内容改变。这意味着,在流读取后条目可能会改变,但删除会实际删除它。

我们特别添加了名为 LockedStream 的新变体。

其他示例的 plenty

Streams API 是一个 JRE 工具,有许多示例使用它。只需记住您的操作需要以某种方式 Serializ。

第 10 章 在 Red Hat JBoss EAP 应用程序中使用 Data Grid

红帽 JBoss EAP 包括可用于红帽 JBoss EAP 应用程序的 Data Grid 模块。您可以通过两种方式执行此操作:

  • 在红帽 JBoss EAP 应用中包含 Data Grid 库。

    当您在应用程序中包含 Data Grid 库时,缓存对应用程序是本地的,不能被其他应用程序使用。另外,缓存配置位于应用程序中。

  • 使用 Red Hat JBoss EAP 提供的 Data Grid 库。

    使用 Red Hat JBoss EAP 提供的 Data Grid 库具有以下优点:

    • 缓存在应用程序间共享。
    • 缓存配置是 Red Hat JBoss EAP 独立或域 XML 文件的一部分。
    • 应用程序不包括 Data Grid 库,而是从 MANIFEST 或 jboss-structure.xml 配置文件中引用所需的模块。

以下流程描述了使用红帽 JBoss EAP 提供的 Data Grid 库。

10.1. 配置应用程序以使用 Data Grid 模块

要在应用程序中使用 Red Hat JBoss EAP 提供的 Data Grid 库,请在应用程序的 pom.xml 文件中添加 Data Grid 依赖项。

流程

  1. 导入 Data Grid 依赖项管理,以控制运行时 Maven 依赖项的版本。

    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-bom</artifactId>
      <version>${version.infinispan.bom}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>

    您必须在 pom .xml 文件的 '<properties> 部分定义 ${version.infinispan.bom}' 的值。

  2. 声明所需的 Data Grid 依赖项,如 提供的

    pom.xml

    <dependencies>
      <dependency>
        <groupId>org.infinispan</groupId>
        <artifactId>infinispan-core</artifactId>
        <scope>provided</scope>
      </dependency>
    </dependencies>

10.2. 在 Red Hat JBoss EAP 中配置数据网格缓存

在红帽 JBoss EAP 中创建数据网格缓存。

先决条件

  • Red Hat JBoss EAP 正在运行

流程

  1. 连接到红帽 JBoss EAP 管理 CLI。

    $ jboss-cli.sh --connect
  2. 创建缓存容器。

    /subsystem=infinispan/cache-container=exampleCacheContainer:add(statistics-enabled=true)

    这会创建一个名为 exampleCacheContainer 的缓存容器,并启用了统计信息。

  3. 向缓存容器添加缓存。

    /subsystem=infinispan/cache-container=exampleCacheContainer/local-cache=exampleCache:add(statistics-enabled=true)

    这会在 exampleCacheContainer 缓存容器中创建一个名为 exampleCache 的本地缓存,并启用了统计信息。

10.3. 在 Red Hat JBoss EAP 应用程序中使用 Data Grid 缓存

您可以通过资源查找来访问应用程序中的 Data Grid 缓存。

先决条件

  • Red Hat JBoss EAP 正在运行。
  • 您已在 Red Hat JBoss EAP 中创建 Data Grid cahches。

流程

  1. 您可以在应用程序中查找 Data Grid 缓存,如下所示:

    @Resource(lookup = "java:jboss/infinispan/cache/exampleCacheContainer/exampleCache")
    private Cache<String, String> ispnCache;

    这将定义一个名为 ispn Cache 的缓存。

  2. 您可以放置、获取和删除缓存中的条目,如下所示:

    获取键的值

    String value = ispnCache.get(key);

    这会检索缓存中键的值。如果没有找到密钥,则返回 null

    将值放在键中

    String oldValue = ispnCache.put(key,value);

    这将定义一个新密钥(如果不存在)并关联传递的值。如果密钥已存在,则替换原始值。

    删除密钥

    String value = ispnCache.remove(key);

    这会从缓存中删除密钥。

第 11 章 使用 CDI 扩展

Data Grid 提供了一个与 CDI (Contexts 和 Dependency Injection)编程模型集成的扩展,并允许您:

  • 配置缓存并将其注入 CDI Beans 和 Java EE 组件。
  • 配置缓存管理器。
  • 接收缓存和缓存管理器级别事件。

11.1. CDI 依赖项

使用以下依赖项之一更新 pom.xml,以在项目中包含 Data Grid CDI 扩展:

嵌入式(Library)模式

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-cdi-embedded</artifactId>
</dependency>

服务器模式

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-cdi-remote</artifactId>
</dependency>

11.2. 注入嵌入式缓存

设置 CDI Bean 以注入嵌入式缓存。

流程

  1. 创建缓存限定符注解。

    ...
    import jakarta.inject.Qualifier;
    
    @Qualifier
    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface GreetingCache { 1
    }
    1
    创建一个 @GreetingCache 限定符。
  2. 添加定义缓存配置的制作者方法。

    ...
    import org.infinispan.configuration.cache.Configuration;
    import org.infinispan.configuration.cache.ConfigurationBuilder;
    import org.infinispan.cdi.ConfigureCache;
    import jakarta.transaction.inject.Produces;
    
    public class Config {
    
        @ConfigureCache("mygreetingcache") 1
        @GreetingCache 2
        @Produces
        public Configuration greetingCacheConfiguration() {
            return new ConfigurationBuilder()
                        .memory()
                            .size(1000)
                        .build();
        }
    }
    1
    要注入的缓存的名称。
    2
    添加缓存限定符。
  3. 如果需要,添加创建集群缓存管理器的制作者方法

    ...
    package org.infinispan.configuration.global.GlobalConfigurationBuilder;
    
    public class Config {
    
        @GreetingCache 1
        @Produces
        @ApplicationScoped 2
        public EmbeddedCacheManager defaultClusteredCacheManager() { 3
          return new DefaultCacheManager(
            new GlobalConfigurationBuilder().transport().defaultTransport().build();
       }
    }
    1
    添加缓存限定符。
    2
    为应用创建 bean 一次。创建缓存管理器的制作者应始终包含 @ApplicationScoped 注释,以避免创建多个缓存管理器。
    3
    创建新的 DefaultCacheManager 实例,该实例绑定到 @GreetingCache qualifier。
    注意

    缓存管理器具有重度的权重对象。在应用程序中运行多个缓存管理器可能会降低性能。在注入多个缓存时,可以将每个缓存的限定符添加到 Cache Manager producer 方法中,或者不添加任何限定符。

  4. @GreetingCache 限定符添加到您的缓存注入点。

    ...
    import jakarta.inject.Inject;
    
    public class GreetingService {
    
        @Inject @GreetingCache
        private Cache<String, String> cache;
    
        public String greet(String user) {
            String cachedValue = cache.get(user);
            if (cachedValue == null) {
                cachedValue = "Hello " + user;
                cache.put(user, cachedValue);
            }
            return cachedValue;
        }
    }

11.3. 注入远程缓存

设置 CDI Bean 以注入远程缓存。

流程

  1. 创建缓存限定符注解。

    @Remote("mygreetingcache") 1
    @Qualifier
    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RemoteGreetingCache { 2
    }
    1
    要注入的缓存的名称。
    2
    创建 @RemoteGreetingCache 限定符。
  2. @RemoteGreetingCache 限定符添加到您的缓存注入点。

    public class GreetingService {
    
        @Inject @RemoteGreetingCache
        private RemoteCache<String, String> cache;
    
        public String greet(String user) {
            String cachedValue = cache.get(user);
            if (cachedValue == null) {
                cachedValue = "Hello " + user;
                cache.put(user, cachedValue);
            }
            return cachedValue;
        }
    }

注入远程缓存的提示

  • 您可以在不使用限定符的情况下注入远程缓存。

       ...
       @Inject
       @Remote("greetingCache")
       private RemoteCache<String, String> cache;
  • 如果您有多个 Data Grid 集群,您可以为每个集群创建单独的远程缓存管理器制作者。

    ...
    import jakarta.transaction.context.ApplicationScoped;
    
    public class Config {
    
        @RemoteGreetingCache
        @Produces
        @ApplicationScoped 1
        public ConfigurationBuilder builder = new ConfigurationBuilder(); 2
            builder.addServer().host("localhost").port(11222);
            return new RemoteCacheManager(builder.build());
        }
    }
    1
    为应用创建 bean 一次。创建缓存管理器的制作者应始终包含 @ApplicationScoped 注释,以避免创建多个缓存管理器,它们是重度的 weight 对象。
    2
    创建新的 RemoteCacheManager 实例,该实例绑定到 @RemoteGreetingCache qualifier。

11.4. 接收缓存和缓存管理器事件

您可以使用 CDI 事件接收缓存和缓存管理器级别事件。

  • 使用 @Observes 注释,如下例所示:
import jakarta.transaction.event.Observes;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
import org.infinispan.notifications.cachelistener.event.*;

public class GreetingService {

    // Cache level events
    private void entryRemovedFromCache(@Observes CacheEntryCreatedEvent event) {
        ...
    }

    // Cache manager level events
    private void cacheStarted(@Observes CacheStartedEvent event) {
        ...
    }
}

第 12 章 Multimap 缓存

MutimapCache 是一种 Data Grid Cache 类型,用于将键映射到每个键可以包含多个值的值。

12.1. Multimap Cache

MutimapCache 是一种 Data Grid Cache 类型,用于将键映射到每个键可以包含多个值的值。

12.1.1. 安装和配置

pom.xml

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-multimap</artifactId>
</dependency>

12.1.2. MultimapCache API

MultimapCache API 会公开几种与多映射缓存交互的方法。在大多数情况下,这些方法是非阻塞的;如需更多信息,请参阅 限制

public interface MultimapCache<K, V> {

   CompletableFuture<Optional<CacheEntry<K, Collection<V>>>> getEntry(K key);

   CompletableFuture<Void> remove(SerializablePredicate<? super V> p);

   CompletableFuture<Void> put(K key, V value);

   CompletableFuture<Collection<V>> get(K key);

   CompletableFuture<Boolean> remove(K key);

   CompletableFuture<Boolean> remove(K key, V value);

   CompletableFuture<Void> remove(Predicate<? super V> p);

   CompletableFuture<Boolean> containsKey(K key);

   CompletableFuture<Boolean> containsValue(V value);

   CompletableFuture<Boolean> containsEntry(K key, V value);

   CompletableFuture<Long> size();

   boolean supportsDuplicates();

}

CompletableFuture<Void> put (K key, V value)

将键值对放在 multimap 缓存中。

MultimapCache<String, String> multimapCache = ...;

multimapCache.put("girlNames", "marie")
             .thenCompose(r1 -> multimapCache.put("girlNames", "oihana"))
             .thenCompose(r3 -> multimapCache.get("girlNames"))
             .thenAccept(names -> {
                          if(names.contains("marie"))
                              System.out.println("Marie is a girl name");

                           if(names.contains("oihana"))
                              System.out.println("Oihana is a girl name");
                        });

这个代码的输出如下:

Marie is a girl name
Oihana is a girl name

CompletableFuture<Collection<V>> get (K key)

异步返回与这个多映射缓存中键关联的值(若有)的视图集合。对检索到的集合的任何更改都不会更改此多映射缓存中的值。当此方法返回空集合时,这意味着未找到该密钥。

CompletableFuture<Boolean> remove (K key)

异步的异步从多映射缓存中删除与密钥关联的条目(如果存在)。

CompletableFuture<Boolean> remove (K key, V value)

异步的异步,它会从多映射缓存中删除键值对(如果存在)。

CompletableFuture<Void> remove (Predicate<? super V> p)

异步方法.删除与给定 predicate 匹配的每个值。

CompletableFuture<Boolean> containsKey (K key)

如果这个多map包含密钥,则异步返回 true。

CompletableFuture<Boolean> containsValue (V value)

如果此多map包含至少一个键中的值,则异步返回 true。

CompletableFuture<Boolean> containsEntry(K key, V value)

如果此多map至少包含一个键值对和值,则异步返回 true。

CompletableFuture<Long> size ()

在多映射缓存中返回键值对数的异步。它不会返回不同的密钥数。

boolean supportsDuplicates()

如果多映射缓存支持重复,则异步返回 true。这意味着 multimap 的内容可以是 'a' → ['1', '1', '2']。现在,这个方法总是返回 false,因为还没有支持重复。给定值的存在由 'equals' 和 'hashcode' 方法的合同决定。

12.1.3. 创建多映射缓存

目前,MultimapCache 配置为常规缓存。这可以通过代码或 XML 配置来完成。了解如何在配置 数据网格缓存中配置常规缓存

12.1.3.1. 嵌入式模式
// create or obtain your EmbeddedCacheManager
EmbeddedCacheManager cm = ... ;

// create or obtain a MultimapCacheManager passing the EmbeddedCacheManager
MultimapCacheManager multimapCacheManager = EmbeddedMultimapCacheManagerFactory.from(cm);

// define the configuration for the multimap cache
multimapCacheManager.defineConfiguration(multimapCacheName, c.build());

// get the multimap cache
multimapCache = multimapCacheManager.get(multimapCacheName);

12.1.4. 限制

在几乎每个情况下,Multimap Cache 将作为常规缓存的行为,但当前版本中存在一些限制,如下所示:

12.1.4.1. 支持重复

可将多映射配置为存储单个键的重复值。重复是由值的 等号 方法决定的。每当调用 put 方法时,如果将 multimap 配置为支持重复,则键值对将添加到集合中。在 multimap 上调用 remove 将删除所有重复(如果存在)。

12.1.4.2. 驱除

现在,驱除会按键而不是每个键对工作。这意味着,每当键被驱除时,与键关联的所有值也会被驱除。

12.1.4.3. Transactions

通过 auto-commit 支持隐式事务,所有方法都不是阻塞的。在大多数情况下,显式事务都可以正常工作。将阻止 大小为包含Entryremove (Predicate<? super V> p)的方法

法律通告

Copyright © 2024 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.