このコンテンツは選択した言語では利用できません。
15.4. Continuous Queries
Continuous Querying allows an application to receive the entries that currently match a query, and be continuously notified of any changes to the queried data set. This includes both incoming matches, for values that have joined the set, and outgoing matches, for values that have left the set, that resulted from further cache operations. By using a Continuous Query the application receives a steady stream of events instead of repeatedly executing the same query to look for changes, resulting in a more efficient use of resources.
For instance, all of the following use cases could utilize Continuous Queries:
- Return all persons with an age between 18 and 25 (assuming the
Person
entity has an age property and is updated by the user application). - Return all transactions higher than $2000.
- Return all times where the lap speed of F1 racers were less than 1:45.00s (assuming the cache contains
Lap
entries and that laps are entered live during the race).
15.4.1. Continuous Query Evaluation
A Continuous Query uses a listener that receives a notification when:
- An entry starts matching the specified query, represented by a
Join
event. - An entry stops matching the specified query, represented by a
Leave
event.
When a client registers a Continuous Query Listener it immediately begins to receive the results currently matching the query, received as
Join
events as described above. In addition, it will receive subsequent notifications when other entries begin matching the query, as Join
events, or stop matching the query, as Leave
events, as a consequence of any cache operations that would normally generate creation, modification, removal, or expiration events.
To determine if the listener receives a
Join
or Leave
event the following logic is used:
- If the query on both the old and new values evaluate false, then the event is suppressed.
- If the query on both the old and new values evaluate true, then the event is suppressed.
- If the query on the old value evaluates false and the query on the new value evaluates true, then a
Join
event is sent. - If the query on the old value evaluates true and the query on the new value evaluates false, then a
Leave
event is sent. - If the query on the old value evaluates true and the entry is removed, then a
Leave
event is sent.
Note
Continuous Queries cannot use grouping, aggregation, or sorting operations.
15.4.2. Using Continuous Queries
The following instructions apply to both Library and Remote Client-Server modes.
Adding Continuous Queries
To create a Continuous Query the Query
object will be created similar to other querying methods; however, ensure that the Query
is registered with a org.infinispan.query.api.continuous.ContinuousQuery
and a org.infinispan.query.api.continuous.ContinuousQueryListener
is in use.
The
ContinuousQuery
object associated to a cache can be obtained by calling the static method org.infinispan.client.hotrod.Search.getContinuousQuery(RemoteCache<K, V> cache)
if running in Client-Server mode or org.infinispan.query.Search.getContinuousQuery(Cache<K, V> cache)
when running in Library mode.
Once the
ContinuousQueryListener
has been defined it may be added by using the addContinuousQueryListener
method of ContinuousQuery
:
continuousQuery.addContinuousQueryListener(query, listener)
The following example demonstrates a simple method of implementing and adding a Continuous Query in Library mode:
Example 15.28. Defining and Adding a Continuous Query
import org.infinispan.query.api.continuous.ContinuousQuery; import org.infinispan.query.api.continuous.ContinuousQueryListener; import org.infinispan.query.Search; import org.infinispan.query.dsl.QueryFactory; import org.infinispan.query.dsl.Query; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; [...] // To begin we create a ContinuousQuery instance on the cache ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache); // Define our query. In this case we will be looking for any // Person instances under 21 years of age. QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory.from(Person.class) .having("age").lt(21) .toBuilder().build(); final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>(); // Define the ContinuousQueryListener ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() { @Override public void resultJoining(Integer key, Person value) { matches.put(key, value); } @Override public void resultLeaving(Integer key) { matches.remove(key); } }; // Add the listener and generated query continuousQuery.addContinuousQueryListener(query, listener); [...] // Remove the listener to stop receiving notifications continuousQuery.removeContinuousQueryListener(listener);
As
Person
instances are added to the cache that contain an Age less than 21 they will be placed into matches
, and when these entries are removed from the cache they will be also be removed from matches
.
Removing Continuous Queries
To stop the query from further execution remove the listener:
continuousQuery.removeContinuousQueryListener(listener);
15.4.3. Performance Considerations with Continuous Queries
Continuous Queries are designed to constantly keep any applications updated where it is implemented, potentially resulting in a large number of events generated for particularly broad queries. In addition, a new memory allocation is made for each event. This behavior may result in memory pressure, including potential OutOfMemory errors, if queries are not carefully designed.
To prevent these issues it is strongly recommended to ensure that each query captures only the information needed, and that each
ContinuousQueryListener
is designed to quickly process all received events.