Chapter 10. The Notification/Listener API
10.1. The Notification/Listener API
Red Hat JBoss Data Grid provides a listener API
that provides notifications for events as they occur. Clients can choose to register with the listener API
for relevant notifications. This annotation-driven API
operates on cache-level events and cache manager-level events.
10.2. Listener Example
The following example defines a listener in Red Hat JBoss Data Grid that prints some information each time a new entry is added to the cache:
Configuring a Listener
@Listener public class PrintWhenAdded { @CacheEntryCreated public void print(CacheEntryCreatedEvent event) { System.out.println("New entry " + event.getKey() + " created in the cache"); } }
10.3. Listener Notifications
10.3.1. Listener Notifications
Each cache event triggers a notification that is dispatched to listeners. A listener is a simple POJO
annotated with @Listener
. A Listenable is an interface that denotes that the implementation can have listeners attached to it. Each listener is registered using methods defined in the Listenable.
A listener can be attached to both the cache and Cache Manager to allow them to receive cache-level or cache manager-level notifications.
10.3.2. About Cache-level Notifications
In Red Hat JBoss Data Grid, cache-level events occur on a per-cache basis. Examples of cache-level events include the addition, removal and modification of entries, which trigger notifications to listeners registered on the relevant cache.
10.3.3. Cache Manager-level Notifications
Examples of events that occur in Red Hat JBoss Data Grid at the cache manager-level are:
- The starting and stopping of caches
- Nodes joining or leaving a cluster;
Cache manager-level events are located globally and used cluster-wide, but are restricted to events within caches created by a single cache manager.
The first two types of events, CacheStarted
and CacheStopped
are highly similar, and the following example demonstrates printing out the name of the cache that has started or stopped:
@CacheStarted public void cacheStarted(CacheStartedEvent event){ // Print the name of the Cache that started log.info("Cache Started: " + event.getCacheName()); } @CacheStopped public void cacheStopped(CacheStoppedEvent event){ // Print the name of the Cache that stopped log.info("Cache Stopped: " + event.getCacheName()); }
When receiving a ViewChangedEvent
or MergeEvent
note that the list of old and new members is from the node that generated the event. For instance, consider the following scenario:
- A JDG Cluster currently consists of nodes A, B, and C.
- Node D joins the cluster.
-
Nodes A, B, and C will receive a
ViewChangedEvent
with [A,B,C] as the list of old members, and [A,B,C,D] as the list of new members. -
Node D will receive a
ViewChangedEvent
with [D] as the list of old members, and [A,B,C,D] as the list of new members.
Therefore, a set intersection may be used to determine if a node has recently joined or left a cluster. By using getOldMembers()
in conjunction with getNewMembers()
, we may determine the set of nodes that have joined or left the cluster, as seen below:
@ViewChanged public void viewChanged(ViewChangedEvent event){ HashSet<Address> oldMembers = new HashSet(event.getOldMembers()); HashSet<Address> newMembers = new HashSet(event.getNewMembers()); HashSet<Address> oldCopy = (HashSet<Address>)oldMembers.clone(); // Remove all new nodes from the old view. // The resulting set indicates nodes that have left the cluster. oldCopy.removeAll(newMembers); if(oldCopy.size() > 0){ for (Address oldAdd : oldCopy){ log.info("Node left:" + oldAdd.toString()); } } // Remove all old nodes from the new view. // The resulting set indicates nodes that have joined the cluster. newMembers.removeAll(oldMembers); if(newMembers.size() > 0){ for(Address newAdd : newMembers){ log.info("Node joined: " + newAdd.toString()); } } }
Similar logic may be used during a MergeEvent
to determine the new set of members in the cluster.
10.3.4. About Synchronous and Asynchronous Notifications
By default, notifications in Red Hat JBoss Data Grid are dispatched in the same thread that generates the event. Therefore the listener must be written in a way that does not block or prevent the thread’s progression.
Alternatively, the listener can be annotated as asynchronous, which dispatches notifications in a separate thread and prevents blocking the operations of the original thread.
Annotate listeners using the following:
@Listener (sync = false) public class MyAsyncListener { .... }
Use the asyncListenerExecutor
element in the XML configuration file to tune the thread pool that is used to dispatch asynchronous notifications.
When using a synchronous, non-clustered listener that handles the CacheEntryExpiredEvent
ensure that this listener does not block execution, as the expiration reaper is also synchronous in a non-clustered environment.
10.4. Modifying Cache Entries
10.4.1. Modifying Cache Entries
After the cache entry has been created, the cache entry can be modified programmatically.
10.4.2. Cache Entry Modified Listener Configuration
In a cache entry modified listener event, The getValue()
method’s behavior is specific to whether the callback is triggered before or after the actual operation has been performed. For example, if event.isPre() is true, then event.getValue() would return the old value, prior to modification. If event.isPre() is false, then event.getValue() would return new value. If the event is creating and inserting a new entry, the old value would be null. For more information about isPre()
, see the Red Hat JBoss Data Grid API Documentation's listing for the org.infinispan.notifications.cachelistener.event
package.
Listeners can only be configured programmatically by using the methods exposed by the Listenable
and FilteringListenable
interfaces (which the Cache object implements).
10.4.3. Cache Entry Modified Listener Example
The following example defines a listener in Red Hat JBoss Data Grid that prints some information each time a cache entry is modified:
Modified Listener
@Listener public class PrintWhenModified { @CacheEntryModified public void print(CacheEntryModifiedEvent event) { System.out.println("Cache entry modified. Details = " + event); } }
10.5. Clustered Listeners
10.5.1. Clustered Listeners
Clustered listeners allow listeners to be used in a distributed cache configuration. In a distributed cache environment, registered local listeners are only notified of events that are local to the node where the event has occurred. Clustered listeners resolve this issue by allowing a single listener to receive any write notification that occurs in the cluster, regardless of where the event occurred. As a result, clustered listeners perform slower than non-clustered listeners, which only provide event notifications for the node on which the event occurs.
When using clustered listeners, client applications are notified when an entry is added, updated, expired, or deleted in a particular cache. The event is cluster-wide so that client applications can access the event regardless of the node on which the application resides or connects with.
The event will always be triggered on the node where the listener was registered, while disregarding where the cache update originated.
10.5.2. Configuring Clustered Listeners
In the following use case, listener stores events as it receives them.
Procedure: Clustered Listener Configuration
@Listener(clustered = true) protected static class ClusterListener { List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>()); @CacheEntryCreated @CacheEntryModified @CacheEntryExpired @CacheEntryRemoved public void onCacheEvent(CacheEntryEvent event) { log.debugf("Adding new cluster event %s", event); events.add(event); } } public void addClusterListener(Cache<?, ?> cache) { ClusterListener clusterListener = new ClusterListener(); cache.addListener(clusterListener); }
-
Clustered listeners are enabled by annotating the
@Listener
class withclustered=true
. The following methods are annotated to allow client applications to be notified when entries are added, modified, expired, or removed.
-
@CacheEntryCreated
-
@CacheEntryModified
-
@CacheEntryExpired
-
@CacheEntryRemoved
-
- The listener is registered with a cache, with the option of passing on a filter or converter.
The following limitations occur when using clustered listeners, that do not apply to non-clustered listeners:
- A cluster listener can only listen to entries that are created, modified, expired, or removed. No other events are listened to by a clustered listener.
- Only post events are sent to a clustered listener, pre events are ignored.
10.5.3. The Cache Listener API
Clustered listeners can be added on top of the existing @CacheListener API
via the addListener
method.
The Cache Listener API
cache.addListener(Object listener, Filter filter, Converter converter);
public @interface Listener { boolean clustered() default false; boolean includeCurrentState() default false; boolean sync() default true; }
interface CacheEventFilter<K,V> { public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); }
interface CacheEventConverter<K,V,C> { public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); }
- The Cache API
The local or clustered listener can be registered with the
cache.addListener
method, and is active until one of the following events occur.-
The listener is explicitly unregistered by invoking
cache.removeListener
. - The node on which the listener was registered crashes.
-
The listener is explicitly unregistered by invoking
- Listener Annotation
The listener annotation is enhanced with three attributes:
-
clustered()
:This attribute defines whether the annotated listener is clustered or not. Note that clustered listeners can only be notified for@CacheEntryRemoved
,@CacheEntryCreated
,@CacheEntryExpired
, and@CacheEntryModified
events. This attribute is false by default. -
includeCurrentState()
: This attribute applies to clustered listeners only, and is false by default. When set totrue
, the entire existing state within the cluster is evaluated. When being registered, a listener will immediately be sent aCacheCreatedEvent
for every entry in the cache. -
Refer to About Synchronous and Asynchronous Notifications for information regarding
sync()
.
-
oldValue
andoldMetadata
-
The
oldValue
andoldMetadata
values are extra methods on the accept method ofCacheEventFilter
andCacheEventConverter
classes. They values are provided to any listener, including local listeners. For more information about these values, see the JBoss Data Grid API Documentation . EventType
-
The
EventType
includes the type of event, whether it was a retry, and if it was a pre or post event.
When using clustered listeners, the order in which the cache is updated is reflected in the sequence of notifications received.
The clustered listener does not guarantee that an event is sent only once. The listener implementation must be idempotent in order to prevent situations where the same event is sent more than once. Implementors can expect singularity to be honored for stable clusters and outside of the time span in which synthetic events are generated as a result of includeCurrentState
.
10.5.4. Clustered Listener Example
The following use case demonstrates a listener that wants to know when orders are generated that have a destination of New York, NY. The listener requires a Filter that filters all orders that come in and out of New York. The listener also requires a Converter as it does not require the entire order, only the date it is to be delivered.
Use Case: Filtering and Converting the New York orders
class CityStateFilter implements CacheEventFilter<String, Order> { private String state; private String city; public boolean accept(String orderId, Order oldOrder, Metadata oldMetadata, Order newOrder, Metadata newMetadata, EventType eventType) { switch (eventType.getType()) { // Only send update if the order is going to our city case CACHE_ENTRY_CREATED: return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); // Only send update if our order has changed from our city to elsewhere or if is now going to our city case CACHE_ENTRY_MODIFIED: if (city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState())) { // If old city matches then we have to compare if new order is no longer going to our city return !city.equals(newOrder.getCity()) || !state.equals(newOrder.getState()); } else { // If the old city doesn't match ours then only send update if new update does match ours return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); } // On remove we have to send update if our order was originally going to city case CACHE_ENTRY_REMOVED: return city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState()); } return false; } } class OrderDateConverter implements CacheEventConverter<String, Order, Date> { private String state; private String city; public Date convert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // If remove we do not care about date - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
10.5.5. Optimized Cache Filter Converter
The example provided in Clustered Listener Example could use the optimized CacheEventFilterConverter
, in order to perform the filtering and converting of results into one step.
The CacheEventFilterConverter
is an optimization that allows the event filter and conversion to be performed in one step. This can be used when an event filter and converter are most efficiently used as the same object, composing the filtering and conversion in the same method. This can only be used in situations where your conversion will not return a null value, as a returned value of null indicates that the value did not pass the filter. To convert a null value, use the CacheEventFilter
and the CacheEventConverter
interfaces independently.
The following is an example of the New York orders use case using the CacheEventFilterConverter
:
CacheEventFilterConverter
class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> { private final String state; private final String city; public Date filterAndConvert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // Remove if the date is not required - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
When registering the listener, provide the FilterConverter
as both arguments to the filter and converter:
OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York"); cache.addListener(listener, filterConveter, filterConverter);
10.6. Remote Event Listeners (Hot Rod)
10.6.1. Remote Event Listeners (Hot Rod)
Event listeners allow Red Hat JBoss Data Grid Hot Rod servers to be able to notify remote clients of events such as CacheEntryCreated
, CacheEntryModified
, CacheEntryExpired
and CacheEntryRemoved
. Clients can choose whether or not to listen to these events to avoid flooding connected clients. This assumes that clients maintain persistent connections to the servers.
Client listeners for remote events can be added similarly to clustered listeners in library mode. The following example demonstrates a remote client listener that prints out each event it receives.
Event Print Listener
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) { System.out.println(e); } @ClientCacheEntryModified public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) { System.out.println(e); } @ClientCacheEntryExpired public void handleExpiredEvent(ClientCacheEntryExpiredEvent e) { System.out.println(e); } @ClientCacheEntryRemoved public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) { System.out.println(e); } }
-
ClientCacheEntryCreatedEvent
andClientCacheEntryModifiedEvent
instances provide information on the key and version of the entry. This version can be used to invoke conditional operations on the server, such areplaceWithVersion
orremoveWithVersion
. -
ClientCacheEntryExpiredEvent
events are sent when either aget()
is called on an expired entry, or when the expiration reaper detects that an entry has expired. Once the entry has expired the cache will nullify the entry, and adjust its size appropriately; however, the event will only be generated in the two scenarios listed. -
ClientCacheEntryRemovedEvent
events are only sent when the remove operation succeeds. If a remove operation is invoked and no entry is found or there are no entries to remove, no event is generated. If users require remove events regardless of whether or not they are successful, a customized event logic can be created. -
All client cache entry created, modified, and removed events provide a
boolean isCommandRetried()
method that will returntrue
if the write command that caused it has to be retried due to a topology change. This indicates that the event has been duplicated or that another event was dropped and replaced, such as where a Modified event replaced a Created event.
If the expected workload favors writes over reads it will be necessary to filter the events sent to prevent a large amount of excessive traffic being generated which may cause issues on either the client or the network. For more details on filtering events refer to .
10.6.2. Adding and Removing Event Listeners
Registering an Event Listener with the Server
The following example registers the Event Print Listener with the server. See Event Print Listener .
Adding an Event Listener
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener());
Removing a Client Event Listener
A client event listener can be removed as follows
EventLogListener listener = ... cache.removeClientListener(listener);
10.6.3. Remote Event Client Listener Example
The following procedure demonstrates the steps required to configure a remote client listener to interact with the remote cache via Hot Rod.
Configuring Remote Event Listeners
Download the Red Hat JBoss Data Grid distribution from the Red Hat Customer Portal
The latest JBoss Data Grid distribution includes the Hot Rod server with which the client will communicate.
Start the server
Start the JBoss Data Grid server by using the following command from the root of the server.
$ ./bin/standalone.sh
Write the application to interact with the Hot Rod server
Maven Users
Create an application with the following dependency and change the version to
8.5.3.Final-redhat-00002
or later:<properties> <infinispan.version>8.5.3.Final-redhat-00002</infinispan.version> </properties> [...] <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-remote</artifactId> <version>${infinispan.version}</version> </dependency>
- Non-Maven users, adjust according to your chosen build tool or download the distribution containing all JBoss Data Grid jars.
Write the client application
The following demonstrates a simple remote event listener that logs all events received.
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientEvent event) { System.out.println(event); } }
Use the remote event listener to execute operations against the remote cache
The following example demonstrates a simple main java class, which adds the remote event listener and executes some operations against the remote cache.
RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); EventLogListener listener = new EventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
Result
Once executed, the console output should appear similar to the following:
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryRemovedEvent(key=1)
The output indicates that by default, events come with the key and the internal data version associated with current value. The actual value is not sent back to the client for performance reasons. Receiving remote events has a performance impact, which is increased with cache size, as more operations are executed. To avoid inundating Hot Rod clients, filter remote events on the server side, or customize the event contents.
10.6.4. Filtering Remote Events
10.6.4.1. Filtering Remote Events
To prevent clients being inundated with events, Red Hat JBoss Data Grid Hot Rod remote events can be filtered by providing key/value filter factories that create instances that filter which events are sent to clients, and how these filters can act on client provided information.
Sending events to remote clients has a performance cost, which increases with the number of clients with registered remote listeners. The performance impact also increases with the number of modifications that are executed against the cache.
The performance cost can be reduced by filtering the events being sent on the server side. Custom code can be used to exclude certain events from being broadcast to the remote clients to improve performance.
Filtering can be based on either key or value information, or based on cache entry metadata. To enable filtering, a cache event filter factory that produces filter instances must be created. The following is a sample implementation that filters key “2” out of the events sent to clients.
KeyValueFilter
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !"2".equals(key); } } }
In order to register a listener with this key value filter factory, the factory must be given a unique name, and the Hot Rod server must be plugged with the name and the cache event filter factory instance.
10.6.4.2. Custom Filters for Remote Events
Custom filters can improve performance by excluding certain event information from being broadcast to the remote clients.
To plug the JBoss Data Grid Server with a custom filter use the following procedure:
Using a Custom Filter
-
Create a JAR file with the filter implementation within it. Each factory must have a name assigned to it via the
org.infinispan.filter.NamedFactory
annotation. The example uses aKeyValueFilterFactory
. - Create a META-INF/services/org.infinispan.notifications.cachelistener.filter. CacheEventFilterFactory file within the JAR file, and within it write the fully qualified class name of the filter class implementation.
Deploy the JAR file in the JBoss Data Grid Server by performing any of the following options:
Option 1: Deploy the JAR through the deployment scanner
-
Copy the JAR to the
$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
-
Copy the JAR to the
Option 2: Deploy the JAR through the CLI
Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Option 3: Deploy the JAR as a custom module
Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
The jar containing the Custom Filter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Filter:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
Once the server is plugged with the filter, add a remote client listener that will use the filter. The following example extends the EventLogListener implementation provided in Remote Event Client Listener Example (See Remote Event Client Listener Example), and overrides the @ClientListener
annotation to indicate the filter factory to use with the listener.
Add Filter Factory to the Listener
@org.infinispan.client.hotrod.annotation.ClientListener(filterFactoryName = "basic-filter-factory") public class BasicFilteredEventLogListener extends EventLogListener {}
The listener can now be added via the RemoteCacheAPI. The following example demonstrates this, and executes some operations against the remote cache.
Register the Listener with the Server
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener); cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
The system output shows that the client receives events for all keys except those that have been filtered.
Result
The following demonstrates the resulting system output from the provided example.
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=3,dataVersion=5) ClientCacheEntryModifiedEvent(key=3,dataVersion=6) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=3)
Filter instances must be marshallable when they are deployed in a cluster in order for filtering to occur where the event is generated, even if the event is generated in a different node to where the listener is registered. To make them marshallable, either make them extend Serializable, Externalizable, or provide a custom Externalizer.
10.6.4.3. Enhanced Filter Factories
When adding client listeners, users can provide parameters to the filter factory in order to generate different filter instances with different behaviors from a single filter factory based on client-side information.
The following configuration demonstrates how to enhance the filter factory so that it can filter dynamically based on the key provided when adding the listener, rather than filtering on a statically given key.
Configuring an Enhanced Filter Factory
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(params); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { private final Object[] params; public BasicKeyValueFilter(Object[] params) { this.params = params; } @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !params[0].equals(key); } } }
The filter can now filter by “3” instead of “2”:
Running an Enhanced Filter Factory
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener, new Object[]{3}, null); // <- Filter parameter passed cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
Result
The provided example results in the following output:
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=2,dataVersion=3) ClientCacheEntryModifiedEvent(key=2,dataVersion=4) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=2)
The amount of information sent to clients can be further reduced or increased by customizing remote events.
10.6.5. Customizing Remote Events
10.6.5.1. Customizing Remote Events
In Red Hat JBoss Data Grid, Hot Rod remote events can be customized to contain the information required to be sent to a client. By default, events contain only a basic set of information, such as a key and type of event, in order to avoid overloading the client, and to reduce the cost of sending them.
The information included in these events can be customized to contain more information, such as values, or contain even less information. Customization is done via CacheEventConverter
instances, which are created by implementing a CacheEventConverterFactory
class. Each factory must have a name associated to it via the @NamedFactory
annotation.
To plug the Red Hat JBoss Data Grid Server with an event converter use the following procedure:
Using a Converter
-
Create a JAR file with the converter implementation within it. Each factory must have a name assigned to it via the
org.infinispan.filter.NamedFactory
annotation. - Create a META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory file within the JAR file and within it, write the fully qualified class name of the converter class implementation.
Deploy the JAR file in the Red Hat JBoss Data Grid Server by performing any of the following options:
Option 1: Deploy the JAR through the deployment scanner
-
Copy the JAR to the
$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
-
Copy the JAR to the
Option 2: Deploy the JAR through the CLI
Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Option 3: Deploy the JAR as a custom module
Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
The jar containing the Custom Converter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Converter:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
Converters can also act on client provided information, allowing converter instances to customize events based on the information provided when the listener was added. The API allows converter parameters to be passed in when the listener is added.
10.6.5.2. Adding a Converter
When a listener is added, the name of a converter factory can be provided to use with the listener. When the listener is added, the server looks up the factory and invokes the getConverter
method to get a org.infinispan.filter.Converter
class instance to customize events server side.
The following example demonstrates sending custom events containing value information to remote clients for a cache of Integers and Strings. The converter generates a new custom event, which includes the value as well as the key in the event. The custom event has a bigger event payload compared with default events, however if combined with filtering, it can reduce bandwidth cost.
Sending Custom Events
import org.infinispan.notifications.cachelistener.filter.*; @NamedFactory(name = "value-added-converter-factory") class ValueAddedConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, ValueAddedEvent> getConverter(final Object[] params) { return new ValueAddedConverter(); } static class ValueAddedConverter implements CacheEventConverter<Integer, String, ValueAddedEvent> { public ValueAddedEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return new ValueAddedEvent(key, newValue); } } } // Must be Serializable or Externalizable. class ValueAddedEvent implements Serializable { final Integer key; final String value; ValueAddedEvent(Integer key, String value) { this.key = key; this.value = value; } }
10.6.5.3. Lightweight Events
Other converter implementations are able to send back events that contain no key or event type information, resulting in extremely lightweight events at the expense of having rich information provided by the event.
In order to plug the server with this converter, deploy the converter factory and associated converter class within a JAR file including a service definition inside the META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory file as follows:
sample.ValueAddedConverterFactor
The client listener must then be linked with the converter factory by adding the factory name to the @ClientListener
annotation.
@ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { ... }
10.6.5.4. Dynamic Converter Instances
Dynamic converter instances convert based on parameters provided when the listener is registered. Converters use the parameters received by the converter factories to enable this option. For example:
Dynamic Converter
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; class DynamicCacheEventConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) { return new DynamicCacheEventConverter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable { final Object[] params; DynamicCacheEventConverter(Object[] params) { this.params = params; } public CustomEvent convert(Integer key, String oldValue, Metadata metadata, String newValue, Metadata prevMetadata, EventType eventType) { // If the key matches a key given via parameter, only send the key information if (params[0].equals(key)) return new ValueAddedEvent(key, null); return new ValueAddedEvent(key, newValue); } }
The dynamic parameters required to do the conversion are provided when the listener is registered:
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener(), null, new Object[]{1});
10.6.5.5. Adding a Remote Client Listener for Custom Events
Implementing a listener for custom events is slightly different to other remote events, as they involve non-default events. The same annotations are used as in other remote client listener implementations, but the callbacks receive instances of ClientCacheEntryCustomEvent<T>
, where T
is the type of custom event we are sending from the server. For example:
Custom Event Listener Implementation
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientCacheEntryCustomEvent<ValueAddedEvent> event) { System.out.println(event); } }
To use the remote event listener to execute operations against the remote cache, write a simple main Java class, which adds the remote event listener and executes some operations against the remote cache. For example:
Execute Operations against the Remote Cache
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); CustomEventLogListener listener = new CustomEventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
Result
Once executed, the console output should appear similar to the following:
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='one'}, eventType=CLIENT_CACHE_ENTRY_CREATED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='new-one'}, eventType=CLIENT_CACHE_ENTRY_MODIFIED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='null'}, eventType=CLIENT_CACHE_ENTRY_REMOVED
Converter instances must be marshallable when they are deployed in a cluster in order for conversion to occur where the event is generated, even if the event is generated in a different node to where the listener is registered. To make them marshallable, either make them extend Serializable, Externalizable, or provide a custom Externalizer for them. Both client and server need to be aware of any custom event type and be able to marshall it in order to facilitate both server and client writing against type safe APIs. On the client side, this is done by an optional marshaller configurable via the RemoteCacheManager. On the server side, this is done by a marshaller added to the Hot Rod server configuration.
10.6.6. Event Marshalling
When filtering or customizing events, the KeyValueFilter
and Converter
instances must be marshallable. As the client listener is installed in a cluster, the filter and/or converter instances are sent to other nodes in the cluster in order for filtering and conversion to occur where the event originates, improving efficiency. These classes can be made marshallable by having them extend Serializable or by providing and registering a custom Externalizer.
To deploy a Marshaller instance server-side, use a similar method to that used for filtering and customized events.
Deploying a Marshaller
-
Create a JAR file with the converter implementation within it. Each factory must have a name assigned to it via the
org.infinispan.filter.NamedFactory
annotation. - Create a META-INF/services/org.infinispan.commons.marshall.Marshaller file within the JAR file and within it, write the fully qualified class name of the marshaller class implementation
Deploy the JAR file in the Red Hat JBoss Data Grid by performing any of the following options:
Option 1: Deploy the JAR through the deployment scanner
-
Copy the JAR to the
$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
-
Copy the JAR to the
Option 2: Deploy the JAR through the CLI
Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Option 3: Deploy the JAR as a custom module
Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
The jar containing the Custom Marshaller must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Marshaller:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
The Marshaller can be deployed either in a separate jar, or in the same jar as the CacheEventConverter, and/or CacheEventFilter instances.
Only the deployment of a single Marshaller instance is supported. If multiple marshaller instances are deployed, warning messages will be displayed as a reminder indicating which marshaller instance will be used.
10.6.7. Remote Event Clustering and Failover
When a client adds a remote listener, it is installed in a single node in the cluster, which is in charge of sending events back to the client for all affected operations that occur cluster-wide.
In a clustered environment, when the node containing the listener goes down, the Hot Rod client implementation transparently fails over the client listener registration to a different node. This may result in a gap in event consumption, which can be solved using one of the following solutions.
State Delivery
The @ClientListener
annotation has an optional includeCurrentState
parameter, which when enabled, has the server send CacheEntryCreatedEvent
event instances for all existing cache entries to the client. As this behavior is driven by the client it detects when the node where the listener is registered goes offline and automatically registers the listener on another node in the cluster. By enabling includeCurrentState
clients may recompute their state or computation in the event the Hot Rod client transparently fails over registered listeners. The performance of the includeCurrentState
parameter is impacted by the cache size, and therefore it is disabled by default.
@ClientCacheFailover
Rather than relying on receiving state, users can define a method with the @ClientCacheFailover
annotation, which receives ClientCacheFailoverEvent
parameter inside the client listener implementation. If the node where a Hot Rod client has registered a client listener fails, the Hot Rod client detects it transparently, and fails over all listeners registered in the node that failed to another node.
During this failover, the client may miss some events. To avoid this, the includeCurrentState
parameter can be set to true. With this enabled a client is able to clear its data, receive all of the CacheEntryCreatedEvent
instances, and cache these events with all keys. Alternatively, Hot Rod clients can be made aware of failover events by adding a callback handler. This callback method is an efficient solution to handling cluster topology changes affecting client listeners, and allows the client listener to determine how to behave on a failover. Near Caching takes this approach and clears the near cache upon receiving a ClientCacheFailoverEvent
.
@ClientCacheFailover
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { // ... @ClientCacheFailover public void handleFailover(ClientCacheFailoverEvent e) { // Deal with client failover, e.g. clear a near cache. } }
The ClientCacheFailoverEvent
is only thrown when the node that has the client listener installed fails.