141.17. 将正文和标头存储为文本


您可以配置 JdbcAggregationRepository 以存储消息正文,并选择(ed)标头作为 String 在单独的列中选择。例如,要存储正文,并且以下两个标头 companyNameaccountName 使用以下 SQL:

CREATE TABLE aggregationRepo3 (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 body varchar(1000),
 companyName varchar(1000),
 accountName varchar(1000),
 constraint aggregationRepo3_pk PRIMARY KEY (id)
);
CREATE TABLE aggregationRepo3_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 body varchar(1000),
 companyName varchar(1000),
 accountName varchar(1000),
 constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);
Copy to Clipboard

然后,将存储库配置为启用此行为,如下所示:

<bean id="repo3"
  class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="repositoryName" value="aggregationRepo3"/>
  <property name="transactionManager" ref="txManager3"/>
  <property name="dataSource" ref="dataSource3"/>
  <!-- configure to store the message body and following headers as text in the repo -->
  <property name="storeBodyAsText" value="true"/>
  <property name="headersToStoreAsText">
    <list>
      <value>companyName</value>
      <value>accountName</value>
    </list>
  </property>
</bean>
Copy to Clipboard

141.17.1. codec (Serialization)

由于它们可以包含任何类型的有效载荷,因此交换不能按照设计序列化。它将转换为一个字节数组,以存储在数据库 BLOB 字段中。所有这些转换都由 JdbcCodec 类处理。代码的详情需要注意: ClassLoadingAwareObjectInputStream

ClassLoadingAwareObjectInputStream 已从 Apache ActiveMQ 项目中重复使用。它嵌套 ObjectInputStream,并将其与 ContextClassLoader 一起使用,而不是 当前的Thread。其好处在于能够加载由其他捆绑包公开的类。这允许交换正文和标头具有自定义类型对象引用。

141.17.2. transaction

需要 Spring PlatformTransactionManager 来编配事务。

141.17.2.1. 服务(Start/Stop)

start 方法验证数据库的连接以及所需的表是否存在。如果出现错误,它将在启动期间失败。

141.17.3. 聚合器配置

根据目标环境,聚合器可能需要一些配置。如您已经了解,每个聚合器应具有自己的存储库(在数据库中创建的对应表对)和数据源。如果默认的 lobHandler 没有适应您的数据库系统,它可以与 lobHandler 属性注入。

以下是 Oracle 的声明:

<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
  <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
</bean>
<bean id="nativeJdbcExtractor"
  class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
  class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="repositoryName" value="aggregation"/>
  <property name="dataSource" ref="dataSource"/>
  <!-- Only with Oracle, else use default -->
  <property name="lobHandler" ref="lobHandler"/>
</bean>
Copy to Clipboard

141.17.4. 最佳锁定

您可以在集群环境中打开 optimisticLocking,并在多个 Camel 应用程序为聚合存储库共享同一数据库的集群中使用此基于 JDBC 的聚合存储库。如果存在竞争条件,则 JDBC 驱动程序将抛出一个供应商特定异常,Jdb cAggregationRepository 可以响应。要了解导致 JDBC 驱动程序的例外情况被视为一个 optimistick locking 错误,我们需要一个映射程序来执行此操作。因此,有 org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper 允许您实施您的自定义逻辑(如果需要)。有默认的实现 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper,它的工作方式如下:

完成以下检查:

  • 如果导致异常是 SQLException,则如果以 23 开始,则会检查 SQLState。
  • 如果原因的例外是 DataIntegrityViolationException
  • 如果导致异常类名称的名称的名称中包含 "ConstraintViolation"。
  • 如果已经配置了任何类名称,则对 FQN 类名称进行可选检查。

您可以添加 FQN 类名,如果任何原因异常(或任何嵌套)等于任何 FQN 类名称,然后是最佳锁定错误。

下面是一个示例,我们从 JDBC 供应商定义 2 个额外的 FQN 类名称。

<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="repositoryName" value="aggregation"/>
  <property name="dataSource" ref="dataSource"/>
  <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean>
<!-- use the default mapper with extraFQN class names from our JDBC driver -->
<bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
  <property name="classNames">
    <util:set>
      <value>com.foo.sql.MyViolationExceptoion</value>
      <value>com.foo.sql.MyOtherViolationExceptoion</value>
    </util:set>
  </property>
</bean>
Copy to Clipboard

141.17.5. 传播行为

JdbcAggregationRepository 使用 Spring-TX 的两个不同 事务模板。一个是只读的,一个用于读写操作。

但是,当在自身使用 < transacted /> 的路由中使用 JdbcAggregationRepository 并且使用了通用平台 TransactionManager 时,可能需要配置 JdbcAggregationRepository 中使用的 传播行为

以下是实现这个目标的方法:

<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
</bean>
Copy to Clipboard

传播由 org.springframework.transaction.TransactionDefinition 接口的常量指定,因此 propagationBehaviorName 非常方便,允许使用常量名称。

141.17.6. PostgreSQL 问题单

存在特殊的数据库,可能会导致 JdbcAggregationRepository 使用的最佳锁定问题。如果数据完整性违反异常(SQLState 23505),PostgreSQL 会将连接标记为无效。这使得连接有效地在嵌套事务中不可用。详情可在 文档 中找到。

org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository 扩展 JdbcAggregationRepository,并使用特殊的 INSERT .ON CONFLICT .. 语句提供最佳锁定行为。

此声明是 (使用默认聚合表定义):

INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
Copy to Clipboard

详情可在 PostgreSQL 文档 中找到。

使用此子句时,java.sql.PreparedStatement.executeUpdate () 调用返回 0, 而不是抛出带有 SQLState=23505 的 SQLException。进一步的处理与通用的 JdbcAggregationRepository 完全相同,但没有将 PostgreSQL 连接标记为无效。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat