此内容没有您所选择的语言版本。
Chapter 5. Creating continuous queries
Applications can register listeners to receive continual updates about cache entries that match query filters.
5.1. Continuous queries
Continuous queries provide applications with real-time notifications about data in Data Grid caches that are filtered by queries. When entries match the query Data Grid sends the updated data to any listeners, which provides a stream of events instead of applications having to execute the query.
Continuous queries can notify applications about incoming matches, for values that have joined the set; updated matches, for matching values that were modified and continue to match; and outgoing matches, for values that have left the set.
For example, continuous queries can notify applications about all:
-
Persons with an age between 18 and 25, assuming the
Person
entity has anage
property and is updated by the user application. - Transactions for dollar amounts larger than $2000.
- Times where the lap speed of F1 racers were less than 1:45.00 seconds, assuming the cache contains Lap entries and that laps are entered during the race.
Continuous queries can use all query capabilities except for grouping, aggregation, and sorting operations.
How continuous queries work
Continuous queries notify client listeners with the following events:
Join
- A cache entry matches the query.
Update
- A cache entry that matches the query is updated and still matches the query.
Leave
- A cache entry no longer matches the query.
When a client registers a continuous query listener it immediately receives Join
events for any entries that match the query. Client listeners receive subsequent events each time a cache operation modifies entries that match the query.
Data Grid determines when to send Join
, Update
, or Leave
events to client listeners as follows:
- If the query on both the old and new value does not match, Data Grid does not sent an event.
-
If the query on the old value does not match but the new value does, Data Grid sends a
Join
event. -
If the query on both the old and new values match, Data Grid sends an
Update
event. -
If the query on the old value matches but the new value does not, Data Grid sends a
Leave
event. -
If the query on the old value matches and the entry is then deleted or it expires, Data Grid sends a
Leave
event.
5.1.1. Continuous queries and Data Grid performance
Continuous queries provide a constant stream of updates to applications, which can generate a significant number of events. Data Grid temporarily allocates memory for each event it generates, which can result in memory pressure and potentially lead to OutOfMemoryError
exceptions, especially for remote caches. For this reason, you should carefully design your continuous queries to avoid any performance impact.
Data Grid strongly recommends that you limit the scope of your continuous queries to the smallest amount of information that you need. To achieve this, you can use projections and predicates. For example, the following statement provides results about only a subset of fields that match the criteria rather than the entire entry:
SELECT field1, field2 FROM Entity WHERE x AND y
It is also important to ensure that each ContinuousQueryListener
you create can quickly process all received events without blocking threads. To achieve this, you should avoid any cache operations that generate events unnecessarily.
5.2. Creating continuous queries
You can create continuous queries for remote and embedded caches.
Procedure
-
Create a
Query
object. Obtain the
ContinuousQuery
object of your cache by calling the appropriate method:-
Remote caches:
org.infinispan.client.hotrod.Search.getContinuousQuery(RemoteCache<K, V> cache)
-
Embedded caches:
org.infinispan.query.Search.getContinuousQuery(Cache<K, V> cache)
-
Remote caches:
Register the query and a
ContinuousQueryListener
object as follows:continuousQuery.addContinuousQueryListener(query, listener);
When you no longer need the continuous query, remove the listener as follows:
continuousQuery.removeContinuousQueryListener(listener);
Continuous query example
The following code example demonstrates a simple continuous query with an embedded cache.
In this example, the listener receives notifications when any Person
instances under the age of 21 are added to the cache. Those Person
instances are also added to the "matches" map. When the entries are removed from the cache or their age becomes greater than or equal to 21, they are removed from "matches" map.
Registering a Continuous Query
import org.infinispan.query.api.continuous.ContinuousQuery; import org.infinispan.query.api.continuous.ContinuousQueryListener; import org.infinispan.query.Search; import org.infinispan.query.dsl.QueryFactory; import org.infinispan.query.dsl.Query; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; [...] // We have a cache of Person objects. Cache<Integer, Person> cache = ... // Create a ContinuousQuery instance on the cache. ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache); // Define a query. // In this example, we search for Person instances under 21 years of age. QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory.create("FROM Person p WHERE p.age < 21"); final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>(); // Define the ContinuousQueryListener. ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() { @Override public void resultJoining(Integer key, Person value) { matches.put(key, value); } @Override public void resultUpdated(Integer key, Person value) { // We do not process this event. } @Override public void resultLeaving(Integer key) { matches.remove(key); } }; // Add the listener and the query. continuousQuery.addContinuousQueryListener(query, listener); [...] // Remove the listener to stop receiving notifications. continuousQuery.removeContinuousQueryListener(listener);