Data Grid 开发人员指南


Red Hat Data Grid 8.1

自定义、配置和扩展数据网格

Red Hat Customer Content Services

摘要

了解 Data Grid API,如何对应用程序进行编码以使用网格功能以及如何自定义数据网格。

Red Hat Data Grid

Data Grid 是一个高性能分布式内存数据存储。

无架构数据结构
将不同对象存储为键值对的灵活性。
基于网格的数据存储
旨在在集群中分发和复制数据。
弹性扩展
动态调整节点数量,以便在不中断服务的情况下满足需求。
数据互操作性
从不同端点在网格中存储、检索和查询数据。

Data Grid 文档

红帽客户门户网站中提供了 Data Grid 的文档。

Data Grid 下载

访问红帽客户门户上的 Data Grid 软件下载

注意

您必须有一个红帽帐户才能访问和下载数据中心软件。

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息

第 1 章 配置 Data Grid Maven 存储库

Data Grid Java 发行版可从 Maven 获取。

您可以从客户门户网站下载 Data Grid Maven 存储库,或者从公共 Red Hat Enterprise Maven 存储库拉取 Data Grid 依赖项。

1.1. 下载 Data Grid Maven 存储库

如果您不想使用公共 Red Hat Enterprise Maven 存储库,将 Data Grid Maven 存储库下载并安装到本地文件系统、Apache HTTP 服务器或 Maven 存储库管理器。

流程

  1. 登录到红帽客户门户。
  2. 导航到 Data Grid 的软件下载
  3. 下载 Red Hat Data Grid 8.1 Maven 存储库。
  4. 将存档的 Maven 存储库提取到本地文件系统。
  5. 打开 README.md 文件,并按照适当的安装说明进行操作。

1.2. 添加 Red Hat Maven 存储库

在您的 Maven 构建环境中包括红帽 GA 存储库,以获取 Data Grid 工件和依赖项。

流程

  • 将 Red Hat GA 存储库添加到 Maven 设置文件中,通常为 ~/.m2/settings.xml,或者直接在项目的 pom.xml 文件中。

    <repositories>
      <repository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </repository>
    </repositories>
    <pluginRepositories>
      <pluginRepository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </pluginRepository>
    </pluginRepositories>
    Copy to Clipboard Toggle word wrap

1.3. 配置数据网格 POM

Maven 使用名为 Project Object Model (POM)文件的配置文件来定义项目并管理构建。POM 文件采用 XML 格式,描述生成的项目打包和输出的模块和组件依赖项、构建顺序和目标。

流程

  1. 打开您的项目 pom.xml 进行编辑。
  2. 使用正确的 Data Grid 版本定义 version.infinispan 属性。
  3. dependencyManagement 部分中包含 infinispan-bom

    Bill Of Materials (BOM)控制依赖项版本,从而避免了版本冲突,这意味着您不需要为添加到项目的每个 Data Grid 工件设置版本。

  4. 保存并关闭 pom.xml

以下示例显示了 Data Grid 版本和 BOM:

<properties>
  <version.infinispan>11.0.9.Final-redhat-00001</version.infinispan>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-bom</artifactId>
      <version>${version.infinispan}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
Copy to Clipboard Toggle word wrap

后续步骤

根据需要,将 Data Grid 工件作为依赖项添加到 pom.xml 中。

第 2 章 缓存管理器

Data Grid 的主要入口点是可让您访问的 CacheManager 接口:

  • 配置并获取缓存。
  • 管理和监控集群数据网格节点。
  • 在集群中执行代码。

如果您在应用程序中嵌入了 Data Grid,则使用 EmbeddedCacheManager。如果您将 Data Grid 作为远程服务器运行,请使用 RemoteCacheManager

缓存管理器是重量的对象,在大多数情形中,您应该在每个 JVM 中仅实例化一个 CacheManager 实例。

EmbeddedCacheManager manager = new DefaultCacheManager(); 
1
Copy to Clipboard Toggle word wrap
1
启动没有缓存的本地非集群缓存管理器。

缓存管理器的生命周期以及默认构造器也调用 start () 方法。构造器的超载版本可用,但它们不会启动 CacheManager。但是,您必须在创建缓存前始终启动 CacheManager

同样,当您不再需要运行 CacheManager 时,您必须调用 stop (),以便其释放资源。这也可确保 Cache Manager 安全地停止其控制的任何缓存。

2.1. 获取缓存

配置 CacheManager 后,您可以获取和控制缓存。

调用 getCache (String) 方法以获取缓存,如下所示:

Cache<String, String> myCache = manager.getCache("myCache");
Copy to Clipboard Toggle word wrap

前面的操作会创建一个名为 myCache 的缓存(如果尚未存在),并返回它。

使用 getCache () 方法仅在您调用方法的节点上创建缓存。换句话说,它会执行一个本地操作,它必须在集群中的每个节点上调用。通常,在多个节点间部署的应用程序会在初始化过程中获取缓存,以确保缓存都是 对称的,并在每个节点上存在。

调用 createCache () 方法,以在整个集群中动态创建缓存,如下所示:

Cache<String, String> myCache = manager.administration().createCache("myCache", "myTemplate");
Copy to Clipboard Toggle word wrap

前面的操作还会在随后加入集群的任何节点上自动创建缓存。

默认情况下,您使用 createCache () 方法创建的缓存是临时的。如果整个集群关闭,则缓存重启后不会自动创建。

使用 PERMANENT 标志来确保缓存可以在重启后保留,如下所示:

Cache<String, String> myCache = manager.administration().withFlags(AdminFlag.PERMANENT).createCache("myCache", "myTemplate");
Copy to Clipboard Toggle word wrap

要使 PERMANENT 标志生效,您必须启用全局状态并设置配置存储提供程序。

有关配置存储供应商的更多信息,请参阅 GlobalStateConfigurationBuilder#configurationStorage ()

2.2. 集群信息

EmbeddedCacheManager 有很多方法来提供信息,与集群运行方式相同。只有在集群环境中使用时(即配置了传输时),以下方法才有意义。

2.3. 成员信息

当使用集群时,务必要查找集群中成员资格的信息,包括谁是集群的所有者。

getMembers()

getMembers ()方法返回当前集群中的所有节点。

getCoordinator()

getCoordinator ()方法将告诉您哪个成员是集群的协调者。对于大多数目的,您不应该关心该协调者是谁。您可以直接使用 isCoordinator () 方法来查看本地节点是否也是协调器。

第 3 章 Data Grid Cache Interface

Data Grid 提供了一个 Cache 接口,它公开简单的方法来添加、检索和删除条目,包括 JDK 的 ConcurrentMap 接口公开的原子机制。根据所使用的缓存模式,调用这些方法会触发多个事情,甚至可能包括将条目复制到远程节点或从远程节点查找条目,或可能缓存存储。

3.1. Cache API

对于简单使用,使用 Cache API 不应与使用 JDK Map API 不同,因此从基于映射到 Data Grid 的缓存的简单内存缓存迁移应该很简单。

3.1.1. Certain Map 方法的性能一致性

与 Data Grid 一起使用时,在 Map 中公开的某些方法具有一定的性能后果,如 size ()value ()keySet ()entrySet ()。有关 keySet、value 和 entrySet 的具体方法,请参阅其 Javadoc 了解更多详情。

试图全局执行这些操作会对性能有很大的影响,并成为可扩展性瓶颈。因此,这些方法应该仅用于信息或调试目的。

应注意,在 withFlags () 方法中使用某些标记可以缓解其中的一些问题,请检查每个方法的文档以了解更多详情。

3.1.2. Mortal 和 Immortal Data

除了仅存储条目外,Data Grid 的缓存 API 允许您向数据附加 mortality 信息。例如,只是使用 put (键,值) 会创建一个 immortal 条目,例如,一个存在于缓存中的条目(永久存在),直到它被删除(或逐出内存以防止耗尽内存)。但是,如果您使用 put (key, value, lifespan, timeunit) 将数据放入缓存中,这会创建一个 mortal 条目,即具有固定生命周期的条目,并在该生命周期后过期。

除了 Lifespan 外,Data Grid 还支持 maxIdle 作为额外的指标,以决定到期。可以使用任何 lifespans 或 maxIdles 的组合。

3.1.3. putForExternalRead 操作

Data Grid 的 Cache 类包含不同的"put"操作,称为 putForExternalRead。当 Data Grid 用作在其他位置保留数据的临时缓存时,此操作特别有用。在大量读取场景中,缓存中的竞争不应延迟实时事务,因为缓存应该只是优化,而不是以某种方式获得。

要达到此目的,putForExternalRead () 充当 put 调用,只有在缓存中不存在密钥时才运行,并在另一个线程尝试同时存储同一密钥时失败。在这个特殊情况下,缓存数据是优化系统的方法,而不需要缓存失败会影响到持续的事务,因此为什么以不同的方式处理失败。putForExternalRead () 被视为快速操作,因为无论它是否成功,它都不会等待任何锁定,因此会立即返回到调用者。

要了解如何使用此操作,让我们来看基本的示例。试想一下,个人实例的缓存,每个由 PersonId 的密钥,其数据源自于单独的数据存储中。以下代码显示了在此示例上下文中使用 putForExternalRead 的最常见模式:

// Id of the person to look up, provided by the application
PersonId id = ...;

// Get a reference to the cache where person instances will be stored
Cache<PersonId, Person> cache = ...;

// First, check whether the cache contains the person instance
// associated with with the given id
Person cachedPerson = cache.get(id);

if (cachedPerson == null) {
   // The person is not cached yet, so query the data store with the id
   Person person = dataStore.lookup(id);

   // Cache the person along with the id so that future requests can
   // retrieve it from memory rather than going to the data store
   cache.putForExternalRead(id, person);
} else {
   // The person was found in the cache, so return it to the application
   return cachedPerson;
}
Copy to Clipboard Toggle word wrap

请注意,putForExternalRead 不应用作使用源自应用程序执行的新 Person 实例更新缓存的机制(例如,来自修改人员地址的事务)。更新缓存的值时,请使用标准 放置 操作,否则可能会出现缓存损坏数据的可能性。

3.2. AdvancedCache API

除了简单缓存接口外,Data Grid 还提供 AdvancedCache 接口,并提供给扩展作者。AdvancedCache 提供了访问某些内部组件,并应用标志来改变某些缓存方法的默认行为。以下代码片段描述了如何获取 AdvancedCache:

AdvancedCache advancedCache = cache.getAdvancedCache();
Copy to Clipboard Toggle word wrap

3.2.1. 标记

标志应用到常规缓存方法,以更改某些方法的行为。有关所有可用标志及其效果的列表,请查看 标记 枚举。使用 AdvancedCache.withFlags () 应用标志。此 builder 方法可用于将任意数量的标记应用到缓存调用,例如:

advancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING)
   .withFlags(Flag.FORCE_SYNCHRONOUS)
   .put("hello", "world");
Copy to Clipboard Toggle word wrap

3.3. 监听器和通知

Data Grid 提供了一个监听器 API,客户端可以在事件发生时注册并获取通知。此注解驱动的 API 适用于 2 个不同的级别:缓存级别事件和缓存管理器级别事件。

事件会触发分配给监听程序的通知。监听器是以 @Listener 标注的简单 POJO,并使用 Listenable 接口中定义的方法注册。

注意

Cache 和 CacheManager 都实现了 Listenable,这意味着您可以将监听程序附加到缓存或缓存管理器管理器,以接收缓存级别或缓存管理器级通知。

例如,以下类定义了一个监听程序,在每次向缓存中添加新条目时打印一些信息,以非阻塞方式:

@Listener
public class PrintWhenAdded {
  Queue<CacheEntryCreatedEvent> events = new ConcurrentLinkedQueue<>();

  @CacheEntryCreated
  public CompletionStage<Void> print(CacheEntryCreatedEvent event) {
    events.add(event);
    return null;
  }

}
Copy to Clipboard Toggle word wrap

有关更全面的示例,请参阅 @Listener 的 Java 文档

3.3.1. 缓存级别通知

缓存级别的事件根据每个缓存发生,默认仅在事件发生的节点上引发。请注意,在分布式缓存中,这些事件只会在受影响数据的所有者上引发。缓存级别事件的示例是添加、删除、修改等条目。这些事件触发了注册到特定缓存的监听程序的通知。

有关所有缓存级别通知的完整列表 ,请查看 org.infinispan.notifications.cachelistener.annotation 软件包上的 Java 文档,以及相应的方法级注解。

注意

请参阅 org.infinispan.notifications.cachelistener.annotation 软件包上的 Java 文档,以了解 Data Grid 中可用的缓存级别通知列表。

3.3.1.1. 集群 Listeners

当需要侦听单个节点上的缓存事件时,应使用集群监听程序。

为此,需要的所有操作都设置为将监听程序作为集群注解。

@Listener (clustered = true)
public class MyClusterListener { .... }
Copy to Clipboard Toggle word wrap

从非集群监听程序中集群监听程序有一些限制。

  1. 集群侦听器只能侦听 @CacheEntryModified@CacheEntryCreated@CacheEntryRemoved@CacheEntryExpired 事件。请注意,这意味着不会侦听此侦听器的任何其他类型的事件。
  2. 只有 post 事件发送到集群监听程序,则忽略 pre 事件。
3.3.1.2. 事件过滤和转换

安装监听器的节点上的所有适用事件都将提高到侦听器。可以使用 KeyFilter (仅允许对键进行过滤)或 CacheEventFilter (用于过滤键、旧值、旧元数据、新值、新值、新元数据、新值、新元数据、新值、新元数据)来动态过滤事件,无论是事件(ie. isPre)和命令类型)。

此处的示例显示了一个简单 KeyFilter,它将只允许当 只有 我的密钥的条目修改事件时引发事件。

public class SpecificKeyFilter implements KeyFilter<String> {
    private final String keyToAccept;

    public SpecificKeyFilter(String keyToAccept) {
      if (keyToAccept == null) {
        throw new NullPointerException();
      }
      this.keyToAccept = keyToAccept;
    }

    public boolean accept(String key) {
      return keyToAccept.equals(key);
    }
}

...
cache.addListener(listener, new SpecificKeyFilter("Only Me"));
...
Copy to Clipboard Toggle word wrap

当您要以更有效的方式限制接收的事件时,这非常有用。

还有一个 CacheEventConverter,可以提供它,允许在增加事件前将值转换为另一个值。这可能适合模块化任何进行值转换的代码。

注意

与 Cluster Listener 一起使用时,上述过滤器和转换器特别有用。这是因为,过滤和转换是在事件源自的节点上进行的,而不是在事件被侦听的节点上。这可提供不要在集群(filter)之间复制事件甚至减少有效负载(转换器)的好处。

3.3.1.3. 初始状态事件

安装侦听器时,它只会在事件被完全安装后通知。

在首次注册监听器时,可能需要获取缓存内容的当前状态,方法是对缓存中的每一元素生成类型为 @CacheEntryCreated 的事件。在此初始阶段生成的任何事件都会被排队,直到引发适当的事件为止。

注意

这目前仅适用于集群监听程序。ISPN-4608 涵盖了为非集群监听程序添加它。

3.3.1.4. 重复事件

非事务缓存中可以接收重复的事件。当尝试执行写入操作(如放置)时,密钥的主所有者会停机。

在内部网格中,通过自动将其发送到给定密钥的新主所有者来改变放置操作,但如果写入首次复制到备份,就不会保证。因此,在一个操作中可以发送以下写入事件的 1 个(CacheEntryCreatedEvent,CacheEntryModifiedEvent & CacheEntryRemovedEvent)。

如果生成了多个事件,则数据网格将标记重试命令生成的事件,以帮助用户了解这种情况的时间,而无需留意查看更改。

@Listener
public class MyRetryListener {
  @CacheEntryModified
  public void entryModified(CacheEntryModifiedEvent event) {
    if (event.isCommandRetried()) {
      // Do something
    }
  }
}
Copy to Clipboard Toggle word wrap

另外,在使用 CacheEventFilterCacheEventConverter 时,EventType 包含一个方法 isRetry,以告知事件是因为重试而生成的。

3.3.2. 缓存管理器级别通知

缓存管理器级别事件发生在缓存管理器中。这太全局且集群范围的,但涉及影响单个缓存管理器创建的所有缓存的事件。缓存管理器级别事件示例是节点加入或离开集群,或者缓存启动或停止。

有关所有缓存管理器级别 通知的完整列表,请参见 org.infinispan.notifications.cachemanagerlistener.annotation 软件包,以及它们对应的方法级注解。

3.3.3. 事件同步

默认情况下,所有 async 通知都会在通知线程池中分配。同步通知将延迟操作持续,直到监听器方法完成或 CompletionStage 完成(前者导致线程阻止)。或者,您可以将监听程序标注为 异步,在这种情况下,操作将立即继续,而通知会在通知线程池上异步完成。要做到这一点,只需注解监听程序,例如:

异步 Listener

@Listener (sync = false)
public class MyAsyncListener {
   @CacheEntryCreated
   void listen(CacheEntryCreatedEvent event) { }
}
Copy to Clipboard Toggle word wrap

阻塞 Synchronous Listener

@Listener
public class MySyncListener {
   @CacheEntryCreated
   void listen(CacheEntryCreatedEvent event) { }
}
Copy to Clipboard Toggle word wrap

非引用 Listener

@Listener
public class MyNonBlockingListener {
   @CacheEntryCreated
   CompletionStage<Void> listen(CacheEntryCreatedEvent event) { }
}
Copy to Clipboard Toggle word wrap
3.3.3.1. 异步线程池

要调整用于分配此类异步通知的线程池,请使用配置文件中的 &lt ;listener-executor /> XML 元素。

3.4. Asynchronous API

除了同步的 API 方法(如 Cache.put ()Cache.remove () 等)之外,数据网格也有一个异步的非阻塞 API,您可以在其中实现相同的结果。

这些方法的命名方式与阻塞计数器类似,并附加"Async"。  例如,Cache.putAsync ()Cache.removeAsync () 等。  这些异步对应部分返回一个包含操作实际结果的 CompletableFuture

例如,在 cache 参数中,在 Cache<String, String>, Cache.put (String key, String value) 返回 String while Cache.putAsync (String key) returns Complet ableFuture<String&gt;。

3.4.1. 为什么使用这种 API?

非阻塞 API 非常强大,它们提供了所有同步通信保证 - 能够处理通信故障和例外 - 在调用完成后不需要阻止。  这可让您更好地利用系统中的并行性。  例如:

Set<CompletableFuture<?>> futures = new HashSet<>();
futures.add(cache.putAsync(key1, value1)); // does not block
futures.add(cache.putAsync(key2, value2)); // does not block
futures.add(cache.putAsync(key3, value3)); // does not block

// the remote calls for the 3 puts will effectively be executed
// in parallel, particularly useful if running in distributed mode
// and the 3 keys would typically be pushed to 3 different nodes
// in the cluster

// check that the puts completed successfully
for (CompletableFuture<?> f: futures) f.get();
Copy to Clipboard Toggle word wrap

3.4.2. 哪些进程实际发生在异步中?

Data Grid 中有 4 个事情,它们被视为典型写入操作的关键路径。这些是成本顺序:

  • 网络调用
  • Marshalling
  • 写入缓存存储(可选)
  • 锁定

使用 async 方法将取网络调用和划分出关键路径。  然而,出于各种技术原因,写入缓存存储和获取锁定,但仍然出现在调用者的线程中。

第 4 章 配置缓存编码

Data Grid 以特定格式保存您的数据,该格式可在读取和写入缓存时进行转换。通过为键和值指定 MediaType 来配置存储格式,它描述了数据的格式。

网格也可以转换不同存储格式之间的数据,以处理不同客户端协议和使用自定义代码处理数据之间的互操作性。

4.1. 缓存编码和客户端互操作性

您用于数据的编码会影响客户端互操作性和功能,如数据网格搜索。

Expand
表 4.1. protobuf 格式
以 Protobuf 格式存储数据,将其与…​ 一起使用

Data Grid 控制台

REST 客户端

Java Hot Rod 客户端

非 Java Hot Rod 客户端

Data Grid Search

自定义 Java 对象

Expand
表 4.2. 基于文本的格式
以基于文本的格式存储数据,将其与…​ 一起使用

Data Grid 控制台

REST 客户端

Java Hot Rod 客户端

非 Java Hot Rod 客户端

Data Grid Search

自定义 Java 对象

Expand
表 4.3. Marshalled Java 对象
Marshalled Java 对象与…​ 兼容

Data Grid 控制台

REST 客户端

Java Hot Rod 客户端

非 Java Hot Rod 客户端

Data Grid Search

Expand
表 4.4. Unmarshalled Java Objects
不建议使用普通旧 Java 对象(POJO),但与…​ 兼容

Data Grid 控制台

REST 客户端

Java Hot Rod 客户端

非 Java Hot Rod 客户端

Data Grid Search

是。但是,您必须注解实体以使用 POJO 搜索,并将您的类提供给 Data Grid 服务器。

自定义 Java 对象

4.1.1. 为 Memcached 客户端配置缓存编码

默认情况下,Data Grid 服务器禁用 Memcached 端点。如果启用 Memcached 端点,您应该使用 Memcached 客户端的适当编码配置缓存。

重要

Memcached 端点不支持身份验证。为了安全起见,您应该将专用缓存用于 Memcached 客户端。您不应该使用 REST 或 Hot Rod 客户端在与 Memcached 客户端相同的数据上交互。

流程

  1. 配置缓存编码,以将 text/plain 用于密钥。
  2. 为值指定任何合适的 MediaType,而不是 application/x-java- 对象

    Memcached 客户端只能以 text/plain 的形式处理密钥。值可以是 Data Grid 存储为 byte[] 的任何 MediaType,可以是 Protobuf、marshalled Java 对象或基于文本的格式。

    <encoding>
      <key media-type="text/plain"/>
      <value media-type="application/x-protostream"/>
    </encoding>
    Copy to Clipboard Toggle word wrap
提示

Memcached 端点包含一个 client-encoding 属性,用于转换值的编码。

例如,如前面的配置示例所示,您可以将值编码为 Protobuf。如果您希望 Memcached 客户端以 JSON 读写值,您可以使用以下配置:

<memcached-connector cache="memcachedCache" client-encoding="application/json">
Copy to Clipboard Toggle word wrap

4.2. 为 Data Grid Caches 配置编码

定义 MediaType,Data Grid 在写入和读取缓存时用于对数据进行编码。

提示

当您定义 MediaType 时,您可以将数据格式指定为 Data Grid。

如果要使用 Data Grid Console、Hot Rod 客户端和 REST 客户端,请指定 application/x-protostream MediaType,以便 Data Grid 以 Protobuf 格式对数据进行编码。

流程

  • 为 Data Grid 缓存配置中的键和值指定 MediaType。

    • 声明:设置 encoding 属性。
    • 以编程方式:使用 encoding () 方法。

声明性示例

  • 对键和值使用相同的编码:
<local-cache>
  <encoding media-type="application/x-protostream"/>
</local-cache>
Copy to Clipboard Toggle word wrap
  • 对键和值使用不同的编码:
<cache>
   <encoding>
      <key media-type="application/x-java-object"/>
      <value media-type="application/xml; charset=UTF-8"/>
   </encoding>
</cache>
Copy to Clipboard Toggle word wrap

编程示例

  • 对键和值使用相同的编码:
ConfigurationBuilder cfg = new ConfigurationBuilder();

cfg
  .encoding()
    .mediaType("application/x-protostream")
  .build());
Copy to Clipboard Toggle word wrap
  • 对键和值使用不同的编码:
ConfigurationBuilder cfg = new ConfigurationBuilder();

cfg.encoding().key().mediaType("text/plain");
cfg.encoding().value().mediaType("application/json");
Copy to Clipboard Toggle word wrap

4.3. 以 Protobuf 格式存储数据

将数据存储在缓存中作为 Protobuf 编码的条目提供了一个独立于平台的配置,可让您从任何客户端执行缓存操作。

注意

当您为 Data Grid Search 配置索引时,Data Grid 会自动使用 application/x-protostream 介质类型存储键和值。

流程

  1. application/x-protostream 指定为键和值的 MediaType,如下所示:

    <distributed-cache name="mycache">
       <encoding>
          <key media-type="application/x-protostream"/>
          <value media-type="application/x-protostream"/>
       </encoding>
    </distributed-cache>
    Copy to Clipboard Toggle word wrap
  2. 配置您的客户端。

热 Rod 客户端必须注册协议缓冲区模式定义,以描述实体和客户端 marshallers。

数据网格在 application/x-protostreamapplication/json 之间转换,因此 REST 客户端只需要发送以下标头来读取和写入 JSON 格式的数据:

  • accept:用于读取操作的 application/json
  • content-Type: 用于写入操作的 application/json

4.4. 以基于文本的格式存储数据

配置 Data Grid 以基于文本的格式存储数据,如 text/ plainapplication/jsonapplication/xml

流程

  1. 将基于文本的存储格式指定为键和值的 MediaType。
  2. (可选)指定字符集,如 UTF-8

    以下示例将 Data Grid 配置为存储带有 text/plain 的条目;charset=UTF-8 格式:

    <cache>
       <encoding>
          <key media-type="text/plain; charset=UTF-8"/>
          <value media-type="text/plain; charset=UTF-8"/>
       </encoding>
    </cache>
    Copy to Clipboard Toggle word wrap
  3. 配置您的客户端。

热 Rod 客户端可以使用 org.infinispan.commons.marshall.StringMarshaller 处理纯文本、JSON、XML 或任何其他基于文本的格式。

您还可以将基于文本的格式与 ProtoStream marshaller 搭配使用。ProtoStream 可以原生处理 Stringbyte[] 类型,而无需创建 Serialization 上下文并注册 Protobuf 模式(.proto 文件)。

REST 客户端必须使用请求发送正确的标头:

  • accept: text/plain; charset=UTF-8 用于读取操作。
  • Content-Type: text/plain; charset=UTF-8 用于写入操作。

4.5. 存储 Marshalled Java 对象

Java Hot Rod 客户端可以处理代表实体的 Java 对象,并执行 marshalling 来序列化和反序列化对象到 byte[] 数组。C++、C# 和 Javascript Hot Rod 客户端也可以使用相应的语言处理对象。

如果您在缓存中存储条目为 marshalled Java 对象,您应该使用 marshalled 存储的 MediaType 配置缓存。

流程

  1. 指定与您的 marshaller 实现匹配的 MediaType。

    • ProtoStream marshaller :将 MediaType 配置为 application/x-protostream
    • JBoss marshalling: 将 MediaType 配置为 application/x-jboss-marshalling
    • Java serialization:将 MediaType 配置为 application/x-java-serialized-object
  2. 配置您的客户端。

由于 REST 客户端最适合处理文本格式,因此您应该对键使用 java.lang.String 等原语。否则,REST 客户端必须使用受支持的二进制编码以 bytes[] 的形式处理密钥。

REST 客户端可以读取 XML 或 JSON 格式的缓存条目的值。

相等注意事项

当以二进制格式存储数据时,Data Grid 将 WrappedBytes 接口用于键和值。这个打包程序类以透明的方式按照需求处理序列化和解序列化,内部可能对对象本身的引用,或者对象的序列化字节数组表示。这对相等的行为有影响,如果您对键实施 equals () 方法,则必须注意这一点。

wrapper 类的 equals () 方法可以比较二进制表示(字节数组)或委托给嵌套对象实例的 equals () 方法,具体取决于在比较时两个实例是否被序列化或反序列化形式。如果比较的一个实例采用一种形式,另一个形式采用另一种形式,则一个实例被序列化或反序列化。

4.6. 存储 Unmarshalled Java Objects

您可以将数据存储为 deserialized Plain Old Java Objects (POJO),而不是以二进制格式存储数据。

不建议存储 POJO 而不是二进制格式,因为它需要在客户端读取操作中序列化数据,并在写入操作中序列化数据。要处理与自定义代码的客户端互操作性,您应该按需转换数据。

流程

  1. application/x-java-object 指定为键和值的 MediaType,如下所示:

    <distributed-cache name="my-cache">
       <encoding>
          <key media-type="application/x-java-object"/>
          <value media-type="application/x-java-object"/>
       </encoding>
    </distributed-cache>
    Copy to Clipboard Toggle word wrap
  2. 将所有自定义对象的类文件放在 Data Grid server classpath 上。

    添加 JAR 文件,其中包含 server/lib 目录中用于 marshaller 实现的自定义类和/或服务提供商。

    ├── server
    │   ├── lib
    │   │   ├── UserObjects.jar
    │       └── README.txt
    Copy to Clipboard Toggle word wrap
  3. 配置您的客户端。

Hot Rod 客户端不需要更改。唯一的要求是客户端中使用的 marshaller 位于 server/lib 目录中,以便 Data Grid 可以序列化对象。

注意

服务器上已提供了 ProtoStream 和 Java Serialization marshallers。

REST 客户端必须使用 JSON 或 XML,以便数据网格可以转换为 Java 对象或从 Java 对象转换。

4.7. 数据编码

编码是在存储数据前由数据进行的数据转换操作,以及从存储中读取数据时进行的数据转换操作。

4.7.1. 概述

编码允许在 API 调用(map、监听器、流等)期间处理某些数据格式,而有效存储的格式会有所不同。

数据转换由 org.infinispan.commons.dataconversion.Encoder 的实例处理:

public interface Encoder {

   /**
    * Convert data in the read/write format to the storage format.
    *
    * @param content data to be converted, never null.
    * @return Object in the storage format.
    */
   Object toStorage(Object content);

   /**
    * Convert from storage format to the read/write format.
    *
    * @param content data as stored in the cache, never null.
    * @return data in the read/write format
    */
   Object fromStorage(Object content);

   /**
     * Returns the {@link MediaType} produced by this encoder or null if the storage format is not known.
     */
   MediaType getStorageFormat();
}
Copy to Clipboard Toggle word wrap

4.7.2. 默认编码器

根据缓存配置,Data Grid 会自动选择 Encoder。下表显示了哪些内部 Encoder 用于以下配置:

Expand
模式配置encoder描述

embedded/Server

Default(默认)

IdentityEncoder

passthrough encoder,没有进行转换

embedded

StorageType.OFF_HEAP

GlobalMarshallerEncoder

使用 Data Grid internal marshaller 转换为 byte[].可以委托到缓存管理器中配置的 marshaller。

embedded

StorageType.BINARY

BinaryEncoder

使用 Data Grid internal marshaller 转换为 byte[],但 primitives 和 String 除外。

服务器

StorageType.OFF_HEAP

IdentityEncoder

远程客户端接收的字节[]

4.7.3. 以编程方式覆盖

可以通过调用 AdvancedCache 中的 .withEncoding () 方法变体来覆盖用于键和值的编程编码。

示例,请考虑以下缓存配置为 OFF_HEAP:

// Read and write POJO, storage will be byte[] since for
// OFF_HEAP the GlobalMarshallerEncoder is used internally:
cache.put(1, new Pojo())
Pojo value = cache.get(1)

// Get the content in its stored format by overriding
// the internal encoder with a no-op encoder (IdentityEncoder)
Cache<?,?> rawContent = cache.getAdvancedCache().withEncoding(IdentityEncoder.class);
byte[] marshalled = (byte[]) rawContent.get(1);
Copy to Clipboard Toggle word wrap

如果缓存中的任何操作都不需要解码,如计算条目数,或者计算 OFF_HEAP 缓存的 byte[] 的大小,则覆盖非常有用。

4.7.4. 定义自定义 Encoders

自定义编码器可以在 EncoderRegistry 中注册。

Important

在启动缓存前,请确保在集群的每个节点中完成注册。

考虑使用 gzip 压缩/解压缩的自定义编码器:

public class GzipEncoder implements Encoder {

   @Override
   public Object toStorage(Object content) {
      assert content instanceof String;
      return compress(content.toString());
   }

   @Override
   public Object fromStorage(Object content) {
      assert content instanceof byte[];
      return decompress((byte[]) content);
   }

   private byte[] compress(String str) {
      try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
           GZIPOutputStream gis = new GZIPOutputStream(baos)) {
         gis.write(str.getBytes("UTF-8"));
         gis.close();
         return baos.toByteArray();
      } catch (IOException e) {
         throw new RuntimeException("Unabled to compress", e);
      }
   }

   private String decompress(byte[] compressed) {
      try (GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed));
           BufferedReader bf = new BufferedReader(new InputStreamReader(gis, "UTF-8"))) {
         StringBuilder result = new StringBuilder();
         String line;
         while ((line = bf.readLine()) != null) {
            result.append(line);
         }
         return result.toString();
      } catch (IOException e) {
         throw new RuntimeException("Unable to decompress", e);
      }
   }

   @Override
   public MediaType getStorageFormat() {
      return MediaType.parse("application/gzip");
   }

   @Override
   public boolean isStorageFormatFilterable() {
      return false;
   }

   @Override
   public short id() {
      return 10000;
   }
}
Copy to Clipboard Toggle word wrap

它可以通过以下方法注册:

GlobalComponentRegistry registry = cacheManager.getGlobalComponentRegistry();
EncoderRegistry encoderRegistry = registry.getComponent(EncoderRegistry.class);
encoderRegistry.registerEncoder(new GzipEncoder());
Copy to Clipboard Toggle word wrap

然后用于从缓存中写入和读取数据:

AdvancedCache<String, String> cache = ...

// Decorate cache with the newly registered encoder, without encoding keys (IdentityEncoder)
// but compressing values
AdvancedCache<String, String> compressingCache = (AdvancedCache<String, String>) cache.withEncoding(IdentityEncoder.class, GzipEncoder.class);

// All values will be stored compressed...
compressingCache.put("297931749", "0412c789a37f5086f743255cfa693dd5");

// ... but API calls deals with String
String stringValue = compressingCache.get("297931749");

// Bypassing the value encoder to obtain the value as it is stored
Object value = compressingCache.withEncoding(IdentityEncoder.class).get("297931749");

// value is a byte[] which is the compressed value
Copy to Clipboard Toggle word wrap

4.8. Transcoders 和 Data Conversion

Data Grid 使用 org.infinispan.commons.dataconversion.Transcoder 在 MediaType 格式之间转换数据。

public interface Transcoder {

   /**
    * Transcodes content between two different {@link MediaType}.
    *
    * @param content         Content to transcode.
    * @param contentType     The {@link MediaType} of the content.
    * @param destinationType The target {@link MediaType} to convert.
    * @return the transcoded content.
    */
   Object transcode(Object content, MediaType contentType, MediaType destinationType);

   /**
    * @return all the {@link MediaType} handled by this Transcoder.
    */
   Set<MediaType> getSupportedMediaTypes();
}
Copy to Clipboard Toggle word wrap

4.8.1. 根据需要转换数据

您可以在 Data Grid 上部署并运行自定义代码,如任务、监听器和合并策略。Data Grid 上的自定义代码可以直接访问数据,但还必须与通过不同端点访问相同数据的客户端进行互操作。例如,您可以创建处理自定义对象的任务,而 Hot Rod 客户端以二进制格式读取和写入数据。

在这种情况下,您可以将 application/x-protostream 配置为缓存编码以二进制格式存储数据,然后将自定义代码配置为使用不同的 MediaType 执行缓存操作。

例如:

DefaultCacheManager cacheManager = new DefaultCacheManager();

// The cache will store POJO for keys and values
ConfigurationBuilder cfg = new ConfigurationBuilder();
cfg.encoding().key().mediaType("application/x-java-object");
cfg.encoding().value().mediaType("application/x-java-object");

cacheManager.defineConfiguration("mycache", cfg.build());

Cache<Integer, Person> cache = cacheManager.getCache("mycache");

cache.put(1, new Person("John","Doe"));

// Wraps cache using 'application/x-java-object' for keys but JSON for values
Cache<Integer, byte[]> jsonValuesCache = (Cache<Integer, byte[]>) cache.getAdvancedCache().withMediaType("application/x-java-object", "application/json");

byte[] json = jsonValuesCache.get(1);
Copy to Clipboard Toggle word wrap

将以 JSON 格式返回值:

{
   "_type":"org.infinispan.sample.Person",
   "name":"John",
   "surname":"Doe"
}
Copy to Clipboard Toggle word wrap

4.8.2. 以嵌入式 Deloyments 安装 Transcoders

默认情况下,Data Grid Server 包括 transcoders。但是,当以库的形式运行 Data Grid 时,您必须将以下内容添加到项目中:

org.infinispan:infinispan-server-core
Copy to Clipboard Toggle word wrap

4.8.3. Transcoders 和 Encoders

通常,缓存操作中没有或只有一个数据转换:

  • 在嵌入式或服务器模式中使用 的缓存上,默认没有转换;
  • 对没有配置 MediaType 的嵌入式缓存进行基于编码的转换,但使用 OFF_HEAP 或 BINARY;
  • 对于在多个 REST 和 Hot Rod 客户端以不同格式发送和接收数据的服务器模式中使用的基于 Transcoder 的转换。这些缓存将配置 MediaType 描述存储。

但是,可以在高级用例中同时使用 encoders 和 transcoders。

例如,一个存储 marshalled 对象(带有 jboss marshaller)内容的缓存,但出于安全原因应该添加透明加密层以避免将"plain"数据存储在外部存储中。客户端应该能以多种格式读取和写入数据。

这可以通过使用描述存储的 MediaType 配置缓存来实现,而不考虑编码层:

ConfigurationBuilder cfg = new ConfigurationBuilder();
cfg.encoding().key().mediaType("application/x-jboss-marshalling");
cfg.encoding().key().mediaType("application/x-jboss-marshalling");
Copy to Clipboard Toggle word wrap

可以通过使用特殊 Encoder 对缓存进行加密/解密并存储/检索来添加透明加密,例如:

class Scrambler implements Encoder {

   public Object toStorage(Object content) {
   // Encrypt data
   }

   public Object fromStorage(Object content) {
   // Decrypt data
   }

   @Override
   public boolean isStorageFormatFilterable() {

   }

   public MediaType getStorageFormat() {
   return new MediaType("application", "scrambled");
   }

   @Override
   public short id() {
   //return id
   }
}
Copy to Clipboard Toggle word wrap

为确保写入缓存的所有数据都会加密,需要使用上面的 Encoder 分离缓存并执行此解码缓存中的所有缓存操作:

Cache<?,?> secureStorageCache = cache.getAdvancedCache().withEncoding(Scrambler.class).put(k,v);
Copy to Clipboard Toggle word wrap

通过减少使用所需 MediaType 的缓存,可以添加以多种格式读取数据的功能:

// Obtain a stream of values in XML format from the secure cache
secureStorageCache.getAdvancedCache().withMediaType("application/xml","application/xml").values().stream();
Copy to Clipboard Toggle word wrap

在内部,数据网格首先应用 存储操作中的 encoder 获取 条目,该条目将采用 "application/x-jboss-marshalling" 格式,然后使用适当的 Transcoder 将连续转换应用到"application/xml"。

第 5 章 将 Data Grid 配置为 Marshall Java 对象

Marshalling 将 Java 对象转换为二进制格式,以便可以通过线路或存储到磁盘传输它们。反向进程 unmarshalling 将数据从二进制格式转换为 Java 对象。

Data Grid 执行 marshalling 和 unmarshalling to:

  • 将数据发送到集群中的其他 Data Grid 节点。
  • 将数据存储在持久缓存存储中。
  • 以二进制格式存储数据以提供反序列化功能。

5.1. 支持的类型

Data Grid 使用 ProtoStream API 将 Java 对象编码并解码为协议缓冲区(Protobuf);语言中立且向后兼容的格式。

ProtoStream 可以处理键和值的以下类型,以及在原语类型的情况下未附带的等效类型:

  • byte[]
  • byte
  • 字符串
  • 整数
  • Long
  • å�Œ
  • 浮点值
  • 布尔值
  • short
  • 字符
  • java.util.Date
  • java.time.Instant

5.2. 使用 ProtoStream 的 Marshalling User Types

用户类型为 Data Grid 不支持的 Java 对象。要进行 marshall 用户类型,您可以实施 SerializationContextInitializer 接口来描述 Java 对象,以便 ProtoStream 库可以对其进行编码,而 Data Grid 可以传输和存储它们。

5.2.1. 生成 Serialization Context Initializers

ProtoStream SerializationContext 包含自定义 Java 对象的 Protobuf 类型定义,从 Protobuf 模式加载,以及这些对象的附带的 marshallers。

Data Grid 提供了一个 protostream-processor 工件,可在编译时处理您的类中的 Java 注解。处理器生成 Protobuf 模式、marshallers 和 SerializationContext 接口的具体实施,您可以使用它来初始化 ProtoStream SerializationContext

注意

默认情况下,实施名称是注解的类名称,带有 "Impl" 后缀。

流程

  1. protostream-processor 依赖项添加到 pom.xml 中。

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.infinispan</groupId>
          <artifactId>infinispan-bom</artifactId>
          <version>${version.infinispan}</version>
          <type>pom</type>
        </dependency>
      </dependencies>
    </dependencyManagement>
    
    <dependencies>
      <dependency>
        <groupId>org.infinispan.protostream</groupId>
        <artifactId>protostream-processor</artifactId>
        <!--
          This dependency should be declared in the "provided" scope or made "optional"
          because it is a compile-only dependency and is not required at runtime.
          Transitive propagation of this dependency should be also be avoided.
        -->
        <scope>provided</scope>
      </dependency>
    </dependencies>
    Copy to Clipboard Toggle word wrap
  2. 使用 @ProtoField@ProtoFactory 注解您要 marshall 的 Java 对象。

    Author.java

    import org.infinispan.protostream.annotations.ProtoFactory;
    import org.infinispan.protostream.annotations.ProtoField;
    
    public class Author {
       @ProtoField(number = 1)
       final String name;
    
       @ProtoField(number = 2)
       final String surname;
    
       @ProtoFactory
       Author(String name, String surname) {
          this.name = name;
          this.surname = surname;
       }
       // public Getter methods omitted for brevity
    }
    Copy to Clipboard Toggle word wrap

    Book.java

    import org.infinispan.protostream.annotations.ProtoFactory;
    import org.infinispan.protostream.annotations.ProtoField;
    ...
    
    public class Book {
       @ProtoField(number = 1)
       final String title;
    
       @ProtoField(number = 2)
       final String description;
    
       @ProtoField(number = 3, defaultValue = "0")
       final int publicationYear;
    
       @ProtoField(number = 4, collectionImplementation = ArrayList.class)
       final List<Author> authors;
    
       @ProtoFactory
       Book(String title, String description, int publicationYear, List<Author> authors) {
          this.title = title;
          this.description = description;
          this.publicationYear = publicationYear;
          this.authors = authors;
       }
       // public Getter methods omitted for brevity
    }
    Copy to Clipboard Toggle word wrap

  3. 定义扩展 SerializationContextInitializer 的接口,并使用 @AutoProtoSchemaBuilder 标注。

    @AutoProtoSchemaBuilder(
          includeClasses = {
                Book.class,
                Author.class,
          },
          schemaFileName = "library.proto", 
    1
    
          schemaFilePath = "proto/", 
    2
    
          schemaPackageName = "book_sample")
    interface LibraryInitializer extends SerializationContextInitializer {
    }
    Copy to Clipboard Toggle word wrap
    1
    将生成的 .proto 模式文件命名为 .proto。
    2
    在生成 schema 文件的 target/classes 下设置路径。

后续步骤

SerializationContextInitializer 实现添加到 Data Grid 配置中以注册它。

请参阅 Registering Serialization Context Initializers

5.2.2. 手动实施 Serialization Context Initializers

在某些情况下,您可能需要手动定义 Protobuf 模式,并实现 ProtoStream marshallers。例如,如果无法修改 Java 对象类来添加注解。

流程

  1. 创建一个 Protobuf 模式 .proto 文件,为 marshall 提供 Java 对象的结构化表示。

    package book_sample;
    
    message Book {
        optional string title = 1;
        optional string description = 2;
        optional int32 publicationYear = 3; // no native Date type available in Protobuf
    
        repeated Author authors = 4;
    }
    
    message Author {
        optional string name = 1;
        optional string surname = 2;
    }
    Copy to Clipboard Toggle word wrap

    前面的 .library.proto 文件定义了名为 Book 的实体(Protobuf 消息类型),该实体包含在 book_sample 软件包中。本书 声明了原语类型的多个字段以及名为 author 的数组(Protobuf 可重复字段),即 Author 消息类型。

    • 您可以嵌套消息,但生成的结构严格是一个树,而不是图形。
    • 类型继承是不可能的。
    • 不支持集合,但您可以使用重复字段模拟数组。
  2. 使用 org.infinispan.protostream.MessageMarshaller 接口为您的类实施 marshallers。

    注意

    MessageMarshaller 接口现已弃用。

    下一个版本的 Data Grid 提供了一种替代的实现,可让您创建一个 adaptor 类,该类将 @ProtoAdaptor 注释用于任何外部的第三方 Java 对象类。

    BookMarshaller.java

    import org.infinispan.protostream.MessageMarshaller;
    
    public class BookMarshaller implements MessageMarshaller<Book> {
    
       @Override
       public String getTypeName() {
          return "book_sample.Book";
       }
    
       @Override
       public Class<? extends Book> getJavaClass() {
          return Book.class;
       }
    
       @Override
       public void writeTo(MessageMarshaller.ProtoStreamWriter writer, Book book) throws IOException {
          writer.writeString("title", book.getTitle());
          writer.writeString("description", book.getDescription());
          writer.writeInt("publicationYear", book.getPublicationYear());
          writer.writeCollection("authors", book.getAuthors(), Author.class);
       }
    
       @Override
       public Book readFrom(MessageMarshaller.ProtoStreamReader reader) throws IOException {
          String title = reader.readString("title");
          String description = reader.readString("description");
          int publicationYear = reader.readInt("publicationYear");
          List<Author> authors = reader.readCollection("authors", new ArrayList<>(), Author.class);
          return new Book(title, description, publicationYear, authors);
       }
    }
    Copy to Clipboard Toggle word wrap

    AuthorMarshaller.java

    import org.infinispan.protostream.MessageMarshaller;
    
    public class AuthorMarshaller implements MessageMarshaller<Author> {
    
       @Override
       public String getTypeName() {
          return "book_sample.Author";
       }
    
       @Override
       public Class<? extends Author> getJavaClass() {
          return Author.class;
       }
    
       @Override
       public void writeTo(MessageMarshaller.ProtoStreamWriter writer, Author author) throws IOException {
          writer.writeString("name", author.getName());
          writer.writeString("surname", author.getSurname());
       }
    
       @Override
       public Author readFrom(MessageMarshaller.ProtoStreamReader reader) throws IOException {
          String name = reader.readString("name");
          String surname = reader.readString("surname");
          return new Author(name, surname);
       }
    }
    Copy to Clipboard Toggle word wrap

  3. 创建一个 SerializationContextInitializer 实现,它使用 SerializationContext 注册 .proto 模式和 ProtoStream marshaller 实现。

    ManualSerializationContextInitializer.java

    import org.infinispan.protostream.FileDescriptorSource;
    import org.infinispan.protostream.SerializationContext;
    import org.infinispan.protostream.SerializationContextInitializer;
    ...
    
    public class ManualSerializationContextInitializer implements SerializationContextInitializer {
       @Override
       public String getProtoFileName() {
          return "library.proto";
       }
    
       @Override
       public String getProtoFile() throws UncheckedIOException {
          // Assumes that the file is located in a Jar's resources, we must provide the path to the library.proto file
          return FileDescriptorSource.getResourceAsString(getClass(), "/" + getProtoFileName());
       }
    
       @Override
       public void registerSchema(SerializationContext serCtx) {
          serCtx.registerProtoFiles(FileDescriptorSource.fromString(getProtoFileName(), getProtoFile()));
       }
    
       @Override
       public void registerMarshallers(SerializationContext serCtx) {
          serCtx.registerMarshaller(new AuthorMarshaller());
          serCtx.registerMarshaller(new BookMarshaller());
       }
    }
    Copy to Clipboard Toggle word wrap

后续步骤

SerializationContextInitializer 实现添加到 Data Grid 配置中以注册它。

请参阅 Registering Serialization Context Initializers

5.2.3. 注册 Serialization Context Initializers

在 Data Grid 配置中声明 SerializationContextInitializer 实现来注册它们。

流程

  • 手动以编程方式或声明性注册 SerializationContextInitializer 实现,如下例所示:

编程配置

GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
builder.serialization()
       .addContextInitializers(new LibraryInitializerImpl(), new SCIImpl());
Copy to Clipboard Toggle word wrap

声明性配置

<serialization>
    <context-initializer class="org.infinispan.example.LibraryInitializerImpl"/>
    <context-initializer class="org.infinispan.example.another.SCIImpl"/>
</serialization>
Copy to Clipboard Toggle word wrap

5.3. 配置替代 Marshaller 实施

Data Grid 提供 Marshaller 实现,您可以使用它而不是 ProtoStream。您还可以将 Data Grid 配置为使用自定义 marshaller 实现。

5.3.1. 使用 JBoss Marshalling

JBoss Marshalling 是一个基于序列化的 marshalling 库,是之前的 Data Grid 版本中的默认 marshaller。

注意
  • 您不应该在 Data Grid 中使用基于序列化的 marshalling。反之,您应该使用 Protostream,它是一个高性能的二进制有线格式,以确保向后兼容。
  • JBoss Marshalling 和 AdvancedExternalizer 接口已弃用,并将在以后的发行版本中删除。但是,除非您使用 JBoss Marshalling,否则数据网格会忽略 高级外部工具 实现。

流程

  1. infinispan-jboss-marshalling 依赖项添加到您的 classpath。
  2. 将 Data Grid 配置为使用 GenericJBossMarshaller
  3. 将您的 Java 类添加到反序列化白名单中。

    • 以编程方式:

      GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
      builder.serialization()
             .marshaller(new GenericJBossMarshaller())
             .whiteList()
             .addRegexps("org.infinispan.example.", "org.infinispan.concrete.SomeClass");
      Copy to Clipboard Toggle word wrap
    • 声明:

      <serialization marshaller="org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller">
        <white-list>
            <class>org.infinispan.concrete.SomeClass</class>
            <regex>org.infinispan.example.*</regex>
        <white-list>
      </serialization>
      Copy to Clipboard Toggle word wrap

5.3.2. 使用 Java Serialization

您可以将 Java 序列化与 Data Grid 搭配使用,以汇总您的对象,但只有 Java 对象实现 Java Serializable 接口。

流程

  1. 将 Data Grid 配置为使用 JavaSerializationMarshaller 作为 marshaller。
  2. 将您的 Java 类添加到反序列化白名单中。

    • 以编程方式:

      GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
      builder.serialization()
             .marshaller(new JavaSerializationMarshaller())
             .whiteList()
             .addRegexps("org.infinispan.example.", "org.infinispan.concrete.SomeClass");
      Copy to Clipboard Toggle word wrap
    • 声明:

      <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
          <white-list>
              <class>org.infinispan.concrete.SomeClass</class>
              <regex>org.infinispan.example.*</regex>
          </white-list>
      </serialization>
      Copy to Clipboard Toggle word wrap

5.3.3. 使用 Kryo Marshaller

Data Grid 提供了一个使用 Kryo 库的 marshalling 实现。

Data Grid Servers 的先决条件

要将 Kryo marshalling 与 Data Grid 服务器搭配使用,请添加 JAR,其中包含 Kryo marshalling 实现的运行时类文件,如下所示:

  1. 从 Data Grid Maven 存储库复制 infinispan-marshaller-kryo-bundle.jar
  2. 将 JAR 文件添加到 Data Grid 服务器安装目录中的 server/lib 目录中。

Data Grid Library Mode 的先决条件

要将 Kryo marshalling 与 Data Grid 搭配使用,作为应用程序中的嵌入式库,请执行以下操作:

  1. infinispan-marshaller-kryo 依赖项添加到您的 pom.xml

    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-marshaller-kryo</artifactId>
      <version>${version.infinispan}</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. org.infinispan.marshaller.kryo.KryoMarshaller 类指定为 marshaller。

    GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
    builder.serialization()
           .marshaller(new org.infinispan.marshaller.kryo.KryoMarshaller());
    Copy to Clipboard Toggle word wrap

流程

  1. SerializerRegistryService.java 接口实施服务提供商。
  2. 将所有序列化器注册放在 寄存器(Kryo) 方法中;其中 serializers 使用 Kryo API 向提供的 Kryo 对象注册,例如:

    kryo.register(ExampleObject.class, new ExampleObjectSerializer())
    Copy to Clipboard Toggle word wrap
  3. 指定在部署 JAR 文件中实施类的完整路径:

    META-INF/services/org/infinispan/marshaller/kryo/SerializerRegistryService
    Copy to Clipboard Toggle word wrap

5.3.4. 使用 Protostuff Marshaller

Data Grid 提供了一个 marshalling 实现,它使用 Protostuff 库。

Data Grid Servers 的先决条件

要将 Protostuff marshalling 与 Data Grid 服务器一起使用,请添加 JAR,其中包含 Protostuff marshalling 实现的运行时类文件,如下所示:

  1. 从 Data Grid Maven 存储库复制 infinispan-marshaller-protostuff-bundle.jar
  2. 将 JAR 文件添加到 Data Grid 服务器安装目录中的 server/lib 目录中。

Data Grid Library Mode 的先决条件

要将 Protostuff marshalling 与 Data Grid 一起用作应用程序中嵌入的库,请执行以下操作:

  1. infinispan-marshaller-protostuff 依赖项添加到 pom.xml

    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-marshaller-protostuff</artifactId>
      <version>${version.infinispan}</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. org.infinispan.marshaller.protostuff.ProtostuffMarshaller 类指定为 marshaller。

    GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
    builder.serialization()
           .marshaller(new org.infinispan.marshaller.protostuff.ProtostuffMarshaller());
    Copy to Clipboard Toggle word wrap

流程

执行以下操作之一为对象 marshalling 注册自定义 Protostuff 模式:

  • 调用 register () 方法。

    RuntimeSchema.register(ExampleObject.class, new ExampleObjectSchema());
    Copy to Clipboard Toggle word wrap
  • SerializerRegistryService.java 接口实施服务供应商,该接口将所有模式注册放置在 register () 方法中。

    然后,您应该指定在部署 JAR 文件中实施类的完整路径:

    META-INF/services/org/infinispan/marshaller/protostuff/SchemaRegistryService
    Copy to Clipboard Toggle word wrap

5.3.5. 使用自定义 Marshallers

Data Grid 提供了一个 Marshaller 接口,您可以为自定义 marshallers 实施。

流程

  1. 实施 Marshaller 接口。
  2. 将 Data Grid 配置为使用您的 marshaller。
  3. 将您的 Java 类添加到反序列化白名单中。

    • 以编程方式:

      GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
      builder.serialization()
            .marshaller(new org.infinispan.example.marshall.CustomMarshaller())
            .whiteList().addRegexp("org.infinispan.example.*");
      Copy to Clipboard Toggle word wrap
    • 声明:

      <serialization marshaller="org.infinispan.example.marshall.CustomMarshaller">
          <white-list>
              <class>org.infinispan.concrete.SomeClass</class>
              <regex>org.infinispan.example.*</regex>
          </white-list>
      </serialization>
      Copy to Clipboard Toggle word wrap
提示

自定义 marshaller 实现可以通过 initialize () 方法访问配置的白板列表,该方法在启动时调用。

出于安全原因,数据网格不允许对 arbritrary Java 类进行反序列化处理,这适用于 JSON、XML 和 marshalled byte[] 内容。

您必须将 Java 类添加到反序列化白名单中,可以使用系统属性或在 Data Grid 配置中指定它们。

系统属性

// Specify a comma-separated list of fully qualified class names
-Dinfinispan.deserialization.whitelist.classes=java.time.Instant,com.myclass.Entity

// Specify a regular expression to match classes
-Dinfinispan.deserialization.whitelist.regexps=.*
Copy to Clipboard Toggle word wrap

声明

<cache-container>
   <serialization version="1.0" marshaller="org.infinispan.marshall.TestObjectStreamMarshaller">
      <white-list>
         <class>org.infinispan.test.data.Person</class>
         <regex>org.infinispan.test.data.*</regex>
       </white-list>
   </serialization>
</cache-container>
Copy to Clipboard Toggle word wrap

注意

您添加到反序列化白名单中的 Java 类适用于 Data Grid CacheContainer,并可以被 CacheContainer 控制的所有缓存反序列化。

第 6 章 集群锁定

集群锁定是数据网格集群中跨节点分布和共享的数据结构。集群锁定允许您运行在节点间同步的代码。

6.1. 锁定 API

Data Grid 提供了一个 ClusteredLock API,可让您在嵌入式模式中使用 Data Grid 时同时在集群中执行代码。

API 由以下内容组成:

  • ClusteredLock 会公开方法来实现集群锁定。
  • ClusteredLockManager 会公开方法来定义、配置、检索和删除集群锁定。
  • EmbeddedClusteredLockManagerFactory 初始化 ClusteredLockManager 实现。

所有权

Data Grid 支持 NODE 所有权,以便集群中的所有节点都可以使用锁定。

Reentrancy

数据网格集群锁定不是潜在的,因此集群中的任何节点都可以获取锁定,但只有创建锁定的节点才能释放它。

如果为同一所有者发送两个连续锁定调用,则第一个调用会获取锁定(如果可用),第二个调用会被阻止。

6.2. 使用集群锁定

了解如何在应用程序中嵌入的 Data Grid 中使用集群锁定。

先决条件

  • infinispan-clustered-lock 依赖项添加到 pom.xml 中:
<dependency>
   <groupId>org.infinispan</groupId>
   <artifactId>infinispan-clustered-lock</artifactId>
</dependency>
Copy to Clipboard Toggle word wrap

流程

  1. 从缓存管理器初始化 ClusteredLockManager 接口。此接口是定义、检索和删除集群锁定的入口点。
  2. 为每个集群锁定指定唯一名称。
  3. 使用 lock.tryLock (1, TimeUnit.SECONDS) 方法获取锁定。
// Set up a clustered Cache Manager.
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();

// Configure the cache mode, in this case it is distributed and synchronous.
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.DIST_SYNC);

// Initialize a new default Cache Manager.
DefaultCacheManager cm = new DefaultCacheManager(global.build(), builder.build());

// Initialize a Clustered Lock Manager.
ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);

// Define a clustered lock named 'lock'.
clm1.defineLock("lock");

// Get a lock from each node in the cluster.
ClusteredLock lock = clm1.get("lock");

AtomicInteger counter = new AtomicInteger(0);

// Acquire the lock as follows.
// Each 'lock.tryLock(1, TimeUnit.SECONDS)' method attempts to acquire the lock.
// If the lock is not available, the method waits for the timeout period to elapse. When the lock is acquired, other calls to acquire the lock are blocked until the lock is released.
CompletableFuture<Boolean> call1 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 1");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 1");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call2 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 2");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 2");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call3 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 3");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 3");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture.allOf(call1, call2, call3).whenComplete((r, ex) -> {
    // Print the value of the counter.
    System.out.println("Value of the counter is " + counter.get());

    // Stop the Cache Manager.
    cm.stop();
});
Copy to Clipboard Toggle word wrap

6.3. 为锁定配置内部缓存

集群锁定管理器包含一个存储锁定状态的内部缓存。您可以以声明性方式或以编程方式配置内部缓存。

流程

  1. 定义集群中存储集群锁定状态的节点数量。默认值为 -1,它将值复制到所有节点。
  2. 为缓存可靠性指定以下值之一,它控制集群锁定在集群分割到分区或多个节点离开时的行为方式:

    • AVAILABLE: 任何分区中的节点都可以同时在锁定时操作。
    • CONSISTENT :只有属于大多数分区的节点才能在锁定上运行。这是默认值。
    • 编程配置

      import org.infinispan.lock.configuration.ClusteredLockManagerConfiguration;
      import org.infinispan.lock.configuration.ClusteredLockManagerConfigurationBuilder;
      import org.infinispan.lock.configuration.Reliability;
      ...
      
      GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
      
      final ClusteredLockManagerConfiguration config = global.addModule(ClusteredLockManagerConfigurationBuilder.class).numOwner(2).reliability(Reliability.AVAILABLE).create();
      
      DefaultCacheManager cm = new DefaultCacheManager(global.build());
      
      ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);
      
      clm1.defineLock("lock");
      Copy to Clipboard Toggle word wrap
    • 声明性配置

      <?xml version="1.0" encoding="UTF-8"?>
      <infinispan
              xmlns="urn:infinispan:config:11.0">
          ...
          <cache-container default-cache="default">
              <transport/>
              <local-cache name="default">
                  <locking concurrency-level="100" acquire-timeout="1000"/>
              </local-cache>
      
              <clustered-locks xmlns="urn:infinispan:config:clustered-locks:11.0"
                               num-owners = "3"
                               reliability="AVAILABLE">
                  <clustered-lock name="lock1" />
                  <clustered-lock name="lock2" />
              </clustered-locks>
          </cache-container>
          ...
      </infinispan>
      Copy to Clipboard Toggle word wrap

第 7 章 集群的计数

集群计数器 是数据网格集群中的所有节点之间分布和共享的计数器。计数器可以有不同的一致性级别:强度和弱点。

虽然强/弱一致的计数器都有单独的接口,但支持更新其值,并在更新其值时返回事件。本文档中提供了详细信息,以帮助您选择最适合您的用例。

7.1. 安装和配置

要开始使用计数器,您需要在 Maven pom.xml 文件中添加依赖项:

pom.xml

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-clustered-counter</artifactId>
</dependency>
Copy to Clipboard Toggle word wrap

计数器可以通过本文档后面详述的 CounterManager 接口配置数据源配置文件或按需配置。当 EmbeddedCacheManager 启动时,会在启动时在 Data Grid 配置文件中配置计数器。这些计数器会立即启动,它们在所有集群的节点中都可用。

configuration.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan>
    <cache-container ...>
        <!-- if needed to persist counter, global state needs to be configured -->
        <global-state>
            ...
        </global-state>
        <!-- your caches configuration goes here -->
         <counters xmlns="urn:infinispan:config:counters:11.0" num-owners="3" reliability="CONSISTENT">
             <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/>
             <strong-counter name="c2" initial-value="2" storage="VOLATILE">
                 <lower-bound value="0"/>
             </strong-counter>
             <strong-counter name="c3" initial-value="3" storage="PERSISTENT">
                 <upper-bound value="5"/>
             </strong-counter>
             <strong-counter name="c4" initial-value="4" storage="VOLATILE">
                 <lower-bound value="0"/>
                 <upper-bound value="10"/>
             </strong-counter>
             <weak-counter name="c5" initial-value="5" storage="PERSISTENT" concurrency-level="1"/>
         </counters>
    </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

或以编程方式在 GlobalConfigurationBuilder 中:

GlobalConfigurationBuilder globalConfigurationBuilder = ...;
CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class);
builder.numOwner(3).reliability(Reliability.CONSISTENT);
builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE);
builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT);
builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE);
builder.addWeakCounter().name("c5").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);
Copy to Clipboard Toggle word wrap

另一方面,可以在 EmbeddedCacheManager 初始化后随时按需配置计数器。

CounterManager manager = ...;
manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build());
manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build());
manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build());
manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build());
manager.defineCounter("c2", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
Copy to Clipboard Toggle word wrap
注意

CounterConfiguration 是不可变的,可以重复使用。

如果计数器成功配置或 false,则方法 defineCounter () 将返回 true。但是,如果配置无效,方法将抛出 CounterConfigurationException。要查找计数器是否已定义,请使用方法 defined ()

CounterManager manager = ...
if (!manager.isDefined("someCounter")) {
    manager.define("someCounter", ...);
}
Copy to Clipboard Toggle word wrap

针对集群属性:

  • num-owners :设置计数器的副本数,以保持集群范围。较小的数量会加快更新操作,但将支持较少的服务器崩溃。它必须是正数,其默认值为 2
  • 可靠性 :在网络分区中设置计数器的更新行为。默认值为 AVAILABLE,有效值为:

    • AVAILABLE: 所有分区都可以读取和更新计数器的值。
    • CONSISTENT :只有主分区(节点的主分区)将能够读取和更新计数器的值。剩余的分区只能读取其值。

每个计数器属性:

  • initial-value [common] :设置计数器的初始值。默认值为 0 ( 零)。
  • Storage [common] :设置集群关闭时和重启时计数器的行为。默认值为 VOLATILE,有效值为:

    • VOLATILE :计数器的值仅在内存中可用。当集群关闭时,该值将会丢失。
    • PERSISTENT: 计数器的值存储在私有和本地持久性存储中。当集群重启后关闭并恢复时,该值会被保留。
注意

在群集关闭后,按需和 VOLATILE 计数器将会丢失其值和配置。重启后必须再次定义它们。

  • lower-bound [strong]:设置强度一致的计数器的下限。默认值为 Long.MIN_VALUE
  • upper-bound [strong]:设置强度一致的计数器的上限。默认值为 Long.MAX_VALUE
注意

如果没有配置 下限上限,则强大的计数器被设置为 unbounded。

警告

initial-value 必须介于 lower-boundupper-bound inclusive内。

  • concurrency-level [weak] :设置并发更新的数量。其 值必须是正数,默认值为 16

7.1.1. 列出计数器名称

要列出定义的所有计数器,method CounterManager.getCounterNames () 返回集群范围的所有计数器名称的集合。

7.2. CounterManager 接口

CounterManager 接口是定义、检索和删除计数器的入口点。

嵌入式部署

CounterManager 会自动侦听创建 EmbeddedCacheManager,并执行每个 EmbeddedCacheManager 实例的实例注册。它启动存储计数器状态所需的缓存并配置默认计数器。

检索 CounterManager 非常简单,如调用 EmbeddedCounterManagerFactory.asCounterManager (EmbeddedCacheManager),如下例所示:

// create or obtain your EmbeddedCacheManager
EmbeddedCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);
Copy to Clipboard Toggle word wrap

服务器部署

对于 Hot Rod 客户端,CounterManager 在 RemoteCacheManager 中注册,如下所示:

// create or obtain your RemoteCacheManager
RemoteCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);
Copy to Clipboard Toggle word wrap

7.2.1. 通过 CounterManager 删除计数器

通过 Strong/WeakCounter 接口和 CounterManager 删除计数器之间存在区别。CounterManager.remove (String) 从集群中移除计数器值,并删除本地计数器实例中注册的所有监听程序。另外,计数器实例不再可以被重复使用,它可能会返回无效的结果。

在另一端,Strong/WeakCounter 删除只会删除计数器值。实例仍然可以重复使用,侦听器仍可以正常工作。

注意

如果在删除后访问计数器,则会重新创建该计数器。

7.3. Counter

计数器可能非常强大(StrongCounter)或弱一致(WeakCounter),它们都由一个名称来标识。它们有一个特定的接口,但它们共享一些逻辑,即它们都是异步的(每个操作会返回一个 CompletableFuture ),提供一个 update 事件,并可以重置为初始值。

如果您不想使用 async API,可以通过 sync () 方法返回同步计数器。API 相同,但没有 CompletableFuture 返回值。

以下方法对两个接口都很常见:

String getName();
CompletableFuture<Long> getValue();
CompletableFuture<Void> reset();
<T extends CounterListener> Handle<T> addListener(T listener);
CounterConfiguration getConfiguration();
CompletableFuture<Void> remove();
SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
Copy to Clipboard Toggle word wrap
  • getName () 返回计数器名称(identifier)。
  • getValue () 返回当前计数器的值。
  • reset () 允许将计数器的值重置为其初始值。
  • 添加Listener () 注册监听程序以接收更新事件。在 Notification 和 Events 部分中有关它的更多详细信息。
  • getConfiguration () 返回计数器所使用的配置。
  • remove () 从集群中移除计数器值。仍可使用实例,并保留监听器。
  • sync () 创建一个同步计数器。
注意

如果在删除后访问计数器,则会重新创建该计数器。

强大的计数器使用存储在 Data Grid 缓存中的单个密钥来提供所需的一致性。所有更新都在键锁定下执行,以更新其值。另一方面,读取不会获取任何锁定并读取当前值。另外,在这个方案中,它允许绑定计数器值,并提供 compare-and-set/swap 等原子操作。

StrongCounter 可以通过使用 getStrongCounter () 方法从 CounterManager 中检索。例如:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
Copy to Clipboard Toggle word wrap
警告

由于每个操作都会命中一个密钥,因此 StrongCounter 具有更高的争用率。

StrongCounter 接口添加了以下方法:

default CompletableFuture<Long> incrementAndGet() {
   return addAndGet(1L);
}

default CompletableFuture<Long> decrementAndGet() {
   return addAndGet(-1L);
}

CompletableFuture<Long> addAndGet(long delta);

CompletableFuture<Boolean> compareAndSet(long expect, long update);

CompletableFuture<Long> compareAndSwap(long expect, long update);
Copy to Clipboard Toggle word wrap
  • incrementAndGet () 将计数器递增,并返回新值。
  • 将计数器缩减为 decrementAndGet (),并返回新值。
  • addAndGet () 将 delta 添加到计数器的值中,并返回新值。
  • 比较AndSet ()比较AndSwap (),如果当前值是预期的值,则以原子方式设置计数器的值。
注意

完成 CompletableFuture 时,操作被视为已完成。

注意

compare-and-set 和 compare-and-swap 之间的区别在于,如果操作成功,则前者会返回 true,同时稍后返回前面的值。如果返回值与预期相同,则 compare-and-swap 可以成功。

7.3.1.1. bounded StrongCounter

绑定后,上述所有更新方法将在达到较低或上限的 CounterOutOfBoundsException 时抛出一个 CounterOutOfBoundsException。例外方法可以检查已达到哪个侧绑定:

public boolean isUpperBoundReached();
public boolean isLowerBoundReached();
Copy to Clipboard Toggle word wrap
7.3.1.2. 使用案例

强大的计数器在以下用例中更适合:

  • 当每次更新后都需要计数器的值(例如,cluster-wise ids 生成器或序列)
  • 当需要绑定的计数器(例如,速率限制器)
7.3.1.3. 使用示例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter");

// incrementing the counter
System.out.println("new value is " + counter.incrementAndGet().get());

// decrement the counter's value by 100 using the functional API
counter.addAndGet(-100).thenApply(v -> {
   System.out.println("new value is " + v);
   return null;
}).get();

// alternative, you can do some work while the counter is updated
CompletableFuture<Long> f = counter.addAndGet(10);
// ... do some work ...
System.out.println("new value is " + f.get());

// and then, check the current value
System.out.println("current value is " + counter.getValue().get());

// finally, reset to initial value
counter.reset().get();
System.out.println("current value is " + counter.getValue().get());

// or set to a new value if zero
System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
Copy to Clipboard Toggle word wrap

另外,还有另一个使用绑定计数器的示例:

StrongCounter counter = counterManager.getStrongCounter("bounded_counter");

// incrementing the counter
try {
    System.out.println("new value is " + counter.addAndGet(100).get());
} catch (ExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof CounterOutOfBoundsException) {
       if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
          System.out.println("ops, upper bound reached.");
       } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
          System.out.println("ops, lower bound reached.");
       }
    }
}

// now using the functional API
counter.addAndGet(-100).handle((v, throwable) -> {
   if (throwable != null) {
      Throwable cause = throwable.getCause();
      if (cause instanceof CounterOutOfBoundsException) {
         if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) {
            System.out.println("ops, upper bound reached.");
         } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) {
            System.out.println("ops, lower bound reached.");
         }
      }
      return null;
   }
   System.out.println("new value is " + v);
   return null;
}).get();
Copy to Clipboard Toggle word wrap

compare-and-set vs Compare-and-swap 示例:

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue, newValue;
do {
   oldValue = counter.getValue().get();
   newValue = someLogic(oldValue);
} while (!counter.compareAndSet(oldValue, newValue).get());
Copy to Clipboard Toggle word wrap

使用 compare-and-swap,它会保存一个调用计数器调用(counter.getValue())

StrongCounter counter = counterManager.getStrongCounter("my-counter");
long oldValue = counter.getValue().get();
long currentValue, newValue;
do {
   currentValue = oldValue;
   newValue = someLogic(oldValue);
} while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
Copy to Clipboard Toggle word wrap

7.3.2. WeakCounter 接口:需要速度时

WeakCounter 将计数器的值存储在 Data Grid 缓存中的多个键中。创建的键数量由 concurrency-level 属性配置。每个键存储计数器值的部分状态,并可同时更新。相对于 StrongCounter 的主要优点是缓存中的竞争较低。另一方面,读取其值更为昂贵,不允许绑定。

警告

应谨慎处理重置操作。它不是 原子的,它会生成中间值。读取操作以及注册的任何监听器都可以看到这些值。

可以使用 get WeakCounter () 方法从 CounterManager 中检索 WeakCounter。例如:

CounterManager counterManager = ...
StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
Copy to Clipboard Toggle word wrap
7.3.2.1. 弱计数器接口

WeakCounter 添加以下方法:

default CompletableFuture<Void> increment() {
   return add(1L);
}

default CompletableFuture<Void> decrement() {
   return add(-1L);
}

CompletableFuture<Void> add(long delta);
Copy to Clipboard Toggle word wrap

它们与 'StrongCounter 的方法类似,但它们不会返回新值。

7.3.2.2. 使用案例

当不需要更新操作的结果或不需要计数器的值时,弱计数器最适合于用例中。收集统计数据是此类用例的良好示例。

7.3.2.3. 例子

以下是弱计数器用法的示例。

WeakCounter counter = counterManager.getWeakCounter("my_counter");

// increment the counter and check its result
counter.increment().get();
System.out.println("current value is " + counter.getValue());

CompletableFuture<Void> f = counter.add(-100);
//do some work
f.get(); //wait until finished
System.out.println("current value is " + counter.getValue().get());

//using the functional API
counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get();
System.out.println("current value is " + counter.getValue().get());
Copy to Clipboard Toggle word wrap

7.4. 通知和事件

强和弱计数器都支持侦听器接收其更新事件。侦听器必须实施 CounterListener,并可以使用以下方法注册:

<T extends CounterListener> Handle<T> addListener(T listener);
Copy to Clipboard Toggle word wrap

CounterListener 有以下接口:

public interface CounterListener {
   void onUpdate(CounterEvent entry);
}
Copy to Clipboard Toggle word wrap

返回的 Handle 对象具有在不再需要时删除 CounterListener 的主要目标。另外,它还允许访问它处理的 CounterListener 实例。它有以下接口:

public interface Handle<T extends CounterListener> {
   T getCounterListener();
   void remove();
}
Copy to Clipboard Toggle word wrap

最后,CounterEvent 具有上一个和当前的值和状态。它有以下接口:

public interface CounterEvent {
   long getOldValue();
   State getOldState();
   long getNewValue();
   State getNewState();
}
Copy to Clipboard Toggle word wrap
注意

对于未绑定的强计数器和弱计数器,状态始终是 State.VALIDstate.LOWER_BOUND_REACHEDState.UPPER_BOUND_REACHED 仅对有界强计数器有效。

警告

弱计数器 reset () 操作将触发带有中间值的多个通知。

第 8 章 locking 和 Concurrency

Data Grid 利用多版本的并发控制(MVCC)- 常见用于相关数据库和其他数据存储的并发方案。MVCC 比粗粒度 Java 同步提供了很多优势,甚至 JDK Locks 用于访问共享数据,包括:

  • 允许并发读取器和写入器
  • 读取器和写入者不阻止另一个读者
  • 可以检测和处理写偏移
  • 内部锁定可以被条带化

8.1. 锁定实施详情

网格的 MVCC 实施利用最小锁定和同步,大量采用无锁定技术(如 compare-and-swap 和 lock-free 数据结构),这有助于对多 CPU 和多核心环境进行优化。

特别是,Data Grid 的 MVCC 实现被大量优化用于读取器。读取器线程不会获取条目的显式锁定,而是直接读取问题中的条目。

另一方面,写入者需要获取写锁定。这样可确保每个条目只有一个并发写入器,从而导致并发写入器排队以更改条目。

要允许并发读取,writers 通过在 MVCCEntry 中嵌套条目来制作他们要修改的条目的副本。此副本将并发读取器与看到部分修改的状态隔离。写入完成后,MVCCEntry.commit () 将清除对数据容器的更改,后续读者将看到写入的更改。

8.1.1. 它如何在集群缓存中工作?

在集群缓存中,每个密钥都有一个节点负责锁定密钥。此节点称为主要所有者。

8.1.1.1. 非事务缓存
  1. 写入操作发送到密钥的主所有者。
  2. 主所有者尝试锁定密钥。

    1. 如果成功,它会将操作转发到其他所有者;
    2. 否则,会抛出异常。
注意

如果操作是条件的,且在主所有者上失败,则不会转发到其他所有者。

注意

如果在主所有者中本地执行操作,则将跳过第一步。

8.1.2. 事务缓存

事务缓存支持最佳锁定模式和 pessimistic 锁定模式。如需更多信息,请参阅 Transaction Locking。

8.1.3. 隔离级别

隔离级别会影响与其他事务同时运行时可以读取哪些事务。如需更多信息,请参阅隔离级别。

8.1.4. LockManager

LockManager 是一个组件,负责锁定写入条目。LockManager 使用 LockContainer 来定位/保留/创建锁定。LockContainers 有两个广泛的 flavours,支持锁定条带,支持每个条目一个锁定。

8.1.5. 锁定条带

锁定条带需要对整个缓存使用固定大小、共享锁定集合,并根据条目的键的哈希代码分配给条目。与 JDK 的 ConcurrentHashMap 分配锁定的方式类似,这允许在交换中具有高度可扩展、固定的锁定机制,以便有可能被同一锁阻止相关的条目。

另一种方法是禁用锁定条带 - 这意味着为每个条目 创建新的 锁定。这种方法 可能会 给您带来更高的并发吞吐量,但会牺牲额外的内存用量、垃圾收集混乱等。

默认锁定条带设置

默认禁用锁定条带,因为在不同键的锁定最终在同一锁定条带中时可能会出现的死锁。

可以使用 < locking /> 配置元素的 concurrencyLevel 属性调整锁定条带使用的共享锁定集合的大小。

配置示例:

<locking striping="false|true"/>
Copy to Clipboard Toggle word wrap

或者

new ConfigurationBuilder().locking().useLockStriping(false|true);
Copy to Clipboard Toggle word wrap

8.1.6. 并发级别

除了确定条带锁定容器的大小外,此并发级别还用于调优任何基于 JDK ConcurrentHashMap 的集合,如 DataContainer内部的集合。如需了解并发级别的详细讨论,请参阅 JDK ConcurrentHashMap Javadocs,因为此参数在 Data Grid 中以完全相同的方式使用。

配置示例:

<locking concurrency-level="32"/>
Copy to Clipboard Toggle word wrap

或者

new ConfigurationBuilder().locking().concurrencyLevel(32);
Copy to Clipboard Toggle word wrap

8.1.7. 锁定超时

锁定超时指定内容锁定的时间(以毫秒为单位)。

配置示例:

<locking acquire-timeout="10000"/>
Copy to Clipboard Toggle word wrap

或者

new ConfigurationBuilder().locking().lockAcquisitionTimeout(10000);
//alternatively
new ConfigurationBuilder().locking().lockAcquisitionTimeout(10, TimeUnit.SECONDS);
Copy to Clipboard Toggle word wrap

8.1.8. 一致性

单个所有者被锁定(而不是所有所有者被锁定)不会破坏以下一致性保证:如果密钥 K 被哈希到节点 {A、B} 和事务 TX1 会为 K 获取锁定,让我们表示 A。如果另一个事务( TX2 )在 B (或任何其他节点)上启动,并且 TX2 尝试锁定 K,那么它将因为锁定已经由 TX1 持有而失败。这样做的原因是,关键 K 的锁始终是,确定性,在集群的同一节点上获取,无论事务源自哪里。

8.2. 数据版本控制

Data Grid 支持两种形式的数据版本控制:简单和外部。简单的版本控制用于写入偏移检查的事务缓存。

外部版本控制用于在数据网格内封装数据源,例如将 Data Grid 与 Hibernate 搭配使用时,后者又直接从数据库获取其数据版本信息。

在此方案中,需要传递版本的一种机制,以及 putForExternalRead ()和 putFor ExternalRead () 的超载版本将在 AdvancedCache 中提供,以采用外部数据版本。然后,它存储在 InvocationContext 上,并在提交时应用到该条目。

注意

写入偏移检查无法执行,在出现外部数据版本控制时不会执行。

第 9 章 使用 Data Grid CDI 扩展

Data Grid 提供了一个与 CDI (Contexts 和 Dependency Injection)编程模型集成的扩展,并允许您:

  • 配置缓存并将其注入 CDI Beans 和 Java EE 组件。
  • 配置缓存管理器。
  • 接收缓存和缓存管理器级别事件。
  • 使用 JCache 注解控制数据存储和检索。

9.1. CDI 依赖项

使用以下依赖项之一更新 pom.xml,以在项目中包含 Data Grid CDI 扩展:

嵌入式(Library)模式

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-cdi-embedded</artifactId>
</dependency>
Copy to Clipboard Toggle word wrap

服务器模式

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-cdi-remote</artifactId>
</dependency>
Copy to Clipboard Toggle word wrap

9.2. 注入嵌入式缓存

设置 CDI Bean 以注入嵌入式缓存。

流程

  1. 创建缓存限定符注解。

    ...
    import javax.inject.Qualifier;
    
    @Qualifier
    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface GreetingCache { 
    1
    
    }
    Copy to Clipboard Toggle word wrap
    1
    创建一个 @GreetingCache 限定符。
  2. 添加定义缓存配置的制作者方法。

    ...
    import org.infinispan.configuration.cache.Configuration;
    import org.infinispan.configuration.cache.ConfigurationBuilder;
    import org.infinispan.cdi.ConfigureCache;
    import javax.enterprise.inject.Produces;
    
    public class Config {
    
        @ConfigureCache("mygreetingcache") 
    1
    
        @GreetingCache 
    2
    
        @Produces
        public Configuration greetingCacheConfiguration() {
            return new ConfigurationBuilder()
                        .memory()
                            .size(1000)
                        .build();
        }
    }
    Copy to Clipboard Toggle word wrap
    1
    要注入的缓存的名称。
    2
    添加缓存限定符。
  3. 如果需要,添加创建集群缓存管理器的制作者方法

    ...
    package org.infinispan.configuration.global.GlobalConfigurationBuilder;
    
    public class Config {
    
        @GreetingCache 
    1
    
        @Produces
        @ApplicationScoped 
    2
    
        public EmbeddedCacheManager defaultClusteredCacheManager() { 
    3
    
          return new DefaultCacheManager(
            new GlobalConfigurationBuilder().transport().defaultTransport().build();
       }
    }
    Copy to Clipboard Toggle word wrap
    1
    添加缓存限定符。
    2
    为应用创建 bean 一次。创建缓存管理器的制作者应始终包含 @ApplicationScoped 注释,以避免创建多个缓存管理器。
    3
    创建新的 DefaultCacheManager 实例,该实例绑定到 @GreetingCache qualifier。
    注意

    缓存管理器具有重度的权重对象。在应用程序中运行多个缓存管理器可能会降低性能。在注入多个缓存时,可以将每个缓存的限定符添加到缓存管理器制作者方法中,或者不添加任何限定符。

  4. @GreetingCache 限定符添加到您的缓存注入点。

    ...
    import javax.inject.Inject;
    
    public class GreetingService {
    
        @Inject @GreetingCache
        private Cache<String, String> cache;
    
        public String greet(String user) {
            String cachedValue = cache.get(user);
            if (cachedValue == null) {
                cachedValue = "Hello " + user;
                cache.put(user, cachedValue);
            }
            return cachedValue;
        }
    }
    Copy to Clipboard Toggle word wrap

9.3. 注入远程缓存

设置 CDI Bean 以注入远程缓存。

流程

  1. 创建缓存限定符注解。

    @Remote("mygreetingcache") 
    1
    
    @Qualifier
    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RemoteGreetingCache { 
    2
    
    }
    Copy to Clipboard Toggle word wrap
    1
    要注入的缓存的名称。
    2
    创建 @RemoteGreetingCache 限定符。
  2. @RemoteGreetingCache 限定符添加到您的缓存注入点。

    public class GreetingService {
    
        @Inject @RemoteGreetingCache
        private RemoteCache<String, String> cache;
    
        public String greet(String user) {
            String cachedValue = cache.get(user);
            if (cachedValue == null) {
                cachedValue = "Hello " + user;
                cache.put(user, cachedValue);
            }
            return cachedValue;
        }
    }
    Copy to Clipboard Toggle word wrap

注入远程缓存的提示

  • 您可以在不使用限定符的情况下注入远程缓存。

       ...
       @Inject
       @Remote("greetingCache")
       private RemoteCache<String, String> cache;
    Copy to Clipboard Toggle word wrap
  • 如果您有多个 Data Grid 集群,您可以为每个集群创建单独的远程缓存管理器制作者。

    ...
    import javax.enterprise.context.ApplicationScoped;
    
    public class Config {
    
        @RemoteGreetingCache
        @Produces
        @ApplicationScoped 
    1
    
        public ConfigurationBuilder builder = new ConfigurationBuilder(); 
    2
    
            builder.addServer().host("localhost").port(11222);
            return new RemoteCacheManager(builder.build());
        }
    }
    Copy to Clipboard Toggle word wrap
    1
    为应用创建 bean 一次。创建缓存管理器的制作者应始终包含 @ApplicationScoped 注释,以避免创建多个缓存管理器,它们是重量对象。
    2
    创建新的 RemoteCacheManager 实例,该实例绑定到 @RemoteGreetingCache qualifier。

9.4. JCACHE 缓存注解

当 JCache 工件位于类路径上时,您可以在 CDI 受管 Bean 中使用以下 JCache 缓存注解:

@CacheResult
缓存方法调用的结果。
@CachePut
缓存方法参数。
@CacheRemoveEntry
从缓存中删除条目。
@CacheRemoveAll
从缓存中删除所有条目。
重要

目标类型 : 您只能在方法上使用这些 JCache 缓存注解。

要使用 JCache 缓存注解,请在应用程序的 beans.xml 文件中声明拦截器。

受管环境(应用服务器)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
   version="1.2" bean-discovery-mode="annotated">

  <interceptors>
    <class>org.infinispan.jcache.annotation.InjectedCacheResultInterceptor</class>
    <class>org.infinispan.jcache.annotation.InjectedCachePutInterceptor</class>
    <class>org.infinispan.jcache.annotation.InjectedCacheRemoveEntryInterceptor</class>
    <class>org.infinispan.jcache.annotation.InjectedCacheRemoveAllInterceptor</class>
  </interceptors>
</beans>
Copy to Clipboard Toggle word wrap

非托管环境(独立)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
   version="1.2" bean-discovery-mode="annotated">

  <interceptors>
    <class>org.infinispan.jcache.annotation.CacheResultInterceptor</class>
    <class>org.infinispan.jcache.annotation.CachePutInterceptor</class>
    <class>org.infinispan.jcache.annotation.CacheRemoveEntryInterceptor</class>
    <class>org.infinispan.jcache.annotation.CacheRemoveAllInterceptor</class>
  </interceptors>
</beans>
Copy to Clipboard Toggle word wrap

JCACHE 缓存注解示例

以下示例显示了 @CacheResult 注释如何缓存 GreetingService.greet () 方法的结果:

import javax.cache.interceptor.CacheResult;

public class GreetingService {

    @CacheResult
    public String greet(String user) {
        return "Hello" + user;
    }
}
Copy to Clipboard Toggle word wrap

使用 JCache 注解时,默认缓存使用注释方法的完全限定名称及其参数类型,例如:
org.infinispan.example.GreetingService.greet (java.lang.String)

要使用默认缓存,请使用 cacheName 属性来指定缓存名称,如下例所示:

@CacheResult(cacheName = "greeting-cache")
Copy to Clipboard Toggle word wrap

9.5. 接收缓存和缓存管理器事件

您可以使用 CDI 事件接收缓存和缓存管理器级别事件。

  • 使用 @Observes 注释,如下例所示:
import javax.enterprise.event.Observes;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
import org.infinispan.notifications.cachelistener.event.*;

public class GreetingService {

    // Cache level events
    private void entryRemovedFromCache(@Observes CacheEntryCreatedEvent event) {
        ...
    }

    // Cache manager level events
    private void cacheStarted(@Observes CacheStartedEvent event) {
        ...
    }
}
Copy to Clipboard Toggle word wrap

第 10 章 Data Grid Transactions

Data Grid 可以配置为使用和参与 JTA 兼容事务。

或者,如果事务支持被禁用,则等同于在 JDBC 调用中使用 autocommit,其中每次更改后可能会复制修改(如果启用了复制)。

在每个缓存操作 Data Grid 中执行以下操作:

  1. 检索与线程关联的当前 事务
  2. 如果还没有完成,请将 XAResource 注册到事务管理器,以便在事务提交或回滚时获得通知。

为此,必须提供对环境的 TransactionManager 的引用缓存。这通常是通过使用 TransactionManagerLookup 接口实现的类名称配置缓存来实现的。当缓存启动时,它将创建此类实例并调用其 getTransactionManager () 方法,这将返回对 TransactionManager 的引用。

Data Grid 附带几个事务管理器查找类:

事务管理器查找实现

  • EmbeddedTransactionManagerLookup :这提供了基本的事务管理器,它仅在没有其他实施时用于嵌入式模式。这种实现存在一些严重限制,用于进行并发事务和恢复。
  • JBossStandaloneJTAManagerLookup: 如果您在独立环境中运行 Data Grid,或者在 JBoss AS 7 及更早版本中,以及 WildFly 8、9 和 10,这是您的事务管理器的默认选择。它是一个基于 JBoss 交易 的全面事务管理器,它解决了 嵌入式TransactionManager 的所有缺陷。
  • WildflyTransactionManagerLookup :如果您在 WildFly 11 或更高版本中运行 Data Grid,则这应该是您的事务管理器的默认选择。
  • GenericTransactionManagerLookup :这是一个查找类,可在最流行的 Java EE 应用服务器中找到事务管理器。如果没有找到事务管理器,则默认为 EmbeddedTransactionManager

警告: DummyTransactionManagerLookup 在 9.0 中弃用,它将在以后的版本中删除。使用 EmbeddedTransactionManagerLookup 替代。

初始化后,TransactionManager 也可以从 缓存 本身获取:

//the cache must have a transactionManagerLookupClass defined
Cache cache = cacheManager.getCache();

//equivalent with calling TransactionManagerLookup.getTransactionManager();
TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
Copy to Clipboard Toggle word wrap

10.1. 配置事务

事务在缓存级别配置。以下是影响事务配置以及每个配置属性的小描述。

<locking
   isolation="READ_COMMITTED"/>
<transaction
   locking="OPTIMISTIC"
   auto-commit="true"
   complete-timeout="60000"
   mode="NONE"
   notifications="true"
   reaper-interval="30000"
   recovery-cache="__recoveryInfoCacheName__"
   stop-timeout="30000"
   transaction-manager-lookup="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"/>
Copy to Clipboard Toggle word wrap

或以编程方式:

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.locking()
    .isolationLevel(IsolationLevel.READ_COMMITTED);
builder.transaction()
    .lockingMode(LockingMode.OPTIMISTIC)
    .autoCommit(true)
    .completedTxTimeout(60000)
    .transactionMode(TransactionMode.NON_TRANSACTIONAL)
    .useSynchronization(false)
    .notifications(true)
    .reaperWakeUpInterval(30000)
    .cacheStopTimeout(30000)
    .transactionManagerLookup(new GenericTransactionManagerLookup())
    .recovery()
    .enabled(false)
    .recoveryInfoCacheName("__recoveryInfoCacheName__");
Copy to Clipboard Toggle word wrap
  • isolation - 配置隔离级别。如需更多详细信息,请参阅 隔离级别。默认为 REPEATABLE_READ
  • locking - 配置缓存是否使用最佳功能或 pessimistic locking。检查 Transaction Locking 部分以了解更多详细信息。默认为 OPTIMISTIC
  • auto-commit - 如果启用,用户不需要为单个操作手动启动事务。事务会自动启动并提交。默认为 true
  • complete-timeout - 以毫秒为单位保存关于完成事务的信息。默认值为 60000
  • Mode - 配置缓存是否事务。默认值为 NONE。可用的选项有:

  • 通知 - 在缓存监听程序中启用/禁用触发事务事件。默认为 true
  • Reaper- interval - 清理事务完成信息的线程以 millisecond 为单位的时间间隔。默认值为 30000
  • recovery-cache - 配置缓存名称以存储恢复信息。有关 恢复的详情,请查看部分事务 恢复。默认为 recoveryInfoCacheName
  • stop-timeout - 缓存停止后等待持续事务的时间。默认值为 30000
  • transaction-manager-lookup - 配置类的完全限定类名称,该类查找对 javax.transaction.TransactionManager 的引用。默认为 org.infinispan.transaction.lookup.GenericTransactionManagerLookup

有关如何在 Data Grid 中实施 Two-Phase-Commit (2PC)以及如何获取锁定的更多详细信息,请参见以下部分。有关配置设置的更多详细信息,请参阅配置 参考

10.2. 隔离级别

Data Grid 提供两种隔离级别 - READ_COMMITTEDREPEATABLE_READ

这些隔离级别决定了读取器何时看到并发写入,并在内部使用 MVCCEntry 的不同子类实现,其在状态重新提交到数据容器的方式上具有不同的行为。

下面是一个更详细的示例,它应该帮助了解数据网格上下文中 READ_COMMITTEDREPEATABLE_READ 之间的区别。使用 READ_COMMITTED 时,如果同一键的两个连续读取调用之间,密钥已被另一个事务更新,第二个读取可能会返回新的更新值:

Thread1: tx1.begin()
Thread1: cache.get(k) // returns v
Thread2:                                       tx2.begin()
Thread2:                                       cache.get(k) // returns v
Thread2:                                       cache.put(k, v2)
Thread2:                                       tx2.commit()
Thread1: cache.get(k) // returns v2!
Thread1: tx1.commit()
Copy to Clipboard Toggle word wrap

使用 REPEATABLE_READ 时,最终的 get 仍会返回 v。因此,如果您要在事务中多次检索同一密钥,您应该使用 REPEATABLE_READ

但是,即使对于 REPEATABLE_READ,也不会获得 read-locks,因此可能会发生这种现象:

cache.get("A") // returns 1
cache.get("B") // returns 1

Thread1: tx1.begin()
Thread1: cache.put("A", 2)
Thread1: cache.put("B", 2)
Thread2:                                       tx2.begin()
Thread2:                                       cache.get("A") // returns 1
Thread1: tx1.commit()
Thread2:                                       cache.get("B") // returns 2
Thread2:                                       tx2.commit()
Copy to Clipboard Toggle word wrap

10.3. 事务锁定

10.3.1. pessimistic 事务缓存

从锁定收购角度来看,在编写密钥时,基于密钥的 pess 事务会获得锁定。

  1. 将锁定请求发送到主所有者(可以是显式锁定请求或操作)
  2. 主要所有者会尝试获取锁定:

    1. 如果成功,它会发回一个正回复;
    2. 否则,会发送负回复,事务会被回滚。

例如:

transactionManager.begin();
cache.put(k1,v1); //k1 is locked.
cache.remove(k2); //k2 is locked when this returns
transactionManager.commit();
Copy to Clipboard Toggle word wrap

cache.put (k1,v1) 返回时,k1 被锁定,集群中的任何位置都没有运行其他事务。仍可读取 k1。当事务完成后(提交或回滚)时,k1 上的锁定会被释放。

注意

对于条件操作,验证在 originator 中执行。

10.3.2. 最佳事务缓存

使用最佳交易锁定会在事务准备时获取,并且仅持有事务提交(或回滚)的时间。这与 5.0 默认锁定模型不同,在写入时获取本地锁定,并在准备期间获取集群锁定。

  1. 准备发送给所有所有者。
  2. 主要所有者试图获取所需的锁定:

    1. 如果锁定成功,它将执行写入偏移检查。
    2. 如果写入偏移检查成功(或被禁用),请发送正回复。
    3. 否则,会发送负回复,并回滚事务。

例如:

transactionManager.begin();
cache.put(k1,v1);
cache.remove(k2);
transactionManager.commit(); //at prepare time, K1 and K2 is locked until committed/rolled back.
Copy to Clipboard Toggle word wrap
注意

对于条件命令,在原始卷中仍然会出现验证。

从用例的角度来看,当多个事务同时运行的多个事务之间没有大量竞争时,应使用最佳事务。这是因为,如果数据在读取的时间和提交的时间(启用了写偏移检查)之间有所变化,则最佳事务回滚。

另一方面,当键和事务回滚非常竞争时,pessimistic 事务可能更适合。pessimistic 事务的性质更加昂贵:每个写入操作都可能涉及对锁定收购的 RPC。

10.4. 写写 Skews

当两个事务独立并同时读取和写入同一密钥时,会发生写偏移。写偏移的结果是,两个事务都成功向同一键提交更新,但使用不同的值。

Data Grid 会自动执行写偏移检查,以确保在最佳事务中对 REPEATABLE_READ 隔离级别的数据一致性。这样,Data Grid 可以检测和回滚其中一个事务。

当以 LOCAL 模式运行时,写入偏移检查依赖于 Java 对象引用来比较差异,它提供了可靠的技术来检查写偏移。

10.4.1. 在 pessimitic 事务中强制强制写入锁定

为了避免带有 pessimistic 事务的写偏移,使用 Flag.FORCE_WRITE_LOCK 锁定密钥。

注意
  • 在非事务缓存中,Flag.FORCE_WRITE_LOCK 无法正常工作。get () 调用读取键值,但不会远程获取锁定。
  • 您应该使用 Flag.FORCE_WRITE_LOCK,并在稍后在同一事务中更新实体的事务。

将以下代码片段与 Flag.FORCE_WRITE_LOCK 示例进行比较:

// begin the transaction
if (!cache.getAdvancedCache().lock(key)) {
   // abort the transaction because the key was not locked
} else {
   cache.get(key);
   cache.put(key, value);
   // commit the transaction
}
Copy to Clipboard Toggle word wrap
// begin the transaction
try {
   // throws an exception if the key is not locked.
   cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).get(key);
   cache.put(key, value);
} catch (CacheException e) {
   // mark the transaction rollback-only
}
// commit or rollback the transaction
Copy to Clipboard Toggle word wrap

10.5. 处理异常

如果在 JTA 事务范围内丢弃了 CacheException (或其子类),则事务会自动标记为回滚。

10.6. Enlisting Synchronizations

默认情况下,Data Grid 通过 XAResource 将自身注册为第一类参与者。有些情况下,Data Grid 不需要在交易中参与,但只有其生命周期通知(准备完成):例如,Data Grid 被用作 Hibernate 中的第二级缓存。

网格允许通过同步进行 事务处理。若要启用它,只需使用 NON_XA 事务模式。

同步具有允许 TransactionManager 使用 1PC 优化 2PC 的优点,其中只有一个其他资源被使用该事务(最后一个资源提交优化)列出。例如Hibernate 第二级缓存:如果 Data Grid 将自身注册到 TransactionManager 作为 XAResource,则 TransactionManager 会在提交时看到两个 XAResource (缓存和数据库),且不会进行这个优化。需要在两个资源间协调,它需要将 tx 日志写入磁盘。另一方面,将 Data Grid 注册为 同步 使 TransactionManager 跳过将日志写入磁盘(性能改进)。

10.7. 批处理

批处理允许原子性以及事务的一些特征,但不包括完整的 JTA 或 XA 功能。批处理通常比一整套交易更轻、更便宜。

提示

通常而言,每当交易中唯一参与者是数据网格集群时,均应使用批处理 API。另一方面,每当事务涉及多个系统时,都应使用 JTA 事务(涉及 TransactionManager)。例如,考虑了交易的"Hello world!":将资金从一个银行帐户转移到另一个银行帐户。如果两个帐户都存储在 Data Grid 中,则可以使用批处理。如果一个帐户位于数据库中,另一个是 Data Grid,则需要分布式事务。

注意

您不必 定义事务管理器来使用批处理。

10.7.1. API

将缓存配置为使用批处理后,您可以通过调用 Cache 上的 startBatch ()endBatch () 来使用它。例如,

Cache cache = cacheManager.getCache();
// not using a batch
cache.put("key", "value"); // will replicate immediately

// using a batch
cache.startBatch();
cache.put("k1", "value");
cache.put("k2", "value");
cache.put("k2", "value");
cache.endBatch(true); // This will now replicate the modifications since the batch was started.

// a new batch
cache.startBatch();
cache.put("k1", "value");
cache.put("k2", "value");
cache.put("k3", "value");
cache.endBatch(false); // This will "discard" changes made in the batch
Copy to Clipboard Toggle word wrap

10.7.2. 批处理和 JTA

在后台后,批处理功能启动 JTA 事务,并且该范围内的所有调用都与其关联。为此,它使用非常简单(如没有恢复)内部 TransactionManager 实现。使用批处理时,您将获得:

  1. 在批处理完成前,您要在调用期间获取锁定
  2. 更改都会在批处理过程中在批处理中复制。减少批处理中每个更新的复制聊天。
  3. 如果使用同步复制或无效,则复制/无效会导致批处理回滚。
  4. 所有事务相关的配置也适用于批处理。

10.8. 事务恢复

恢复是 XA 事务的一个功能,它处理资源的最终性,甚至是事务管理器失败,并从这种情况进行相应的恢复。

10.8.1. 何时使用恢复

考虑一个分布式交易,其中资金从存储在外部数据库中的帐户转移到 Data Grid 中存储的帐户。当调用 TransactionManager.commit () 时,两个资源都会成功准备(1st 阶段)。在提交(2nd)阶段,数据库在从事务管理器接收提交请求前成功应用更改 whilst Data Grid 失败。此时系统处于不一致的状态:从外部数据库中的帐户获得资金,但还没有在 Data Grid (因为锁定仅在双阶段提交协议的 2nd 阶段发布)。恢复处理这种情况,以确保数据库和数据网格都处于一致状态。

10.8.2. 它如何工作

恢复由事务管理器协调。事务管理器与 Data Grid 合作,以确定需要人工干预并告知系统管理员(通过电子邮件、日志警报等)的异常事务列表。这个过程是特定于事务管理器的,但通常需要在事务管理器中进行一些配置。  

知道 in-doubt 事务 ID,系统管理员现在可以连接到 Data Grid 集群,并重播事务提交或强制回滚。Data Grid 提供 JMX 工具 - 这在 Transaction recovery and reconciliation 部分中进行了广泛阐述。

10.8.3. 配置恢复   

Data Grid 中不默认启用恢复。如果禁用,TransactionManager 将无法与 Data Grid 一起使用,以确定 in-doubt 事务。Transaction configuration 部分演示了如何启用它。

注意: recovery-cache 属性不是强制的,它被配置为每个缓存。

注意

若要恢复工作,必须将 mode 设置为 FULL_XA,因为需要全线 XA 事务。

10.8.3.1. 启用 JMX 支持

为了可以使用 JMX 管理恢复支持,必须明确启用。

10.8.4. 恢复缓存

为了跟踪新的交易并能够回复它们,数据网格会缓存所有事务,供未来使用。这个状态仅为 in-doubt 事务保存,在提交/滚动阶段完成后,会删除成功完成事务。

这种不疑的交易数据保存在本地缓存中:这样,在它太大时,可以通过缓存加载程序将这些信息交换到磁盘。此缓存可以通过 recovery-cache 配置属性指定。如果没有指定 Data Grid,则会为您配置本地缓存。

(尽管强制)可以在启用了恢复的所有 Data Grid 缓存之间共享相同的恢复缓存。如果覆盖默认恢复缓存,则指定的恢复缓存必须使用 TransactionManagerLookup,它返回一个与缓存本身所使用的不同的事务管理器。

10.8.5. 与事务管理器集成

虽然这是特定于事务管理器的,但通常需要一个事务管理器对 XAResource 实现的引用,才能在其上调用 XAResource.recover ()。要获得对 Data Grid XAResource 的引用,可以使用以下 API:

XAResource xar = cache.getAdvancedCache().getXAResource();
Copy to Clipboard Toggle word wrap

常见做法是在与运行事务的不同进程中运行恢复。

10.8.6. 协调

事务管理器将以专有方式通知系统管理员不疑事务。在此阶段,系统管理员知道事务的 XID (字节阵列)。

普通的恢复流程是:

  • STEP 1: 系统管理员通过 JMX 连接到数据网格服务器,并列出了重大交易。下图显示了 JConsole 连接到具有重大事务的 Data Grid 节点。

图 10.1. show in-doubt 事务

此时会显示每个 in-doubt 事务的状态(在本示例中" PREPARED ")。status 字段中可能存在多个元素,例如:在某些节点上提交但不是所有节点上的事务时,"PREPARED"和"COMMITTED"。  

  • STEP 2: 系统管理员视觉地将从事务管理器接收的 XID 映射到 Data Grid 内部 ID,以数字表示。此步骤是必需的,因为 XID (一个字节阵列)无法方便地传递给 JMX 工具(如 JConsole),然后在 Data Grid 的侧重新集合。
  • STEP 3 :系统管理员根据内部 ID 强制事务的提交/滚动通过相应的 jmx 操作。以下镜像是通过根据其内部 ID 强制提交事务来获取的。

图 10.2. 强制提交

提示

以上描述的所有 JMX 操作都可以在任何节点上执行,无论事务来自哪里。

10.8.6.1. 根据 XID 强制提交/滚动

也提供基于 XID 的 JMX 操作,以强制进行内部事务的提交/滚动:这些方法接收 XID 阵列,而不是与事务关联的数字(如第 2 步所述)。例如,如果想要为某些不疑事务设置自动完成作业,则这非常有用。这个过程插入事务管理器的恢复中,并可以访问事务管理器的 XID 对象。

10.8.7. 想要了解更多信息?

恢复设计文档 更详细地描述了事务恢复实现内部。

第 11 章 索引和搜索

Data Grid 提供了一个搜索 API,可让您索引和搜索存储为 Java POJO 或作为 协议 缓冲的对象编码的对象。

11.1. 概述

搜索可以在库和 客户端/服务器模式中 (用于 Java、C#、Node.js 和其他客户端),而 Data Grid 可使用 Apache Lucene 来索引数据,提供有效的 全文本 搜索引擎,以覆盖广泛的数据检索用例。#query_library

索引配置依赖于架构定义,并且 Data Grid 可以在库模式中使用注解的 Java 类,而 protobuf 模式用于远程客户端。通过对 protobuf 进行标准化,数据网格允许在 Java 和非 Java 客户端之间进行完全互操作性。

Data Grid 具有自己的查询语言,称为 Ickle,这是基于字符串的查询语言,并增加了对全文本搜索的支持。Ickle 支持搜索索引化数据、部分索引数据或非索引数据。

最后,Data Grid 支持 连续查询 (以反向查询的方式处理)其他 API:不创建、执行查询并获取结果,它允许客户端注册查询,以在集群更改的数据匹配时持续评估查询,每当更改的数据与查询匹配时都会生成通知。

11.2. 索引条目值

在 Data Grid 缓存中索引条目值可显著提高搜索性能,并允许您执行全文本查询。但是,索引可能会降级 Data Grid 集群的写入吞吐量。因此,您应该计划使用策略来优化查询性能,具体取决于缓存模式和您的用例。有关 查询性能的更多信息

11.2.1. 配置

要通过 XML 启用索引,您需要在缓存配置中添加 & lt;indexing > 元素,指定索引的实体并选择性地传递附加属性。

注意

存在 & lt;indexing > 元素,省略 enabled 属性将自动启用索引,即使 enabled 属性的默认值在 XSD 模式中被定义为 "false "。在编程配置中,必须使用 enabled ()

声明性

<infinispan>
   <cache-container default-cache="default">
      <replicated-cache name="default">
         <indexing>
            <indexed-entities>
               <indexed-entity>com.acme.Book</indexed-entity>
            </indexed-entities>
            <property name="property.name">some value</property>
         </indexing>
      </replicated-cache>
   </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

以编程方式

import org.infinispan.configuration.cache.*;

ConfigurationBuilder cacheCfg = ...
cacheCfg.indexing().enable()
            .addIndexedEntity(Book.class)
      .addProperty("property name", "propery value")
Copy to Clipboard Toggle word wrap

11.2.2. 指定索引实体

建议声明索引类型,因为它们将在下一个 Data Grid 版本中是必需的。

声明性

<infinispan>
   <cache-container default-cache="default">
      <replicated-cache name="default">
         <indexing>
            <indexed-entities>
                <indexed-entity>com.acme.query.test.Car</indexed-entity>
                <indexed-entity>com.acme.query.test.Truck</indexed-entity>
            </indexed-entities>
         </indexing>
      </replicated-cache>
   </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

以编程方式

 cacheCfg.indexing()
       .addIndexedEntity(Car.class)
       .addIndexedEntity(Truck.class)
Copy to Clipboard Toggle word wrap

当缓存存储 protobuf 时,索引类型应该是 protobuf 模式中声明的消息。例如,对于以下模式:

package book_sample;

message Book {
    optional string title = 1;
    optional string description = 2;
    optional int32 publicationYear = 3; // no native Date type available in Protobuf

    repeated Author authors = 4;
}

message Author {
    optional string name = 1;
    optional string surname = 2;
}
Copy to Clipboard Toggle word wrap

配置应该是:

<infinispan>
  <cache-container default-cache="default">
    <replicated-cache name="books">
      <indexing>
        <indexed-entities>
          <indexed-entity>book_sample.Book</indexed-entity>
        </indexed-entities>
      </indexing>
    </replicated-cache>
  </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

11.2.3. 索引存储

Data Grid 可以在文件系统中或内存中存储索引(local-heap)。文件系统是推荐的和默认配置,并且内存索引只应用于不需要在重启后进行中型索引的中型索引。

文件系统索引配置:

<replicated-cache name="myCache">
   <indexing>
      <indexed-entities>
         <indexed-entity>com.acme.Book</indexed-entity>
      </indexed-entities>
      <!-- Optional: this is the default setting -->
      <property name="default.directory_provider">filesystem</property>
      <!-- Optional: define base folder for indexes -->
      <property name="default.indexBase">${java.io.tmpdir}/baseDir</property>
   </indexing>
</replicated-cache>
Copy to Clipboard Toggle word wrap

配置内存索引:

<replicated-cache name="myCache">
   <indexing>
      <indexed-entities>
         <indexed-entity>com.acme.Book</indexed-entity>
      </indexed-entities>
      <property name="default.directory_provider">local-heap</property>
   </indexing>
</replicated-cache>
Copy to Clipboard Toggle word wrap

11.2.4. 索引管理器

Data Grid 使用内部一个称为"Index Manager"的组件来控制如何将新数据应用到索引以及数据对搜索可见的时间。

当数据写入到缓存中时,默认索引管理器 基于目录的默认 写入索引。缺点是,在编写繁重的情况下,它可以显著减慢缓存写入速度,因为它需要在索引上执行名为"flushes"的恒定的昂贵的操作。

near-real-time 索引管理器与默认索引管理器类似,但利用 Lucene 的 Near-Real-Time 功能。它具有更好的写入性能,因为它经常将索引刷新到底层存储。缺陷是,在非清理关闭时,未清空的索引更改可能会丢失。可与 local-heapfilesystem 一起使用。

使用 local-heap 的示例:

<replicated-cache name="default">
    <indexing>
        <property name="default.indexmanager">near-real-time</property>
        <property name="default.directory_provider">local-heap</property>
    </indexing>
</replicated-cache>
Copy to Clipboard Toggle word wrap

使用 文件系统的示例

<replicated-cache name="default">
    <indexing>
        <property name="default.indexmanager">near-real-time</property>
    </indexing>
</replicated-cache>
Copy to Clipboard Toggle word wrap

11.2.5. 重建索引

从缓存中存储的数据重建索引。如果您更改了索引类型或分析器定义等内容,则需要重建索引。同样,如果出于某种原因删除了索引,您可能需要重建索引。请注意,可能需要过些时间,因为需要重新处理网格中的所有数据。

Indexer indexer = Search.getIndexer(cache);
CompletionStage<Void> future = index.run();
Copy to Clipboard Toggle word wrap

11.3. 搜索

使用 Ickle 查询语言在 Library 和 Remote Client-Server 模式中创建关系和全文本查询。

要使用 API,首先获取 QueryFactory 到缓存,然后调用 .create () 方法,传递字符串以便在查询中使用。每个 QueryFactory 实例都绑定到与 Search 相同的 缓存 实例,但它是一个无状态和 thread-safe 对象,可用于并行创建多个查询。

例如:

// Remote Query, using protobuf
QueryFactory qf = org.infinispan.client.hotrod.Search.getQueryFactory(remoteCache);
Query q = qf.create("from sample_bank_account.Transaction where amount > 20");

// Embedded Query using Java Objects
QueryFactory qf = org.infinispan.query.Search.getQueryFactory(cache);
Query q = qf.create("from com.acme.Book where price > 20");

// Execute the query
QueryResult<Book> queryResult = q.execute();
Copy to Clipboard Toggle word wrap
注意

查询将始终以单个实体类型为目标,并在单个缓存的内容上评估。不支持对多个缓存运行查询,或创建以多个实体类型(连接)为目标的查询。

执行查询并获取结果非常简单,就像调用 Query 对象的 run () 方法一样简单。执行之后,在同一实例上调用 run () 将重新执行查询。

11.3.1. 分页

您可以使用 Query.maxResults (int maxResults) 限制返回的结果数量。这可与 Query.startOffset (long startOffset) 一起使用,来实现结果集的分页。

// sorted by year and match all books that have "clustering" in their title
// and return the third page of 10 results
Query<Book> query = queryFactory.create("FROM com.acme.Book WHERE title like '%clustering%' ORDER BY year").startOffset(20).maxResults(10)
Copy to Clipboard Toggle word wrap

11.3.2. Hits 的数量

QueryResult 对象具有 .hitCount () 方法,可以返回查询的结果总数,而不考虑任何分页参数。因为性能的原因,点击数仅适用于索引的查询。

11.3.3. 迭代

Query 对象具有 .iterator () 方法,可以完全获得结果。它返回一个在使用后必须关闭的 CloseableIterator 实例。

注意

远程查询的迭代支持当前有限,因为它在迭代前首先获取客户端的所有条目。

11.3.4. 使用 Named Query 参数

可以不必为每个执行构建一个新的 Query 对象,而是在查询中包含命名参数,这些参数可以在执行前使用实际值替换。这允许定义一次查询,并可以有效地执行多次。参数只能在操作器的右侧使用,并通过提供由 org.infinispan.query.dsl.Expression.param (String paramName) 方法生成的对象来创建查询时定义。定义参数后,可以通过调用 Query.setParameter (parameterName, value)Query.setParameters (parameterMap) 来设置参数,如以下示例所示。

QueryFactory queryFactory = Search.getQueryFactory(cache);
// Defining a query to search for various authors and publication years
Query<Book> query = queryFactory.create("SELECT title FROM com.acme.Book WHERE author = :authorName AND publicationYear = :publicationYear").build();

// Set actual parameter values
query.setParameter("authorName", "Doe");
query.setParameter("publicationYear", 2010);

// Execute the query
List<Book> found = query.list();
Copy to Clipboard Toggle word wrap

或者,您可以提供一个实际参数值映射以一次性设置多个参数: ⁠

一次设置多个命名参数

Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put("authorName", "Doe");
parameterMap.put("publicationYear", 2010);

query.setParameters(parameterMap);
Copy to Clipboard Toggle word wrap

注意

在首次使用参数执行查询时,执行查询解析、验证和执行规划工作的主要部分。与使用恒定值而不是查询参数类似的查询相比,后续执行期间不会重复这一工作,从而提高性能。

11.3.5. Ickle Query Language Parser Syntax

Ickle 查询语言是 JPQL 查询语言的小子集,具有一些全文本扩展。

解析器语法有一些值得注意的规则:

  • 空格并不重要。
  • 字段名称不支持通配符。
  • 必须始终指定字段名称或路径,因为没有默认字段。
  • &&|| 在全文本和 JPA predicates 中都接受 ANDOR
  • ! 可以被使用,而不是
  • 缺少布尔值运算符解释为 OR
  • 字符串术语必须用单引号或双引号括起。
  • Fuzziness 和 boosting 没有被任意顺序接受;fuzziness 始终是首先接受的。
  • != 被接受,而不是 <& gt;
  • boosting 无法应用到 >, &gt;=, & lt;, HBAC operators。范围可用于实现相同的结果。
11.3.5.1. 过滤 Operator

Ickle 支持许多可用于索引和非索引字段的过滤运算符。

Expand
Operator描述Example

in

检查左侧运算对象是否等于所给值集合中的一个元素。

FROM Book WHERE isbn IN ('ZZ', 'X1234')

like

检查左侧参数(预期为 String)是否匹配 JPA 规则之后的通配符模式。

FROM Book WHERE 标题 LIKE '%Java%'

=

检查 left 参数是否与给定值完全匹配

FROM Book WHERE name = 'Programming Java'

!=

检查 left 参数与给定值不同

FROM Book WHERE 语言 != ' English'

>

检查 left 参数是否大于给定值。

FROM Book WHERE price > 20

>=

检查 left 参数是否大于或等于给定值。

FROM Book WHERE price >= 20

<

检查 left 参数是否小于给定值。

FROM Book WHERE year < 2012

检查 left 参数是否小于或等于给定值。

FROM Book WHERE price 了 50

between

检查 left 参数是否在给定的范围限值之间。

FROM Book WHERE 价格 BETWEEN 50 AND 100

11.3.5.2. 布尔值条件

以下示例中演示了多个属性条件和逻辑组合()和 disjunction ()运算符,以创建更复杂的条件。布尔值运算符的已知运算符优先级规则适用于此处,因此操作器的顺序无关。这里 运算符的优先级比 高,即使先调用

# match all books that have "Data Grid" in their title
# or have an author named "Manik" and their description contains "clustering"

FROM com.acme.Book WHERE title LIKE '%Data Grid%' OR author.name = 'Manik' AND description like '%clustering%'
Copy to Clipboard Toggle word wrap

布尔值负值在逻辑运算符之间具有最高优先级,并且只适用于下一个简单的属性条件。

# match all books that do not have "Data Grid" in their title and are authored by "Manik"
FROM com.acme.Book WHERE title != 'Data Grid' AND author.name = 'Manik'
Copy to Clipboard Toggle word wrap
11.3.5.3. 嵌套条件

通过括号更改逻辑运算符的优先级:

# match all books that have an author named "Manik" and their title contains
# "Data Grid" or their description contains "clustering"
FROM com.acme.Book WHERE author.name = 'Manik' AND ( title like '%Data Grid%' OR description like '% clustering%')
Copy to Clipboard Toggle word wrap
11.3.5.4. 选择属性

在某些用例中,如果应用实际使用了一小部分属性,则返回整个域对象是过量的,特别是在域实体有嵌入式实体时。查询语言允许您指定属性(或属性路径)的子集来返回 - 投射。如果使用投射,则 QueryResult.list () 不会返回整个域实体,而是返回 Object[] 列表,则数组中的每个插槽都与投射属性对应。

# match all books that have "Data Grid" in their title or description
# and return only their title and publication year
SELECT title, publicationYear FROM com.acme.Book WHERE title like '%Data Grid%' OR description like '%Data Grid%'
Copy to Clipboard Toggle word wrap
11.3.5.5. 排序

使用 ORDER BY 子句,根据一个或多个属性或属性路径对结果进行排序。如果指定了多个排序条件,则顺序将指定其优先级。

# match all books that have "Data Grid" in their title or description
# and return them sorted by the publication year and title
FROM com.acme.Book WHERE title like '%Data Grid%' ORDER BY publicationYear DESC, title ASC
Copy to Clipboard Toggle word wrap
11.3.5.6. 分组和聚合

Data Grid 能够根据一组分组字段并构造来自每个组的结果聚合来对查询结果进行分组,方法是将聚合应用到每个组中的值集合。分组和聚合只能应用到投射查询(在 SELECT 子句中带有一个或多个字段)。

支持的聚合有: avg、sum、count、max、min。

组分组字段通过 GROUP BY 子句指定,并且用于定义分组字段的顺序无关。投射中选择的所有字段都必须分组字段,否则必须使用下面描述的分组功能之一来聚合它们。项目字段可以聚合,并同时用于分组。选择仅分组字段但没有聚合字段的查询是法律的。附录示例:作者对手册进行分组,并计算它们。

SELECT author, COUNT(title) FROM com.acme.Book WHERE title LIKE '%engine%' GROUP BY author
Copy to Clipboard Toggle word wrap
注意

一个投射查询,所有选择的字段都应用了聚合功能,且无法用于分组的字段。在这种情况下,聚合将全局计算,就像有一个全局组一样。

11.3.5.7. 聚合

以下聚合功能可应用到字段:

  • avg () - 计算一组数字的平均数量。接受的值是原始数字和 java.lang.Number 的实例。结果以 java.lang.Double 表示。如果没有非 null 值,则结果为 null
  • count () - 计算非null 行的数量并返回 java.lang.Long。如果没有非 null 值,则 结果为 0。
  • max () - 返回找到的最大值。接受的值必须是 java.lang.Comparable 的实例。如果没有非 null 值,则结果为 null
  • min () - 返回找到的最小值。接受的值必须是 java.lang.Comparable 的实例。如果没有非 null 值,则结果为 null
  • sum () - 计算一组数字的总和。如果没有非 null 值,则结果为 null。下表根据指定字段显示返回类型。
Expand
表 11.1. 表和返回类型
字段类型返回类型

不可或缺(除 BigInteger)

Long

float 或 Double

�

BigInteger

BigInteger

BigDecimal

BigDecimal

11.3.5.8. 使用分组和聚合评估查询

聚合查询可以包含过滤条件,如常规查询。可以在两个阶段执行过滤:在分组操作之前和之后执行。在执行分组操作之前,定义的所有过滤器条件都将应用到缓存条目(而不是最终投射)。这些过滤器条件可以引用查询的实体类型的任何字段,旨在限制要作为分组阶段输入的数据集。调用 groupBy () 方法后定义的所有过滤器条件将应用到投射结果和分组操作。这些过滤器条件可以引用任何 groupBy () 字段或聚合的字段。允许引用在 select 子句中指定的聚合字段,但禁止引用非aggregated 和 non-grouping 字段。在此阶段过滤将根据其属性减少组量。排序也可以指定,类似于常见的查询。排序操作在分组操作后执行,并可引用任何 groupBy () 字段或聚合字段。

11.4. 嵌入式搜索

当 Data Grid 用作库时,可以使用嵌入式搜索。不需要 protobuf 映射,并在 Java 对象之上进行索引和搜索。

11.4.1. 快速示例

我们将将 Book 实例存储在名为"books"的数据网格缓存中。将对本书实例进行索引,因此我们为缓存启用索引:

Data Grid 配置:

infinispan.xml

<infinispan>
    <cache-container>
        <transport cluster="infinispan-cluster"/>
        <distributed-cache name="books">
            <indexing>
                <indexed-entities>
                    <indexed-entity>com.acme.Book</indexed-entity>
                </indexed-entities>
            </indexing>
        </distributed-cache>
    </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

获取缓存:

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

EmbeddedCacheManager manager = new DefaultCacheManager("infinispan.xml");
Cache<String, Book> cache = manager.getCache("books");
Copy to Clipboard Toggle word wrap

每个 Book 将按照以下示例定义;我们必须选择索引哪些属性;对于每个属性,我们可以选择使用 Hibernate Search 项目中定义的注释选择高级索引选项。

Book.java

import org.hibernate.search.annotations.*;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;

//Values you want to index need to be annotated with @Indexed, then you pick which fields and how they are to be indexed:
@Indexed
public class Book {
   @Field String title;
   @Field String description;
   @Field @DateBridge(resolution=Resolution.YEAR) Date publicationYear;
   @IndexedEmbedded Set<Author> authors = new HashSet<Author>();
}
Copy to Clipboard Toggle word wrap

Author.java

public class Author {
   @Field String name;
   @Field String surname;
   // hashCode() and equals() omitted
}
Copy to Clipboard Toggle word wrap

假设我们在我们的 Data Grid Cache 中存储了多个 Book 实例,我们可以根据以下示例所示搜索任何匹配字段。

QueryExample.java

// get the query factory from the cache:
QueryFactory queryFactory = org.infinispan.query.Search.getQueryFactory(cache);

// create an Ickle query that will do a full-text search (operator ':') on fields 'title' and 'authors.name'
Query<Book> fullTextQuery = queryFactory.create("FROM com.acme.Book WHERE title:'infinispan' AND authors.name:'sanne'")

// The ('=') operator is not a full-text operator, thus can be used in both indexed and non-indexed caches
Query<Book> exactMatchQuery = queryFactory.create("FROM com.acme.Book WHERE title = 'Programming Infinispan' AND authors.name = 'Sanne Grinnovero'")

// Full-text and non-full text operators can be part of the same query
Query q = queryFactory.create("FROM com.query.Book b where b.author.name = 'Stephen' and b.description : (+'dark' -'tower')");

// get the results
List<Book> found=query.execute().list();
Copy to Clipboard Toggle word wrap

除了 list () 外,您还可以选择获取 迭代器 () 或使用分页。

11.4.2. 映射实体

网格依赖于 Hibernate Search 的丰富 API,以定义在实体级别索引的精细配置。此配置包括注解哪些字段,这些分析器应使用,如何映射嵌套对象等。Hibernate Search 手册中 提供了详细的文档。

11.4.2.1. @DocumentId

与 Hibernate Search 不同,使用 @DocumentId 将字段标记为标识符不适用于 Data Grid 值;在 Data Grid 中,所有 @Indexed 对象的标识符是用于存储值的密钥。您仍然可以使用 @Transformable、自定义类型和自定义 FieldBridge 实施的组合来自定义键的索引方式。

11.4.2.2. @Transformable 键

每个值的密钥也需要索引,必须在 String 中转换密钥实例。Data Grid 包括一些默认的转换例程来编码常见的原语,但要使用一个自定义密钥,您必须提供一个 org.infinispan.query.Transformer 的实现。

通过注解注册密钥转换程序

您可以使用 org.infinispan.query.Transformable 标注密钥类,您的自定义转换器实施将自动获取:

@Transformable(transformer = CustomTransformer.class)
public class CustomKey {
   ...
}

public class CustomTransformer implements Transformer {
   @Override
   public Object fromString(String s) {
      ...
      return new CustomKey(...);
   }

   @Override
   public String toString(Object customType) {
      CustomKey ck = (CustomKey) customType;
      return ...
   }
}
Copy to Clipboard Toggle word wrap

通过缓存索引配置注册密钥转换程序

在嵌入式和服务器配置中使用 key-transformers xml 元素:

<replicated-cache name="test">
    <indexing auto-config="true">
        <key-transformers>
            <key-transformer key="com.mycompany.CustomKey" transformer="com.mycompany.CustomTransformer"/>
        </key-transformers>
    </indexing>
</replicated-cache>
Copy to Clipboard Toggle word wrap

或者,使用 Java 配置 API (embedded 模式):

   ConfigurationBuilder builder = ...
   builder.indexing().enable()
         .addKeyTransformer(CustomKey.class, CustomTransformer.class);
Copy to Clipboard Toggle word wrap
11.4.2.3. 程序映射

除了使用注解将实体映射到索引外,还可以以编程方式进行配置。

在以下示例中,我们映射了一个要存储在网格中的对象 Author,并可在两个属性上搜索,但没有注解类。

import org.apache.lucene.search.Query;
import org.hibernate.search.cfg.Environment;
import org.hibernate.search.cfg.SearchMapping;
import org.hibernate.search.query.dsl.QueryBuilder;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.Search;
import org.infinispan.query.SearchManager;

import java.io.IOException;
import java.lang.annotation.ElementType;
import java.util.Properties;

SearchMapping mapping = new SearchMapping();
mapping.entity(Author.class).indexed()
       .property("name", ElementType.METHOD).field()
       .property("surname", ElementType.METHOD).field();

Properties properties = new Properties();
properties.put(Environment.MODEL_MAPPING, mapping);
properties.put("hibernate.search.[other options]", "[...]");

Configuration infinispanConfiguration = new ConfigurationBuilder()
        .indexing().index(Index.NONE)
        .withProperties(properties)
        .build();

DefaultCacheManager cacheManager = new DefaultCacheManager(infinispanConfiguration);

Cache<Long, Author> cache = cacheManager.getCache();
SearchManager sm = Search.getSearchManager(cache);

Author author = new Author(1, "Manik", "Surtani");
cache.put(author.getId(), author);

QueryBuilder qb = sm.buildQueryBuilderForClass(Author.class).get();
Query q = qb.keyword().onField("name").matching("Manik").createQuery();
CacheQuery cq = sm.getQuery(q, Author.class);
assert cq.getResultSize() == 1;
Copy to Clipboard Toggle word wrap

11.5. 远程搜索

远程搜索与嵌入的显著区别非常相似,数据必须使用 Google 协议缓冲 作为无线和存储的编码。另外,需要编写(或从 Java 类生成)一个 protobuf 模式,用于定义数据结构和索引元素,而不是依赖 Hibernate 搜索注解。

使用 protobuf 允许远程查询只能用于 Java,而是用于 REST、C# 和 Node.js 客户端。

11.5.1. 远程查询示例

我们将重新访问来自嵌入式查询的 Book Sample,但这一次使用 Java Hot Rod 客户端和 Infinispan 服务器。名为 Book s 的对象将存储在名为"books"的 Infinispan 缓存中。将对本书实例进行索引,因此我们为缓存启用索引:

infinispan.xml

<infinispan>
  <cache-container default-cache="default">
    <replicated-cache name="books">
      <indexing>
        <indexed-entities>
          <indexed-entity>book_sample.Book</indexed-entity>
        </indexed-entities>
      </indexing>
    </replicated-cache>
  </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

另外,如果索引缓存没有索引,我们将 &lt ;encoding& gt; 配置为 application/x-protostream,以确保存储可以查询:

infinispan.xml

<infinispan>
  <cache-container default-cache="default">
    <replicated-cache name="books">
      <encoding media-type="application/x-protostream"/>
    </replicated-cache>
  </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

每个 Book 将按照以下示例定义:我们使用 @Protofield 注释来识别协议缓冲区消息字段以及字段上的 @ProtoDoc 注释来配置索引属性:

Book.java

import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

@ProtoDoc("@Indexed")
public class Book {

   @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)")
   @ProtoField(number = 1)
   final String title;

   @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)")
   @ProtoField(number = 2)
   final String description;

   @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)")
   @ProtoField(number = 3, defaultValue = "0")
   final int publicationYear;


   @ProtoFactory
   Book(String title, String description, int publicationYear) {
      this.title = title;
      this.description = description;
      this.publicationYear = publicationYear;
   }
   // public Getter methods omitted for brevity
}
Copy to Clipboard Toggle word wrap

上面的注释将在编译读取、写入和查询 Book 实例所需的工件期间生成。要启用此生成,请在新创建的带有空构造或接口的类中使用 @AutoProtoSchemaBuilder 注释:

RemoteQueryInitializer.java

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
      includeClasses = {
            Book.class
      },
      schemaFileName = "book.proto",
      schemaFilePath = "proto/",
      schemaPackageName = "book_sample")
public interface RemoteQueryInitializer extends SerializationContextInitializer {
}
Copy to Clipboard Toggle word wrap

在编译后,将在配置的 schemaFilePath 中创建文件 book.proto 文件,以及注释接口的实施 RemoteQueryInitializerImpl.java。这种 concrete 类可以直接在 Hot Rod 客户端代码中使用,以初始化序列化上下文。

将所有设置放在一起:

RemoteQuery.java

package org.infinispan;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.Search;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;

public class RemoteQuery {

   public static void main(String[] args) throws Exception {
      ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
      // RemoteQueryInitializerImpl is generated
      clientBuilder.addServer().host("127.0.0.1").port(11222)
            .security().authentication().username("user").password("user")
            .addContextInitializers(new RemoteQueryInitializerImpl());

      RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());

      // Grab the generated protobuf schema and registers in the server.
      Path proto = Paths.get(RemoteQuery.class.getClassLoader()
            .getResource("proto/book.proto").toURI());
      String protoBufCacheName = ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME;
      remoteCacheManager.getCache(protoBufCacheName).put("book.proto", Files.readString(proto));

      // Obtain the 'books' remote cache
      RemoteCache<Object, Object> remoteCache = remoteCacheManager.getCache("books");

      // Add some Books
      Book book1 = new Book("Infinispan in Action", "Learn Infinispan with using it", 2015);
      Book book2 = new Book("Cloud-Native Applications with Java and Quarkus", "Build robust and reliable cloud applications", 2019);

      remoteCache.put(1, book1);
      remoteCache.put(2, book2);

      // Execute a full-text query
      QueryFactory queryFactory = Search.getQueryFactory(remoteCache);
      Query<Book> query = queryFactory.create("FROM book_sample.Book WHERE title:'java'");

      List<Book> list = query.execute().list(); // Voila! We have our book back from the cache!
   }
}
Copy to Clipboard Toggle word wrap

11.5.2. Protobuf 编码条目的索引

Remote Query 示例所示,查询 protobuf 实体需要的步骤是为客户端和服务器提供有关实体(.proto 文件)的相关元数据。

描述符存储在名为 ___protobuf_metadata 的服务器中的专用缓存中。此缓存中的键和值都是普通字符串。因此,注册新模式就像使用模式的名称作为键,且 schema 文件本身用作值时对这个缓存执行 put () 操作非常简单。

或者,您可以使用 CLI (通过 cache-containerPROFILE:register-proto-schemas () 操作)、管理控制台、REST 端点 /rest/v2/schemas 或通过 JMX 的 ProtobufMetadataManager MBean。请注意,当启用安全性时,通过远程协议访问模式缓存要求用户属于 '___schema_manager' 角色。

注意

即使为 Protobuf 编码条目的缓存没有字段启用索引,除非您在 protobuf 模式文档注解 (@ProtoDoc) 中使用 @Indexed@Field 来指定需要索引的哪些字段。

11.5.3. 分析

分析是将输入数据转换为您可以索引和查询的一个或多个术语的进程。在 嵌入式 Query 映射中通过 Hibernate Search 注解 完成,但支持基于 Lucene 的分析器集,在 client-server 模式中,分析器定义以平台中立的方式声明。

11.5.3.1. 默认分析器

Data Grid 为远程查询提供一组默认分析程序,如下所示:

Expand
定义描述

standard

将文本字段拆分为令牌,将空格和标点分隔为分隔符。

simple

通过取消限制非字母的令牌化输入流,然后将所有字母转换为小写字符。空格和非字母将被丢弃。

空格

分割空格上的文本流,并将非空格字符序列返回为令牌。

关键字

将整个文本字段视为单一令牌。

stemmer

使用 Snowball Porter 过滤器窃取英语词.

ngram

默认情况下,生成大小为 3 分的 ngram 令牌。

filename

将文本字段分成比 标准 分析器更大的令牌,将空格视为分隔符,并将所有字母转换为小写字符。

这些分析器定义基于 Apache Lucene,并提供了"as-is"。有关令牌工具、过滤器和 CharFilters 的更多信息,请参阅适当的 Lucene 文档。

11.5.3.2. 使用分析器定义

要使用分析器定义,请在 .proto schema 文件中按名称引用它们。

  1. 包含 Analyze.YES 属性,以指示分析了属性。
  2. 使用 @Analyzer 注释指定分析器定义。

以下示例显示了引用的分析器定义:

/* @Indexed */
message TestEntity {

    /* @Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = "keyword")) */
    optional string id = 1;

    /* @Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = "simple")) */
    optional string name = 2;
}
Copy to Clipboard Toggle word wrap

如果使用 @ProtoField 注解的 Java 类,则声明类似如下:

@ProtoDoc("@Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = \"keyword\"))")
@ProtoField(number = 1)
final String id;

@ProtoDoc("@Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = \"simple\"))")
@ProtoField(number = 2)
final String description;
Copy to Clipboard Toggle word wrap
11.5.3.3. 创建自定义分析器定义

如果您需要自定义分析器定义,请执行以下操作:

  1. 创建打包在 JAR 文件中的 ProgrammaticSearchMappingProvider 接口的实现。
  2. JARMETA-INF/services/ 目录中提供名为 org.infinispan.query.spi.ProgrammaticSearchMappingProvider 的文件。此文件应包含您的实施的完全限定类名称。
  3. JAR 复制到数据网格安装的 lib/ 目录中。

    重要

    在启动期间,您的 jar 必须可供 Data Grid 服务器使用。如果服务器已在运行,则无法添加它。

    以下是 ProgrammaticSearchMappingProvider 接口的示例实现:

    import org.apache.lucene.analysis.core.LowerCaseFilterFactory;
    import org.apache.lucene.analysis.core.StopFilterFactory;
    import org.apache.lucene.analysis.standard.StandardFilterFactory;
    import org.apache.lucene.analysis.standard.StandardTokenizerFactory;
    import org.hibernate.search.cfg.SearchMapping;
    import org.infinispan.Cache;
    import org.infinispan.query.spi.ProgrammaticSearchMappingProvider;
    
    public final class MyAnalyzerProvider implements ProgrammaticSearchMappingProvider {
    
       @Override
       public void defineMappings(Cache cache, SearchMapping searchMapping) {
          searchMapping
                .analyzerDef("standard-with-stop", StandardTokenizerFactory.class)
                   .filter(StandardFilterFactory.class)
                   .filter(LowerCaseFilterFactory.class)
                   .filter(StopFilterFactory.class);
       }
    }
    Copy to Clipboard Toggle word wrap

11.6. 持续查询

持续查询允许应用程序注册监听程序,该监听程序将接收当前与查询过滤器匹配的条目,并将持续通知对查询的数据集的任何更改。这包括传入匹配项,适用于已加入集合的值、更新匹配项、修改和继续匹配的匹配值,以及离开该设置的值传出匹配。通过使用连续查询,应用程序会收到稳定事件流,而不是重复执行相同的查询来发现更改,从而更有效地使用资源。例如,以下所有用例都可以使用持续查询:

  • 返回在 18 到 25 之间带有年龄的所有人员(假设 Person 实体具有 age 属性,并由用户应用程序更新)。
  • 返回超过 $2000 的所有事务。
  • 返回 F1 竞争条件的 lap 速度小于 1:45.00s (假设缓存包含 Lap 条目且在竞争过程中输入的 laps )的所有时间。

11.6.1. 持续查询执行

持续查询使用监听程序,该监听程序在以下情况时获得通知:

  • 条目开始与指定的查询匹配,由 Join 事件表示。
  • 一个匹配的条目已更新,并持续匹配由 Update vent 组成的查询。
  • 条目停止与由 Leave 事件表示的查询匹配。

当客户端注册持续查询监听程序时,它会立即开始接收当前与查询匹配的结果,如上面所述的 Join 事件。另外,当其他条目开始与查询匹配时,它会接收后续的通知,作为 Leave 事件,因为任何通常会生成创建、修改、删除或过期事件的缓存操作。如果条目与操作前后查询过滤器匹配,更新的缓存条目将生成 Update 事件。总而言之,用于确定监听器是否收到 JoinUpdateLeave 事件的逻辑:

  1. 如果旧值和新值的查询评估 false,则会隐藏该事件。
  2. 如果对旧值的查询评估 false,且在新值上评估为 true,则会发送 Join 事件。
  3. 如果旧值和新值的查询评估为 true,则发送 Update 事件。
  4. 如果对旧值的查询评估为 true,且在新值上评估 false,则会发送 Leave 事件。
  5. 如果对旧值的查询评估为 true,且条目被删除或过期,则会发送 Leave 事件。
注意

持续查询可以使用所有查询功能,但分组、聚合和排序操作除外。

11.6.2. 运行持续查询

要创建持续查询,请执行以下操作:

  1. 创建 Query 对象。请参阅 搜索部分
  2. 通过调用适当的方法,获取 cache 的 continuousQuery (org.infinispan.query.api.continuous.ContinuousQuery 对象:

    • org.infinispan.client.hotrod.Search.getContinuousQuery (RemoteCache<K, V> cache) 用于远程模式
    • org.infinispan.query.Search.getContinuousQuery (Cache<K, V> cache) 用于嵌入式模式
  3. 注册查询和持续查询监听程序(org.infinispan.query.api.continuous.ContinuousQueryListener),如下所示:
continuousQuery.addContinuousQueryListener(query, listener);
Copy to Clipboard Toggle word wrap

以下示例演示了在嵌入式模式下简单的持续查询用例: ⁠

注册连续查询

import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.Query;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

[...]

// We have a cache of Persons
Cache<Integer, Person> cache = ...

// We begin by creating a ContinuousQuery instance on the cache
ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache);

// Define our query. In this case we will be looking for any Person instances under 21 years of age.
QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.create("FROM Person p WHERE p.age < 21");

final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>();

// Define the ContinuousQueryListener
ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() {
    @Override
    public void resultJoining(Integer key, Person value) {
        matches.put(key, value);
    }

    @Override
    public void resultUpdated(Integer key, Person value) {
        // we do not process this event
    }

    @Override
    public void resultLeaving(Integer key) {
        matches.remove(key);
    }
};

// Add the listener and the query
continuousQuery.addContinuousQueryListener(query, listener);

[...]

// Remove the listener to stop receiving notifications
continuousQuery.removeContinuousQueryListener(listener);
Copy to Clipboard Toggle word wrap

因为具有小于 21 的 Person 实例被添加到监听器将接收它们的缓存中,并放入 匹配项 映射中,以及这些条目从缓存中删除或其年龄被修改为大于或等于 21,它们将被从 匹配项 中删除。

11.6.3. 删除持续查询

要停止查询进一步执行,请只删除监听程序:

continuousQuery.removeContinuousQueryListener(listener);
Copy to Clipboard Toggle word wrap

11.6.4. 有关持续查询性能的备注

持续查询旨在为应用程序提供持续的更新流,可能会导致为广泛查询生成大量事件。为每个事件创建一个新的临时内存分配。如果查询没有被仔细设计,则此行为可能会导致内存压力,并可能导致 OutOfMemoryErrors (特别是在远程模式中)。为了防止这些问题,强烈建议每个查询捕获在匹配条目数量和每个匹配项时所需的最小信息(项目可以用来捕获有趣属性),并且每个 continuous QueryListener 旨在快速处理所有接收的事件,以避免执行从它监听的缓存中生成新匹配事件的操作。

11.7. Statistics

查询统计信息 可以从 SearchManager 获取,如以下代码片段中所示。

SearchManager searchManager = Search.getSearchManager(cache);
org.hibernate.search.stat.Statistics statistics = searchManager.getStatistics();
Copy to Clipboard Toggle word wrap
提示

此数据也可以通过 Hibernate Search StatisticsInfoMBean 在名称 org.infinispan:type=Query,manager="{name-of-cache-manager}",cache="{name-of-cache}",cache="{name-of-cache}",component=Statistics 下注册。请注意,这个 MBean 始终由 Data Grid 注册,但只有在缓存级别启用统计集合时才会收集统计信息。

警告

Hibernate Search 具有自己的配置属性 hibernate.search.jmx_enabledhibernate.search.generate_statistics,如 此处 所述。将它们与 Data Grid Query 一起使用会被禁止,因为它只会导致重复的 MBeans 和无法预计的结果。

11.8. 性能调优

11.8.1. 以 SYNC 模式进行批处理写入

默认情况下,索引管理器 以同步模式工作,这意味着当数据被写入 Data Grid 时,它将同步执行索引操作。这个同步保证索引始终与数据一致(因此在搜索中可见),但可能会减慢写操作的速度,因为它也会执行对索引的提交。提交是 Lucene 中的非常昂贵的操作,因此,来自不同节点的多个写入可以自动批量到一个提交中,以减少影响。

因此,当将数据加载到启用了索引的 Data Grid 时,请尝试使用多个线程来利用这个批处理。

如果使用多个线程不会导致所需的性能,替代方案是加载带有临时禁用的索引的数据,并在 之后运行重新索引操作。这可以通过使用 SKIP_INDEXING 标志写入数据:

cache.getAdvancedCache().withFlags(Flag.SKIP_INDEXING).put("key","value");
Copy to Clipboard Toggle word wrap

11.8.2. 使用 async 模式编写

如果数据写入和数据在查询中可见时可以接受小的延迟,则索引管理器可以配置为以 async 模式 工作。async 模式提供更好的写入性能,因为在这个模式中以可配置的间隔进行。

配置:

<distributed-cache name="default">
    <indexing>
        <!-- Index data in async mode -->
        <property name="default.worker.execution">async</property>
        <!-- Optional: configure the commit interval, default is 1000ms -->
        <property name="default.index_flush_interval">500</property>
    </indexing>
</distributed-cache>
Copy to Clipboard Toggle word wrap

11.8.3. 索引读取器 async 策略

Lucene 在内部与索引的快照一起工作:一旦打开 IndexReader ,它只会看到索引更改直到打开了点;在索引读取程序刷新前,进一步的索引更改将无法看到。默认情况下,Data Grid 中使用的索引管理器将在每次查询前检查索引读取器的最新性,并在需要时刷新它们。

通过将 reader.strategy 配置设置为 async,将这个策略重新排序此策略来放宽此新性检查到预先配置的间隔:

<distributed-cache name="default">
    <indexing>
        <property name="default.reader.strategy">async</property>
        <!-- refresh reader every 1s, default is 5s -->
        <property name="default.reader.async_refresh_period_ms">1000</property>
    </indexing>
</distributed-cache>
Copy to Clipboard Toggle word wrap

11.8.4. Lucene 选项

可以在 Lucene 中直接应用调优选项。如需了解更多详细信息,请参阅 Hibernate 搜索手册

第 12 章 在网格中执行代码

缓存的主要优点是能够快速查找其键,甚至跨机器查找值。实际上,这单独使用可能是许多用户使用 Data Grid 的原因。但是,Data Grid 可以提供许多无法立即明显的好处。由于 Data Grid 通常在机器集群中使用,因此我们还提供相应的功能,可帮助利用整个集群来执行用户所需的工作负载。

注意

本节仅介绍使用嵌入式缓存在网格中执行代码,如果您正在使用远程缓存,您应该回顾在远程网格中执行代码的详细信息。

12.1. Cluster Executor

由于您有一组计算机,因此最好利用其组合计算能力在所有这些计算机上执行代码。缓存管理器附带了一个 nice 工具,可让您在集群中执行任意代码。请注意,这个功能不需要使用缓存。此 Cluster Executor 可通过调用 EmbeddedCacheManager 上的 executor ()来检索。这个 executor 在集群和非集群配置中可以被检索。

注意

ClusterExecutor 专为执行代码不依赖于缓存中数据的代码而设计,而是作为帮助用户在集群中轻松执行代码的方法。

此管理器专门使用 Java 8 构建,此类功能有功能 API,因此所有方法都使用功能 inteface 作为参数。因为这些参数将发送到其他节点,因此它们需要按顺序排序。我们甚至使用 nice trick 来确保我们的 lambda 可立即串行。这是通过参数实现 Serializable 和 real 参数类型(如 Runnable 或 Function)。在确定要调用的方法时,JRE 将选择最具体的类,因此在这样,您的 lambdas 始终可以被序列化。也可以使用外部工具来进一步减小消息大小。

默认情况下,管理器将向集群中的所有节点提交给定命令,包括从中提交它的节点。您可以使用 filterTargets 方法控制在其上执行任务的节点,如 部分中所述。

12.1.1. 过滤执行节点

可以限制命令要运行的节点。例如,您可能只想在同一机架的计算机上运行计算。或者,您可能想要在本地站点中一次执行操作,并在不同的站点上再次执行操作。集群 executor 可以限制在相同或不同机器、机架或站点级别向发送请求的节点。

SameRack.java

EmbeddedCacheManager manager = ...;
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)
Copy to Clipboard Toggle word wrap

要使用此拓扑基本过滤,您必须通过服务器提示启用拓扑感知一致的哈希。

您还可以根据节点的地址使用 predicate 进行过滤。这也可以选择与之前代码片段中的基于拓扑过滤结合使用。

我们还允许使用任何方法选择目标节点,该方法使用 Predicate 来过滤哪些节点可以被视为执行。请注意,这也可与 Topology 过滤结合使用,以便更精细地控制您在集群中执行代码的位置。

Predicate.java

EmbeddedCacheManager manager = ...;
// Just filter
manager.executor().filterTargets(a -> a.equals(..)).submit(...)
// Filter only those in the desired topology
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)
Copy to Clipboard Toggle word wrap

12.1.2. Timeout(超时)

集群可执行文件允许为每个调用设置超时。默认为在传输配置中配置的分布式同步超时。这个超时可在集群和非集群缓存管理器中正常工作。当超时过期时,executor 可能会或可能无法中断执行任务的线程。但是,当超时发生任何 Consumerfuture 时,将完成传递 TimeoutException。这个值可以通过激活 超时 方法并提供所需持续时间来覆盖。

12.1.3. 单一节点提交

Cluster Executor 也可以以单一节点提交模式运行,而不是向所有节点提交命令,而不必选择通常会收到该命令的其中一个节点,并将其提交到只有一个节点。每个提交都可能会使用不同的节点来执行任务。这对将 ClusterExecutor 用作 java.util.concurrent.Executor 非常有用,您可能会注意到 ClusterExecutor 已实现该 ClusterExecutor。

SingleNode.java

EmbeddedCacheManager manager = ...;
manager.executor().singleNodeSubmission().submit(...)
Copy to Clipboard Toggle word wrap

12.1.3.1. 故障切换

在单一节点提交中运行时,可能需要允许 Cluster Executor 处理给定命令期间发生异常的情况,再次重试命令。当发生这种情况时,Cluster Executor 将再次选择一个一个节点,以重新将命令重新提交到所需的故障转移尝试次数。请注意,所选节点可以是通过拓扑或 predicate 检查的任何节点。通过调用覆盖的 singleNodeSubmission 方法来启用故障转移。给定的命令将再次提交到一个节点,直到命令完成且无例外,或者总提交数等于提供的故障切换计数。

12.1.4. 示例:PI Approximation

本例演示了如何使用 ClusterExecutor 估算 PI 的值。

Pi approximation 大大受益于通过 Cluster Executor 的并行分布式执行。回想一下,块的区域是 Sa = 4r2,圆圈的区域是 Ca=pi*r2。将 r2 从第二个 equation 替换成第一个,它会关闭 pi = 4 * Ca/Sa。现在,我们可以将大量 darts 成方块的图像;如果我们将这个大体中的 darts sto 分值放在圆圈中,那么我们将大约是 Ca/Sa 值。由于我们知道,pi = 4 * Ca/Sa 我们可以轻松获得 pi 的大约价值。更糟糕的是,我们实现了更好的投放。在以下示例中,我们找到了 1 亿 darts,而不是"缩小"它们串行地并行化整个数据网格集群中的 dart shooting 的工作。请注意,这在 1 的集群中可以正常工作,但会较慢。

public class PiAppx {

   public static void main (String [] arg){
      EmbeddedCacheManager cacheManager = ..
      boolean isCluster = ..

      int numPoints = 1_000_000_000;
      int numServers = isCluster ? cacheManager.getMembers().size() : 1;
      int numberPerWorker = numPoints / numServers;

      ClusterExecutor clusterExecutor = cacheManager.executor();
      long start = System.currentTimeMillis();
      // We receive results concurrently - need to handle that
      AtomicLong countCircle = new AtomicLong();
      CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> {
         int insideCircleCount = 0;
         for (int i = 0; i < numberPerWorker; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }, (address, count, throwable) -> {
         if (throwable != null) {
            throwable.printStackTrace();
            System.out.println("Address: " + address + " encountered an error: " + throwable);
         } else {
            countCircle.getAndAdd(count);
         }
      });
      fut.whenComplete((v, t) -> {
         // This is invoked after all nodes have responded with a value or exception
         if (t != null) {
            t.printStackTrace();
            System.out.println("Exception encountered while waiting:" + t);
         } else {
            double appxPi = 4.0 * countCircle.get() / numPoints;

            System.out.println("Distributed PI appx is " + appxPi +
                  " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms");
         }
      });

      // May have to sleep here to keep alive if no user threads left
   }

   private static boolean insideCircle(double x, double y) {
      return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
            <= Math.pow(0.5, 2);
   }
}
Copy to Clipboard Toggle word wrap

第 13 章 流

您可能需要处理一个子集或缓存中的所有数据来生成结果。这可能会造成 Map Reduce 的想法。Data Grid 允许用户执行类似内容,但利用标准的 JRE API 来执行此操作。Java 8 引入了一个流( Stream )的概念,它允许对集合进行功能风格的操作,而不是自行迭代数据。流操作可以以类似 MapReduce 的方式实施。流,就像 MapReduce 一样,您可以对整个缓存执行处理,这可能是一个非常大的数据集,但采用效率。

注意

在处理缓存中存在的数据时,流是首选的方法,因为流会自动调整集群拓扑更改。

另外,由于我们可以控制条目在我们如何迭代,因此如果您希望它同时在集群中执行所有操作,则可以更有效地在缓存中执行操作。

通过调用 stream 或 parallelStream 方法,从 entrySetkeySetvalues 集合 从缓存 返回的流检索。

13.1. 常见流操作

本节重点介绍了您正在使用的底层缓存类型的各种选项。

13.2. 密钥过滤

可以过滤流,使其仅在给定的密钥子集上运行。这可以通过在 CacheStream 上调用 filterKeys 方法来完成。这应该总是通过 Predicate 过滤器使用,如果 predicate 保存所有键,则更快。

如果您熟悉 AdvancedCache 接口,您可能会知道为什么在这个 keyFilter 上使用 getAll。如果您需要条目为 并在本地节点的内存中需要它们,则有一些小的好处(大较小的有效负载)来使用 getAll。但是,如果您需要在这些元素上处理流,因为您将得到分布式和线程并行处理。

13.3. 基于片段的过滤

注意

这是一个高级功能,应该只用于深入了解数据网格网段和哈希技术。如果您需要将数据分段到单独的调用中,这些片段过滤很有用。当与其他工具(如 Apache Spark )集成时,这非常有用。

这个选项只支持复制和分布式缓存。这允许用户在 KeyPartitioner 决定时对数据的子集进行操作。可以通过在 CacheStream 上调用 filterKeySegments 方法来过滤网段。这在键过滤器后应用,但在执行任何中间操作前应用。

13.4. local/Invalidation

与本地或无效缓存一起使用的流可以像在常规集合上使用流一样使用。如果需要,数据网格处理所有翻译,并与更有趣的选项(例如 storeAsBinary 和 cache loader)一起工作。仅执行流操作的节点本地数据,例如无效只使用本地条目。

13.5. Example

以下代码使用一个缓存,并返回一个带有字符串 "JBoss" 的所有缓存条目的映射

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("JBoss"))
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Copy to Clipboard Toggle word wrap

13.6. distribution/Replication/Scattered

这是流进入其范围的地方。执行流操作时,它会将各种中间和终端操作发送到具有相关数据的每个节点。这允许处理拥有数据的节点上的中间值,并且仅将最终结果发送回原始节点,从而提高性能。

13.6.1. 重新哈希 Aware

在内部,数据被分段,每个节点只对它拥有的数据执行操作。这允许平均处理数据,假设片段足够精细,以为每个节点上提供相同数量的数据。

使用分布式缓存时,当新节点加入或离开时,可以在节点间重新处理数据。分布式流处理会自动重新处理数据,因此您不必在节点离开或加入集群时考虑监控。Reshuffled 条目可能会第二次处理,我们将在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。

最好禁用重新哈希对流的了解。只有在您的请求只能处理重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来实现,当重新哈希无法完全忽略时,大多数操作的性能都会获得相应的性能。唯一的例外是 iterator 和 per per (使用较少的内存),因为它们不必跟踪已处理的密钥。

警告

请重新考虑禁用重新哈希感知,除非您真正知道自己正在执行的操作。

13.6.2. serialization

由于操作会互相发送到其他节点,因此它们必须可以被 Data Grid marshalling 处理。这允许将操作发送到其他节点。

最简单的方法是使用 CacheStream 实例,并使用 lambda,就像您正常一样。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…​)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体方法。

在前面的示例中,我们使用 Collector 将所有结果收集到 Map 中。不幸的是 ,Collector s 类不会生成 Serializable 实例。因此,如果您需要使用它们,可以通过两种方式来实现:

一个选项是使用 CacheCollectors 类,该类允许提供 Supplier<Collector& gt;。然后,这个实例可以使用 Collector 提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
Copy to Clipboard Toggle word wrap

或者,您可以避免使用 CacheCollectors,而是使用采用 Supplier<Collector > 的超载 收集 方法。这些过载 收集 方法只能通过 CacheStream 接口获得。

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Copy to Clipboard Toggle word wrap

但是,如果您无法使用 CacheCacheStream 接口,则无法使用 Serializable 参数,而是必须通过将 lambda 发送到多个接口来手动调用 lambdas。这不是一个非常好的,但会获得作业完成。

Map<Object, String> jbossValues = map.entrySet().stream()
              .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
Copy to Clipboard Toggle word wrap

推荐的和最高性能的方法是使用 AdvancedExternalizer,因为这提供了最小的有效负载。不幸的是,您不能将 lamdbas 用作高级外部工具,需要预先定义该类。

您可以使用高级外部工具,如下所示:

   Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

   class ContainsFilter implements Predicate<Map.Entry<Object, String>> {
      private final String target;

      ContainsFilter(String target) {
         this.target = target;
      }

      @Override
      public boolean test(Map.Entry<Object, String> e) {
         return e.getValue().contains(target);
      }
   }

   class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> {

      @Override
      public Set<Class<? extends ContainsFilter>> getTypeClasses() {
         return Util.asSet(ContainsFilter.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException {
         output.writeUTF(object.target);
      }

      @Override
      public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return new ContainsFilter(input.readUTF());
      }
   }
Copy to Clipboard Toggle word wrap

您还可以为收集器供应商使用高级外部工具来进一步缩小有效负载大小。

Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

 class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> {
      static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier();

      private ToMapCollectorSupplier() { }

      @Override
      public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() {
         return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue);
      }
   }

   class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> {

      @Override
      public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() {
         return Util.asSet(ToMapCollectorSupplier.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException {
      }

      @Override
      public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return ToMapCollectorSupplier.INSTANCE;
      }
   }
Copy to Clipboard Toggle word wrap

13.7. 并行复杂度

默认情况下,分布式流会尽量并行化。最终用户可以控制这一点,实际上它们始终需要控制其中一个选项。这些流的并行性有两种。

当从 缓存集合创建流时,最终用户可以在调用流或 parallelStream 方法之间进行选择。https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html#stream--根据是否选择了并行流,将在本地为每个节点启用多个线程。请注意,一些操作(如重新哈希)和每个操作都会在本地使用顺序流。这在某些情况下可以增强,以允许本地并行流。

在使用本地并行时,用户应该小心,因为它需要大量条目或操作,这些条目或操作需要可以更快地计算。另请注意,如果用户使用具有 的并行流,并且每个操作不应阻断,这会在常见池中执行,这通常为计算操作保留。

当有多个节点时,远程请求可能需要控制同时处理远程请求是否同时处理。默认情况下,除 iterator 执行并发请求外的所有终端操作。迭代器,用于降低本地节点上的总体内存压力的方法,仅执行实际上性能稍好更高的顺序请求。

如果用户希望在 CacheStream 上调用 sequentialDistributionparallelDistribution 方法来更改此默认值。

13.8. 任务超时

可以为操作请求设置超时值。这个超时仅用于远程请求超时,它基于每个请求。前者表示本地执行不会超时,后者意味着,如果您有一个故障转移场景,如后续请求上方描述,则每个请求都有新的超时。如果没有指定超时,它将使用复制超时作为默认超时时间。您可以通过执行以下操作在任务中设置超时:

CacheStream<Map.Entry<Object, String>> stream = cache.entrySet().stream();
stream.timeout(1, TimeUnit.MINUTES);
Copy to Clipboard Toggle word wrap

有关此问题的更多信息,请检查 java doc in timeout javadoc。

13.9. 注入

都有一个称为 的终端操作,允许对数据运行某种副作用的操作。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEach-java.util.function.Consumer-在这种情况下,可能需要获得支持此流的缓存的引用。如果您的 Consumer 实现 CacheAware 接口,则在来自 Consumer 接口的 accept 方法之前调用 injectCache 方法。

13.10. 分布式流执行

分布式流执行以非常熟悉的方式进行映射。除了本例中,我们会向不同的节点发送零到多个中间操作(映射、过滤等)和单个终端操作。操作基本上会减少到以下几项:

  1. 所需的片段由哪个节点分组,这是给定网段的主要所有者
  2. 生成请求以发送到包含中间和终端操作的每个远程节点,包括它应该处理的片段

    1. 如有必要,终端操作将在本地执行
    2. 每个远程节点都将接收此请求并运行操作,然后发送响应回
  3. 然后,本地节点将收集本地响应和远程响应,同时执行操作本身所需的任何减少。
  4. 最终减少了响应,然后返回给用户

在大多数情况下,所有操作都是完全分发的,因为操作是在每个远程节点上完全应用,通常只有最后一个操作或相关操作才可以重新应用,以减少多个节点的结果。务必要注意,中间值实际上不需要被序列化,它是发送的最后一个值,这是所需部分(突出显示各种操作的异常)。

终端 operator 分布的结果 会减少以下段落描述了各种终端操作器的分布式缩减如何工作。其中一些是特殊的,可能需要中间值才能被序列化,而不是最终结果。

allMatch noneMatch anyMatch
allMatch 操作在每个节点上运行,然后所有结果在本地逻辑上进行,以获取适当的值。noneMatchanyMatch 操作使用逻辑或替代。这些方法也有早期终止支持,在已知的最终结果后停止远程和本地操作。
collect
collect 方法很有趣,它可以执行一些额外的步骤。远程节点以正常方式执行所有内容,但它不会在结果上执行最终的 结束 程序,而是发回完全组合的结果。然后,本地线程 会将 远程和本地结果合并为一个值,然后最后完成。这里要记住的键是最终的值不一定是可序列化的,而是来自供应商和 组合 方法的值。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html#supplier--
数�
count 方法只从每个节点中添加数字。
findAny findFirst
findAny 操作只返回它们找到的第一个值,无论是来自远程节点还是本地。请注意,当发现一个值后,它就不会处理其他值。请注意 findFirst 方法是特殊的,因为它需要排序的中间操作,这在 例外 部分中详细介绍。
最大分钟
maxmin 方法在每个节点上找到对应的 min 或 max 值,然后在本地执行最终缩减,以确保返回所有节点中的 min 或 max。
reduce
各种减少方法 123 将按顺序化结果,与累计者可以尽可能多。然后,如果提供,它将在本地整合本地和远程结果。请注意,这意味着来自 combiner 的值不必是 Serializable。

13.11. 基于密钥的重新哈希了解操作符

迭代器分割 器和每个 终端运算符都与重新哈希感知器不同的是,必须跟踪每个网段处理了哪些键,而不是仅段段。这是为了保证一次(迭代器和分割器)或至少一次行为(对于每个),即使在集群成员资格发生变化下也是如此。

在远程节点上调用时,迭代 器和 分割 器运算符将返回条目的回滚批处理,其中下一个批处理仅在最后一个完全被完全使用后发回。完成此批处理是为了限制给定时间在内存中有多少条目。用户节点将保存在其处理哪些密钥以及给定片段完成时,将从内存中释放这些密钥。这就是为什么在迭代器方法中使用顺序处理的原因,因此仅在内存中保存片段密钥的子集,而不是来自所有节点。

per () 方法也会返回批处理,但在处理至少批处理后,它会返回批处理密钥。这样,原始节点可以知道已经处理了哪些密钥,以减少再次处理同一条目的几率。不幸的是,这意味着当节点意外停机时,至少有一次行为。在这种情况下,节点可能已经处理批处理,而还没有完成其中一个和这些条目,但还没有在完成的批处理中会在重新哈希失败时再次运行。请注意,在收到所有响应前,添加节点不会导致这个问题,因为重新哈希故障转移不会发生。

这些操作批处理的大小都由相同的值控制,这可以通过调用 CacheStream 上的 distributedBatchSize 方法进行配置。这个值将默认为在状态传输中配置的 chunkSize。不幸的是,这个值会权衡内存用量与性能,至少一次,您的鼠标可能会有所不同。

迭代器 与复制和分布式缓存一起使用

当节点是分布式流的所有请求片段的主要或备份所有者时,Data Grid 在本地执行 迭代分割 器终端操作,这会优化性能,因为远程迭代更为资源密集型。

此优化适用于复制和分布式缓存。但是,在使用 共享 并启用了 write-behind 的缓存存储时,Data Grid 会远程执行迭代。在这种情况下,远程执行迭代可确保一致性。

13.12. 中间操作例外

有些带有特殊例外的中间操作 会跳过peek、排序 1不同的https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#sorted--所有这些方法在流处理中都会发生某种形式,以保证正确性,如下所示。请注意,这意味着这些操作可能会导致严重的性能降级。

skip
Anrtificial iterator 被计划到中间跳过操作。然后,结果会在本地生成,以便可以跳过适当数量的元素。
排序
警告:此操作需要本地节点上的内存所有条目。Anrtificial iterator 被计划到中间排序操作。所有结果都在本地排序。可能的计划中可能有一个分布式排序来返回元素批处理,但这尚未实施。
不同的
警告:此操作需要本地节点上的所有或几乎所有条目。每个远程节点上都执行不同的情况,然后对它产生不同的迭代器返回这些不同的值。最后,所有这些结果都有不同的操作。

其余的中间操作会按预期完全分发。

13.13. 例子

字数

单词 count 是典型的,如果过度使用,如映射/缩减范例。假设我们在 Data Grid 节点上存储了 key → 句子的映射。Key 是一个字符串,每个句子都是一个 String,我们必须计算所有句子中所有可用的词语。此类分布式任务的实现可定义如下:

public class WordCountExample {

   /**
    * In this example replace c1 and c2 with
    * real Cache references
    *
    * @param args
    */
   public static void main(String[] args) {
      Cache<String, String> c1 = ...;
      Cache<String, String> c2 = ...;

      c1.put("1", "Hello world here I am");
      c2.put("2", "Infinispan rules the world");
      c1.put("3", "JUDCon is in Boston");
      c2.put("4", "JBoss World is in Boston as well");
      c1.put("12","JBoss Application Server");
      c2.put("15", "Hello world");
      c1.put("14", "Infinispan community");
      c2.put("15", "Hello world");

      c1.put("111", "Infinispan open source");
      c2.put("112", "Boston is close to Toronto");
      c1.put("113", "Toronto is a capital of Ontario");
      c2.put("114", "JUDCon is cool");
      c1.put("211", "JBoss World is awesome");
      c2.put("212", "JBoss rules");
      c1.put("213", "JBoss division of RedHat ");
      c2.put("214", "RedHat community");

      Map<String, Long> wordCountMap = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
   }
}
Copy to Clipboard Toggle word wrap

在这种情况下,可以简单地执行上例中的单词 count。

但是,如果我们想要在示例中找到最频繁的字词,该怎么办?如果您考虑第二个情况,您将意识到您需要首先计算所有词语,并首先可用。因此,我们实际上有一些选项。

我们可以在收集器上使用一个 finisher,它会在收集所有结果后在用户线程上调用。之前示例中删除了一些冗余行。

public class WordCountExample {
   public static void main(String[] args) {
      // Lines removed

      String mostFrequentWord = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.collectingAndThen(
            Collectors.groupingBy(Function.identity(), Collectors.counting()),
               wordCountMap -> {
                  String mostFrequent = null;
                  long maxCount = 0;
                     for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                        int count = e.getValue().intValue();
                        if (count > maxCount) {
                           maxCount = count;
                           mostFrequent = e.getKey();
                        }
                     }
                     return mostFrequent;
               }));

}
Copy to Clipboard Toggle word wrap

不幸的是,最后一步只会在单一线程中运行,如果我们有很多词语可能非常慢。可以通过另一种方式与 Streams 并行化。

在处理后,我们提到了本地节点,因此我们实际上可以在映射结果上使用流。因此,我们可以在结果中使用并行流。

public class WordFrequencyExample {
   public static void main(String[] args) {
      // Lines removed

      Map<String, Long> wordCount = c1.entrySet().parallelStream()
              .map(e -> e.getValue().split("\\s"))
              .flatMap(Arrays::stream)
              .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
      Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce(
              (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
Copy to Clipboard Toggle word wrap

这样,在计算最频繁的元素时,您仍然可以在本地使用所有内核。

删除特定条目

分布式流也可以用作修改它所在数据的方法。例如,您可能想要删除包含特定词语的缓存中的所有条目。

public class RemoveBadWords {
   public static void main(String[] args) {
      // Lines removed
      String word = ..

      c1.entrySet().parallelStream()
         .filter(e -> e.getValue().contains(word))
         .forEach((c, e) -> c.remove(e.getKey()));
Copy to Clipboard Toggle word wrap

如果我们仔细注意了要序列化的内容,并且没有什么,我们注意到,只有词语以及操作才会按顺序化到其他 nods,因为它由 lambda 捕获。但是,实际保存部分是缓存操作在主所有者上执行,从而减少从缓存中删除这些值所需的网络流量数量。lambda 不会捕获缓存,因为我们在每个节点上调用时提供一个特殊的 BiConsumer 方法覆盖,这会将缓存传递给 BiConsumer

以这种方式考虑使用 for each 命令的一个问题是底层流没有获得锁定。缓存删除操作仍将自然而获得锁定,但该值可能会从流看到的内容改变。这意味着,在流读取后条目可能会改变,但删除会实际删除它。

我们特别添加了名为 LockedStream 的新变体。

其他示例的 plenty

Streams API 是一个 JRE 工具,有许多示例使用它。只需记住您的操作需要以某种方式 Serializ。

第 14 章 JCache (JSR-107) API

Data Grid 提供 JCache 1.0 API 的实施( JSR-107 )。JCACHE 指定用于在内存中缓存临时 Java 对象的标准 Java API。缓存 java 对象有助于防止使用代价昂贵的数据(例如,DB 或 Web 服务)或难以计算的数据造成瓶颈。在内存中缓存这些类型的对象有助于加快应用性能,方法是直接从内存检索数据,而不是执行昂贵的往返或重新计算。本文档论述了如何在规范的 Data Grid 实现中使用 JCache,并解释了 API 的关键方面。

14.1. 创建嵌入式缓存

先决条件

  1. 确保您的 cache-api 位于您的 classpath 上。
  2. 将以下依赖项添加到 pom.xml 中:

    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-jcache</artifactId>
    </dependency>
    Copy to Clipboard Toggle word wrap

流程

  • 创建使用默认 JCache API 配置的嵌入式缓存,如下所示:
import javax.cache.*;
import javax.cache.configuration.*;

// Retrieve the system wide cache manager
CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
// Define a named cache with default JCache configuration
Cache<String, String> cache = cacheManager.createCache("namedCache",
      new MutableConfiguration<String, String>());
Copy to Clipboard Toggle word wrap

14.1.1. 配置嵌入式缓存

  • 将自定义 Data Grid 配置的 URI 传递给 caching Provider.getCacheManager (URI) 调用,如下所示:
import java.net.URI;
import javax.cache.*;
import javax.cache.configuration.*;

// Load configuration from an absolute filesystem path
URI uri = URI.create("file:///path/to/infinispan.xml");
// Load configuration from a classpath resource
// URI uri = this.getClass().getClassLoader().getResource("infinispan.xml").toURI();

// Create a cache manager using the above configuration
CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(uri, this.getClass().getClassLoader(), null);
Copy to Clipboard Toggle word wrap
警告

默认情况下,JCache API 指定数据应存储为 storeByValue,因此,操作之外的对象状态不会对缓存中存储的对象产生影响。目前,数据网格使用 serialization/marshalling 来实现此目的,从而使副本存储在缓存中,并遵循 spec。因此,如果将默认 JCache 配置与 Data Grid 一起使用,则存储的数据必须可以被处理。

另外,也可以通过参考(如 Data Grid 或 JDK Collections 工作)将 JCache 配置为存储数据。要做到这一点,只需调用:

Cache<String, String> cache = cacheManager.createCache("namedCache",
      new MutableConfiguration<String, String>().setStoreByValue(false));
Copy to Clipboard Toggle word wrap

14.2. 创建远程缓存

先决条件

  1. 确保您的 cache-api 位于您的 classpath 上。
  2. 将以下依赖项添加到 pom.xml 中:

    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-jcache-remote</artifactId>
    </dependency>
    Copy to Clipboard Toggle word wrap

流程

  • 在远程 Data Grid 服务器上创建缓存,并使用默认的 JCache API 配置,如下所示:
import javax.cache.*;
import javax.cache.configuration.*;

// Retrieve the system wide cache manager via org.infinispan.jcache.remote.JCachingProvider
CacheManager cacheManager = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider").getCacheManager();
// Define a named cache with default JCache configuration
Cache<String, String> cache = cacheManager.createCache("remoteNamedCache",
      new MutableConfiguration<String, String>());
Copy to Clipboard Toggle word wrap

14.2.1. 配置远程缓存

热 Rod 配置文件包括 infinispan.client.hotrod.cache mdadm 属性,可用于自定义远程缓存。

  • hotrod-client.properties 文件的 URI 传递给 caching Provider.getCacheManager (URI) 调用,如下所示:
import javax.cache.*;
import javax.cache.configuration.*;

// Load configuration from an absolute filesystem path
URI uri = URI.create("file:///path/to/hotrod-client.properties");
// Load configuration from a classpath resource
// URI uri = this.getClass().getClassLoader().getResource("hotrod-client.properties").toURI();

// Retrieve the system wide cache manager via org.infinispan.jcache.remote.JCachingProvider
CacheManager cacheManager = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider")
      .getCacheManager(uri, this.getClass().getClassLoader(), null);
Copy to Clipboard Toggle word wrap

14.3. 存储和检索数据

虽然 JCache API 不扩展 java.util.Map 而不是 java.util.concurrent.ConcurrentMap,但它提供了一个键/值 API 来存储和检索数据:

import javax.cache.*;
import javax.cache.configuration.*;

CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
Cache<String, String> cache = cacheManager.createCache("namedCache",
      new MutableConfiguration<String, String>());
cache.put("hello", "world"); // Notice that javax.cache.Cache.put(K) returns void!
String value = cache.get("hello"); // Returns "world"
Copy to Clipboard Toggle word wrap

与标准的 java.util.Map 不同,javax.cache.Cache 附带两个基本的放置方法,称为 put 和 getAndPut。前者返回 void,后者返回与键关联的以前的值。因此,JCache 中的 java.util.Map.put (K) 等效于 javax.cache.Cache.getAndPut (K)

提示

虽然 JCache API 仅涵盖独立缓存,但可以使用持久性存储插入它,并考虑了集群或分发。javax.cache.Cache 提供两个放置方法的原因是,标准 java.util.Map 放置调用强制实施器来计算之前的值。当使用持久性存储或缓存被分发时,返回前面的值可能是一个昂贵的操作,用户通常会调用标准 java.util.Map.put (K),而无需使用返回值。因此,JCache 用户需要考虑返回值是否与它们相关,在这种情况下,需要调用 javax.cache.Cache.getAndPut (K),否则他们可以调用 java.util.Map.put (K, V),以避免返回之前值的潜在昂贵的操作。

以下是由 java.util.concurrent.ConcurrentMapjavax.cache.Cache API 提供的数据操作 API 的简要比较。

Expand
操作java.util.concurrent.ConcurrentMap<K, V>javax.cache.Cache<K, V>

存储且没有返回

N/A 

void put (K key)

存储并返回以前的值

v put (K key)

v getAndPut (K key)

如果不存在,则存储

V putIfAbsent(K key, V value)

boolean putIfAbsent(K key, V value)

retrieve

v get (Object key)

v get (K key)

如果存在,删除

v remove (Object key)

布尔值删除(K 密钥)

删除并返回前面的值

v remove (Object key)

V getAndRemove(K key)

删除条件

boolean remove (Object key, Object value)

布尔值 remove (K key, V oldValue)

如果存在,替换

v replace (K key, V value)

boolean replace (K key, V value)

替换并返回前面的值

v replace (K key, V value)

V getAndReplace(K key, V value)

替换条件

boolean replace (K key, V oldValue, V newValue)

boolean replace (K key, V oldValue, V newValue)

比较两个 API,很明显地发现,在可能的情况下,JCache 避免返回之前的值,以避免操作执行昂贵的网络或 IO 操作。这是 JCache API 设计中的原则。实际上,在 java.util.concurrent.ConcurrentMap 中存在一组操作,但没有存在于 javax.cache.Cache 中,因为它们可能难以在分布式缓存中计算。唯一例外是迭代缓存的内容:

Expand
操作java.util.concurrent.ConcurrentMap<K, V>javax.cache.Cache<K, V>

计算缓存大小

int size ()

 N/A

返回缓存中的所有密钥

Set<K> keySet()

 N/A

返回缓存中的所有值

collection<V> values ()

 N/A

返回缓存中的所有条目

Set<Map.Entry<K, V>> entrySet()

 N/A

迭代缓存

在 keySet、value 或 entrySet 上使用 iterator () 方法

iterator<Cache.Entry<K, V>> iterator ()

14.5. 集群 JCache 实例

数据网格 JCache 实现超出了规格,以便使用标准 API 的集群缓存的可能性。根据配置为复制缓存的数据网格配置文件,如下所示:

infinispan.xml

<infinispan>
   <cache-container default-cache="namedCache">
      <transport cluster="jcache-cluster" />
      <replicated-cache name="namedCache" />
   </cache-container>
</infinispan>
Copy to Clipboard Toggle word wrap

您可以使用此代码创建缓存集群:

import javax.cache.*;
import java.net.URI;

// For multiple cache managers to be constructed with the standard JCache API
// and live in the same JVM, either their names, or their classloaders, must
// be different.
// This example shows how to force their classloaders to be different.
// An alternative method would have been to duplicate the XML file and give
// it a different name, but this results in unnecessary file duplication.
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
CacheManager cacheManager1 = Caching.getCachingProvider().getCacheManager(
      URI.create("infinispan-jcache-cluster.xml"), new TestClassLoader(tccl));
CacheManager cacheManager2 = Caching.getCachingProvider().getCacheManager(
      URI.create("infinispan-jcache-cluster.xml"), new TestClassLoader(tccl));

Cache<String, String> cache1 = cacheManager1.getCache("namedCache");
Cache<String, String> cache2 = cacheManager2.getCache("namedCache");

cache1.put("hello", "world");
String value = cache2.get("hello"); // Returns "world" if clustering is working

// --

public static class TestClassLoader extends ClassLoader {
  public TestClassLoader(ClassLoader parent) {
     super(parent);
  }
}
Copy to Clipboard Toggle word wrap

第 15 章 Multimap Cache

MutimapCache 是一种 Data Grid Cache 类型,用于将键映射到每个键可以包含多个值的值。

15.1. 安装和配置

pom.xml

<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-multimap</artifactId>
</dependency>
Copy to Clipboard Toggle word wrap

15.2. MultimapCache API

MultimapCache API 会公开几种与多映射缓存交互的方法。在大多数情况下,这些方法是非阻塞的;如需更多信息,请参阅 限制

public interface MultimapCache<K, V> {

   CompletableFuture<Optional<CacheEntry<K, Collection<V>>>> getEntry(K key);

   CompletableFuture<Void> remove(SerializablePredicate<? super V> p);

   CompletableFuture<Void> put(K key, V value);

   CompletableFuture<Collection<V>> get(K key);

   CompletableFuture<Boolean> remove(K key);

   CompletableFuture<Boolean> remove(K key, V value);

   CompletableFuture<Void> remove(Predicate<? super V> p);

   CompletableFuture<Boolean> containsKey(K key);

   CompletableFuture<Boolean> containsValue(V value);

   CompletableFuture<Boolean> containsEntry(K key, V value);

   CompletableFuture<Long> size();

   boolean supportsDuplicates();

}
Copy to Clipboard Toggle word wrap

CompletableFuture<Void> put (K key, V value)

将键值对放在 multimap 缓存中。

MultimapCache<String, String> multimapCache = ...;

multimapCache.put("girlNames", "marie")
             .thenCompose(r1 -> multimapCache.put("girlNames", "oihana"))
             .thenCompose(r3 -> multimapCache.get("girlNames"))
             .thenAccept(names -> {
                          if(names.contains("marie"))
                              System.out.println("Marie is a girl name");

                           if(names.contains("oihana"))
                              System.out.println("Oihana is a girl name");
                        });
Copy to Clipboard Toggle word wrap

这个代码的输出如下:

Marie is a girl name
Oihana is a girl name
Copy to Clipboard Toggle word wrap

CompletableFuture<Collection<V>> get (K key)

异步返回与这个多映射缓存中键关联的值(若有)的视图集合。对检索到的集合的任何更改都不会更改此多映射缓存中的值。当此方法返回空集合时,这意味着未找到该密钥。

CompletableFuture<Boolean> remove (K key)

异步的异步从多映射缓存中删除与密钥关联的条目(如果存在)。

CompletableFuture<Boolean> remove (K key, V value)

异步的异步,它会从多映射缓存中删除键值对(如果存在)。

CompletableFuture<Void> remove (Predicate<? super V> p)

异步方法.删除与给定 predicate 匹配的每个值。

CompletableFuture<Boolean> containsKey (K key)

如果这个多map包含密钥,则异步返回 true。

CompletableFuture<Boolean> containsValue (V value)

如果此多map包含至少一个键中的值,则异步返回 true。

CompletableFuture<Boolean> containsEntry(K key, V value)

如果此多map至少包含一个键值对和值,则异步返回 true。

CompletableFuture<Long> size ()

在多映射缓存中返回键值对数的异步。它不会返回不同的密钥数。

boolean supportsDuplicates()

如果多映射缓存支持重复,则异步返回 true。这意味着 multimap 的内容可以是 'a' → ['1', '1', '2']。现在,这个方法总是返回 false,因为还没有支持重复。给定值的存在由 'equals' 和 'hashcode' 方法的合同决定。

15.3. 创建多映射缓存

目前,MultimapCache 配置为常规缓存。这可以通过代码或 XML 配置来完成。请参阅如何在部分链接中配置常规缓存 [配置 cache]。

15.3.1. 嵌入式模式

// create or obtain your EmbeddedCacheManager
EmbeddedCacheManager cm = ... ;

// create or obtain a MultimapCacheManager passing the EmbeddedCacheManager
MultimapCacheManager multimapCacheManager = EmbeddedMultimapCacheManagerFactory.from(cm);

// define the configuration for the multimap cache
multimapCacheManager.defineConfiguration(multimapCacheName, c.build());

// get the multimap cache
multimapCache = multimapCacheManager.get(multimapCacheName);
Copy to Clipboard Toggle word wrap

15.4. 限制

在几乎每个情况下,Multimap Cache 将作为常规缓存的行为,但当前版本中存在一些限制,如下所示:

15.4.1. 支持重复

尚不支持重复项。这意味着 multimap 不包含任何重复的键值对。每当调用放置方法时,如果键-值对已经存在,则不会添加此键值 par。用于检查 Multimap 中是否已存在键值对的方法是 等号散列码

15.4.2. 驱除

现在,驱除会按键而不是每个键对工作。这意味着,每当键被驱除时,与键关联的所有值也会被驱除。

15.4.3. Transactions

通过 auto-commit 支持隐式事务,所有方法都不是阻塞的。在大多数情况下,显式事务都可以正常工作。将阻止 大小为包含Entryremove (Predicate<? super V> p)的方法

第 16 章 用于 Red Hat JBoss EAP 的 Data Grid 模块

要在部署到 Red Hat JBoss EAP 的应用中使用 Data Grid,您应该安装 Data Grid 模块:

  • 允许您在 WAR 或 EAR 文件中打包 Data Grid JAR 文件的情况下部署应用程序。
  • 允许您使用独立于 Red Hat JBoss EAP 捆绑的 Data Grid 版本。
重要

Data Grid 模块已弃用,并计划删除。这些模块在 Red Hat JBoss EAP 直接管理 infinispan 子系统之前提供临时解决方案。

16.1. 安装 Data Grid 模块

为红帽 JBoss EAP 下载并安装 Data Grid 模块。

先决条件

  1. JDK 8 或更高版本。
  2. 现有红帽 JBoss EAP 安装。

流程

  1. 登录到红帽客户门户。
  2. Data Grid 软件下载 模块的 ZIP 存档。
  3. 提取 ZIP 存档并将 模块 内容复制到 Red Hat JBoss EAP 安装的模块目录中,以便获得生成的结构:

    $EAP_HOME/modules/system/add-ons/rhdg/org/infinispan/rhdg-8.1

16.2. 配置应用程序以使用 Data Grid 模块

为 Red Hat JBoss EAP 安装 Data Grid 模块后,将您的应用程序配置为使用 Data Grid 功能。

流程

  1. 在项目 pom.xml 文件中,将所需的 Data Grid 依赖项标记为 提供
  2. 配置您的工件存档器,以生成适当的 MANIFEST.MF 文件。

pom.xml

<dependencies>
  <dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-core</artifactId>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-cachestore-jdbc</artifactId>
    <scope>provided</scope>
  </dependency>
</dependencies>
<build>
  <plugins>
     <plugin>
       <groupId>org.apache.maven.plugins</groupId>
       <artifactId>maven-war-plugin</artifactId>
       <configuration>
         <archive>
           <manifestEntries>
             <Dependencies>org.infinispan:rhdg-8.1 services</Dependencies>
           </manifestEntries>
         </archive>
      </configuration>
    </plugin>
  </plugins>
</build>
Copy to Clipboard Toggle word wrap

Data Grid 功能被打包为一个模块 org.infinispan,您可以将其作为条目添加到应用程序清单中,如下所示:

MANIFEST.MF

Manifest-Version: 1.0
Dependencies: org.infinispan:rhdg-8.1 services
Copy to Clipboard Toggle word wrap

通过 org.apache.catalina.Manager 接口将 HTTP 会话数据从 JBoss Web 服务器部署外部化到 Data Grid Server 集群来实现高可用性。

17.1. 安装 Tomcat 会话客户端

安装 Tomcat 会话客户端,将来自 Red Hat JBoss Web Server 应用程序的 HTTP 会话外部化到 Red Hat Data Grid。

流程

  1. Data Grid Software Downloads 下载 redhat-datagrid-8.1.1-tomcat<$version>-session-client.zip 归档。
  2. 将存档提取到您的文件系统中。
  3. 将所提取存档中的 lib/ 目录的内容复制到 $CATALINA_HOME/lib

17.2. 配置会话管理器

为会话管理器配置 HotRodManager 类,以定义 Tomcat 会话客户端如何连接到 Red Hat Data Grid Server,并在远程缓存中存储数据。

先决条件

  • 安装 Tomcat 会话客户端。
  • 至少安装一个 Data Grid 服务器实例。
  • 在 Data Grid Server 上创建缓存,以用作存储 HTTP 会话数据的模板。

流程

  1. 打开 $CATALINA_HOME/conf/context.xml/WEB-INF/context.xml 进行编辑。
  2. org.wildfly.clustering.tomcat.hotrod.HotRodManager 指定为 className 属性的值。
  3. 使用 configurationName 属性指定用作模板的缓存名称。
  4. 根据需要定义 HotRodManager 类的任何其他配置属性。
  5. 设置 Hot Rod 客户端配置属性,但不设置 infinispan.client.hotrod. 前缀。

    1. 使用 server_list 属性指定 Data Grid Server 节点列表。
    2. 使用 auth_usernameauth_password 属性指定 Data Grid 凭证。
  6. 根据需要,为 Tomcat 会话管理器指定通用属性。
  7. 保存并关闭 context.xml

配置示例

<Manager className="org.wildfly.clustering.tomcat.hotrod.HotRodManager"
         configurationName="mycache"
         persistenceStrategy="FINE"
         maxActiveSessions="100"
         server_list="192.0.2.0:11222;192.0.2.0:11223;192.0.2.0:11224"
         protocol_version="2.9"
         auth_username="admin"
         auth_password="changeme"
         auth_realm="default"
         sasl_mechanism="DIGEST-MD5"
         auth_server_name="infinispan"/>
Copy to Clipboard Toggle word wrap

验证

要验证 Tomcat 会话客户端是否在远程缓存中存储数据,请执行以下操作:

  1. 在任何浏览器中打开 Data Grid 控制台。

    默认情况下,控制台位于 http://127.0.0.1:11222/console/

  2. 检查 Tomcat 会话客户端是否已为每个部署的应用程序创建了缓存。

17.2.1. 热 Rod 管理器配置属性

下表列出了 HotRodManager 类的配置属性:

Expand
属性描述

className

指定 org.wildfly.clustering.tomcat.hotrod.HotRodManager 作为会话管理器。

configurationName

指定 Data Grid 服务器上的远程缓存,用作存储 HTTP 会话数据的模板。

persistenceStrategy

定义会话如何映射到缓存中的条目。

COARSE 将会话的所有属性存储在单个缓存条目中。这是默认值。

FINE 将会话属性存储在单独的缓存条目中。

maxActiveSessions

定义要在缓存中存储的最大会话数。默认为没有最大值(无限)。

第 18 章 自定义拦截器

重要

自定义拦截器在 Data Grid 中已弃用,并将在以后的发行版本中删除。

自定义拦截器是通过影响或响应缓存的任何修改来扩展数据网格的方法。此类修改示例包括:添加/删除/更新或事务被提交。

18.1. 以声明性方式添加自定义拦截器

可以基于每个命名的缓存添加自定义拦截器。这是因为每个命名的缓存都有自己的拦截器堆栈。以下 xml 片段描述了添加自定义拦截器的方法。

<local-cache name="cacheWithCustomInterceptors">
      <!--
      Define custom interceptors.  All custom interceptors need to extend org.jboss.cache.interceptors.base.CommandInterceptor
      -->
      <custom-interceptors>
         <interceptor position="FIRST" class="com.mycompany.CustomInterceptor1">
               <property name="attributeOne">value1</property>
               <property name="attributeTwo">value2</property>
         </interceptor>
         <interceptor position="LAST" class="com.mycompany.CustomInterceptor2"/>
         <interceptor index="3" class="com.mycompany.CustomInterceptor1"/>
         <interceptor before="org.infinispanpan.interceptors.CallInterceptor" class="com.mycompany.CustomInterceptor2"/>
         <interceptor after="org.infinispanpan.interceptors.CallInterceptor" class="com.mycompany.CustomInterceptor1"/>
      </custom-interceptors>
</local-cache>
Copy to Clipboard Toggle word wrap

18.2. 以编程方式添加自定义拦截器

为此,需要获取对 AdvancedCache 的引用。这可按以下方式完成:

CacheManager cm = getCacheManager();//magic
Cache aCache = cm.getCache("aName");
AdvancedCache advCache = aCache.getAdvancedCache();
Copy to Clipboard Toggle word wrap

然后,应该使用 addInterceptor () 方法来添加实际的拦截器。有关更多文档,请参阅 AdvancedCache javadoc。

18.3. 自定义拦截器设计

在编写自定义拦截器时,您需要通过以下规则进行 abide:

  • 自定义拦截器必须声明一个公共的空构造器才能启用构造。
  • 自定义拦截器将具有通过 XML 配置中使用的属性标签定义的任何属性的 setters。

法律通告

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat