Chapter 5. Transaction Demarcation
Abstract
Transaction demarcation refers to the procedures for starting, committing, and rolling back transactions. This chapter describes the mechanisms that are available for controlling transaction demarcation, both by programming and by configuration.
5.1. Demarcation by Marking the Route
Overview
Apache Camel provides a simple mechanism for initiating a transaction in a route, by inserting the
transacted()
command in the Java DSL or by inserting the <transacted/>
tag in the XML DSL.
Sample route with JDBC resource
Figure 5.1, “Demarcation by Marking the Route” shows an example of a route that is made transactional by adding the
transacted()
DSL command to the route. All of the route nodes following the transacted()
node are included in the transaction scope. In this example, the two following nodes access a JDBC resource.
Figure 5.1. Demarcation by Marking the Route
The
transacted
processor demarcates transactions as follows: when an exchange enters the transacted
processor, the transacted
processor invokes the default transaction manager to begin a transaction (attaching it to the current thread); when the exchange reaches the end of the remaining route, the transacted
processor invokes the transaction manager to commit the current transaction.
Route definition in Java DSL
The following Java DSL example shows how to define a transactional route by marking the route with the
transacted()
DSL command:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}
In this example, the
file
endpoint reads some files in XML format that describe a transfer of funds from one account to another. The first beanRef()
invocation credits the specified sum of money to the beneficiary's account and then the second beanRef()
invocation subtracts the specified sum of money from the sender's account. Both of the beanRef()
invocations cause updates to be made to a database resource, which we are assuming is bound to the transaction through the transaction manager (for example, see Section 2.6.1, “JDBC Data Source”). For a sample implementation of the accountService
bean, see Section 4.2, “Spring JDBC Template”.
Using SpringRouteBuilder
The
beanRef()
Java DSL command is available only in the SpringRouteBuilder
class. It enables you to reference a bean by specifying the bean's Spring registry ID (for example, accountService
). If you do not use the beanRef()
command, you could inherit from the org.apache.camel.builder.RouteBuilder
class instead.
Route definition in Spring XML
The preceding route can equivalently be expressed in Spring XML, where the
<transacted/>
tag is used to mark the route as transactional, as follows:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... > <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file:src/data?noop=true"/> <transacted/> <bean ref="accountService" method="credit"/> <bean ref="accountService" method="debit"/> </route> </camelContext> </beans>
Default transaction manager and transacted policy
To demarcate transactions, the
transacted
processor must be associated with a particular transaction manager instance. To save you having to specify the transaction manager every time you invoke transacted()
, the transacted
processor automatically picks a sensible default. For example, if there is only one instance of a transaction manager in your Spring configuration, the transacted
processor implicitly picks this transaction manager and uses it to demarcate transactions.
A
transacted
processor can also be configured with a transacted policy, of TransactedPolicy
type, which encapsulates a propagation policy and a transaction manager (see Section 5.3, “Propagation Policies” for details). The following rules are used to pick the default transaction manager or transaction policy:
- If there is only one bean of
org.apache.camel.spi.TransactedPolicy
type, use this bean.NoteTheTransactedPolicy
type is a base type of theSpringTransactionPolicy
type that is described in Section 5.3, “Propagation Policies”. Hence, the bean referred to here could be aSpringTransactionPolicy
bean. - If there is a bean of type,
org.apache.camel.spi.TransactedPolicy
, which has the ID,PROPAGATION_REQUIRED
, use this bean. - If there is only one bean of
org.springframework.transaction.PlatformTransactionManager
type, use this bean.
You also have the option of specifying a bean explicitly by providing the bean ID as an argument to
transacted()
—see the section called “Sample route with PROPAGATION_NEVER policy in Java DSL”.
Transaction scope
If you insert a
transacted
processor into a route, a new transaction is created each time an exchange passes through this node and the transaction's scope is defined as follows:
- The transaction is associated with the current thread only.
- The transaction scope encompasses all of the route nodes following the
transacted
processor.
In particular, all of the route nodes preceding the
transacted
processor are not included in the transaction (but the situation is different, if the route begins with a transactional endpoint—see Section 5.2, “Demarcation by Transactional Endpoints”). For example, the following route is incorrect, because the transacted()
DSL command mistakenly appears after the first beanRef()
call (which accesses the database resource):
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.beanRef("accountService","credit")
.transacted() // <-- WARNING: Transaction started in the wrong place!
.beanRef("accountService","debit");
}
}
No thread pools in a transactional route
It is crucial to understand that a given transaction is associated with the current thread only. It follows that you must not create a thread pool in the middle of a transactional route, because the processing in the new threads will not participate in the current transaction. For example, the following route is bound to cause problems:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.threads(3) // WARNING: Subthreads are not in transaction scope!
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}
A route like the preceding one is certain to corrupt your database, because the
threads()
DSL command is incompatible with transacted routes. Even if the threads()
call precedes the transacted()
call, the route will not behave as expected.
Breaking a route into fragments
If you want to break a route into fragments and have each route fragment participate in the current transaction, you can use
direct:
endpoints. For example, to send exchanges to separate route fragments, depending on whether the transfer amount is big (greater than 100) or small (less than or equal to 100), you can use the choice()
DSL command and direct
endpoints, as follows:
// Java import org.apache.camel.spring.SpringRouteBuilder; public class MyRouteBuilder extends SpringRouteBuilder { ... public void configure() { from("file:src/data?noop=true") .transacted() .beanRef("accountService","credit") .choice().when(xpath("/transaction/transfer[amount > 100]")) .to("direct:txbig") .otherwise() .to("direct:txsmall"); from("direct:txbig") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") .to("file:target/messages/big"); from("direct:txsmall") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") .to("file:target/messages/small"); } }
Both the fragment beginning with
direct:txbig
and the fragment beginning with direct:txsmall
participate in the current transaction, because the direct
endpoints are synchronous. This means that the fragments execute in the same thread as the first route fragment and, therefore, they are included in the same transaction scope.
Note
You must not use
seda
endpoints to join the route fragments, because seda
consumer endpoints create a new thread (or threads) to execute the route fragment (asynchronous processing). Hence, the fragments would not participate in the original transaction.
Resource endpoints
The following Apache Camel components act as resource endpoints when they appear as the destination of a route (for example, if they appear in the
to()
DSL command). That is, these endpoints can access a transactional resource, such as a database or a persistent queue. The resource endpoints can participate in the current transaction, as long as they are associated with the same transaction manager as the transacted
processor that initiated the current transaction. If you need to access multiple resources, you must deploy your application in a J2EE container, which gives you access to a global transaction manager.
Sample route with resource endpoints
For example, the following route sends the order for a money transfer to two different JMS queues: the
credits
queue processes the order to credit the receiver's account; and the debits queue processes the order to debit
the sender's account. Since there must only be a credit, if there is a corresponding debit, it makes sense to enclose the enqueueing operations in a single transaction. If the transaction succeeds, both the credit order and the debit order will be enqueued, but if an error occurs, neither order will be enqueued.
from("file:src/data?noop=true") .transacted() .to("jmstx:queue:credits") .to("jmstx:queue:debits");