304.10. JDBC ベースのべき等リポジトリーの使用
Camel 2.7 で利用可能: このセクションでは、JDBC ベースのべき等リポジトリーを使用し ます。
TIP:*Abstract class* From Camel 2.9 以降では、カスタム JDBC のべき等リポジトリーを作成するために拡張できる抽象クラス org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository
があります。
まず、べき等リポジトリーによって使用されるデータベーステーブルを作成する必要があります。Camel 2.7 では、以下のスキーマを使用します。
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) )
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100) )
Camel 2.8 では、作成されたAt 列を追加しました。
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP )
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP )
警告: SQL Server TIMESTAMP タイプは、固定長のバイナリー文字列タイプです。JDBC 時間タイプ DATE、TIME、または TIMESTAMP にはマッピングされません。
JdbcMessageIdRepository のカスタマイズ
Camel 2.9.1 以降、必要に応じて org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
を調整するいくつかのオプションがあります。
パラメーター | デフォルト値 | 説明 |
---|---|---|
createTableIfNotExists | true | Camel が存在しない場合に、テーブルを作成しようとするかどうかを定義します。 |
tableExistsString | CAMEL_MESSAGEPROCESSED から 1 を選択します。ここで、1 = 0 | このクエリーは、テーブルがすでに存在しているかどうかを確認するために使用されます。テーブルが存在しないことを示すために例外をスローする必要があります。 |
createString | CREATE TABLE CAMEL_MESSAGEPROCESSED(processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP) | テーブルの作成に使用されるステートメント。 |
queryString | SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? |
メッセージがリポジトリーにすでに存在するかどうかを判断するために使用されるクエリー(結果は「0」と等しくありません)。これは 2 つのパラメーターを取ります。この最初の 1 つはプロセッサー名( |
insertString | INSERT INTO CAMEL_MESSAGEPROCESSED(processorName, messageId, createdAt)VALUES(?, ?, ?) |
テーブルにエントリーを追加するために使用されるステートメント。3 つのパラメーターを取ります。1 つ目はプロセッサー名( |
deleteString | DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? |
データベースからエントリーを削除するために使用されるステートメント。これには 2 つのパラメーターが使用されます。この最初の 1 つはプロセッサー名( |
JDBC ベースの集約リポジトリーの使用
Camel 2.6 で利用可能
INFO: Using JdbcAggregationRepository in Camel 2.6
Camel 2.6 では、JdbcAggregationRepository は camel-jdbc-aggregator
コンポーネントで提供されます。Camel 2.7 以降、JdbcAggregationRepository
は camel-sql
コンポーネントで提供されます。
JdbcAggregationRepository
は AggregationRepository
で、集約されたメッセージを即座に永続化します。これにより、デフォルトのアグリゲーターは AggregationRepository
のみのメモリーで使用されるため、メッセージを緩めることはありません。JdbcAggregationRepository
を使用すると、Camel とともに Aggregator の永続的なサポートを提供できます。
Exchange が正常に処理された場合にのみ、完了とマークされます。これは、確認
メソッドが AggregationRepository
で呼び出されると発生します。つまり、同じエクスチェンジが再び失敗すると、成功するまで再試行されます。
maximumRedeliveries
オプションを使用して、特定のリカバリーされたエクスチェンジの再配信の最大試行回数を制限できます。また、maximumRedeliveries
に達したときに Camel が Exchange を送信する場所を認識できるように deadLetterUri
オプションも設定する必要があります。
この テストなど、camel-sql のユニットテストにはいくつかの例があります。
データベース
稼働にするために、各Aggregator は集約と完了の 2 つのテーブルを使用します。規則により、完了した名前は 「_COMPLETED」
のサフィックスが付いた集約名と同じです。名前は、RepositoryName
プロパティーで Spring Bean で設定する必要があります。以下の例では、集約が使用されます。
両方のテーブルのテーブル構造の定義は同じです。いずれの場合も、String 値はキー(id)として使用されますが、B Blob にはバイトアレイでシリアライズされたエクスチェンジが含まれます。
ただし、1 つの違いに注意してください。id フィールドには、テーブルによっては同じコンテンツがありません。
集約テーブル ID では、メッセージを集約するためにコンポーネントによって使用される相関 ID を保持します。完了したテーブルの id は、対応する blob フィールドに保存されたエクスチェンジの ID を保持します。
これは、テーブルの作成に使用される SQL クエリーです。「aggregation」
を、お使いのアグリゲーターリポジトリー名に置き換えます。
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE
aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
本文とヘッダーのテキストとしての保存
Camel 2.11 から利用可能
JdbcAggregationRepository
を設定して、メッセージボディーを保存し、別の列で String として選択することができます。たとえば、本文を保存し、以下の 2 つのヘッダー companyName
および accountName
を保存するには、以下の SQL を使用します。
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob
NOT NULL, body varchar(1000), companyName varchar(1000), accountName
varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE
TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange
blob NOT NULL, body varchar(1000), companyName varchar(1000),
accountName varchar(1000), constraint aggregationRepo3_completed_pk
PRIMARY KEY (id) );
次に、以下のようにこの動作を有効にするためにリポジトリーを設定します。
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
<bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/> <property
name="transactionManager" ref="txManager3"/> <property name="dataSource"
ref="dataSource3"/> <!-- configure to store the message body and
following headers as text in the repo --> <property
name="storeBodyAsText" value="true"/> <property
name="headersToStoreAsText"> <list> <value>companyName</value>
<value>accountName</value> </list> </property> </bean>
Codec(シリアライズ)
どの種類のペイロードも含まれる可能性があるため、Exchange は設計でシリアライズできません。これは、データベース BLOB フィールドに保存されるバイト配列に変換されます。これらの変換はすべて JdbcCodec
クラスによって処理されます。コードの詳細は、ClassLoadingAwareObjectInputStream
に注意してください。
ClassLoadingAwareObjectInputStream
は Apache ActiveMQ プロジェクトから再利用されました。ObjectInputStream
をラップし、現在のThread
ではなく ContextClassLoader
で使用します。この利点は、他のバンドルによって公開されるクラスをロードできることです。これにより、エクスチェンジボディーおよびヘッダーにカスタムタイプオブジェクト参照を持たせることができます。
Transaction
トランザクションのオーケストレーションには Spring PlatformTransactionManager
が必要です。
サービス(Start/Stop)
start
メソッドは、データベースの接続と、必要なテーブルが存在することを確認します。何らかの問題が発生した場合は、起動時に失敗します。
Aggregator の設定
対象の環境によっては、Aggregator に一部の設定が必要になる場合があります。すでに分かるように、各アグリゲーターには、独自のリポジトリー(データベース内に作成されたテーブルのペアあり)とデータソースが必要です。デフォルトの lobHandler
がデータベースシステムに適応されない場合は、lobHandler プロパティーで挿入できます。
以下は、Oracle の宣言です。
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
<bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property
name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean
id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <!-- Only with Oracle, else use default --> <property
name="lobHandler" ref="lobHandler"/> </bean>
楽観的ロック
Camel 2.12 以降では、optistic Locking
をオンにし、複数の Camel アプリケーションが集約リポジトリーと同じデータベースを共有するクラスター環境でこの JDBC ベースの集約リポジトリーを使用できます。競合状態がある場合、JDBC ドライバーは、JdbcAggregationRepository
が反応できるベンダー固有の例外をスローします。JDBC ドライバーから発生した例外は、最適なロックエラーとして考慮されたかを知るには、マッパーが必要です。そのため、org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
を使用すると、必要に応じてカスタムロジックを実装できます。デフォルトの実装 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
は以下のようになります。
以下のチェックが行われます。
発生した例外が SQLException
の場合、SQLState は 23 で始まる場合にチェックされます。
発生した例外が DataIntegrityViolationException
の場合
原因となった例外クラス名が、名前に "ConstraintViolation" である場合。
FQN クラス名のオプションの確認は、クラス名が設定されているかどうかに一致します。
FQN クラス名を追加でき、原因となった例外(またはネストされたもの)が FQN クラス名のいずれにも等しい場合は、楽観的ロックエラーになります。
以下は、JDBC ベンダーから 2 つの追加の FQN クラス名を定義する例です。
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper"
ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra
FQN class names from our JDBC driver --> <bean id="myExceptionMapper"
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames"> <util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set>
</property> </bean>