66.16. 将正文和标头存储为文本
您可以配置 JdbcAggregationRepository
以存储消息正文,并在单独的列中选择(ed)标头作为 String。例如,要存储正文,以下两个标头 companyName
和 accountName
,请使用以下 SQL:
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, version BIGINT NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, version BIGINT NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
然后,将存储库配置为启用此行为,如下所示:
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
66.16.1. codec (Serialization)
由于他们可以包含任何类型的有效负载,因此通过设计,交换不可序列化。它将转换为字节数组,以存储在数据库 BLOB 字段中。所有这些转换都由 JdbcCodec
类处理。代码的详情需要您注意的是: ClassLoadingAwareObjectInputStream
。
ClassLoadingAwareObjectInputStream
已从 Apache ActiveMQ 项目中重复使用。它嵌套 ObjectInputStream
,并将其与 ContextClassLoader
一起使用,而不是 当前的Thread
。优点是能够加载由其他捆绑包公开的类。这允许交换正文和标头具有自定义类型对象引用。
66.16.2. 事务
需要 Spring PlatformTransactionManager
来编配事务。
66.16.2.1. 服务(Start/Stop)
start
方法验证数据库的连接以及所需的表是否存在。如果出现错误,它将在启动过程中失败。
66.16.3. 聚合器配置
根据目标环境,聚合器可能需要一些配置。如您知道,每个聚合器应具有自己的存储库(以及数据库中创建的对应表对)和数据源。如果默认 lobHandler 没有适应您的数据库系统,可以使用 lobHandler
属性注入它。
以下是 Oracle 的声明:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
66.16.4. 光率锁定
您可以在集群环境中打开 optimisticLocking
并使用基于 JDBC 的聚合存储库,其中多个 Camel 应用程序为聚合存储库共享相同的数据库。如果存在竞争条件,则 JDBC 驱动程序会抛出特定于供应商的异常,JdbcAggregationRepository
可以对其做出反应。要了解 JDBC 驱动程序导致的例外被认为是选择锁定错误,我们需要一个映射程序来执行此操作。因此,有一个 org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
,您可以根据需要实施自定义逻辑。有一个默认的实现 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
,它可以正常工作:
完成以下检查:
-
如果原因的例外是
SQLException
,则检查 SQLState (如果以 23 开始)。 -
如果原因的例外是
DataIntegrityViolationException
- 如果原因的例外类名称的名称中具有 "ConstraintViolation"。
- 如果配置了任何类名称,对 FQN 类名称的可选检查匹配。
您可以添加 FQN classnames,如果任何原因异常(或任何嵌套)等于任何 FQN 类名称,则它有一个 optimistick locking 错误。
下面是一个例子,其中从 JDBC 供应商定义 2 个额外的 FQN 类名称。
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extraFQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>
66.16.5. 传播行为
JdbcAggregationRepository
使用来自 Spring-TX 的两个不同 事务模板。一个是只读的,一个用于读写操作。
但是,当在自身使用 < transacted />
的路由中使用 JdbcAggregationRepository
时,可能需要配置 JdbcAggregationRepository
中的事务模板使用的 传播行为。
以下是实现它的方法:
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="propagationBehaviorName" value="PROPAGATION_NESTED" /> </bean>
传播由 org.springframework.transaction.TransactionDefinition
接口常量来指定,因此 propagationBehaviorName
是方便的,允许使用常量名称。
66.16.6. PostgreSQL 问题单
有特殊数据库可能会导致 JdbcAggregationRepository
使用的最佳锁定问题。如果数据完整性违反异常(带有 SQLState 23505),PostgreSQL 会将连接标记为无效。这使得连接在嵌套的事务中有效不可用。详情可在 文档 中找到。
org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository
扩展 JdbcAggregationRepository
并使用特殊的 INSERT。ON CONFLICT ..
语句提供选择锁定行为。
这个声明是(带有默认聚合表定义):
INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
详情请参考 PostgreSQL 文档。
当使用此条款时,java.sql.PreparedStatement.executeUpdate ()
调用会返回 0,
而不是抛出 SQLException with SQLState=23505。进一步处理与通用 JdbcAggregationRepository
完全相同,但没有将 PostgreSQL 连接标记为无效。