4.6. 创建事件 Listener


Java Hot Rod 客户端可以注册监听程序以接收缓存进入级别事件。支持创建、修改和删除的缓存条目。

创建客户端监听程序与嵌入式监听器非常相似,但使用了不同的注释和事件类。以下是打印接收的每个事件的客户端监听程序示例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "static-converter")
public class EventPrintListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}
Copy to Clipboard Toggle word wrap

ClientCacheEntryCreatedEventClientCacheEntryModifiedEvent 实例提供有关受影响密钥和条目版本的信息。此版本可用于调用服务器上的条件操作,如 replaceWithVersionremoveWithVersion

只有在删除操作成功时才会发送 ClientCacheEntryRemovedEvent 事件。换句话说,如果调用 remove 操作但未找到任何条目,则不会生成任何事件。对删除事件的用户(即使没有条目被删除),用户也可以开发事件自定义逻辑来生成这些事件。如需更多信息,请参阅自定义 客户端事件部分

所有 ClientCacheEntryCreatedEventClientCacheEntryModifiedEventClientCacheEntryRemovedEvent 事件实例也都 会提供一个布尔值为CommandRetried() 方法,如果导致这个使用了拓扑更改,则返回 true。这可能是因为此事件已被重复,或另一个事件已被替换(例如: ClientCacheEntryModifiedEvent 替换 ClientCacheEntryCreatedEvent)。

创建客户端侦听器实施后,需要将其注册到服务器。要做到这一点,请执行:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener());
Copy to Clipboard Toggle word wrap

4.6.1. 删除事件 Listener

当不再需要客户端事件监听程序时,可以删除它:

EventPrintListener listener = ...
cache.removeClientListener(listener);
Copy to Clipboard Toggle word wrap

4.6.2. 过滤事件

为了避免使用事件锁定客户端,用户可以提供过滤功能来限制服务器为特定客户端监听器触发的事件数量。要启用过滤,需要创建一个缓存事件过滤器工厂来生成过滤实例:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-filter")
public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory {

   @Override
   public StaticCacheEventFilter getFilter(Object[] params) {
      return new StaticCacheEventFilter();
   }
}


// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(1)) // static key
         return true;

      return false;
   }
}
Copy to Clipboard Toggle word wrap

以上定义的缓存事件过滤器工厂实例会创建过滤实例,这些实例静态过滤出所有条目,但其键为 1 除外。

要能够使用此缓存事件过滤器注册侦听器,需要为工厂提供唯一名称,Hot Rod 服务器需要插入名称以及缓存事件过滤器实例。

  1. 创建包含过滤器实施的 JAR 文件。

    如果缓存使用自定义键/值类,这些键/值类必须包含在 JAR 中,以便可以使用正确的未总结键和/或值实例来执行回调。如果客户端监听程序启用了 useRawData,则不需要它,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory 文件,并编写过滤器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将 factory 名称添加到 @ClientListener 注释,将客户端监听程序与此缓存事件过滤器连接:

    @ClientListener(filterFactoryName = "static-filter")
    public class EventPrintListener { ... }
    Copy to Clipboard Toggle word wrap
  5. 在服务器中注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new EventPrintListener());
    Copy to Clipboard Toggle word wrap

您还可以根据执行监听器注册时提供的参数,注册过滤的动态过滤器实例。过滤器使用过滤器工厂接收的参数启用这个选项,例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;

class DynamicCacheEventFilterFactory implements CacheEventFilterFactory {
   @Override
   public CacheEventFilter<Integer, String> getFilter(Object[] params) {
      return new DynamicCacheEventFilter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class DynamicCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   final Object[] params;

   DynamicCacheEventFilter(Object[] params) {
      this.params = params;
   }

   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(params[0])) // dynamic key
         return true;

      return false;
   }
}
Copy to Clipboard Toggle word wrap

注册监听程序时,会提供执行过滤所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), new Object[]{1}, null);
Copy to Clipboard Toggle word wrap
警告

当实例部署到集群中时,过滤实例必须可以被放入集群中,以便可以在事件生成的情况下进行过滤,即使即使是在注册侦听器的不同节点中也会生成事件。要使它们可以被扩展,请扩展 SerializableExternalizable,或为其提供自定义 外部化 器。

4.6.3. 跳过通知

在调用远程 API 方法以不从服务器获取事件通知的情况下,包含 SKIP_LISTENER_ Xmx 标志。例如,要在创建或修改值时防止监听程序通知,请按如下所示设置标记:

remoteCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(1, "one");
Copy to Clipboard Toggle word wrap

4.6.4. 自定义事件

默认生成的事件仅包含与事件相关的足够信息,但可以避免生成太多的信息,以减少发送它们的成本。(可选)事件中包含的信息可以自定义,使其包含更多信息,如值,或者包含更多的信息。此自定义通过 CacheEventConverter onnectionFactoryy 生成的 CacheEventConvertertal 实例完成:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-converter")
class StaticConverterFactory implements CacheEventConverterFactory {
   final CacheEventConverter<Integer, String, CustomEvent> staticConverter = new StaticCacheEventConverter();
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return staticConverter;
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return new CustomEvent(key, newValue);
   }
}

// Needs to be Serializable, Externalizable or marshallable with Infinispan Externalizers
// regardless of cluster or local caches
static class CustomEvent implements Serializable {
   final Integer key;
   final String value;
   CustomEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
   }
}
Copy to Clipboard Toggle word wrap

在上例中,converter 生成一个新的自定义事件,其中包含值以及事件中的键。这会导致大量事件有效负载与默认事件进行比较,但如果与过滤结合使用,它可以降低其网络带宽成本。

警告

转换器的目标类型必须是 SerializableExternalizable。在这种情况下,提供 Externalizer 时,默认将无法使用,因为默认的 Hot Rod 客户端 marshaller 不支持它们。

处理自定义事件需要稍有不同的客户端监听程序实施。要更精确,需要处理 ClientCacheEntryCustomEvent 实例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class CustomEventPrintListener {

   @ClientCacheEntryCreated
   @ClientCacheEntryModified
   @ClientCacheEntryRemoved
   public void handleCustomEvent(ClientCacheEntryCustomEvent<CustomEvent> e) {
      System.out.println(e);
   }

}
Copy to Clipboard Toggle word wrap

ClientCacheEntryCustomEvent 通过 getEventData 方法接收自定义事件,而 getType 方法则提供有关生成了缓存条目创建、修改或删除事件的信息。

与过滤类似,为了能使用此转换器工厂注册侦听器,需要赋予一个唯一名称,并且 Hot Rod 服务器需要用名称插入名称和缓存事件转换程序实例。

  1. 创建一个 JAR 文件,其中包含转换器实施。

    如果缓存使用自定义键/值类,这些键/值类必须包含在 JAR 中,以便可以使用正确的未总结键和/或值实例来执行回调。如果客户端监听程序启用了 useRawData,则不需要它,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将 factory 名称添加到 @ClientListener 注释,将客户端监听程序与此转换器工厂连接:

    @ClientListener(converterFactoryName = "static-converter")
    public class CustomEventPrintListener { ... }
    Copy to Clipboard Toggle word wrap
  5. 在服务器中注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new CustomEventPrintListener());
    Copy to Clipboard Toggle word wrap

也可以利用动态转换基于监听器提供的参数转换器实例。converters 使用转换器工厂接收的参数启用这个选项。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-converter")
class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}
Copy to Clipboard Toggle word wrap

注册监听器时,会提供进行转换所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), null, new Object[]{1});
Copy to Clipboard Toggle word wrap
警告

当转换器实例在集群中部署时,必须合并,以便可以正确生成事件,即使事件在注册了监听程序的不同节点中也会发生转换。要使它们可以被扩展,请扩展 SerializableExternalizable,或为其提供自定义 外部化 器。

4.6.5. 过滤和自定义事件

如果要进行事件过滤和自定义,则更容易实现 org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter,它可让您在单步中进行过滤和自定义。为方便起见,建议直接扩展 org.infinispan.notifications.cachelistener.filter.AbstractCacheEventFilterConverter 而非实施 org.infinispan.notifications.cachelistener.CacheEventFilterConverter。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-filter-converter")
class DynamicCacheEventFilterConverterFactory implements CacheEventFilterConverterFactory {
   public CacheEventFilterConverter<Integer, String, CustomEvent> getFilterConverter(final Object[] params) {
      return new DynamicCacheEventFilterConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
//
class DynamicCacheEventFilterConverter extends AbstractCacheEventFilterConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventFilterConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent filterAndConvert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}
Copy to Clipboard Toggle word wrap

与过滤器和转换器类似,为了能够使用此组合的过滤器/协调程序注册侦听器,该工厂必须通过 @NamedFactory 注释赋予一个唯一名称,并且 Hot Rod 服务器需要使用名称和缓存事件转换程序实例进行插入。

  1. 创建一个 JAR 文件,其中包含转换器实施。

    如果缓存使用自定义键/值类,这些键/值类必须包含在 JAR 中,以便可以使用正确的未总结键和/或值实例来执行回调。如果客户端监听程序启用了 useRawData,则不需要它,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter onnectionFactoryy 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

从客户端的角度来看,为了能够使用组合过滤器和转换器类,客户端监听程序必须定义相同的过滤器工厂和转换器名称,例如:

@ClientListener(filterFactoryName = "dynamic-filter-converter", converterFactoryName = "dynamic-filter-converter")
public class CustomEventPrintListener { ... }
Copy to Clipboard Toggle word wrap

当监听器通过过滤器或转换器参数注册时,会提供上例中的动态参数。如果 filter 参数不是空的,则使用那些参数,否则使用转换器参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new CustomEventPrintListener(), new Object[]{1}, null);
Copy to Clipboard Toggle word wrap

4.6.6. event Marshalling

热备份服务器可以以不同格式存储数据,但是在这方面,Java Hot Rod 客户端用户仍然可以开发在键入的对象上工作的 CacheEventConverterCacheEventFilter 实例。默认情况下,过滤器和转换器将使用数据作为 POJO(application/x-java-object),但可以通过覆盖 filter/converter 中的方法 format() 覆盖所需的格式。如果格式返回 null,则 filter/converter 将接收数据,因为它存储在其中。

hot Rod Java 客户端可以配置为使用不同的 org.infinispan.commons.marshall.Marshaller 实例。如果这样做并部署 CacheEventConverterCacheEventFilter 实例,若要通过 Java 对象提供过滤器/聚合内容,服务器需要能够转换对象和 marshaller 生成的二进制格式。

要部署 Marshaller 实例服务器端,请按照类似方法来部署 CacheEventConverterCacheEventFilter 实例:

  1. 创建一个 JAR 文件,其中包含转换器实施。
  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.commons.marshall.Marshaller 文件,并编写 marshaller 类实施的完全限定域名。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

请注意,Marshaller 可以部署到单独的 jar 中,也可以部署到与 CacheEventConverter 和/or CacheEventFilter 实例相同的 jar 中。

4.6.6.1. 部署 Protostream Marshallers

如果缓存存储了 Protobuf 内容,因为当在 Hot Rod 客户端中使用 ProtoStream marshaller 时,不需要部署自定义摘要,因为该格式已经被服务器支持:可以使用 Protobuf 格式将其转换为最常用的格式。

将过滤器/聚合与这些缓存搭配使用时,最好将过滤器/聚合与 Java 对象一起使用,而是对二进制过程的数据一起使用,需要配置额外的 ProtoStream marshallers,以便服务器在过滤/聚合前无法排解数据。要做到这一点,您必须配置所需的 SerializationContextInitializer,作为 Data Grid 服务器配置的一部分。

如需更多信息 ,请参阅缓存编码和 Marshalling

4.6.7. 侦听器状态处理

客户端监听程序注解有一个可选的 includeCurrentState 属性,用于指定在添加监听程序时或当监听器故障转移时,状态将发送到客户端。

默认情况下,includeCurrentState 为 false,但是如果设为 true,并且已在缓存中添加客户端监听程序,服务器会迭代缓存内容,并将每个条目的事件作为 ClientCacheEntryCreated (或者配置了自定义事件)。这使得客户端能够基于现有内容构建一些本地数据结构。迭代内容后,会正常收到事件,因为收到缓存更新。如果集群缓存,则会迭代整个集群范围的内容。

4.6.8. 监听器故障处理

当 Hot Rod 客户端注册客户端侦听器时,它能在集群中的单个节点中执行此操作。如果该节点失败,Java Hot Rod 客户端检测到,通过注册到另一节点的所有监听程序来透明地失败。

在这种故障期间,客户端故障转移可能会丢失一些事件。为了避免缺少这些事件,客户端监听程序注解包含名为 includeCurrentState 的可选参数,如果出现故障转移时,缓存内容可以迭代和 ClientCacheEntryCreated 事件(或者配置了自定义事件)。默认情况下,includeCurrentState 设置为 false。

使用回调处理故障切换事件:

@ClientCacheFailover
public void handleFailover(ClientCacheFailoverEvent e) {
  ...
}
Copy to Clipboard Toggle word wrap

当客户端缓存了一些数据的情况中,当客户端缓存了一些数据的情况中,在出现问题出现问题时,要考虑到一些事件可能会丢失,它可能会决定在收到失败的事件时清除所有本地缓存的数据,因为知识在事件过后,它将接收整个缓存的内容的事件。

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部