4.5. 持续查询
持续查询允许应用程序注册监听程序,该监听程序将接收当前与查询过滤器匹配的条目,并将持续通知对查询的数据集的任何更改。这包括传入匹配项,适用于已加入集合的值、更新匹配项、修改和继续匹配的匹配值,以及离开该设置的值传出匹配。通过使用连续查询,应用程序会收到稳定事件流,而不是重复执行相同的查询来发现更改,从而更有效地使用资源。例如,以下所有用例都可以使用持续查询:
-
返回在 18 到 25 之间带有年龄的所有人员(假设 Person 实体具有
age
属性,并由用户应用程序更新)。 - 返回超过 $2000 的所有事务。
- 返回 F1 竞争条件的 lap 速度小于 1:45.00s (假设缓存包含 Lap 条目且在竞争过程中输入的 laps )的所有时间。
4.5.1. 持续查询执行
持续查询使用监听程序,该监听程序在以下情况时获得通知:
-
条目开始与指定的查询匹配,由
Join
事件表示。 -
一个匹配的条目已更新,并持续匹配由
Update
vent 组成的查询。 -
条目停止与由
Leave
事件表示的查询匹配。
当客户端注册持续查询监听程序时,它会立即开始接收当前与查询匹配的结果,如上面所述的 Join
事件。另外,当其他条目开始与查询匹配时,它会接收后续的通知,作为 Leave
事件,因为任何通常会生成创建、修改、删除或过期事件的缓存操作。如果条目与操作前后查询过滤器匹配,更新的缓存条目将生成
Update
事件。总而言之,用于确定监听器是否收到 Join
、Update
或 Leave
事件的逻辑:
- 如果旧值和新值的查询评估 false,则会隐藏该事件。
-
如果对旧值的查询评估 false,且在新值上评估为 true,则会发送
Join
事件。 -
如果旧值和新值的查询评估为 true,则发送
Update
事件。 -
如果对旧值的查询评估为 true,且在新值上评估 false,则会发送
Leave
事件。 -
如果对旧值的查询评估为 true,且条目被删除或过期,则会发送
Leave
事件。
持续查询可以使用所有查询功能,但分组、聚合和排序操作除外。
4.5.2. 创建持续查询
要创建持续查询,请执行以下操作:
- 创建 Query 对象。
通过调用适当的方法,获取 cache 的 continuousQuery (
org.infinispan.query.api.continuous.ContinuousQuery
对象:-
org.infinispan.client.hotrod.Search.getContinuousQuery (RemoteCache<K, V> cache)
用于远程模式 -
org.infinispan.query.Search.getContinuousQuery (Cache<K, V> cache)
用于嵌入式模式
-
-
注册查询和持续查询监听程序(
org.infinispan.query.api.continuous.ContinuousQueryListener
),如下所示:
continuousQuery.addContinuousQueryListener(query, listener);
以下示例演示了在嵌入式模式下简单的持续查询用例:
注册连续查询
import org.infinispan.query.api.continuous.ContinuousQuery; import org.infinispan.query.api.continuous.ContinuousQueryListener; import org.infinispan.query.Search; import org.infinispan.query.dsl.QueryFactory; import org.infinispan.query.dsl.Query; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; [...] // We have a cache of Persons Cache<Integer, Person> cache = ... // We begin by creating a ContinuousQuery instance on the cache ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache); // Define our query. In this case we will be looking for any Person instances under 21 years of age. QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory.create("FROM Person p WHERE p.age < 21"); final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>(); // Define the ContinuousQueryListener ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() { @Override public void resultJoining(Integer key, Person value) { matches.put(key, value); } @Override public void resultUpdated(Integer key, Person value) { // we do not process this event } @Override public void resultLeaving(Integer key) { matches.remove(key); } }; // Add the listener and the query continuousQuery.addContinuousQueryListener(query, listener); [...] // Remove the listener to stop receiving notifications continuousQuery.removeContinuousQueryListener(listener);
因为具有小于 21 的 Person 实例被添加到监听器将接收它们的缓存中,并放入 匹配项
映射中,以及这些条目从缓存中删除或其年龄被修改为大于或等于 21,它们将被从 匹配项
中删除。
4.5.3. 删除持续查询
要停止查询进一步执行,请只删除监听程序:
continuousQuery.removeContinuousQueryListener(listener);
4.5.4. 持续查询性能
持续查询旨在为应用程序提供持续的更新流,可能会导致为广泛查询生成大量事件。为每个事件创建一个新的临时内存分配。如果查询没有被仔细设计,则此行为可能会导致内存压力,并可能导致 OutOfMemoryErrors
(特别是在远程模式中)。为了防止这些问题,强烈建议每个查询捕获在匹配条目数量和每个匹配项时所需的最小信息(项目可以用来捕获有趣属性),并且每个 continuous QueryListener
旨在快速处理所有接收的事件,以避免执行从它监听的缓存中生成新匹配事件的操作。