51.14. 使用基于 JDBC 的幂等存储库


在本节中,我们将使用基于 JDBC 的幂等存储库。

注意

抽象
是一个抽象类 org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository,您可以扩展为构建自定义 JDBC 幂等存储库。

首先,必须创建由幂等存储库使用的数据库表。我们使用以下模式:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )
Copy to Clipboard Toggle word wrap

我们添加了 createdAt 列:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
Copy to Clipboard Toggle word wrap
注意

SQL Server TIMESTAMP 类型是一个固定长度的二进制字符串类型。它不映射到 JDBC 时间类型: DATETIMETIMESTAMP

使用并发消费者时,在列 processorName 和 messageId 上创建唯一约束至关重要。由于此约束的语法与数据库的不同,我们不会显示在此处。

51.14.1. 自定义 JDBC idempotency 存储库

您有几个选项来根据您的需要调整 org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository

Expand
参数默认值描述

createTableIfNotExists

true

定义 Camel 是否应该尝试创建表(如果不存在)。

tableName

CAMEL_MESSAGEPROCESSED

使用自定义表名称而不是默认名称:CAMEL_MESSAGEPROCESSED。

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

此查询用于找出表是否已存在。必须抛出异常,以指示表不存在。

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (255), messageId VARCHAR (100), createdAt TIMESTAMP)

用于创建表的 语句。

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于识别存储库中是否已存在消息的查询(结果不等于 '0')。它取两个参数:第一个是处理器名称(字符串),第二个是消息 ID (字符串)。

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

用于将条目添加到表中的语句。它采用三个参数。第一个是处理器名称(字符串),第二个名称为消息 ID (String),当此条目添加到存储库中时,第三个是时间戳(java.sql.Timestamp)。

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

用于从数据库中删除条目的声明。它取两个参数。第一个是处理器名称(字符串),第二个是消息 ID (字符串)。

选项 tableName 可用于使用默认 SQL 查询,但使用不同的表名称。但是,如果要自定义 SQL 查询,您可以单独配置每个查询。

51.14.2. Orphan Lock aware Jdbc IdempotentRepository

org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository 的一个限制是,它不会处理由 JVM 崩溃或非正常关闭导致的孤立锁定。如果您需要解决孤立锁定处理,则使用 org.apache.camel.processor.idempotent.jdbc.jdbc.JdbcOrphanLockAwareIdempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository,则可能会导致未处理的文件/信息。此存储库跟踪由应用程序实例保存的锁定。对于每个保留锁定,应用程序将向锁定存储库发送实时信号,从而使用当前的 Timestamp 更新 createdAt 列。当应用程序实例试图获取锁定时,如果存在三个可能性:

  • 锁定条目不存在,然后使用 JdbcMessageIdRepository 的基本实现来提供锁定。
  • 锁定已存在,createdAt < System.currentTimeMillis ()- lockMaxAgeMillis。在这种情况下,假设一个活跃的实例具有锁定,且锁定没有被提供给请求锁定的新实例
  • 锁定已存在,并且 createdAt > = System.currentTimeMillis ()- lockMaxAgeMillis。在这种情况下,假设没有活跃的实例具有锁定,并且该锁定提供给请求实例。背后的原因是,如果原始有锁定的实例仍在运行,它将使用其 keepAlive 机制更新 createdAt 上的 Timestamp

此软件仓库有两个额外的配置参数

Expand
参数描述

lockMaxAgeMillis

这指的是锁定被视为孤立的 i.e. 的持续时间。如果 currentTimestamp - createdAt >= lockMaxAgeMillis then lock is孤立。

lockKeepAliveIntervalMillis

保持 alive 更新的频率为 createdAt Timestamp 列。

51.14.3. Caching Jdbc IdempotentRepository

一些 SQL 实现不会在每次查询时快。JdbcMessageIdRepository 实施在 SQL 事务内单独执行其幂等检查。检查 mere 100 键可能需要几分钟的时间。JdbcCachedMessageIdRepository 假定为 上的内存缓存以整个密钥列表开头。然后首先检查此缓存,然后再通过原始实施。

与所有缓存实施一样,应该考虑陈旧数据和特定用途的注意事项。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat