12.2. cacheReplicationJMSExample
Example: JMS cache replication
Note
Please note, that this example is not finished yet. It is based on OSGi iTest instead of real life example. But no matter to that it is very good staring point for all Camel Cache Riders!
JMS replication is the most powerful and secured way. Used altogether with Camel Cache replication options is also the most easy way. This basic example is divided to few important steps that have to be made to get the cache replication to work.
The first step is to write your own implementation of
CacheManagerFactory
.
public class TestingCacheManagerFactory extends CacheManagerFactory { [...] //This constructor is very useful when using Camel with Spring/Blueprint public TestingCacheManagerFactory(String xmlName, TopicConnection replicationTopicConnection, Topic replicationTopic, QueueConnection getQueueConnection, Queue getQueue) { this.xmlName = xmlName; this.replicationTopicConnection = replicationTopicConnection; this.replicationTopic = replicationTopic; this.getQueue = getQueue; this.getQueueConnection = getQueueConnection; } @Override protected synchronized CacheManager createCacheManagerInstance() { //Singleton if (cacheManager == null) { cacheManager = new WrappedCacheManager(getClass().getResourceAsStream(xmlName)); } return cacheManager; } //Wrapping Ehcache's CacheManager to be able to add JMSCacheManagerPeerProvider public class WrappedCacheManager extends CacheManager { public WrappedCacheManager(InputStream xmlConfig) { super(xmlConfig); JMSCacheManagerPeerProvider jmsCMPP = new JMSCacheManagerPeerProvider(this, replicationTopicConnection, replicationTopic, getQueueConnection, getQueue, AcknowledgementMode.AUTO_ACKNOWLEDGE, true); cacheManagerPeerProviders.put(jmsCMPP.getScheme(), jmsCMPP); jmsCMPP.init(); } } }
Next step is to write your own implementation of
CacheLoaderWrapper
, the easiest one is:
public class WrappedJMSCacheLoader implements CacheLoaderWrapper { [...] //This constructor is very useful when using Camel with Spring/Blueprint public WrappedJMSCacheLoader(QueueConnection getQueueConnection, Queue getQueue, AcknowledgementMode acknowledgementMode, int timeoutMillis) { this.getQueueConnection = getQueueConnection; this.getQueue = getQueue; this.acknowledgementMode = acknowledgementMode; this.timeoutMillis = timeoutMillis; } @Override public void init(Ehcache cache) { jmsCacheLoader = new JMSCacheLoader(cache, defaultLoaderArgument, getQueueConnection, getQueue, acknowledgementMode, timeoutMillis); } @Override public CacheLoader clone(Ehcache arg0) throws CloneNotSupportedException { return jmsCacheLoader.clone(arg0); } @Override public void dispose() throws CacheException { jmsCacheLoader.dispose(); } [...] }
At the third step you can take care about Camel Cache options (prepare their values):
- cacheManagerFactory
- eventListenerRegistry
- cacheLoaderRegistry
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="queueConnection1" factory-bean="amqCF" factory-method="createQueueConnection" class="javax.jms.QueueConnection" /> <bean id="topicConnection1" factory-bean="amqCF" factory-method="createTopicConnection" class="javax.jms.TopicConnection" /> <bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg ref="getQueue" /> </bean> <bean id="topic1" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg ref="getTopic" /> </bean> <bean id="jmsListener1" class="net.sf.ehcache.distribution.jms.JMSCacheReplicator"> <constructor-arg index="0" value="true" /> <constructor-arg index="1" value="true" /> <constructor-arg index="2" value="true" /> <constructor-arg index="3" value="true" /> <constructor-arg index="4" value="false" /> <constructor-arg index="5" value="0" /> </bean> <bean id="jmsLoader1" class="my.cache.replication.WrappedJMSCacheLoader"> <constructor-arg index="0" ref="queueConnection1" /> <constructor-arg index="1" ref="queue1" /> <constructor-arg index="2" value="AUTO_ACKNOWLEDGE" /> <constructor-arg index="3" value="30000" /> </bean> <bean id="cacheManagerFactory1" class="my.cache.replication.TestingCacheManagerFactory"> <constructor-arg index="0" value="ehcache_jms_test.xml" /> <constructor-arg index="1" ref="topicConnection1" /> <constructor-arg index="2" ref="topic1" /> <constructor-arg index="3" ref="queueConnection1" /> <constructor-arg index="4" ref="queue1" /> </bean> <bean id="eventListenerRegistry1" class="org.apache.camel.component.cache.CacheEventListenerRegistry"> <constructor-arg> <list> <ref bean="jmsListener1" /> </list> </constructor-arg> </bean> <bean id="cacheLoaderRegistry1" class="org.apache.camel.component.cache.CacheLoaderRegistry"> <constructor-arg> <list> <ref bean="jmsLoader1"/> </list> </constructor-arg> </bean> </beans>
The final step is to define some routes using Cache component
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="getQueue" class="java.lang.String"> <constructor-arg value="replicationGetQueue" /> </bean> <bean id="getTopic" class="java.lang.String"> <constructor-arg value="replicationTopic" /> </bean> <!-- Import the xml file explained at step three --> <import resource="JMSReplicationCache1.xml"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <camel:endpoint id="fooCache1" uri="cache:foo?cacheManagerFactory=#cacheManagerFactory1&ventListenerRegistry=#eventListenerRegistry1&acheLoaderRegistry=#cacheLoaderRegistry1"/> <camel:route> <camel:from uri="direct:addRoute"/> <camel:setHeader headerName="CamelCacheOperation"> <camel:constant>CamelCacheAdd</camel:constant> </camel:setHeader> <camel:setHeader headerName="CamelCacheKey"> <camel:constant>foo</camel:constant> </camel:setHeader> <camel:to ref="fooCache1"/> </camel:route> </camelContext> <bean id="amqCF" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false"/> </bean> <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory"> <ref bean="amqCF"/> </property> </bean> </beans>