120.17. 将正文和标头存储为文本
您可以将 JdbcAggregationRepository 配置为存储消息正文,并在单独的列中选择(ed)标头作为 String。例如,要存储正文,以下两个标头 companyName 和 accountName 使用以下 SQL:
CREATE TABLE aggregationRepo3 (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
version BIGINT NOT NULL,
body varchar(1000),
companyName varchar(1000),
accountName varchar(1000),
constraint aggregationRepo3_pk PRIMARY KEY (id)
);
CREATE TABLE aggregationRepo3_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
version BIGINT NOT NULL,
body varchar(1000),
companyName varchar(1000),
accountName varchar(1000),
constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);
然后,配置存储库以启用此行为,如下所示:
<bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/>
<property name="transactionManager" ref="txManager3"/>
<property name="dataSource" ref="dataSource3"/>
<!-- configure to store the message body and following headers as text in the repo -->
<property name="storeBodyAsText" value="true"/>
<property name="headersToStoreAsText">
<list>
<value>companyName</value>
<value>accountName</value>
</list>
</property>
</bean>
120.17.1. codec (Serialization) 复制链接链接已复制到粘贴板!
由于它们可以包含任何类型的有效负载,因此交换的设计不适合。它转换为存储在数据库 BLOB 字段中的字节数组。所有这些转换都由 JdbcCodec 类处理。代码的一个详情需要注意: ClassLoadingAwareObjectInputStream。
ClassLoadingAwareObjectInputStream 已从 Apache ActiveMQ 项目中重复使用。它打包了一个 ObjectInputStream,并将其与 ContextClassLoader 而不是 currentThread 一起使用。好处是能够加载由其他捆绑包公开的类。这允许交换正文和标头具有自定义类型对象引用。
120.17.2. Transactions 复制链接链接已复制到粘贴板!
需要 Spring PlatformTransactionManager 来编配事务。
120.17.2.1. Service (Start/Stop) 复制链接链接已复制到粘贴板!
start 方法验证数据库的连接并存在所需的表。如果出现错误,它将在启动过程中失败。
120.17.3. 聚合器配置 复制链接链接已复制到粘贴板!
根据目标环境,聚合器可能需要一些配置。如您已经知道,每个聚合器应具有自己的存储库(在数据库中创建对应的表对)和数据源。如果默认的 lobHandler 没有根据您的数据库系统进行调整,则可以将其与 lobHandler 属性注入。
以下是 Oracle 的声明:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
<property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
</bean>
<bean id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/>
<property name="dataSource" ref="dataSource"/>
<!-- Only with Oracle, else use default -->
<property name="lobHandler" ref="lobHandler"/>
</bean>
120.17.4. Optimistic locking 复制链接链接已复制到粘贴板!
您可以在集群环境中开启 optimisticLocking 并使用基于 JDBC 的聚合存储库,其中多个 Camel 应用程序为聚合存储库共享同一数据库。如果存在竞争条件,则 JDBC 驱动程序将抛出特定于供应商的异常,Jdb cAggregationRepository 可以响应。要了解将 JDBC 驱动程序中的异常视为 optimistick 锁定错误,我们需要一个映射器才能执行此操作。因此,有一个 org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper 允许您实现自定义逻辑(如果需要)。有一个默认的 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper,它可以正常工作:
以下检查已完成:
-
如果导致异常是
SQLException,则如果以 23 开始,则会检查 SQLState。 -
如果原因异常是
DataIntegrityViolationException - 如果原因异常类名称的名称中包含 "ConstraintViolation"。
- 如果配置了任何类名称,则对 FQN 类名称的可选检查都匹配。
您还可以添加 FQN 类名称,如果任何原因异常(或任何嵌套)等于任何 FQN 类名称,则其 optimistick 锁定错误。
例如,我们从 JDBC 供应商定义了 2 个额外的 FQN 类名称。
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/>
<property name="dataSource" ref="dataSource"/>
<property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean>
<!-- use the default mapper with extraFQN class names from our JDBC driver -->
<bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames">
<util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value>
</util:set>
</property>
</bean>
120.17.5. 传播行为 复制链接链接已复制到粘贴板!
JdbcAggregationRepository 使用 Spring-TX 的两个不同 事务模板。一个是只读的,一个用于读写操作。
但是,当在其自身使用 < transacted /> 的路由中使用 JdbcAggregationRepository 且使用了通用平台TransactionManager 时,可能需要配置 JdbcAggregationRepository 中的事务模板使用的 传播行为。
以下是进行该操作的方法:
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
</bean>
propagation 由 org.springframework.transaction.TransactionDefinition 接口的常量指定,因此 propagationBehaviorName 方便地使用恒定名称。
120.17.6. PostgreSQL 问题单 复制链接链接已复制到粘贴板!
有特殊的数据库可能会导致 JdbcAggregationRepository 使用的最佳锁定问题。在数据完整性违反异常时,PostgreSQL 会将连接标记为无效( SQLState 23505 之一)。这使得连接在嵌套的事务中有效无法使用。详情可在 文档 中找到。
org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository 扩展 JdbcAggregationRepository,并使用特殊的 INSERT .ON CONFLICT .. 语句以提供最佳锁定行为。
此语句(具有默认聚合表定义):
INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
详情请查看 PostgreSQL 文档。
当使用此 子句时,java.sql.PreparedStatement.executeUpdate () 调用返回 0, 而不是用 SQLState=23505 丢弃 SQLException。进一步处理与通用 JdbcAggregationRepository 完全相同,但没有将 PostgreSQL 连接标记为无效。