51.14. 使用基于 JDBC 的幂等存储库
在本节中,我们将使用基于 JDBC 的幂等存储库。
抽象 类
是一个抽象类 org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository
,您可以扩展为构建自定义 JDBC 幂等存储库。
首先,必须创建由幂等存储库使用的数据库表。我们使用以下模式:
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) )
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )
我们添加了 createdAt 列:
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP )
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
SQL Server TIMESTAMP 类型是一个固定长度的二进制字符串类型。它不映射到 JDBC 时间类型: DATE、TIME 或 TIMESTAMP。
使用并发消费者时,在列 processorName 和 messageId 上创建唯一约束至关重要。由于此约束的语法与数据库的不同,我们不会显示在此处。
51.14.1. 自定义 JDBC idempotency 存储库 复制链接链接已复制到粘贴板!
您有几个选项来根据您的需要调整 org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
:
参数 | 默认值 | 描述 |
---|---|---|
createTableIfNotExists | true | 定义 Camel 是否应该尝试创建表(如果不存在)。 |
tableName | CAMEL_MESSAGEPROCESSED | 使用自定义表名称而不是默认名称:CAMEL_MESSAGEPROCESSED。 |
tableExistsString | SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 | 此查询用于找出表是否已存在。必须抛出异常,以指示表不存在。 |
createString | CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (255), messageId VARCHAR (100), createdAt TIMESTAMP) | 用于创建表的 语句。 |
queryString | SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? |
用于识别存储库中是否已存在消息的查询(结果不等于 '0')。它取两个参数:第一个是处理器名称( |
insertString | INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?) |
用于将条目添加到表中的语句。它采用三个参数。第一个是处理器名称( |
deleteString | DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? |
用于从数据库中删除条目的声明。它取两个参数。第一个是处理器名称( |
选项 tableName
可用于使用默认 SQL 查询,但使用不同的表名称。但是,如果要自定义 SQL 查询,您可以单独配置每个查询。
51.14.2. Orphan Lock aware Jdbc IdempotentRepository 复制链接链接已复制到粘贴板!
org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
的一个限制是,它不会处理由 JVM 崩溃或非正常关闭导致的孤立锁定。如果您需要解决孤立锁定处理,则使用 org.apache.camel.processor.idempotent.jdbc.jdbc.JdbcOrphanLockAwareIdempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository
,则可能会导致未处理的文件/信息。此存储库跟踪由应用程序实例保存的锁定。对于每个保留锁定,应用程序将向锁定存储库发送实时信号,从而使用当前的 Timestamp 更新 createdAt 列。当应用程序实例试图获取锁定时,如果存在三个可能性:
-
锁定条目不存在,然后使用
JdbcMessageIdRepository
的基本实现来提供锁定。 - 锁定已存在,createdAt < System.currentTimeMillis ()- lockMaxAgeMillis。在这种情况下,假设一个活跃的实例具有锁定,且锁定没有被提供给请求锁定的新实例
- 锁定已存在,并且 createdAt > = System.currentTimeMillis ()- lockMaxAgeMillis。在这种情况下,假设没有活跃的实例具有锁定,并且该锁定提供给请求实例。背后的原因是,如果原始有锁定的实例仍在运行,它将使用其 keepAlive 机制更新 createdAt 上的 Timestamp
此软件仓库有两个额外的配置参数
参数 | 描述 |
---|---|
lockMaxAgeMillis | 这指的是锁定被视为孤立的 i.e. 的持续时间。如果 currentTimestamp - createdAt >= lockMaxAgeMillis then lock is孤立。 |
lockKeepAliveIntervalMillis | 保持 alive 更新的频率为 createdAt Timestamp 列。 |
51.14.3. Caching Jdbc IdempotentRepository 复制链接链接已复制到粘贴板!
一些 SQL 实现不会在每次查询时快。JdbcMessageIdRepository
实施在 SQL 事务内单独执行其幂等检查。检查 mere 100 键可能需要几分钟的时间。JdbcCachedMessageIdRepository
假定为 上的内存缓存以整个密钥列表开头。然后首先检查此缓存,然后再通过原始实施。
与所有缓存实施一样,应该考虑陈旧数据和特定用途的注意事项。