5.5. Continuous Queries


Continuous Querying allows an application to receive the entries that currently match a query, and be continuously notified of any changes to the queried data set. This includes both incoming matches, for values that have joined the set, and outgoing matches, for values that have left the set, that resulted from further cache operations. By using a Continuous Query the application receives a steady stream of events instead of repeatedly executing the same query to look for changes, resulting in a more efficient use of resources.
For instance, all of the following use cases could utilize Continuous Queries:
  1. Return all persons with an age between 18 and 25 (assuming the Person entity has an age property and is updated by the user application).
  2. Return all transactions higher than $2000.
  3. Return all times where the lap speed of F1 racers were less than 1:45.00s (assuming the cache contains Lap entries and that laps are entered live during the race).

5.5.1. Continuous Query Evaluation

A Continuous Query uses a listener that receives a notification when:
  • An entry starts matching the specified query, represented by a Join event.
  • An entry stops matching the specified query, represented by a Leave event.
When a client registers a Continuous Query Listener it immediately begins to receive the results currently matching the query, received as Join events as described above. In addition, it will receive subsequent notifications when other entries begin matching the query, as Join events, or stop matching the query, as Leave events, as a consequence of any cache operations that would normally generate creation, modification, removal, or expiration events.
To determine if the listener receives a Join or Leave event the following logic is used:
  1. If the query on both the old and new values evaluate false, then the event is suppressed.
  2. If the query on both the old and new values evaluate true, then the event is suppressed.
  3. If the query on the old value evaluates false and the query on the new value evaluates true, then a Join event is sent.
  4. If the query on the old value evaluates true and the query on the new value evaluates false, then a Leave event is sent.
  5. If the query on the old value evaluates true and the entry is removed, then a Leave event is sent.

Note

Continuous Queries cannot use grouping, aggregation, or sorting operations.

5.5.2. Using Continuous Queries in Library Mode

Adding Continuous Queries

To create a Continuous Query the Query will be defined similar to other querying methods; however, ensure that the query is defined as a org.infinispan.query.continuous.ContinuousQuery and a org.infinispan.query.continuous.ContinuousQueryListener is in use.

Once the ContinuousQueryListener has been defined it may be added by using the addContinuousQueryListener method of ContinuousQuery:
continuousQuery.addContinuousQueryListener(query, listener)
The following example demonstrates a simple method of implementing and adding a ContinuousQuery in Library mode:

Example 5.29. Defining and Adding a Continuous Query

import org.infinispan.query.continuous.ContinuousQuery;
import org.infinispan.query.continuous.ContinuousQueryListener;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.Query;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

[...]

// To begin we create a ContinuousQuery instance on the cache
ContinuousQuery<Integer, Person> continuousQuery = new ContinuousQuery<Integer, Person>(cache);

// Define our query. In this case we will be looking for any
// Person instances under 21 years of age.
QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class)
    .having("age").lt(21)
    .toBuilder().build();

final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>();

// Define the ContinuousQueryListener
ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() {
    @Override
    public void resultJoining(Integer key, Person value) {
        matches.put(key, value);
    }

    @Override
    public void resultLeaving(Integer key) {
        matches.remove(key);
    }
};

// Add the listener and generated query
continuousQuery.addContinuousQueryListener(query, listener);

[...]

// Remove the listener to stop receiving notifications
continuousQuery.removeContinuousQueryListener(listener);
As Person instances are added to the cache that contain an Age less than 21 they will be placed into matches, and when these entries are removed from the cache they will be also be removed from matches.
Removing Continuous Queries

To stop the query from further execution remove the listener:

continuousQuery.removeContinuousQueryListener(listener);

5.5.3. Using Continuous Queries in Remote Client-Server Mode

Adding Continuous Queries

To create a Continuous Query the Query will be defined similar to other querying methods; however, ensure that the query and a org.infinispan.client.hotrod.event.ContinuousQueryListener are registered with ClientEvents.

Once the ContinuousQueryListener has been defined it may be added by using the static addContinuousQueryListener method of org.infinispan.client.hotrod.event.ClientEvents:
ClientEvents.addContinuousQueryListener(cache, listener, query)
The following example demonstrates a simple method of implementing and adding a Continuous Query in Remote Client-Server mode:

Example 5.30. Defining and Adding a Continuous Query

import org.infinispan.client.hotrod.event.ClientEvents;
import org.infinispan.client.hotrod.event.ContinuousQueryListener;
import org.infinispan.client.hotrod.Search;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.Query;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

[...]

// Define our query. In this case we will be looking for any
// Person instances under 21 years of age.
QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class)
    .having("age").lt(21)
    .toBuilder().build();

final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>();

// Define the ContinuousQueryListener
ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() {
    @Override
    public void resultJoining(Integer key, Person value) {
        matches.put(key, value);
    }

    @Override
    public void resultLeaving(Integer key) {
        matches.remove(key);
    }
};

// Add the listener and generated query
Object clientListener = ClientEvents.addContinuousQueryListener(cache, listener, query);

[...]

// Remove the listener to stop receiving notifications
cache.removeClientListener(clientListener);
As Person instances are added to the cache that contain an Age less than 21 they will be placed into matches, and when these entries are removed from the cache they will be also be removed from matches.
Removing Continuous Queries

To stop the query from further execution remove the listener from the cache:

cache.removeClientListener(clientListener);

5.5.4. Performance Considerations with Continuous Queries

Continuous Queries are designed to constantly keep any applications updated where it is implemented, potentially resulting in a large number of events generated for particularly broad queries. In addition, a new memory allocation is made for each event. This behavior may result in memory pressure, including potential OutOfMemory errors, if queries are not carefully designed.
To prevent these issues it is strongly recommended to ensure that each query captures only the information needed, and that each ContinuousQueryListener is designed to quickly process all received events.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.