130.15. 使用基于 JDBC 的幂等存储库


在本节中,我们将使用基于 JDBC 的幂等存储库。

注意

抽象
有一个抽象类 org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository,您可以扩展来构建自定义 JDBC 幂等存储库。

首先,我们必须创建将由幂等存储库使用的数据库表。我们使用以下模式:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )
Copy to Clipboard Toggle word wrap

我们添加了 createdAt 列:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
Copy to Clipboard Toggle word wrap
注意

SQL Server TIMESTAMP 类型是一个固定长度的二进制字符串类型。它没有映射到任何 JDBC 时间类型: DATETIMETIMESTAMP

在使用并发消费者时,对列 processorName 和 messageId 创建唯一约束至关重要。由于此约束的语法与数据库与数据库不同,因此这里不会显示它。

130.15.1. 自定义 JDBC 幂等性存储库

您有几个选项来调优 org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository 以满足您的需要:

Expand
参数默认值描述

createTableIfNotExists

true

定义 Camel 是否是否应该尝试创建表(如果不存在)。

tableName

CAMEL_MESSAGEPROCESSED

使用自定义表名称而不是默认名称:CAMEL_MESSAGEPROCESSED。

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

此查询用于找出表是否已存在。它必须抛出异常,以指示该表不存在。

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (255), messageId VARCHAR (100), createdAt TIMESTAMP)

用于创建表的声明。

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于找出存储库中是否已存在消息的查询(结果不等于 '0')。它采用两个参数。第一个是处理器名称(String),第二个是消息 id (String)。

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

用于将条目添加到表中的语句。它采用三个参数。第一个是处理器名称(String),第二个是消息 id (String),第三个是此条目添加到存储库时的时间戳(java.sql.Timestamp)。

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于从数据库中删除条目的声明。它取两个参数。第一个是处理器名称(String),第二个是消息 id (String)。

选项 tableName 可用于使用默认 SQL 查询,但使用不同的表名称。但是,如果要自定义 SQL 查询,您可以单独配置每个查询。

130.15.2. orphan Lock aware Jdbc IdempotentRepository

org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository 的一个限制是它不会处理由 JVM 崩溃或非安全关闭导致的孤立锁定。如果您需要处理孤立锁定,则使用 org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository。此存储库跟踪应用实例持有的锁定。对于持有的每个锁定,应用程序会将保留信号发送到锁定存储库,从而导致使用 current Timestamp 更新 createdAt 列。当应用程序实例尝试获取锁定时,如果存在 3 个可能:

  • 锁定条目不存在,然后使用 JdbcMessageIdRepository 的基本实现提供锁定。
  • lock already exists 和 createdAt < System.currentTimeMillis ()- lockMaxAgeMillis.在这种情况下,假设一个活跃的实例具有锁定,并且未向请求锁定的新实例提供锁定
  • lock already exists 和 createdAt > = System.currentTimeMillis ()- lockMaxAgeMillis.在这种情况下,假设没有活跃的实例,该实例没有锁定,并且为请求实例提供了锁定。背后的原因是,如果原始实例已锁定,如果仍然在运行,它将使用其 keepAlive 机制更新 Timestamp on createdAt

这个软件仓库有两个额外的配置参数

Expand
参数描述

lockMaxAgeMillis

这指的是锁定被视为孤立的持续时间,例如,如果 currentTimestamp - createdAt >= lockMaxAgeMillis,则锁定会被孤立。

lockKeepAliveIntervalMillis

为 createAt Timestamp 列进行保留 alive 更新的频率。

130.15.3. Caching Jdbc IdempotentRepository

有些 SQL 实现不会基于每个查询快速。JdbcMessageIdRepository 实现在 SQL 事务中单独执行其幂等检查。检查 100 个密钥可能需要几分钟。JdbcCachedMessageIdRepository preloads a in-memory 缓存以整个键列表开头。然后,在传递到原始实施之前首先检查此缓存。

与所有缓存实现一样,应该考虑陈旧数据和您的特定用途。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat