141.17. 将正文和标头存储为文本
您可以配置 JdbcAggregationRepository
以存储消息正文,并选择(ed)标头作为 String 在单独的列中选择。例如,要存储正文,并且以下两个标头 companyName
和 accountName
使用以下 SQL:
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, version BIGINT NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, version BIGINT NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
CREATE TABLE aggregationRepo3 (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
version BIGINT NOT NULL,
body varchar(1000),
companyName varchar(1000),
accountName varchar(1000),
constraint aggregationRepo3_pk PRIMARY KEY (id)
);
CREATE TABLE aggregationRepo3_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
version BIGINT NOT NULL,
body varchar(1000),
companyName varchar(1000),
accountName varchar(1000),
constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);
然后,将存储库配置为启用此行为,如下所示:
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
<bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/>
<property name="transactionManager" ref="txManager3"/>
<property name="dataSource" ref="dataSource3"/>
<!-- configure to store the message body and following headers as text in the repo -->
<property name="storeBodyAsText" value="true"/>
<property name="headersToStoreAsText">
<list>
<value>companyName</value>
<value>accountName</value>
</list>
</property>
</bean>
141.17.1. codec (Serialization)
由于它们可以包含任何类型的有效载荷,因此交换不能按照设计序列化。它将转换为一个字节数组,以存储在数据库 BLOB 字段中。所有这些转换都由 JdbcCodec
类处理。代码的详情需要注意: ClassLoadingAwareObjectInputStream
。
ClassLoadingAwareObjectInputStream
已从 Apache ActiveMQ 项目中重复使用。它嵌套 ObjectInputStream
,并将其与 ContextClassLoader
一起使用,而不是 当前的Thread
。其好处在于能够加载由其他捆绑包公开的类。这允许交换正文和标头具有自定义类型对象引用。
141.17.2. transaction
需要 Spring PlatformTransactionManager
来编配事务。
141.17.2.1. 服务(Start/Stop)
start
方法验证数据库的连接以及所需的表是否存在。如果出现错误,它将在启动期间失败。
141.17.3. 聚合器配置
根据目标环境,聚合器可能需要一些配置。如您已经了解,每个聚合器应具有自己的存储库(在数据库中创建的对应表对)和数据源。如果默认的 lobHandler 没有适应您的数据库系统,它可以与 lobHandler
属性注入。
以下是 Oracle 的声明:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
<property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
</bean>
<bean id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/>
<property name="dataSource" ref="dataSource"/>
<!-- Only with Oracle, else use default -->
<property name="lobHandler" ref="lobHandler"/>
</bean>
141.17.4. 最佳锁定
您可以在集群环境中打开 optimisticLocking
,并在多个 Camel 应用程序为聚合存储库共享同一数据库的集群中使用此基于 JDBC 的聚合存储库。如果存在竞争条件,则 JDBC 驱动程序将抛出一个供应商特定异常,Jdb cAggregationRepository
可以响应。要了解导致 JDBC 驱动程序的例外情况被视为一个 optimistick locking 错误,我们需要一个映射程序来执行此操作。因此,有 org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
允许您实施您的自定义逻辑(如果需要)。有默认的实现 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
,它的工作方式如下:
完成以下检查:
-
如果导致异常是
SQLException
,则如果以 23 开始,则会检查 SQLState。 -
如果原因的例外是
DataIntegrityViolationException
- 如果导致异常类名称的名称的名称中包含 "ConstraintViolation"。
- 如果已经配置了任何类名称,则对 FQN 类名称进行可选检查。
您可以添加 FQN 类名,如果任何原因异常(或任何嵌套)等于任何 FQN 类名称,然后是最佳锁定错误。
下面是一个示例,我们从 JDBC 供应商定义 2 个额外的 FQN 类名称。
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extraFQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/>
<property name="dataSource" ref="dataSource"/>
<property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean>
<!-- use the default mapper with extraFQN class names from our JDBC driver -->
<bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames">
<util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value>
</util:set>
</property>
</bean>
141.17.5. 传播行为
JdbcAggregationRepository
使用 Spring-TX 的两个不同 事务模板。一个是只读的,一个用于读写操作。
但是,当在自身使用 < transacted />
的路由中使用 JdbcAggregationRepository
并且使用了通用平台 TransactionManager
时,可能需要配置 JdbcAggregationRepository
中使用的 传播行为。
以下是实现这个目标的方法:
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="propagationBehaviorName" value="PROPAGATION_NESTED" /> </bean>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
</bean>
传播由 org.springframework.transaction.TransactionDefinition
接口的常量指定,因此 propagationBehaviorName
非常方便,允许使用常量名称。
141.17.6. PostgreSQL 问题单
存在特殊的数据库,可能会导致 JdbcAggregationRepository
使用的最佳锁定问题。如果数据完整性违反异常(SQLState 23505),PostgreSQL 会将连接标记为无效。这使得连接有效地在嵌套事务中不可用。详情可在 文档 中找到。
org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository
扩展 JdbcAggregationRepository
,并使用特殊的 INSERT .ON CONFLICT ..
语句提供最佳锁定行为。
此声明是 (使用默认聚合表定义):
INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
详情可在 PostgreSQL 文档 中找到。
使用此子句时,java.sql.PreparedStatement.executeUpdate ()
调用返回 0,
而不是抛出带有 SQLState=23505 的 SQLException。进一步的处理与通用的 JdbcAggregationRepository
完全相同,但没有将 PostgreSQL 连接标记为无效。