304.10. JDBC ベースのべき等リポジトリーの使用


Camel 2.7 で利用可能: このセクションでは、JDBC ベースのべき等リポジトリーを使用し ます。

TIP:*Abstract class* From Camel 2.9 以降では、カスタム JDBC のべき等リポジトリーを作成するために拡張できる抽象クラス org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository があります。

 

まず、べき等リポジトリーによって使用されるデータベーステーブルを作成する必要があります。Camel 2.7 では、以下のスキーマを使用します。

Copy to Clipboard Toggle word wrap
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )

 

Camel 2.8 では、作成されたAt 列を追加しました。

Copy to Clipboard Toggle word wrap
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )

警告: SQL Server TIMESTAMP タイプは、固定長のバイナリー文字列タイプです。JDBC 時間タイプ DATETIME、または TIMESTAMP にはマッピングされません。

JdbcMessageIdRepository のカスタマイズ

Camel 2.9.1 以降、必要に応じて org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository を調整するいくつかのオプションがあります。

パラメーターデフォルト値説明

createTableIfNotExists

true

Camel が存在しない場合に、テーブルを作成しようとするかどうかを定義します。

tableExistsString

CAMEL_MESSAGEPROCESSED から 1 を選択します。ここで、1 = 0

このクエリーは、テーブルがすでに存在しているかどうかを確認するために使用されます。テーブルが存在しないことを示すために例外をスローする必要があります。

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED(processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)

テーブルの作成に使用されるステートメント。

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

メッセージがリポジトリーにすでに存在するかどうかを判断するために使用されるクエリー(結果は「0」と等しくありません)。これは 2 つのパラメーターを取ります。この最初の 1 つはプロセッサー名(文字列)で、2 つ目はメッセージ ID(文字列)です。

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED(processorName, messageId, createdAt)VALUES(?, ?, ?)

テーブルにエントリーを追加するために使用されるステートメント。3 つのパラメーターを取ります。1 つ目はプロセッサー名(文字列)で、2 つ目はメッセージ ID(文字列)で、3 つ目はこのエントリーがリポジトリーに追加される際のタイムスタンプ(java.sql.Timestamp)です。

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ?

データベースからエントリーを削除するために使用されるステートメント。これには 2 つのパラメーターが使用されます。この最初の 1 つはプロセッサー名(文字列)で、2 つ目はメッセージ ID(文字列)です。

JDBC ベースの集約リポジトリーの使用

Camel 2.6 で利用可能

INFO: Using JdbcAggregationRepository in Camel 2.6

Camel 2.6 では、JdbcAggregationRepository は camel-jdbc-aggregator コンポーネントで提供されます。Camel 2.7 以降、JdbcAggregationRepositorycamel-sql コンポーネントで提供されます。

JdbcAggregationRepositoryAggregationRepository で、集約されたメッセージを即座に永続化します。これにより、デフォルトのアグリゲーターは AggregationRepository のみのメモリーで使用されるため、メッセージを緩めることはありません。JdbcAggregationRepository を使用すると、Camel とともに Aggregator の永続的なサポートを提供できます。

Exchange が正常に処理された場合にのみ、完了とマークされます。これは、確認 メソッドが AggregationRepository で呼び出されると発生します。つまり、同じエクスチェンジが再び失敗すると、成功するまで再試行されます。

maximumRedeliveries オプションを使用して、特定のリカバリーされたエクスチェンジの再配信の最大試行回数を制限できます。また、maximumRedeliveries に達したときに Camel が Exchange を送信する場所を認識できるように deadLetterUri オプションも設定する必要があります。

この テストなど、camel-sql のユニットテストにはいくつかの例があります。

データベース

稼働にするために、各Aggregator は集約と完了の 2 つのテーブルを使用します。規則により、完了した名前は 「_COMPLETED」 のサフィックスが付いた集約名と同じです。名前は、RepositoryName プロパティーで Spring Bean で設定する必要があります。以下の例では、集約が使用されます。

両方のテーブルのテーブル構造の定義は同じです。いずれの場合も、String 値はキー(id)として使用されますが、B Blob にはバイトアレイでシリアライズされたエクスチェンジが含まれます。
ただし、1 つの違いに注意してください。id フィールドには、テーブルによっては同じコンテンツがありません。
集約テーブル ID では、メッセージを集約するためにコンポーネントによって使用される相関 ID を保持します。完了したテーブルの id は、対応する blob フィールドに保存されたエクスチェンジの ID を保持します。

これは、テーブルの作成に使用される SQL クエリーです。「aggregation」 を、お使いのアグリゲーターリポジトリー名に置き換えます。

Copy to Clipboard Toggle word wrap
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE
aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );

本文とヘッダーのテキストとしての保存

Camel 2.11 から利用可能

JdbcAggregationRepository を設定して、メッセージボディーを保存し、別の列で String として選択することができます。たとえば、本文を保存し、以下の 2 つのヘッダー companyName および accountName を保存するには、以下の SQL を使用します。

Copy to Clipboard Toggle word wrap
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob
NOT NULL, body varchar(1000), companyName varchar(1000), accountName
varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE
TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange
blob NOT NULL, body varchar(1000), companyName varchar(1000),
accountName varchar(1000), constraint aggregationRepo3_completed_pk
PRIMARY KEY (id) );

 

次に、以下のようにこの動作を有効にするためにリポジトリーを設定します。

Copy to Clipboard Toggle word wrap
<bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/> <property
name="transactionManager" ref="txManager3"/> <property name="dataSource"
ref="dataSource3"/> <!-- configure to store the message body and
following headers as text in the repo --> <property
name="storeBodyAsText" value="true"/> <property
name="headersToStoreAsText"> <list> <value>companyName</value>
<value>accountName</value> </list> </property> </bean>

Codec(シリアライズ)

どの種類のペイロードも含まれる可能性があるため、Exchange は設計でシリアライズできません。これは、データベース BLOB フィールドに保存されるバイト配列に変換されます。これらの変換はすべて JdbcCodec クラスによって処理されます。コードの詳細は、ClassLoadingAwareObjectInputStream に注意してください。

ClassLoadingAwareObjectInputStreamApache ActiveMQ プロジェクトから再利用されました。ObjectInputStream をラップし、現在のThread ではなく ContextClassLoader で使用します。この利点は、他のバンドルによって公開されるクラスをロードできることです。これにより、エクスチェンジボディーおよびヘッダーにカスタムタイプオブジェクト参照を持たせることができます。

Transaction

トランザクションのオーケストレーションには Spring PlatformTransactionManager が必要です。

サービス(Start/Stop)

start メソッドは、データベースの接続と、必要なテーブルが存在することを確認します。何らかの問題が発生した場合は、起動時に失敗します。

Aggregator の設定

対象の環境によっては、Aggregator に一部の設定が必要になる場合があります。すでに分かるように、各アグリゲーターには、独自のリポジトリー(データベース内に作成されたテーブルのペアあり)とデータソースが必要です。デフォルトの lobHandler がデータベースシステムに適応されない場合は、lobHandler プロパティーで挿入できます。

以下は、Oracle の宣言です。

Copy to Clipboard Toggle word wrap
<bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property
name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean
id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <!-- Only with Oracle, else use default --> <property
name="lobHandler" ref="lobHandler"/> </bean>

  楽観的ロック

Camel 2.12 以降では、optistic Locking をオンにし、複数の Camel アプリケーションが集約リポジトリーと同じデータベースを共有するクラスター環境でこの JDBC ベースの集約リポジトリーを使用できます。競合状態がある場合、JDBC ドライバーは、JdbcAggregationRepository が反応できるベンダー固有の例外をスローします。JDBC ドライバーから発生した例外は、最適なロックエラーとして考慮されたかを知るには、マッパーが必要です。そのため、org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper を使用すると、必要に応じてカスタムロジックを実装できます。デフォルトの実装 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper は以下のようになります。

以下のチェックが行われます。

発生した例外が SQLException の場合、SQLState は 23 で始まる場合にチェックされます。

発生した例外が DataIntegrityViolationExceptionの場合

原因となった例外クラス名が、名前に "ConstraintViolation" である場合。

FQN クラス名のオプションの確認は、クラス名が設定されているかどうかに一致します。

FQN クラス名を追加でき、原因となった例外(またはネストされたもの)が FQN クラス名のいずれにも等しい場合は、楽観的ロックエラーになります。

以下は、JDBC ベンダーから 2 つの追加の FQN クラス名を定義する例です。

Copy to Clipboard Toggle word wrap
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper"
ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra
FQN class names from our JDBC driver --> <bean id="myExceptionMapper"
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames"> <util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set>
</property> </bean>
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.