137.15. 使用基于 JDBC 的幂等存储库


在本节中,我们将使用基于 JDBC 的幂等存储库。

注意

抽象
有一个抽象类 org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository,您可以扩展到构建自定义 JDBC 幂等存储库。

首先,我们必须创建将由幂等存储库使用的数据库表。我们使用以下模式:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )
Copy to Clipboard Toggle word wrap

我们添加了 createdAt 列:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
Copy to Clipboard Toggle word wrap
注意

SQL Server TIMESTAMP 类型是一个固定长度的二进制字符串类型。它没有映射到任何 JDBC 时间类型: DATETIMETIMESTAMP

在使用并发消费者时,必须在列 processorName 和 messageId 上创建唯一约束。由于此约束的语法与数据库不同,所以我们不会显示它。

137.15.1. 自定义 JDBC idempotency 存储库

您有几个选项可根据您的需要调整 org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository

Expand
参数默认值描述

createTableIfNotExists

true

定义 Camel 是否应该尝试创建表(如果不存在)。

tableName

CAMEL_MESSAGEPROCESSED

使用自定义表名称而不是默认名称:CAMEL_MESSAGEPROCESSED。

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

此查询用于找出表是否已存在。必须抛出异常以指示表不存在。

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (255), messageId VARCHAR (100), createdAt TIMESTAMP)

用于创建表的 语句。

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于找出消息是否已存在于存储库中的查询(结果不等于 '0')。它采用两个参数。第一个是处理器名称(字符串),第二个则是消息 ID (字符串)。

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

用于将该条目添加到表中的语句。它取三个参数。第一个是处理器名称(String),第二个是消息 ID (String),第三个则是当此条目添加到存储库时的时间戳(java.sql.Timestamp)。

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于从数据库中删除条目的语句。它取两个参数。第一个是处理器名称(字符串),第二个则是消息 ID (字符串)。

选项 tableName 可用于使用默认的 SQL 查询,但使用不同的表名称。但是,如果要自定义 SQL 查询,您可以单独配置每个查询。

137.15.2. orphan Lock aware Jdbc IdempotentRepository

org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository 的限制之一是不处理由于 JVM 崩溃或非安全关闭导致的孤立锁定。如果实施与 camel-file、camel-ftp 等实现一起使用,则可能会导致未处理的文件/消息。如果您需要处理孤立锁定处理,则使用 org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository。此存储库跟踪由应用程序实例持有的锁定。对于每个锁定,应用程序将向锁定存储库发送保留信号,从而使用当前的 Timestamp 更新 createdAt 列。当应用程序实例尝试获取锁时,如果存在三个可能性:

  • 锁定条目不存在,然后使用 JdbcMessageIdRepository 的基本实现来提供锁定。
  • lock already exists 和 createdAt < System.currentTimeMillis ()- lockMaxAgeMillis.在这种情况下,假设活跃的实例具有锁定,并且没有向请求锁定的新实例提供锁定
  • lock already exists 和 createdAt > = System.currentTimeMillis ()- lockMaxAgeMillis。在这种情况下,假设没有活跃的实例具有锁定,并且锁定提供给请求实例。后面的原因是,如果锁定的原始实例仍在运行,它将使用其 keepAlive 机制在创建的At 上更新 Timestamp。

这个软件仓库有两个额外的配置参数

Expand
参数描述

lockMaxAgeMillis

这指的是锁定被视为孤立的持续时间(如 currentTimestamp - createdAt >= lockMaxAgeMillis),则锁定会孤立。

lockKeepAliveIntervalMillis

为 CreateAt Timestamp 列进行保留更新的频率。

137.15.3. Caching Jdbc IdempotentRepository

有些 SQL 实现不会基于每个查询速度。JdbcMessageIdRepository 实现在 SQL 事务中单独执行其幂等检查。检查 100 个键可能需要几分钟时间。JdbcCachedMessageIdRepository 会在启动时使用整个键列表加载内存缓存。然后,首先检查此缓存,然后再进入原始实施。

与所有缓存实施一样,应该考虑过时的数据和特定使用情况。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat