搜索

66.14. 使用基于 JDBC 的幂等存储库

download PDF

在本节中,我们将使用基于 JDBC 的幂等存储库。

注意

抽象
有一个抽象类 org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository,您可以扩展来构建自定义 JDBC 幂等存储库。

首先,我们需要创建由幂等存储库使用的数据库表。我们使用以下模式:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )

我们添加了 createdAt 列:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
注意

SQL Server TIMESTAMP 类型是一个固定长度二进制字符串类型。它不映射到任何 JDBC 时间类型: DATETIMETIMESTAMP

在使用并发消费者时,对列 processorName 和 messageId 创建唯一约束至关重要。由于此约束的语法与数据库与数据库不同,因此我们不会在此处显示它。

66.14.1. 自定义 JDBC idempotency 存储库

您有几个选项来根据您的需要调整 org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository

参数默认值描述

createTableIfNotExists

true

定义 Camel 是否是否应该尝试创建表(如果不存在)。

tableName

CAMEL_MESSAGEPROCESSED

使用自定义表名称而不是默认名称:CAMEL_MESSAGEPROCESSED。

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

此查询用于找出表是否已存在。它必须抛出异常,以指示表不存在。

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (ASP, messageId VARCHAR (100), createdAt TIMESTAMP)

用于创建表的声明。

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于找出存储库中是否已存在的查询(结果不等于 '0')。它采用两个参数。第一个是处理器名称(字符串),第二个是消息 ID (字符串)。

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

用于在表中添加条目的声明。它采用三个参数。第一个是处理器名称(字符串),第二个是消息 ID (字符串),第三个是此条目添加到存储库中的时间戳(java.sql.Timestamp)。

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于从数据库中删除条目的声明。它采用两个参数。第一个是处理器名称(字符串),第二个是消息 ID (字符串)。

选项 tableName 可用于使用默认的 SQL 查询,但具有不同表名称。但是,如果要自定义 SQL 查询,您可以单独配置它们。

66.14.2. Orp Lock aware Jdbc IdempotentRepository

org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository 的一个限制是它不处理 JVM 崩溃或非安全关闭的孤立锁定。如果您需要解决孤立锁处理,则这可能会导致未处理的文件/消息。这对 camel-file、camel-ftp 等实现。如果您需要解决孤立锁定处理,则使用 org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository。此存储库跟踪由应用程序实例保存的锁定。对于保存的每个锁定,应用程序将向锁定存储库发送实时信号,从而使用当前的 Timestamp 更新 createdAt 列。当应用程序实例尝试获取锁定时,如果存在三个可能:

  • 锁定条目不存在,则使用 JdbcMessageIdRepository 的基本实现提供锁。
  • lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis.在这种情况下,假设活跃的实例有锁定,并且没有为请求锁定的新实例提供锁定
  • lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis.在这种情况下,假设没有活动实例,没有锁定,并且为请求的实例提供锁定。原因是,如果原始实例有锁定(如果仍然在运行),它将使用 keepAlive 机制在 createdAt 上更新 Timestamp

此存储库有两个额外的配置参数

参数描述

lockMaxAgeMillis

这指的是锁定被视为孤立的持续时间,如 currentTimestamp - createdAt >= lockMaxAgeMillis,则锁定会被孤立。

lockKeepAliveIntervalMillis

保持实时更新的频率到创建At Timestamp 列。

66.14.3. Caching Jdbc IdempotentRepository

某些 SQL 实现不会因每个查询而快速进行。JdbcMessageIdRepository 实施在 SQL 事务中单独执行其幂等性检查。检查 100 个密钥可能需要几分钟。JdbcCachedMessageIdRepository 使用整个键列表在启动时预加载内存缓存。然后,在传递给原始实现前,首先检查此缓存。

与所有缓存实现一样,应该考虑与过时的数据和特定用途相关的注意事项。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.