第165章 SQL Component


SQL Component

sql: コンポーネントを使用すると、JDBC クエリーを使用してデータベースを操作することができます。このコンポーネントと JDBC コンポーネントの違いは、SQL の場合、クエリーがエンドポイントのプロパティーであり、クエリーに渡されるパラメーターとしてメッセージペイロードを使用する点です。
このコンポーネントは、実際の SQL 処理の背後で spring-jdbc を使用します。
SQL コンポーネントは以下もサポートします。
  • Idempotent Consumer EIP パターンの JDBC ベースのリポジトリー。詳細は以下を参照してください。
  • Aggregator EIP パターンの JDBC ベースのリポジトリー。詳細は以下を参照してください。
重要
このコンポーネントは、Transactional Client として使用できます。

Camel on EAP デプロイメント

このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。このモデルの詳細は、Deploying into a Web Server の Apache Camel on JBoss EAP の章を参照してください
camel-cdi コンポーネントと組み合わせて使用すると、Java EE アノテーションは Camel でデータソースを利用できるようにします。以下の例は @Named アノテーションを使用して、Camel が必要なデータソースを特定できるようにします。
public class DatasourceProducer {
    @Resource(name = "java:jboss/datasources/ExampleDS")
    DataSource dataSource;

    @Produces
    @Named("wildFlyExampleDS")
    public DataSource getDataSource() {
        return dataSource;
    }
}
Copy to Clipboard Toggle word wrap
データソースは、以下のように camel-sql エンドポイント設定の dataSource パラメーターから参照できるようになりました。
@ApplicationScoped
@ContextName("camel-sql-cdi-context")
@Startup
public class CdiRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("sql:select name from information_schema.users?dataSource=wildFlyExampleDS")
        .to("direct:end");
    }
}
Copy to Clipboard Toggle word wrap

URI 形式

警告
Camel 2.11 以降では、このコンポーネントはコンシューマー( from()など)とプロデューサーエンドポイント(g など)の両方を作成できます。to()).以前のバージョンでは、プロデューサーとしてのみ動作できました。
SQL コンポーネントは、以下のエンドポイント URI 表記を使用します。
sql:select * from table where id=# order by name[?options]
Copy to Clipboard Toggle word wrap
Camel 2.11 以降では、以下のように #name_of_the_parameter スタイルを使用して名前付きパラメーターを使用できます。
sql:select * from table where id=:#myId order by name[?options]
Copy to Clipboard Toggle word wrap
名前付きパラメーターを使用する場合、Camel は、メッセージヘッダーから java.util.Map 2. の場合にメッセージボディーから名前を検索します。
名前付きパラメーターを解決できない場合は、例外が発生します。
Camel 2.14 以降では、以下のように Simple 式をパラメーターとして使用できます。
sql:select * from table where id=:#${property.myId} order by name[?options]
Copy to Clipboard Toggle word wrap
SQL クエリーへのパラメーターを示す標準の ? 記号は、エンドポイントのオプションの指定に ? 記号が使用されるため、# 記号に置き換えられることに注意してください。? 記号の置換は、エンドポイントベースで設定できます。
Camel 2.17 以降では、以下のように SQL クエリーをクラスパスまたはファイルシステムのファイルに外部化できます。
sql:classpath:sql/myquery.sql[?options]
Copy to Clipboard Toggle word wrap
myquery.sql ファイルはクラスパスにあり、プレーンテキストです。
-- this is a comment
select *
from table
where
  id = :#${property.myId}
order by
  name
Copy to Clipboard Toggle word wrap
このファイルでは、複数の行を使用し、SQL を必要に応じてフォーマットできます。また、 ダッシュ行などのコメントも使用します。
URI にクエリーオプションは ?option=value&option=value&.. の形式で追加できます。

オプション

Expand
オプション デフォルト 説明
batch boolean false Camel 2.7.5、2.8、および 2.9: SQL バッチ更新ステートメントを実行します。true に設定されている場合は、インバウンドメッセージボディーのハンドルがどのように変化するかについて以下の注記を参照してください。
dataSourceRef 文字列 null 非推奨および は Camel 3.0: レジストリーで検索するための DataSource への参照で削除され ます。代わりに dataSource=#theName を使用してください。
dataSource 文字列 null Camel 2.11: レジストリーで検索するための DataSource への参照。
placeholder 文字列 # Camel 2.4: SQL クエリーで ? に置き換えられる文字を指定します。単純な String.replaceAll ()操作であり、SQL 解析が関与していないことに注意してください(引用符で囲まれた文字列も変更されます)。
usePlaceholder boolean true Camel 2.17: SQL クエリーでプレースホルダーを使用し、すべてのプレースホルダー文字を ? 記号に置き換えるかどうかを設定します。
template.<xxx> null クエリーを実行するために背後で使用される Spring JdbcTemplate に追加のオプションを設定します。例: template.maxRows=10詳細なドキュメントは、JdbcTemplate javadoc のドキュメント を参照してください。
allowNamedParameters boolean true Camel 2.11: クエリーで名前付きパラメーターの使用を許可するかどうか。
processingStrategy Camel 2.11:SQL consumer only: コンシューマーが行/バッチを処理したときに、プラグインがカスタム org.apache.camel.component.sql.SqlProcessingStrategy を使用してクエリーを実行できます。
prepareStatementStrategy Camel 2.11: プラグインがカスタムの org.apache.camel.component.sql.SqlPrepareStatementStrategy を使用して、クエリーおよび準備済みステートメントの準備を制御できます。
consumer.delay long 500 Camel 2.11:SQL consumer only: 各ポーリング間の遅延(ミリ秒単位)。
consumer.initialDelay long 1000 Camel 2.11:SQL コンシューマーのみ: ポーリングの開始前の Milliseconds。
consumer.useFixedDelay boolean false Camel 2.11:SQL コンシューマーのみ: ポーリング間の固定遅延を使用するには true に設定します。それ以外の場合は、固定レートが使用されます。詳細は、JDK の ScheduledExecutorService を参照してください。
maxMessagesPerPoll int 0 Camel 2.11:SQL コンシューマーのみ: ポーリングごとに収集するメッセージの最大数を定義する整数値。デフォルトでは最大値は設定されていません。
useIterator boolean true Camel 2.11:SQL consumer only: ポーリングが個別に処理されるときに返された各行が true の場合。false の場合、データの java.util.List 全体が IN ボディーとして設定されます。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
routeEmptyResultSet boolean false Camel 2.11:SQL コンシューマーのみ: ポーリングするデータがない場合に、単一の空の エクスチェンジ をルーティングするかどうか。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
onConsume 文字列 null Camel 2.11:SQL consumer only: 各行を処理した後、エクスチェンジ が正常に処理された場合、このクエリーを実行できます。たとえば、行を processed とマークします。クエリーには パラメーターを指定できます。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
onConsumeFailed 文字列 null Camel 2.11:SQL consumer only: 各行の処理後、エクスチェンジ が失敗した場合にこのクエリーを実行できます。たとえば、行が失敗したとマークされます。クエリーには パラメーターを指定できます。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
onConsumeBatchComplete 文字列 null Camel 2.11:SQL コンシューマーのみ: バッチ全体の処理後、このクエリーを実行して行を一括更新することができます。クエリーにはパラメーターを指定できません。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
expectedUpdateCount int \-1 Camel 2.11:SQL コンシューマーのみ: consumer.onConsume を使用する場合、このオプションを使用して、更新される予想される行数を設定できます。通常、これを 1 に設定すると 1 行が更新されることが予想されます。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
breakBatchOnConsumeFail boolean false Camel 2.11:SQL コンシューマーのみ: consumer.onConsume を使用し、失敗すると、このオプションはバッチを分割するか、バッチからの次の行の処理を継続するかどうかを制御します。Camel 2.15.x 以前では、このオプションの前に consumer. 接頭辞を付ける必要があります(例: consumer.useIterator=true
alwaysPopulateStatement boolean false Camel 2.11:SQL producer only: 有効にすると、org.apache.camel.component.sql.SqlPrepareStatementStrategypopulateStatement メソッドが常に呼び出されます(準備が想定されるパラメーターがない場合も同様です)。これが false の場合、1 つ以上の想定されるパラメーターが設定されている場合にのみ populateStatement が呼び出されます。たとえば、パラメーターなしの SQL クエリーのメッセージボディー/ヘッダーの読み取りを回避します。
separator char , Camel 2.11.1: パラメーター値がメッセージボディー(ボディーが String 型である場合)から取得される場合に使用する区切り文字。# プレースホルダーに挿入されます。名前付きパラメーターを使用する場合は、代わりに Map タイプが使用されることに注意してください。
outputType 文字列 SelectList
Camel 2.12.0: コンシューマーまたはプロデューサーの出力を Map の List に、SelectOne を単一の Java オブジェクトとして作成する(a)クエリーに列が 1 つしかない場合は、その JDBC Column オブジェクトが返されます。SELECT COUNT (*) FROM PROJECT はLong オブジェクトを返します。b (クエリーに複数の列がある場合) その結果の Map を返します。c) outputClass が設定されている場合には、列名に一致するすべてのセッターを呼び出すことで、クエリー結果を Java Bean オブジェクトに変換します。クラスにインスタンスを作成するデフォルトのコンストラクターがあるとします。d)クエリーによって複数の行が発生した場合は、一意でない結果例外が出力されます。
Camel 2.14.1 以降では、SelectList は、SelectOne として各行の Java オブジェクトへのマッピングもサポートしています(step c のみ)。
outputClass 文字列 null Camel 2.12.0: outputType=SelectOne の場合に変換として使用する完全なパッケージおよびクラス名を指定します。
outputHeader 文字列 null Camel 2.15: メッセージのボディーではなくヘッダーとして結果を保存します。これにより、既存のメッセージボディーをそのまま保存できます。
parametersCount int 0 Camel 2.11.2/2.12.0 がゼロより大きい場合、Camel はこの数のパラメーターを使用でき、JDBC メタデータ API 経由でクエリーするのではなく、置き換えられます。これは、JDBC ベンダーが正しいパラメーター数を返すことができない場合に、代わりに上書きできる場合に便利です。
noop boolean false Camel 2.12.0 が設定されている場合、は SQL クエリーの結果を無視し、既存の IN メッセージを処理継続の OUT メッセージとして使用します。
useMessageBodyForSql boolean false Camel 2.16: メッセージボディーを SQL として使用し、ヘッダーをパラメーターに使用するかどうか。このオプションが有効な場合には、uri の SQL は使用されません。その後、SQL パラメーターは、キー CamelSqlParameters を含むヘッダーに提供する必要があります。このオプションはプロデューサー専用です。
transacted boolean false Camel 2.16.2: SQL consumer only: トランザクションを有効または無効にします。有効にすると、エクスチェンジの処理が失敗した場合、コンシューマーは追加のエクスチェンジを処理しなくなり、ロールバックの Eager が発生します。

メッセージボディーの処理

SQL コンポーネントはメッセージボディーを java.util.Iterator タイプのオブジェクトに変換しようとし、この iterator を使用してクエリーパラメーターを埋めます(各クエリーパラメーターはエンドポイント URI の # 記号または他の設定されたプレースホルダーで表されます)。メッセージボディーが配列またはコレクションではない場合、変換により、ボディー自体の 1 つのオブジェクトのみに対して反復する iterator が生成されます。
たとえば、メッセージボディーが java.util.List のインスタンスである場合、リストの最初の項目は SQL クエリーの最初の # に置き換えられます。
batchtrue に設定されている場合、インバウンドメッセージボディーの解釈はパラメーターのイテレーターではなく、若干変更されます。コンポーネントは、パラメーター iterators が含まれるイテレーターを想定し、外部イテレーターのサイズによってバッチサイズが決まります。
Camel 2.16 以降では、useMessageBodyForSql オプションを使用してメッセージボディーを SQL ステートメントとして使用し、SQL パラメーターをキー SqlConstants.SQL_PARAMETERS のヘッダーに指定する必要があります。SQL クエリーはメッセージボディーからのものであるため、SQL コンポーネントはより動的に機能します。

クエリーの結果

選択 操作の場合、結果は JdbcTemplate.queryFor List ()メソッドによって返される List <Map<String, Object >> タイプのインスタンスになります。更新 操作の場合、結果は更新された行数で、整数 として返され ます
デフォルトでは、結果はメッセージのボディーに配置されます。outputHeader パラメーターを設定すると、結果はヘッダーに配置されます。これは、完全な Message Enrichment パターンを使用してヘッダーを追加する代わりに、シーケンスやその他の小さい値をヘッダーにクエリーするための簡潔な構文を提供します。outputHeaderoutputType を一緒に使用すると便利です。以下に例を示します。
from("jms:order.inbox")
	.to("sql:select order_seq.nextval from dual?outputHeader=OrderId&outputType=SelectOne")
	.to("jms:order.booking");
Copy to Clipboard Toggle word wrap

ヘッダーの値

更新 操作を実行すると、SQL コンポーネントは以下のメッセージヘッダーに更新数を保存します。
Expand
ヘッダー 説明
CamelSqlUpdateCount Apache Camel 2.0: 更新操作用に更新 された行数。Integer オブジェクトとして返されます。
CamelSqlRowCount Apache Camel 2.0: Integer オブジェクトとして返される 選択 操作に対して返される行数。
CamelSqlQuery Camel 2.8: 実行するクエリー。このクエリーは、エンドポイント URI で指定されたクエリーよりも優先されます。ヘッダーのクエリーパラメーターは、# 記号で はなく ? で表示さ れることに注意してください。
insert 操作の実行時に、SQL コンポーネントは生成されたキーとこれらの行の数を以下のメッセージヘッダーに格納します(Camel 2.12.4、2.13.1 で利用可能)。
Expand
ヘッダー
説明
CamelSqlGeneratedKeysRowCount
生成されたキーが含まれるヘッダーの行数。
CamelSqlGeneratedKeyRows
生成されたキー(キーのマップの一覧)を含む行。

生成されるキー

Camel 2.12.4、2.13.1、および 2.14 から利用可能
SQL INSERT を使用してデータを挿入すると、----------|----- は自動生成されたキーをサポートする可能性があります。SQL プロデューサーに、ヘッダーで生成されたキーを返すように指示できます。これには、ヘッダー CamelSqlRetrieveGeneratedKeys=true を設定します。次に、生成された鍵が上記の表に記載されているキーが含まれるヘッダーとして提供されます。
詳細は、この ユニットテスト を参照してください。

設定

URI の DataSource への参照を直接設定できるようになりました。
sql:select * from table where id=# order by name?dataSourceRef=myDS
Copy to Clipboard Toggle word wrap

以下の例では、クエリーを実行し、結果を List of rows として取得します。各行は Map<String, Object で、キーは列名になります。
まず、サンプルに使用するテーブルを設定します。これはユニットテストに基づいているため、java コードを実行します。
// this is the database we create with some initial data for our unit test
db = new EmbeddedDatabaseBuilder()
    .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
Copy to Clipboard Toggle word wrap
SQL スクリプト createAndPopulateDatabase.sql は、以下のように実行します。
create table projects (id integer primary key, project varchar(10), license varchar(5));
insert into projects values (1, 'Camel', 'ASF');
insert into projects values (2, 'AMQ', 'ASF');
insert into projects values (3, 'Linux', 'XXX');
Copy to Clipboard Toggle word wrap
次に、ルートと sql コンポーネントを設定します。sql エンドポイントの前に ダイレクト エンドポイントを使用することに注意してください。これにより、URI direct:simple を使用して direct エンドポイントにエクスチェンジを送信できます。これは、クライアントが長い sql: URI よりも簡単に使用できます。DataSource はレジストリーで検索されるため、標準の Spring XML を使用して DataSource を設定できます。
from("direct:simple")
    .to("sql:select * from projects where license = # order by id?dataSourceRef=jdbc/myDataSource")
    .to("mock:result");
Copy to Clipboard Toggle word wrap
次に、メッセージをデータベースをクエリーする sql コンポーネントにルーティングする direct エンドポイントに実行します。
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);

// send the query to direct that will route it to the sql where we will execute the query
// and bind the parameters with the data from the body. The body only contains one value
// in this case (XXX) but if we should use multi values then the body will be iterated
// so we could supply a List<String> instead containing each binding value.
template.sendBody("direct:simple", "XXX");

mock.assertIsSatisfied();

// the result is a List
List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody());

// and each row in the list is a Map
Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));

// and we should be able the get the project from the map that should be Linux
assertEquals("Linux", row.get("PROJECT"));
Copy to Clipboard Toggle word wrap
Spring XML の DataSource を以下のように設定できます。
 <jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
Copy to Clipboard Toggle word wrap

名前付きパラメーターの使用

Camel 2.11 から利用可能
以下のルートでは、Projects テーブルからすべてのプロジェクトを取得します。SQL クエリーには、:#lic と :#min の 2 つの名前付きパラメーターがあることに注意してください。その後、Camel はメッセージボディーまたはメッセージヘッダーからこれらのパラメーターを検索します。上記の例では、名前付きパラメーターに定数値で 2 つのヘッダーを設定することに注意してください。
   from("direct:projects")
     .setHeader("lic", constant("ASF"))
     .setHeader("min", constant(123))
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")
Copy to Clipboard Toggle word wrap
メッセージボディーが java.util.Map の場合、名前付きパラメーターはボディーから取得されます。
   from("direct:projects")
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")
Copy to Clipboard Toggle word wrap

式パラメーターの使用

Camel 2.14 から利用可能
以下のルートでは、データベースからすべてのプロジェクトを取得します。ライセンスを定義するためにエクスチェンジのボディーを使用し、2 番目のパラメーターとしてプロパティーの値を使用します。
from("direct:projects")
  .setBody(constant("ASF"))
  .setProperty("min", constant(123))
  .to("sql:select * from projects where license = :#${body} and id > :#${property.min} order by id")
Copy to Clipboard Toggle word wrap

動的値による IN クエリーの使用

Camel 2.17 以降では、SQL プロデューサーは、IN ステートメントで SQL クエリーを使用できます。ここで、IN 値は動的に計算されます。たとえば、メッセージボディーやヘッダーなどから。
IN を使用するには、以下を行う必要があります。
  • パラメーター名の前に in: および を付けます。
  • パラメーターの前後に ( ) を追加します。
たとえば、以下のクエリーが使用されるとします。
-- this is a comment
select *
from projects
where project in (:#in:names)
order by id
Copy to Clipboard Toggle word wrap
以下のルートでは、以下のようになります。
from("direct:query")
    .to("sql:classpath:sql/selectProjectsIn.sql")
    .to("log:query")
    .to("mock:query");
Copy to Clipboard Toggle word wrap
次に、IN クエリーは、以下のような動的な値を持つキー名にヘッダーを使用できます。
// use an array
template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"});


// use a list
List<String> names = new ArrayList<String>();
names.add("Camel");
names.add("AMQ");

template.requestBodyAndHeader("direct:query", "Hi there!", "names", names);


// use a string separated values with comma
template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ");
Copy to Clipboard Toggle word wrap
クエリーは、外部化されるのではなく、エンドポイントで指定することもできます(外部化により SQL クエリーのメンテナーンスが容易になることに注意してください)。
from("direct:query")
    .to("sql:select * from projects where project in (:#in:names) order by id")
    .to("log:query")
    .to("mock:query");
Copy to Clipboard Toggle word wrap

JDBC ベースのべき等リポジトリーの使用

このセクションでは、JDBC ベースのべき等リポジトリーを使用します。
抽象クラス
Camel 2.9 以降では、org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository の抽象クラスがあり、カスタム JDBC べき等リポジトリーをビルドできます。
まず、べき等リポジトリーで使用されるデータベーステーブルを作成する必要があります。
Camel 2.8 では、createdAt 列を追加しました。
 CREATE TABLE CAMEL_MESSAGEPROCESSED (
   processorName VARCHAR(255),
   messageId VARCHAR(100),
   createdAt TIMESTAMP
 )
Copy to Clipboard Toggle word wrap
警告
SQL Server TIMESTAMP タイプは、固定長の binary-string タイプです。JDBC 時間タイプ( DATETIME、または TIMESTAMP )にはマップされません。
processorName および messageId 列に対して、一意の制約を使用することが推奨されます。この制約の構文はデータベースとデータベースごとに異なるため、ここには表示されません。
次に、spring XML ファイルで javax.sql.DataSource を設定する必要があります。
<jdbc:embedded-database id="dataSource" type="DERBY" />
Copy to Clipboard Toggle word wrap
最後に、spring XML ファイルで JDBC べき等リポジトリーを作成することもできます。
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>
	
	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
</camel:camelContext>
Copy to Clipboard Toggle word wrap

JdbcMessageIdRepository のカスタマイズ

Camel 2.9.1 以降では、ニーズに合わせて org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository を調整するオプションがいくつかあります。
Expand
パラメーター デフォルト値 説明
createTableIfNotExists true Camel が存在しない場合に Camel がテーブルの作成を試行するかどうかを定義します。
tableExistsString SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 このクエリーは、テーブルがすでに存在しているかどうかを確認するために使用されます。テーブルが存在しないことを示す例外を出力する必要があります。
createString CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR (255), messageId VARCHAR (100), createdAt TIMESTAMP) テーブルの作成に使用されるステートメント。
queryString SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? メッセージがすでにリポジトリーに存在するかどうかを確認するために使用されるクエリー(結果は 0 と等しくありません)。2 つのパラメーターを取ります。最初の 1 つはプロセッサー名()で、2 つ目はメッセージ ID (文字列)です。
insertString INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?) テーブルにエントリーを追加するために使用される ステートメント。3 つのパラメーターを取ります。1 つ目はプロセッサー名(文字列)で、2 つ目はメッセージ ID (文字列)で、3 つ目は、このエントリーがリポジトリーに追加されたときのタイムスタンプ(java.sql.Timestamp)です。
deleteString DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?AND messageId = ? データベースからエントリーを削除するために使用されるステートメント。2 つのパラメーターを取ります。最初の 1 つはプロセッサー名()で、2 つ目はメッセージ ID (文字列)です。
カスタマイズした org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository は以下のようになります。
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
	<property name="tableExistsString" value="SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0" />
	<property name="createString" value="CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)" />
	<property name="queryString" value="SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" />
	<property name="insertString" value="INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)" />
	<property name="deleteString" value="DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" />
</bean>
Copy to Clipboard Toggle word wrap

JDBC ベースの集約リポジトリーの使用

Camel 2.6 以降で利用可能
Camel 2.6 での JdbcAggregationRepository の使用
Camel 2.6 では、JdbcAggregationRepository は camel-jdbc-aggregator コンポーネントで提供されます。Camel 2.7 以降、JdbcAggregationRepositorycamel-sql コンポーネントで提供されます。
JdbcAggregationRepositoryAggregationRepository で、オンザフライで集約されたメッセージを永続化します。これにより、デフォルトのアグリゲーターはメモリーの AggregationRepository のみを使用するので、メッセージを失わないようにします。JdbcAggregationRepository を使用すると、Camel とともに Aggregator の永続的なサポートを提供できます。
これには、以下のオプションがあります。
Expand
オプション タイプ 説明
dataSource DataSource 必須: データベースへのアクセスに使用する javax.sql.DataSource
repositoryName 文字列 必須: リポジトリーの名前。
transactionManager TransactionManager 必須: データベースのトランザクションを管理する org.springframework.transaction.PlatformTransactionManager。TransactionManager はデータベースをサポートできる必要があります。
lobHandler LobHandler データベースの Lob タイプを処理する org.springframework.jdbc.support.lob.LobHandler。Oracle を使用する場合など、ベンダー固有の LobHandler を使用するには、このオプションを使用します。
returnOldExchange boolean get 操作が存在する場合は、get 操作によって古い既存のエクスチェンジが返されるかどうか。デフォルトでは、集計時に古いエクスチェンジを必要としないため、このオプションは false で最適化されます。
useRecovery boolean リカバリーが有効になっているかどうか。このオプションは、デフォルトで true です。有効にすると、Camel Aggregator は自動的に集約されたエクスチェンジをリカバリーし、再提出します。
recoveryInterval long recovery が有効になっている場合、バックグラウンドタスクは x 度ごとに実行され、失敗したエクスチェンジをスキャンしてリカバリーし、再送信します。デフォルトでは、この間隔は 5000 ミリ秒です。
maximumRedeliveries int リカバリーされたエクスチェンジの再配信試行の最大数を制限できます。有効にすると、すべての再配信試行に失敗すると、エクスチェンジはデッドレターチャネルに移動します。デフォルトでは、このオプションは無効です。このオプションを使用する場合は、deadLetterUri オプションも指定する必要があります。
deadLetterUri 文字列 使い切られるリカバリーされたエクスチェンジが移動される Dead Letter Channel のエンドポイント URI。このオプションを使用する場合は、maximumRedeliveries オプションも指定する必要があります。
storeBodyAsText boolean Camel 2.11: メッセージボディーを文字列として格納するかどうか(人間が読める)。デフォルトでは、このオプション ボディーをバイナリー形式で格納します。
headersToStoreAsText List<String> Camel 2.11: ヘッダーを文字列として保存し、人間が判読できる文字列として保存できます。デフォルトでは、このオプションは無効になっており、ヘッダーをバイナリー形式で保存します。
optimisticLocking false Camel 2.12: 複数の Camel アプリケーションが同じ JDBC ベースの集約リポジトリーを共有するクラスター化された環境で必要となる楽観的ロックを有効にします。
jdbcOptimisticLockingExceptionMapper Camel 2.12: カスタムの org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper をプラグインして、Camel が再試行できるようにベンダー固有のエラーコードを optimistick ロックエラーにマップできるようにします。これには、optimisticLocking を有効にする必要があります。

永続化時に保持される内容

JdbcAggregationRepository は、Serializable と互換性のあるデータタイプのみを保持します。データ型がそのようなタイプの場合はドロップされ、WARN がログに記録されます。また、メッセージ本文とメッセージ ヘッダーのみを保持 ます。Exchange プロパティーは永続化され ません
Camel 2.11 以降では、メッセージボディーを保存し、ヘッダーを String として個別の列に選択します。

復元

JdbcAggregationRepository はデフォルトで失敗したエクスチェンジ を復元し ます。これは、永続ストアで失敗した エクスチェンジをスキャンするバックグラウンドタスクを持つことで行われます。checkInterval オプションを使用して、このタスクの実行頻度を設定できます。リカバリーはトランザクションとして機能し、Camel が失敗したエクスチェンジのリカバリーおよび再配信を試み ます。リカバリー れたエクスチェンジは永続ストアから復元され、再送信されて再度送信されます。
エクスチェンジ のリカバリー/再配信時に、以下のヘッダーが設定されます。
Expand
ヘッダー タイプ 説明
Exchange.REDELIVERED ブール値 エクスチェンジ が再配信されていることを示すために が true に設定されます。
Exchange.REDELIVERY_COUNTER 整数 1 から始まる再配信の試行。
エクスチェンジ が正常に処理された場合のみ、完了とマークされます。これは、AggregationRepositoryconfirm メソッドが呼び出されたときに発生します。つまり、同じ エクスチェンジ が再び失敗すると、成功するまで再試行されます。
maximumRedeliveries オプションを使用して、特定のリカバリーエクスチェンジの再配信試行の最大数を制限でき ますmaximumRedeliveries に達したときに エクスチェンジ を送信する場所を Camel が認識できるように deadLetterUri オプションも設定する必要があります。
camel-sql のユニットテストの例の一部を確認できます(例: このテスト )。

データベース

各アグリゲーターは集約と完了した 2 つのテーブルを使用します。慣例により、完了により、接尾辞が _COMPLETED の集約と同じ名前が付けられます。名前は Spring Bean で RepositoryName プロパティーで設定する必要があります。以下の例では、集約が使用されます。
両方のテーブルのテーブル構造定義は同一です。いずれの場合も、String 値がキー()として使用されますが、Blob にはバイトアレイでシリアライズされたエクスチェンジが含まれます。ただし、1 つの違いを覚えておくべきです。id フィールドはテーブルに応じて同じコンテンツを持つことができません。集約テーブル ID は、メッセージ 集約するためにコンポーネントによって使用される相関 ID を保持します。完了したテーブルでは、id は、対応する Blob フィールドに保存されるエクスチェンジの ID を保持します。
以下は、テーブルの作成に使用する SQL クエリーです。aggregation はアグリゲーターリポジトリー名に置き換えます。
CREATE TABLE aggregation (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE aggregation_completed (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_completed_pk PRIMARY KEY (id)
);
Copy to Clipboard Toggle word wrap

ボディーとヘッダーをテキストとして保存

Camel 2.11 から利用可能
JdbcAggregationRepository を設定してメッセージボディーを保存し、ヘッダーを別の列に String として選択できます。たとえば、ボディーと以下の 2 つのヘッダー companyName および accountName を保存するには、以下の SQL を使用します。
CREATE TABLE aggregationRepo3 (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    body varchar(1000),
    companyName varchar(1000),
    accountName varchar(1000),
    constraint aggregationRepo3_pk PRIMARY KEY (id)
);
CREATE TABLE aggregationRepo3_completed (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    body varchar(1000),
    companyName varchar(1000),
    accountName varchar(1000),
    constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);
Copy to Clipboard Toggle word wrap
次に、以下のようにこの動作を有効にするようにリポジトリーを設定します。
    <bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
        <property name="repositoryName" value="aggregationRepo3"/>
        <property name="transactionManager" ref="txManager3"/>
        <property name="dataSource" ref="dataSource3"/>
        <!-- configure to store the message body and following headers as text in the repo -->
        <property name="storeBodyAsText" value="true"/>
        <property name="headersToStoreAsText">
          <list>
      	    <value>companyName</value>
    	    <value>accountName</value>
          </list>
        </property>
    </bean>
Copy to Clipboard Toggle word wrap

Codec (Serialization)

任意のタイプのペイロードを含めることができるため、エクスチェンジは設計によってシリアライズできません。データベース BLOB フィールドに保存されるバイトアレイに変換されます。これらの変換はすべて JdbcCodec クラスによって処理されます。コードの詳細の 1 つである ClassLoadingAwareObjectInputStream に注意してください。
ClassLoadingAwareObjectInputStreamApache ActiveMQ プロジェクトから再利用されました。ObjectInputStream をラップし、currentThread ではなく ContextClassLoader で使用します。この利点は、他のバンドルによって公開されるクラスをロードできることです。これにより、エクスチェンジボディーとヘッダーにカスタム型オブジェクトの参照を設定できます。

Transaction

トランザクションのオーケストレーションには、Spring PlatformTransactionManager が必要です。

サービス(開始/停止)

start メソッドは、データベースのコネクションと必要なテーブルが存在することを確認します。何らかの誤りがある場合は、起動時に失敗します。

Aggregator の設定

対象の環境によっては、Aggregator の設定が必要になる場合があります。ご存知のように、各アグリゲーターには独自のリポジトリー(データベースに作成された対応するテーブルのペアを含む)とデータソースが必要です。デフォルトの lobHandler がデータベースシステムに適応しない場合は、lobHandler プロパティーでインジェクトできます。
以下は Oracle の宣言です。
    <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
        <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
    </bean>

    <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>

    <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
        <property name="transactionManager" ref="transactionManager"/>
        <property name="repositoryName" value="aggregation"/>
        <property name="dataSource" ref="dataSource"/>
        <!-- Only with Oracle, else use default -->
        <property name="lobHandler" ref="lobHandler"/>
    </bean>
Copy to Clipboard Toggle word wrap

Optimistic locking

Camel 2.12 以降では、optimisticLocking をオンにし、複数の Camel アプリケーションが集約リポジトリーに対して同じデータベースを共有するクラスター環境でこの JDBC ベースの集約リポジトリーを使用できます。競合状態がある場合、JDBC ドライバーは JdbcAggregationRepository が応答できるベンダー固有の例外を出力します。JDBC ドライバーからの例外が optimistick ロックエラーと見なされるかを知るには、マッパーが必要です。そのため、org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper があり、必要に応じてカスタムロジックを実装できます。デフォルトの実装 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper は以下のように機能します。
以下のチェックが行われます。
  • 原因となった例外が SQLException の場合、SQLState は 23 で始まる場合にチェックされます。
  • 発生した例外が DataIntegrityViolationExceptionの場合
  • 発生した例外クラス名の名前が ConstraintViolation である場合。
  • クラス名が設定されている場合、FQN クラス名が一致するためのオプションチェック
さらに FQN クラス名を追加できます。発生した例外(またはネストされた)のいずれかが FQN クラス名と一致する場合は、optimistick locking エラーになります。
以下の例は、JDBC ベンダーから 2 つの追加の FQN クラス名を定義します。
    <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
        <property name="transactionManager" ref="transactionManager"/>
        <property name="repositoryName" value="aggregation"/>
        <property name="dataSource" ref="dataSource"/>
        <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
    </bean>

    <!-- use the default mapper with extra FQN class names from our JDBC driver -->
    <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
      <property name="classNames">
        <util:set>
          <value>com.foo.sql.MyViolationExceptoion</value>
          <value>com.foo.sql.MyOtherViolationExceptoion</value>
        </util:set>
      </property>
    </bean>
Copy to Clipboard Toggle word wrap
以下も参照してください。
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat