Chapter 29. Infinispan
Both producer and consumer are supported
This component allows you to interact with Infinispan distributed data grid / cache using the Hot Rod procol. Infinispan is an extremely scalable, highly available key/value data store and data grid platform written in Java.
If you use Maven, you must add the following dependency to your pom.xml
:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-infinispan</artifactId> <version>{CamelSBVersion}</version> <!-- use the same version as your Camel core version --> </dependency>
29.1. URI format
infinispan://cacheName?[options]
The producer allows sending messages to a remote cache using the HotRod protocol. The consumer allows listening for events from a remote cache using the HotRod protocol.
29.2. Configuring Options
Camel components are configured on two separate levels:
- component level
- endpoint level
29.2.1. Configuring Component Options
The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.
29.2.2. Configuring Endpoint Options
Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL as a type safe way of configuring endpoints.
A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.
The following two sections lists all the options, firstly for the component followed by the endpoint.
29.3. Component Options
The Infinispan component supports 26 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
configuration (common) | Component configuration. | InfinispanRemoteConfiguration | |
hosts (common) | Specifies the host of the cache on Infinispan instance. | String | |
queryBuilder (common) | Specifies the query builder. | InfinispanQueryBuilder | |
secure (common) | Define if we are connecting to a secured Infinispan instance. | false | boolean |
bridgeErrorHandler (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean |
customListener (consumer) | Returns the custom listener in use, if provided. | InfinispanRemoteCustomListener | |
eventTypes (consumer) | Specifies the set of event types to register by the consumer.Multiple event can be separated by comma. The possible event types are: CLIENT_CACHE_ENTRY_CREATED, CLIENT_CACHE_ENTRY_MODIFIED, CLIENT_CACHE_ENTRY_REMOVED, CLIENT_CACHE_ENTRY_EXPIRED, CLIENT_CACHE_FAILOVER. | String | |
defaultValue (producer) | Set a specific default value for some producer operations. | Object | |
key (producer) | Set a specific key for producer operations. | Object | |
lazyStartProducer (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | boolean |
oldValue (producer) | Set a specific old value for some producer operations. | Object | |
operation (producer) | The operation to perform. Enum values:
| PUT | InfinispanOperation |
value (producer) | Set a specific value for producer operations. | Object | |
password ( security) | Define the password to access the infinispan instance. | String | |
saslMechanism ( security) | Define the SASL Mechanism to access the infinispan instance. | String | |
securityRealm ( security) | Define the security realm to access the infinispan instance. | String | |
securityServerName ( security) | Define the security server name to access the infinispan instance. | String | |
username ( security) | Define the username to access the infinispan instance. | String | |
autowiredEnabled (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean |
cacheContainer (advanced) | Autowired Specifies the cache Container to connect. | RemoteCacheManager | |
cacheContainerConfiguration (advanced) | Autowired The CacheContainer configuration. Used if the cacheContainer is not defined. | Configuration | |
configurationProperties (advanced) | Implementation specific properties for the CacheManager. | Map | |
configurationUri (advanced) | An implementation specific URI for the CacheManager. | String | |
flags (advanced) | A comma separated list of org.infinispan.client.hotrod.Flag to be applied by default on each cache invocation. | String | |
remappingFunction (advanced) | Set a specific remappingFunction to use in a compute operation. | BiFunction | |
resultHeader (advanced) | Store the operation result in a header instead of the message body. By default, resultHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If resultHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved. This value can be overridden by an in message header named: CamelInfinispanOperationResultHeader. | String |
29.4. Endpoint Options
The Infinispan endpoint is configured using URI syntax:
infinispan:cacheName
with the following path and query parameters:
29.4.1. Path Parameters (1 parameters)
Name | Description | Default | Type |
---|---|---|---|
cacheName (common) | Required The name of the cache to use. Use current to use the existing cache name from the currently configured cached manager. Or use default for the default cache manager name. | String |
29.4.2. Query Parameters (26 parameters)
Name | Description | Default | Type |
---|---|---|---|
hosts (common) | Specifies the host of the cache on Infinispan instance. | String | |
queryBuilder (common) | Specifies the query builder. | InfinispanQueryBuilder | |
secure (common) | Define if we are connecting to a secured Infinispan instance. | false | boolean |
bridgeErrorHandler (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean |
customListener (consumer) | Returns the custom listener in use, if provided. | InfinispanRemoteCustomListener | |
eventTypes (consumer) | Specifies the set of event types to register by the consumer.Multiple event can be separated by comma. The possible event types are: CLIENT_CACHE_ENTRY_CREATED, CLIENT_CACHE_ENTRY_MODIFIED, CLIENT_CACHE_ENTRY_REMOVED, CLIENT_CACHE_ENTRY_EXPIRED, CLIENT_CACHE_FAILOVER. | String | |
exceptionHandler (consumer (advanced)) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | ExceptionHandler | |
exchangePattern (consumer (advanced)) | Sets the exchange pattern when the consumer creates an exchange. Enum values:
| ExchangePattern | |
defaultValue (producer) | Set a specific default value for some producer operations. | Object | |
key (producer) | Set a specific key for producer operations. | Object | |
lazyStartProducer (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | boolean |
oldValue (producer) | Set a specific old value for some producer operations. | Object | |
operation (producer) | The operation to perform. Enum values:
| PUT | InfinispanOperation |
value (producer) | Set a specific value for producer operations. | Object | |
password ( security) | Define the password to access the infinispan instance. | String | |
saslMechanism ( security) | Define the SASL Mechanism to access the infinispan instance. | String | |
securityRealm ( security) | Define the security realm to access the infinispan instance. | String | |
securityServerName ( security) | Define the security server name to access the infinispan instance. | String | |
username ( security) | Define the username to access the infinispan instance. | String | |
cacheContainer (advanced) | Autowired Specifies the cache Container to connect. | RemoteCacheManager | |
cacheContainerConfiguration (advanced) | Autowired The CacheContainer configuration. Used if the cacheContainer is not defined. | Configuration | |
configurationProperties (advanced) | Implementation specific properties for the CacheManager. | Map | |
configurationUri (advanced) | An implementation specific URI for the CacheManager. | String | |
flags (advanced) | A comma separated list of org.infinispan.client.hotrod.Flag to be applied by default on each cache invocation. | String | |
remappingFunction (advanced) | Set a specific remappingFunction to use in a compute operation. | BiFunction | |
resultHeader (advanced) | Store the operation result in a header instead of the message body. By default, resultHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If resultHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved. This value can be overridden by an in message header named: CamelInfinispanOperationResultHeader. | String |
29.5. Camel Operations
This section lists all available operations, along with their header information.
Operation Name | Description |
---|---|
InfinispanOperation.PUT | Puts a key/value pair in the cache, optionally with expiration |
InfinispanOperation.PUTASYNC | Asynchronously puts a key/value pair in the cache, optionally with expiration |
InfinispanOperation.PUTIFABSENT | Puts a key/value pair in the cache if it did not exist, optionally with expiration |
InfinispanOperation.PUTIFABSENTASYNC | Asynchronously puts a key/value pair in the cache if it did not exist, optionally with expiration |
Required Headers:
- CamelInfinispanKey
- CamelInfinispanValue
Optional Headers:
- CamelInfinispanLifespanTime
- CamelInfinispanLifespanTimeUnit
- CamelInfinispanMaxIdleTime
- CamelInfinispanMaxIdleTimeUnit
Result Header:
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.PUTALL | Adds multiple entries to a cache, optionally with expiration |
CamelInfinispanOperation.PUTALLASYNC | Asynchronously adds multiple entries to a cache, optionally with expiration |
Required Headers:
- CamelInfinispanMap
Optional Headers:
- CamelInfinispanLifespanTime
- CamelInfinispanLifespanTimeUnit
- CamelInfinispanMaxIdleTime
- CamelInfinispanMaxIdleTimeUnit
Operation Name | Description |
---|---|
InfinispanOperation.GET | Retrieves the value associated with a specific key from the cache |
InfinispanOperation.GETORDEFAULT | Retrieves the value, or default value, associated with a specific key from the cache |
Required Headers:
- CamelInfinispanKey
Operation Name | Description |
---|---|
InfinispanOperation.CONTAINSKEY | Determines whether a cache contains a specific key |
Required Headers
- CamelInfinispanKey
Result Header
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.CONTAINSVALUE | Determines whether a cache contains a specific value |
Required Headers:
- CamelInfinispanKey
Operation Name | Description |
---|---|
InfinispanOperation.REMOVE | Removes an entry from a cache, optionally only if the value matches a given one |
InfinispanOperation.REMOVEASYNC | Asynchronously removes an entry from a cache, optionally only if the value matches a given one |
Required Headers:
- CamelInfinispanKey
Optional Headers:
- CamelInfinispanValue
Result Header:
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.REPLACE | Conditionally replaces an entry in the cache, optionally with expiration |
InfinispanOperation.REPLACEASYNC | Asynchronously conditionally replaces an entry in the cache, optionally with expiration |
Required Headers:
- CamelInfinispanKey
- CamelInfinispanValue
- CamelInfinispanOldValue
Optional Headers:
- CamelInfinispanLifespanTime
- CamelInfinispanLifespanTimeUnit
- CamelInfinispanMaxIdleTime
- CamelInfinispanMaxIdleTimeUnit
Result Header:
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.CLEAR | Clears the cache |
InfinispanOperation.CLEARASYNC | Asynchronously clears the cache |
Operation Name | Description |
---|---|
InfinispanOperation.SIZE | Returns the number of entries in the cache |
Result Header
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.STATS | Returns statistics about the cache |
Result Header:
- CamelInfinispanOperationResult
Operation Name | Description |
---|---|
InfinispanOperation.QUERY | Executes a query on the cache |
Required Headers:
- CamelInfinispanQueryBuilder
Result Header:
- CamelInfinispanOperationResult
Write methods like put(key, value) and remove(key) do not return the previous value by default.
29.6. Message Headers
Name | Default Value | Type | Context | Description |
---|---|---|---|---|
CamelInfinispanCacheName |
| String | Shared | The cache participating in the operation or event. |
CamelInfinispanOperation |
| InfinispanOperation | Producer | The operation to perform. |
CamelInfinispanMap |
| Map | Producer | A Map to use in case of CamelInfinispanOperationPutAll operation |
CamelInfinispanKey |
| Object | Shared | The key to perform the operation to or the key generating the event. |
CamelInfinispanValue |
| Object | Producer | The value to use for the operation. |
CamelInfinispanEventType |
| String | Consumer | The type of the received event. |
CamelInfinispanLifespanTime |
| long | Producer | The Lifespan time of a value inside the cache. Negative values are interpreted as infinity. |
CamelInfinispanTimeUnit |
| String | Producer | The Time Unit of an entry Lifespan Time. |
CamelInfinispanMaxIdleTime |
| long | Producer | The maximum amount of time an entry is allowed to be idle for before it is considered as expired. |
CamelInfinispanMaxIdleTimeUnit |
| String | Producer | The Time Unit of an entry Max Idle Time. |
CamelInfinispanQueryBuilder | null | InfinispanQueryBuilder | Producer | The QueryBuilde to use for QUERY command, if not present the command defaults to InifinispanConfiguration’s one |
CamelInfinispanOperationResultHeader | null | String | Producer | Store the operation result in a header instead of the message body |
29.7. Examples
Put a key/value into a named cache:
from("direct:start") .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.PUT) (1) .setHeader(InfinispanConstants.KEY).constant("123") (2) .to("infinispan:myCacheName&cacheContainer=#cacheContainer"); (3)
Where,
- 1 - Set the operation to perform
- 2 - Set the key used to identify the element in the cache
3 - Use the configured cache manager
cacheContainer
from the registry to put an element to the cache namedmyCacheName
It is possible to configure the lifetime and/or the idle time before the entry expires and gets evicted from the cache, as example:
from("direct:start") .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.GET) .setHeader(InfinispanConstants.KEY).constant("123") .setHeader(InfinispanConstants.LIFESPAN_TIME).constant(100L) (1) .setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT.constant(TimeUnit.MILLISECONDS.toString()) (2) .to("infinispan:myCacheName");
where,
- 1 - Set the lifespan of the entry
- 2 - Set the time unit for the lifespan
Queries
from("direct:start") .setHeader(InfinispanConstants.OPERATION, InfinispanConstants.QUERY) .setHeader(InfinispanConstants.QUERY_BUILDER, new InfinispanQueryBuilder() { @Override public Query build(QueryFactory<Query> qf) { return qf.from(User.class).having("name").like("%abc%").build(); } }) .to("infinispan:myCacheName?cacheContainer=#cacheManager") ;
The .proto descriptors for domain objects must be registered with the remote Data Grid server, see Remote Query Example in the official Infinispan documentation.
Custom Listeners
from("infinispan://?cacheContainer=#cacheManager&customListener=#myCustomListener") .to("mock:result");
The instance of myCustomListener
must exist and Camel should be able to look it up from the Registry
. Users are encouraged to extend the org.apache.camel.component.infinispan.remote.InfinispanRemoteCustomListener
class and annotate the resulting class with @ClientListener
which can be found found in package org.infinispan.client.hotrod.annotation
.
29.8. Using the Infinispan based idempotent repository
In this section we will use the Infinispan based idempotent repository.
Java Example
InfinispanRemoteConfiguration conf = new InfinispanRemoteConfiguration(); (1) conf.setHosts("localhost:1122") InfinispanRemoteIdempotentRepository repo = new InfinispanRemoteIdempotentRepository("idempotent"); (2) repo.setConfiguration(conf); context.addRoutes(new RouteBuilder() { @Override public void configure() { from("direct:start") .idempotentConsumer(header("MessageID"), repo) (3) .to("mock:result"); } });
where,
- 1 - Configure the cache
- 2 - Configure the repository bean
- 3 - Set the repository to the route
XML Example
<bean id="infinispanRepo" class="org.apache.camel.component.infinispan.remote.InfinispanRemoteIdempotentRepository" destroy-method="stop"> <constructor-arg value="idempotent"/> (1) <property name="configuration"> (2) <bean class="org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration"> <property name="hosts" value="localhost:11222"/> </bean> </property> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <idempotentConsumer messageIdRepositoryRef="infinispanRepo"> (3) <header>MessageID</header> <to uri="mock:result" /> </idempotentConsumer> </route> </camelContext>
where,
- 1 - Set the name of the cache that will be used by the repository
- 2 - Configure the repository bean
- 3 - Set the repository to the route
29.9. Using the Infinispan based aggregation repository
In this section we will use the Infinispan based aggregation repository.
Java Example
InfinispanRemoteConfiguration conf = new InfinispanRemoteConfiguration(); (1) conf.setHosts("localhost:1122") InfinispanRemoteAggregationRepository repo = new InfinispanRemoteAggregationRepository(); (2) repo.setCacheName("aggregation"); repo.setConfiguration(conf); context.addRoutes(new RouteBuilder() { @Override public void configure() { from("direct:start") .aggregate(header("MessageID")) .completionSize(3) .aggregationRepository(repo) (3) .aggregationStrategyRef("myStrategy") .to("mock:result"); } });
where,
- 1 - Configure the cache
- 2 - Create the repository bean
- 3 - Set the repository to the route
XML Example
<bean id="infinispanRepo" class="org.apache.camel.component.infinispan.remote.InfinispanRemoteAggregationRepository" destroy-method="stop"> <constructor-arg value="aggregation"/> (1) <property name="configuration"> (2) <bean class="org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration"> <property name="hosts" value="localhost:11222"/> </bean> </property> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <aggregate strategyRef="myStrategy" completionSize="3" aggregationRepositoryRef="infinispanRepo"> (3) <correlationExpression> <header>MessageID</header> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> </camelContext>
where,
- 1 - Set the name of the cache that will be used by the repository
- 2 - Configure the repository bean
- 3 - Set the repository to the route
With the release of Infinispan 11, it is required to set the encoding configuration on any cache created. This is critical for consuming events too. For more information have a look at Data Encoding and MediaTypes in the official Infinispan documentation.
29.10. Spring Boot Auto-Configuration
When using infinispan with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-infinispan-starter</artifactId> </dependency>
The component supports 23 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
camel.component.infinispan.autowired-enabled | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | Boolean |
camel.component.infinispan.bridge-error-handler | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean |
camel.component.infinispan.cache-container | Specifies the cache Container to connect. The option is a org.infinispan.client.hotrod.RemoteCacheManager type. | RemoteCacheManager | |
camel.component.infinispan.cache-container-configuration | The CacheContainer configuration. Used if the cacheContainer is not defined. The option is a org.infinispan.client.hotrod.configuration.Configuration type. | Configuration | |
camel.component.infinispan.configuration | Component configuration. The option is a org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration type. | InfinispanRemoteConfiguration | |
camel.component.infinispan.configuration-properties | Implementation specific properties for the CacheManager. | Map | |
camel.component.infinispan.configuration-uri | An implementation specific URI for the CacheManager. | String | |
camel.component.infinispan.custom-listener | Returns the custom listener in use, if provided. The option is a org.apache.camel.component.infinispan.remote.InfinispanRemoteCustomListener type. | InfinispanRemoteCustomListener | |
camel.component.infinispan.enabled | Whether to enable auto configuration of the infinispan component. This is enabled by default. | Boolean | |
camel.component.infinispan.event-types | Specifies the set of event types to register by the consumer.Multiple event can be separated by comma. The possible event types are: CLIENT_CACHE_ENTRY_CREATED, CLIENT_CACHE_ENTRY_MODIFIED, CLIENT_CACHE_ENTRY_REMOVED, CLIENT_CACHE_ENTRY_EXPIRED, CLIENT_CACHE_FAILOVER. | String | |
camel.component.infinispan.flags | A comma separated list of org.infinispan.client.hotrod.Flag to be applied by default on each cache invocation. | String | |
camel.component.infinispan.hosts | Specifies the host of the cache on Infinispan instance. | String | |
camel.component.infinispan.lazy-start-producer | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | Boolean |
camel.component.infinispan.operation | The operation to perform. | InfinispanOperation | |
camel.component.infinispan.password | Define the password to access the infinispan instance. | String | |
camel.component.infinispan.query-builder | Specifies the query builder. The option is a org.apache.camel.component.infinispan.InfinispanQueryBuilder type. | InfinispanQueryBuilder | |
camel.component.infinispan.remapping-function | Set a specific remappingFunction to use in a compute operation. The option is a java.util.function.BiFunction type. | BiFunction | |
camel.component.infinispan.result-header | Store the operation result in a header instead of the message body. By default, resultHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If resultHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved. This value can be overridden by an in message header named: CamelInfinispanOperationResultHeader. | String | |
camel.component.infinispan.sasl-mechanism | Define the SASL Mechanism to access the infinispan instance. | String | |
camel.component.infinispan.secure | Define if we are connecting to a secured Infinispan instance. | false | Boolean |
camel.component.infinispan.security-realm | Define the security realm to access the infinispan instance. | String | |
camel.component.infinispan.security-server-name | Define the security server name to access the infinispan instance. | String | |
camel.component.infinispan.username | Define the username to access the infinispan instance. | String |