3.6. 创建事件 Listeners


Java Hot Rod 客户端可以注册监听程序来接收 cache-entry 级别事件。支持创建、修改和删除的事件的缓存条目。

创建客户端监听程序与嵌入式监听程序非常相似,但使用不同的注解和事件类。以下是打印收到的每个事件的客户端监听程序示例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "static-converter")
public class EventPrintListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}
Copy to Clipboard Toggle word wrap

ClientCacheEntryCreatedEventClientCacheEntryModifiedEvent 实例提供有关受影响密钥的信息,以及条目的版本。此版本可用于在服务器上调用条件操作,如 replaceWithVersionremoveWithVersion

只有 remove 操作成功时才会发送 ClientCacheEntryRemovedEvent 事件。换句话说,如果调用删除操作但没有找到条目,或者不应删除任何条目,则不会生成事件。有兴趣删除的事件的用户(即使没有删除条目)可以开发事件自定义逻辑来生成此类事件。如需更多信息,请参阅自定义 客户端事件部分

所有 ClientCacheEntryCreatedEvent,ClientCacheEntryModifiedEventClientCacheEntryRemovedEvent 事件实例也会提供一个 布尔值 isCommandRetried () 方法,如果因为拓扑更改而需要再次重试的写命令返回 true。这可能是此事件已被重复或另一个事件已被丢弃并替换(例如: ClientCacheEntryModifiedEvent 替换 ClientCacheEntryCreatedEvent)的符号。

创建了客户端侦听器实施后,需要向服务器注册。要做到这一点,请执行:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener());
Copy to Clipboard Toggle word wrap

3.6.1. 删除事件 Listener

当不需要客户端事件监听程序时,可以删除它:

EventPrintListener listener = ...
cache.removeClientListener(listener);
Copy to Clipboard Toggle word wrap

3.6.2. 过滤事件

为了避免用事件取消客户端,用户可以提供过滤功能来限制服务器为特定客户端侦听器触发的事件数量。要启用过滤,需要创建一个缓存事件过滤器工厂来生成过滤器实例:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-filter")
public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory {

   @Override
   public StaticCacheEventFilter getFilter(Object[] params) {
      return new StaticCacheEventFilter();
   }
}


// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(1)) // static key
         return true;

      return false;
   }
}
Copy to Clipboard Toggle word wrap

上面定义的缓存事件过滤器工厂实例会创建过滤器实例,它们静态过滤掉除其键为 1 的所有条目。

为了能够使用此缓存事件过滤器工厂注册侦听器,必须赋予一个唯一的名称,并且 Hot Rod 服务器需要插入名称和缓存事件过滤器工厂实例。

  1. 创建包含过滤器实现的 JAR 文件。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory 文件,并在其中编写过滤器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将工厂名称添加到 @ClientListener 注释,将客户端监听程序链接到此缓存事件过滤器工厂:

    @ClientListener(filterFactoryName = "static-filter")
    public class EventPrintListener { ... }
    Copy to Clipboard Toggle word wrap
  5. 使用服务器注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new EventPrintListener());
    Copy to Clipboard Toggle word wrap

您还可以根据在监听器注册时提供的参数注册动态过滤器实例。过滤器使用过滤器工厂接收的参数启用这个选项,例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;

class DynamicCacheEventFilterFactory implements CacheEventFilterFactory {
   @Override
   public CacheEventFilter<Integer, String> getFilter(Object[] params) {
      return new DynamicCacheEventFilter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class DynamicCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   final Object[] params;

   DynamicCacheEventFilter(Object[] params) {
      this.params = params;
   }

   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(params[0])) // dynamic key
         return true;

      return false;
   }
}
Copy to Clipboard Toggle word wrap

在注册监听器时,提供了执行过滤所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), new Object[]{1}, null);
Copy to Clipboard Toggle word wrap
警告

当它们部署到集群中时,过滤实例必须可以被处理,以便过滤可以在生成事件的位置发生,即使即使即使在被注册了监听器的不同节点中也是如此。为了使其可以被编译,可以使它们扩展 SerializableExternalizable 或为它们提供自定义外部工具。

3.6.3. 跳过通知

在调用远程 API 方法来执行操作时,包括 SKIP_LISTENER_NOTIFICATION 标志,而无需从服务器获取事件通知。例如,要在创建或修改值时防止监听程序通知,请设置标志,如下所示:

remoteCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(1, "one");
Copy to Clipboard Toggle word wrap

3.6.4. 自定义事件

默认情况下生成的事件仅包含足够的信息,以便使事件相关,但可以避免产生太多的信息,以降低发送它们的成本。(可选)事件中提供的信息可以自定义,使其包含更多信息,如值,或者包含较少的信息。此自定义通过 CacheEventConverter Factory 生成的 CacheEventConverterFactory 实例进行:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-converter")
class StaticConverterFactory implements CacheEventConverterFactory {
   final CacheEventConverter<Integer, String, CustomEvent> staticConverter = new StaticCacheEventConverter();
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return staticConverter;
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return new CustomEvent(key, newValue);
   }
}

// Needs to be Serializable, Externalizable or marshallable with Infinispan Externalizers
// regardless of cluster or local caches
static class CustomEvent implements Serializable {
   final Integer key;
   final String value;
   CustomEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
   }
}
Copy to Clipboard Toggle word wrap

在上例中,转换器生成新的自定义事件,该事件包括值以及事件中的键。与默认事件相比,这会导致更大的事件有效负载,但如果与过滤结合使用,则可能会降低其网络带宽成本。

警告

转换程序的目标类型必须是 SerializableExternalizable。在这个特殊情况下,提供外部程序的转换器默认不起作用,因为默认的 Hot Rod 客户端 marshaller 不支持它们。

处理自定义事件需要略有不同的客户端监听程序实现。要更精确地处理 ClientCacheEntryCustomEvent 实例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class CustomEventPrintListener {

   @ClientCacheEntryCreated
   @ClientCacheEntryModified
   @ClientCacheEntryRemoved
   public void handleCustomEvent(ClientCacheEntryCustomEvent<CustomEvent> e) {
      System.out.println(e);
   }

}
Copy to Clipboard Toggle word wrap

回调中收到的 ClientCacheEntryCustomEvent 通过 getEventData 方法公开自定义事件,getType 方法提供了有关生成的事件的信息,这是缓存条目创建、修改或删除的结果。

与过滤类似,若要使用此转换器工厂注册监听程序,必须授予唯一的名称,并且 Hot Rod 服务器需要插入名称和缓存事件转换器工厂实例。

  1. 创建一个 JAR 文件,其中带有转换器实现。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将工厂名称添加到 @ClientListener 注释,将客户端监听程序与这个转换器工厂连接:

    @ClientListener(converterFactoryName = "static-converter")
    public class CustomEventPrintListener { ... }
    Copy to Clipboard Toggle word wrap
  5. 使用服务器注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new CustomEventPrintListener());
    Copy to Clipboard Toggle word wrap

也可以根据在侦听器注册时提供的参数转换的动态转换器实例。转换器使用转换器接收的参数启用此选项。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-converter")
class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}
Copy to Clipboard Toggle word wrap

在注册监听器时,提供了进行转换所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), null, new Object[]{1});
Copy to Clipboard Toggle word wrap
警告

当集群部署到集群中时,转换器实例必须可以被处理,因此即使事件被注册了监听程序的不同节点中也会发生转换。为了使其可以被编译,可以使它们扩展 SerializableExternalizable 或为它们提供自定义外部工具。

3.6.5. 过滤和自定义事件

如果要同时进行事件过滤和自定义,可以更轻松地实施 org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter,它允许过滤和自定义在一个步骤中进行。为方便起见,建议直接扩展 org.infinispan.notifications.cachelistener.filter.AbstractCacheEventFilterConverter,而不是直接实施 org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-filter-converter")
class DynamicCacheEventFilterConverterFactory implements CacheEventFilterConverterFactory {
   public CacheEventFilterConverter<Integer, String, CustomEvent> getFilterConverter(final Object[] params) {
      return new DynamicCacheEventFilterConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
//
class DynamicCacheEventFilterConverter extends AbstractCacheEventFilterConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventFilterConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent filterAndConvert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}
Copy to Clipboard Toggle word wrap

与过滤器和转换器类似,要能够使用组合的 filter/converter 工厂注册监听程序,工厂必须通过 @NamedFactory 注解指定唯一名称,并且 Hot Rod 服务器需要与名称和缓存事件转换器工厂实例插入。

  1. 创建一个 JAR 文件,其中带有转换器实现。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

从客户端的角度来看,要使用组合过滤器和转换器类,客户端监听程序必须定义相同的过滤器工厂和转换器工厂名称,例如:

@ClientListener(filterFactoryName = "dynamic-filter-converter", converterFactoryName = "dynamic-filter-converter")
public class CustomEventPrintListener { ... }
Copy to Clipboard Toggle word wrap

当监听器通过 filter 或 converter 参数注册时,会提供上例中的动态参数。如果过滤器参数是非空的,则会使用这些参数,否则会使用转换器参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new CustomEventPrintListener(), new Object[]{1}, null);
Copy to Clipboard Toggle word wrap

3.6.6. event Marshalling

热 Rod 服务器可以以不同的格式存储数据,但是尽管如此,Java Hot Rod 客户端用户仍然可以开发在键入的对象上运行的 CacheEventConverterCacheEventFilter 实例。默认情况下,过滤器和转换器将数据用作 POJO (application/x-java-object),但可以通过覆盖 filter/converter 中的方法 格式() 来覆盖所需的格式。如果格式返回 null,则过滤器/转换器将接收存储的数据。

热 Rod Java 客户端可以配置为使用不同的 org.infinispan.commons.marshall.Marshaller 实例。如果这样做和部署 CacheEventConverterCacheEventFilter 实例,则可以使用 Java 对象而不是 marshaller 显示过滤器/转换,服务器需要能够在对象和 marshaller 生成的二进制格式之间进行转换。

要部署 Marshaller 实例服务器端,请按照类似的方法部署 CacheEventConverterCacheEventFilter 实例:

  1. 创建一个 JAR 文件,其中带有转换器实现。
  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.commons.marshall.Marshaller 文件,写入 marshaller 类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

请注意,Marshaller 可以部署到单独的 jar 中,或者在与 CacheEventConverter 和/或 CacheEventFilter 实例相同的 jar 中进行部署。

3.6.6.1. 部署 Protostream Marshallers

如果缓存存储 Protobuf 内容,就像在 Hot Rod 客户端中使用 ProtoStream marshaller 时发生,则不需要部署自定义 marshaller,因为服务器已经支持格式:有 Protobuf 格式到最常见的格式,如 JSON 和 POJO。

在将过滤器/转换器与这些缓存搭配使用时,需要使用带有 Java 对象的 filter/converters 而不是二进制 Protobuf 数据时,需要配置额外的 ProtoStream marshallers,以便服务器可以在过滤/转换前处理数据。要做到这一点,您必须将所需的 SerializationContextInitializer (s) 配置为 Data Grid 服务器配置的一部分。

如需更多信息 ,请参阅缓存编码和 Marshalling

3.6.7. 侦听器状态处理

客户端侦听器注解具有可选的 includeCurrentState 属性,用于指定在添加监听程序时是否将状态发送到客户端,或者是监听器故障转移时。

默认情况下,includeCurrentState 为 false,但如果设置为 true,并且客户端监听程序添加到已包含数据的缓存中,服务器会迭代缓存内容,并将每个条目的事件作为 ClientCacheEntryCreated (如果配置)发送一个 ClientCacheEntryCreated (如果配置了自定义事件)。这允许客户端基于现有内容构建一些本地数据结构。迭代内容后,事件会正常接收,因为接收缓存更新。如果缓存被集群,则整个集群范围的内容都会迭代。

3.6.8. 侦听器故障处理

当 Hot Rod 客户端注册客户端监听程序时,它会在集群的单个节点中执行此操作。如果该节点失败,Java Hot Rod 客户端会检测到透明且在节点中注册的所有监听器失败。

在这种故障切换过程中,客户端可能会错过一些事件。为了避免缺少这些事件,client 侦听器注解包含一个名为 includeCurrentState 的可选参数,如果设为 true,则缓存内容可以迭代,并且生成 ClientCacheEntryCreated 事件(如果配置了自定义事件)。默认情况下,includeCurrentState 设置为 false。

使用回调来处理故障转移事件:

@ClientCacheFailover
public void handleFailover(ClientCacheFailoverEvent e) {
  ...
}
Copy to Clipboard Toggle word wrap

当客户端缓存了一些数据的用例中,这非常有用,因此,考虑到一些事件可能会丢失,它决定在收到事件失败时清除任何本地缓存的数据,了解事件故障转移后,它将收到整个缓存的内容的事件。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat