此内容没有您所选择的语言版本。

16.2. cacheReplicationJMSExample


Example: JMS cache replication

Note
Please note, that this example is not finished yet. It is based on OSGi iTest instead of real life example. But no matter to that it is very good staring point for all Camel Cache Riders!
JMS replication is the most powerful and secured way. Used altogether with Camel Cache replication options is also the most easy way. This basic example is divided to few important steps that have to be made to get the cache replication to work.
The first step is to write your own implementation of CacheManagerFactory.
public class TestingCacheManagerFactory extends CacheManagerFactory {

[...]

    //This constructor is very useful when using Camel with Spring/Blueprint
    public TestingCacheManagerFactory(String xmlName, 
            TopicConnection replicationTopicConnection, Topic replicationTopic, 
            QueueConnection getQueueConnection, Queue getQueue) {
        this.xmlName = xmlName;
        this.replicationTopicConnection = replicationTopicConnection;
        this.replicationTopic = replicationTopic;
        this.getQueue = getQueue;
        this.getQueueConnection = getQueueConnection;
    }

    @Override
    protected synchronized CacheManager createCacheManagerInstance() {
        //Singleton
        if (cacheManager == null) {
            cacheManager = new WrappedCacheManager(getClass().getResourceAsStream(xmlName));
        }

        return cacheManager;
    }

    //Wrapping Ehcache's CacheManager to be able to add JMSCacheManagerPeerProvider
    public class WrappedCacheManager extends CacheManager {
        public WrappedCacheManager(InputStream xmlConfig) {
            super(xmlConfig);
            JMSCacheManagerPeerProvider jmsCMPP = new JMSCacheManagerPeerProvider(this,
                            replicationTopicConnection,
                            replicationTopic,
                            getQueueConnection,
                            getQueue,
                            AcknowledgementMode.AUTO_ACKNOWLEDGE,
                            true);
            cacheManagerPeerProviders.put(jmsCMPP.getScheme(), jmsCMPP);
            jmsCMPP.init();
        }
    }
}
Next step is to write your own implementation of CacheLoaderWrapper, the easiest one is:
public class WrappedJMSCacheLoader implements CacheLoaderWrapper {

[...]

    //This constructor is very useful when using Camel with Spring/Blueprint
    public WrappedJMSCacheLoader(QueueConnection getQueueConnection,
            Queue getQueue, AcknowledgementMode acknowledgementMode,
            int timeoutMillis) {
        this.getQueueConnection = getQueueConnection;
        this.getQueue = getQueue;
        this.acknowledgementMode = acknowledgementMode;
        this.timeoutMillis = timeoutMillis;
    }

    @Override
    public void init(Ehcache cache) {
        jmsCacheLoader = new JMSCacheLoader(cache, defaultLoaderArgument,
                getQueueConnection, getQueue, acknowledgementMode,
                timeoutMillis);
    }

    @Override
    public CacheLoader clone(Ehcache arg0) throws CloneNotSupportedException {
        return jmsCacheLoader.clone(arg0);
    }

    @Override
    public void dispose() throws CacheException {
        jmsCacheLoader.dispose();
    }

[...]

}
At the third step you can take care about Camel Cache options (prepare their values):
  • cacheManagerFactory
  • eventListenerRegistry
  • cacheLoaderRegistry
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="queueConnection1" factory-bean="amqCF" factory-method="createQueueConnection" class="javax.jms.QueueConnection" />
    <bean id="topicConnection1" factory-bean="amqCF" factory-method="createTopicConnection" class="javax.jms.TopicConnection" />
    <bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg ref="getQueue" />
    </bean>
    <bean id="topic1" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg ref="getTopic" />
    </bean>

    <bean id="jmsListener1" class="net.sf.ehcache.distribution.jms.JMSCacheReplicator">
        <constructor-arg index="0" value="true" />
        <constructor-arg index="1" value="true" />
        <constructor-arg index="2" value="true" />
        <constructor-arg index="3" value="true" />
        <constructor-arg index="4" value="false" />
        <constructor-arg index="5" value="0" />
    </bean>

    <bean id="jmsLoader1" class="my.cache.replication.WrappedJMSCacheLoader">
        <constructor-arg index="0" ref="queueConnection1" />
        <constructor-arg index="1" ref="queue1" />
        <constructor-arg index="2" value="AUTO_ACKNOWLEDGE" />
        <constructor-arg index="3" value="30000" />
    </bean>

    <bean id="cacheManagerFactory1" class="my.cache.replication.TestingCacheManagerFactory">
        <constructor-arg index="0" value="ehcache_jms_test.xml" />
        <constructor-arg index="1" ref="topicConnection1" />
        <constructor-arg index="2" ref="topic1" />
        <constructor-arg index="3" ref="queueConnection1" />
        <constructor-arg index="4" ref="queue1" />
    </bean>

    <bean id="eventListenerRegistry1" class="org.apache.camel.component.cache.CacheEventListenerRegistry">
        <constructor-arg>
            <list>
                <ref bean="jmsListener1" />
            </list>
        </constructor-arg>
    </bean>

    <bean id="cacheLoaderRegistry1" class="org.apache.camel.component.cache.CacheLoaderRegistry">
        <constructor-arg>
            <list>
                <ref bean="jmsLoader1"/>
            </list>
        </constructor-arg>
    </bean>
</beans>
The final step is to define some routes using Cache component
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="getQueue" class="java.lang.String">
        <constructor-arg value="replicationGetQueue" />
    </bean>

    <bean id="getTopic" class="java.lang.String">
        <constructor-arg value="replicationTopic" />
    </bean>

    <!-- Import the xml file explained at step three -->
    <import resource="JMSReplicationCache1.xml"/>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <camel:endpoint id="fooCache1" uri="cache:foo?cacheManagerFactory=#cacheManagerFactory1&ventListenerRegistry=#eventListenerRegistry1&acheLoaderRegistry=#cacheLoaderRegistry1"/>

        <camel:route>
            <camel:from uri="direct:addRoute"/>
            <camel:setHeader headerName="CamelCacheOperation">
                <camel:constant>CamelCacheAdd</camel:constant>
            </camel:setHeader>
            <camel:setHeader headerName="CamelCacheKey">
                <camel:constant>foo</camel:constant>
            </camel:setHeader>
            <camel:to ref="fooCache1"/>
        </camel:route>

    </camelContext>

    <bean id="amqCF" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
    </bean>

    <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <ref bean="amqCF"/>
        </property>
    </bean>

</beans>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.