16.2. cacheReplicationJMSExample


Example: JMS cache replication

Note
Please note, that this example is not finished yet. It is based on OSGi iTest instead of real life example. But no matter to that it is very good staring point for all Camel Cache Riders!
JMS replication is the most powerful and secured way. Used altogether with Camel Cache replication options is also the most easy way. This basic example is divided to few important steps that have to be made to get the cache replication to work.
The first step is to write your own implementation of CacheManagerFactory.
public class TestingCacheManagerFactory extends CacheManagerFactory {

[...]

    //This constructor is very useful when using Camel with Spring/Blueprint
    public TestingCacheManagerFactory(String xmlName, 
            TopicConnection replicationTopicConnection, Topic replicationTopic, 
            QueueConnection getQueueConnection, Queue getQueue) {
        this.xmlName = xmlName;
        this.replicationTopicConnection = replicationTopicConnection;
        this.replicationTopic = replicationTopic;
        this.getQueue = getQueue;
        this.getQueueConnection = getQueueConnection;
    }

    @Override
    protected synchronized CacheManager createCacheManagerInstance() {
        //Singleton
        if (cacheManager == null) {
            cacheManager = new WrappedCacheManager(getClass().getResourceAsStream(xmlName));
        }

        return cacheManager;
    }

    //Wrapping Ehcache's CacheManager to be able to add JMSCacheManagerPeerProvider
    public class WrappedCacheManager extends CacheManager {
        public WrappedCacheManager(InputStream xmlConfig) {
            super(xmlConfig);
            JMSCacheManagerPeerProvider jmsCMPP = new JMSCacheManagerPeerProvider(this,
                            replicationTopicConnection,
                            replicationTopic,
                            getQueueConnection,
                            getQueue,
                            AcknowledgementMode.AUTO_ACKNOWLEDGE,
                            true);
            cacheManagerPeerProviders.put(jmsCMPP.getScheme(), jmsCMPP);
            jmsCMPP.init();
        }
    }
}
Next step is to write your own implementation of CacheLoaderWrapper, the easiest one is:
public class WrappedJMSCacheLoader implements CacheLoaderWrapper {

[...]

    //This constructor is very useful when using Camel with Spring/Blueprint
    public WrappedJMSCacheLoader(QueueConnection getQueueConnection,
            Queue getQueue, AcknowledgementMode acknowledgementMode,
            int timeoutMillis) {
        this.getQueueConnection = getQueueConnection;
        this.getQueue = getQueue;
        this.acknowledgementMode = acknowledgementMode;
        this.timeoutMillis = timeoutMillis;
    }

    @Override
    public void init(Ehcache cache) {
        jmsCacheLoader = new JMSCacheLoader(cache, defaultLoaderArgument,
                getQueueConnection, getQueue, acknowledgementMode,
                timeoutMillis);
    }

    @Override
    public CacheLoader clone(Ehcache arg0) throws CloneNotSupportedException {
        return jmsCacheLoader.clone(arg0);
    }

    @Override
    public void dispose() throws CacheException {
        jmsCacheLoader.dispose();
    }

[...]

}
At the third step you can take care about Camel Cache options (prepare their values):
  • cacheManagerFactory
  • eventListenerRegistry
  • cacheLoaderRegistry
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="queueConnection1" factory-bean="amqCF" factory-method="createQueueConnection" class="javax.jms.QueueConnection" />
    <bean id="topicConnection1" factory-bean="amqCF" factory-method="createTopicConnection" class="javax.jms.TopicConnection" />
    <bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg ref="getQueue" />
    </bean>
    <bean id="topic1" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg ref="getTopic" />
    </bean>

    <bean id="jmsListener1" class="net.sf.ehcache.distribution.jms.JMSCacheReplicator">
        <constructor-arg index="0" value="true" />
        <constructor-arg index="1" value="true" />
        <constructor-arg index="2" value="true" />
        <constructor-arg index="3" value="true" />
        <constructor-arg index="4" value="false" />
        <constructor-arg index="5" value="0" />
    </bean>

    <bean id="jmsLoader1" class="my.cache.replication.WrappedJMSCacheLoader">
        <constructor-arg index="0" ref="queueConnection1" />
        <constructor-arg index="1" ref="queue1" />
        <constructor-arg index="2" value="AUTO_ACKNOWLEDGE" />
        <constructor-arg index="3" value="30000" />
    </bean>

    <bean id="cacheManagerFactory1" class="my.cache.replication.TestingCacheManagerFactory">
        <constructor-arg index="0" value="ehcache_jms_test.xml" />
        <constructor-arg index="1" ref="topicConnection1" />
        <constructor-arg index="2" ref="topic1" />
        <constructor-arg index="3" ref="queueConnection1" />
        <constructor-arg index="4" ref="queue1" />
    </bean>

    <bean id="eventListenerRegistry1" class="org.apache.camel.component.cache.CacheEventListenerRegistry">
        <constructor-arg>
            <list>
                <ref bean="jmsListener1" />
            </list>
        </constructor-arg>
    </bean>

    <bean id="cacheLoaderRegistry1" class="org.apache.camel.component.cache.CacheLoaderRegistry">
        <constructor-arg>
            <list>
                <ref bean="jmsLoader1"/>
            </list>
        </constructor-arg>
    </bean>
</beans>
The final step is to define some routes using Cache component
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="getQueue" class="java.lang.String">
        <constructor-arg value="replicationGetQueue" />
    </bean>

    <bean id="getTopic" class="java.lang.String">
        <constructor-arg value="replicationTopic" />
    </bean>

    <!-- Import the xml file explained at step three -->
    <import resource="JMSReplicationCache1.xml"/>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <camel:endpoint id="fooCache1" uri="cache:foo?cacheManagerFactory=#cacheManagerFactory1&ventListenerRegistry=#eventListenerRegistry1&acheLoaderRegistry=#cacheLoaderRegistry1"/>

        <camel:route>
            <camel:from uri="direct:addRoute"/>
            <camel:setHeader headerName="CamelCacheOperation">
                <camel:constant>CamelCacheAdd</camel:constant>
            </camel:setHeader>
            <camel:setHeader headerName="CamelCacheKey">
                <camel:constant>foo</camel:constant>
            </camel:setHeader>
            <camel:to ref="fooCache1"/>
        </camel:route>

    </camelContext>

    <bean id="amqCF" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
    </bean>

    <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <ref bean="amqCF"/>
        </property>
    </bean>

</beans>
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.