Chapter 76. JCache
JCache Component
Available as of Camel 2.17
The jcache component enables you to perform caching operations using JCache (JSR-107) as the Cache Implementation. The cache itself is created on demand or if a cache of that name already exists then it is simply utilized with its original settings.
This component supports producer and event based consumer endpoints.
The Cache consumer is an event based consumer and can be used to listen and respond to specific cache activities. If you need to perform selections from a pre-existing cache, use the processors defined for the cache component.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jcache</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
cache://cacheName[?options]
You can append query options to the URI in the following format,
?option=value&option=#beanRef&...
Options
Name
|
Default Value
|
Description
|
---|---|---|
cachingProvider
|
null
|
The fully qualified class name of the javax.cache.spi.CachingProvider. Mandatory in an OSGI environment.
|
cacheConfiguration
|
null
|
A reference to a javax.cache.configuration.Configuration instance
|
cacheConfigurationProperties
|
null
|
A reference to a java.util.Properties for the
javax.cache.spi.CachingProvider to create the javax.cache.CacheManager
|
configurationUri
|
null
|
An implementation specific URI for the
javax.cache.CacheManager
|
cacheLoaderFactory
|
null
|
A reference to a
javax.cache.configuration.Factory for javax.cache.integration.CacheLoader
|
cacheWriterFactory
|
null
|
A reference to a
javax.cache.configuration.Factory for javax.cache.integration.CacheWriter
|
expiryPolicyFactory
|
null
|
A reference to a
javax.cache.configuration.Factory for javax.cache.expiry.ExpiryPolicy
|
readThrough
|
false
|
A flag indicating if "read-through" mode is required
|
writeThrough
|
false
|
A flag indicating if "write-through" mode is required
|
storeByValue | true | A flag indicating if the cache will be store-by-value or store-by-reference |
statisticsEnabled
|
fasle
|
A flag indicating if statistics gathering is enabled |
managementEnabled
|
false
|
A flag indicating if management is enabled |
filteredEvents
|
null
|
A comma separated list of event types to filter. Overridden by eventFilters when they are specified together.
|
eventFilters
|
null
|
A comma separated list of javax.cache.event.CacheEntryEventFilter references. Overrides filteredEvents when they are specified together.
|
oldValueRequired |
false
|
A flag indicating if the old value is required for events, supported values are CREATED, UPDATED, REMOVED, EXPIRED |
synchronous | false | A flag indicating if the event listener should block the thread causing the event |
action | null | The default action to apply, value in the header has the priority |
createCacheIfNotExists | true | Configure if the cache identified by cacheName need to be created if it does not exists |
Header variables
Name
|
Type
|
Description
|
---|---|---|
CamelJCacheAction
|
java.lang.String
|
The action to perform, supported values are PUT, PUTALL, PUTIFABSENT, GET, GETALL, GETANDREMOVE, GETANDREPLACE, GETANDPUT, REPLACE, REMOVE, REMOVEALL, INVOKE, CLEAR
|
CamelJCacheResult
|
java.lang.Object
|
The result of an action, i.e. Boolean for PUT, REMOVE, REPLACE
|
CamelJCacheEventType
|
java.lang.String
|
The type of event
javax.cache.event.EventType
|
CamelJCacheKey
|
java.lang.Object
|
A key to apply an action
|
CamelJCacheKeys | java.util.Set<java-lang.Object> | A set of keys to apply an action, used for GETALL, REMOVEALL, INVOKE |
CamelJCacheOldValue |
java.lang.Object
|
On consumer side, the header value contains the old value associated to a key. On producer side, the header must contains the expected old value to use CAS like operation |
CamelJCacheEntryProcessor | javax.cache.processor.EntryProcessor |
The entry processor to use for INVOKE action
|
CamelJCacheEntryArgs |
java.util.collection<java.lang.Object>
|
Additional arguments to pass to the javax.cache.processor.EntryProcessor
|
JCache based idempotent repository example:
JCacheIdempotentRepository idempotentRepo = new JCacheIdempotentRepository(); idempotentRepo.setCacheName("idempotent-cache") from("direct:in") .idempotentConsumer(header("messageId"), idempotentRepo) .to("mock:out");
JCache based aggregation repository example:
package org.apache.camel.component.jcache.processor.aggregate; import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.junit.Test; public class JCacheAggregationRepositoryRoutesTest extends JCacheAggregationRepositoryTestSupport { private static final String MOCK_GOTCHA = "mock:gotcha"; private static final String DIRECT_ONE = "direct:one"; private static final String DIRECT_TWO = "direct:two"; @EndpointInject(uri = MOCK_GOTCHA) private MockEndpoint mock; @Produce(uri = DIRECT_ONE) private ProducerTemplate produceOne; @Produce(uri = DIRECT_TWO) private ProducerTemplate produceTwo; @Test public void checkAggregationFromTwoRoutes() throws Exception { final JCacheAggregationRepository repoOne = createRepository(false); final JCacheAggregationRepository repoTwo = createRepository(false); final int completionSize = 4; final String correlator = "CORRELATOR"; RouteBuilder rbOne = new RouteBuilder() { @Override public void configure() throws Exception { from(DIRECT_ONE).routeId("AggregatingRouteOne") .aggregate(header(correlator)) .aggregationRepository(repoOne) .aggregationStrategy(new MyAggregationStrategy()) .completionSize(completionSize) .to(MOCK_GOTCHA); } }; RouteBuilder rbTwo = new RouteBuilder() { @Override public void configure() throws Exception { from(DIRECT_TWO).routeId("AggregatingRouteTwo") .aggregate(header(correlator)) .aggregationRepository(repoTwo) .aggregationStrategy(new MyAggregationStrategy()) .completionSize(completionSize) .to(MOCK_GOTCHA); } }; context().addRoutes(rbOne); context().addRoutes(rbTwo); context().start(); mock.expectedMessageCount(1); mock.expectedBodiesReceived(1 + 2 + 3 + 4); produceOne.sendBodyAndHeader(1, correlator, correlator); produceTwo.sendBodyAndHeader(2, correlator, correlator); produceOne.sendBodyAndHeader(3, correlator, correlator); produceOne.sendBodyAndHeader(4, correlator, correlator); mock.assertIsSatisfied(); } private class MyAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } else { Integer n = newExchange.getIn().getBody(Integer.class); Integer o = oldExchange.getIn().getBody(Integer.class); Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); oldExchange.getIn().setBody(v, Integer.class); return oldExchange; } } } protected JCacheAggregationRepository createRepository(boolean optimistic) throws Exception { JCacheAggregationRepository repository = new JCacheAggregationRepository(); repository.setConfiguration(new JCacheConfiguration()); repository.setCacheName("aggregation-repository"); repository.setOptimistic(optimistic); return repository; } }