Apache Camel Component Reference
Red Hat JBoss Fuse
Configuration reference for Camel components
Version 6.2
Copyright © 2011-2015 Red Hat, Inc. and/or its affiliates.
17 Jan 2018
Abstract
Apache Camel has over 100 components and each component is highly configurable. This guide describes the settings for each of the components.
Chapter 1. Components Overview
Abstract
This chapter provides a summary of all the components available for Apache Camel.
1.1. List of Components
Table of components
The following components are available for use with Apache Camel.
Component | Endpoint URI | Artifact ID | Description |
---|---|---|---|
ActiveMQ | activemq:[queue:|topic:]DestinationName | activemq-core | For JMS Messaging with Apache ActiveMQ. |
AHC | ahc:http[s]://Hostname[:Port][/ResourceUri] | camel-ahc | Calls external HTTP servers using the Async Http Client library. |
AHC-WS | ahc-ws[s]://Hostname[:Port][/ResourceUri] | camel-ahc-ws | Calls external WebSocket servers using the Async Http Client library. |
AMQP | amqp:[queue:|topic:]DestinationName[?Options]] | camel-amqp | For messaging with the AMQP protocol. |
APNS | apns:notify[?Options] apns:consumer[?Options] | camel-apns | For sending notifications to Apple iOS devices. |
Atmosphere-WebSocket | atmosphere-websocket:///RelativePath[?Options] | camel-atmosphere-websocket | Accepts connections from external WebSocket clients using Atmosphere. |
Atom | atom://AtomUri[?Options] | camel-atom | Working with Apache Abdera for atom integration, such as consuming an atom feed. |
Avro | avro:http://Hostname[:Port][?Options] | camel-avro | Working with Apache Avro for data serialization. |
AWS-CW | aws-cw://Namespace[?Options] | camel-aws | For sending metrics to Amazon CloudWatch. |
AWS-DDB | aws-ddb://TableName[?Options] | camel-aws | For working with Amazon's DynamoDB (DDB). |
AWS-SDB | aws-sdb://DomainName[?Options] | camel-aws | For working with Amazon's SimpleDB (SDB). |
AWS-SES | aws-ses://From[?Options] | camel-aws | For working with Amazon's Simple Email Service (SES). |
AWS-S3 | aws-s3://BucketName[?Options] | camel-aws | For working with Amazon's Simple Storage Service (S3). |
AWS-SNS | aws-sns://TopicName[?Options] | camel-aws | For Messaging with Amazon's Simple Notification Service (SNS). |
AWS-SQS | aws-sqs://QueueName[?Options] | camel-aws | For Messaging with Amazon's Simple Queue Service (SQS). |
Bean | bean:BeanID[?methodName=Method] | camel-core | Uses the Bean Binding to bind message exchanges to beans in the Registry. Is also used for exposing and invoking POJO (Plain Old Java Objects). |
Bean Validation | bean-validator:Something[?Options] | camel-bean-validator | Validates the payload of a message using the Java Validation API (JSR 303 and JAXP Validation) and its reference implementation Hibernate Validator. |
Browse | browse: Name | camel-core | Provdes a simple BrowsableEndpoint which can be useful for testing, visualisation tools or debugging. The exchanges sent to the endpoint are all available to be browsed. |
Cache | cache://CacheName[?Options] | camel-cache | The cache component enables you to perform caching operations using EHCache as the Cache Implementation. |
Class | class:ClassName[?method=MethodName] | camel-core | Uses the Bean binding to bind message exchanges to beans in the registry. Is also used for exposing and invoking POJOs (Plain Old Java Objects). |
CMIS | cmis:CmisServerUrl[?Options] | camel-cmis | Uses the Apache Chemistry client API to interface with CMIS supporting CMS. |
Cometd | cometd://Hostname[:Port]/ChannelName[?Options] | camel-cometd | A transport for working with the jetty implementation of the cometd/bayeux protocol. |
Context | context:CamelContextId:LocalEndpointName | camel-context | Refers to an endpoint in a different CamelContext . |
ControlBus | controlbus:Command[?Options] | camel-core | ControlBus Enterprise Integration Pattern that allows you to send messages to endpoints for managing and monitoring your Camel applications. |
CouchDB | couchdb:http://Hostname[:Port] /Database[?Options]://Name[?Options] | camel-couchdb | Allows you to treat CouchDB instances as a producer or consumer of messages. |
Crypto | crypto:sign:Name[?Options] crypto:verify:Name[?Options] | camel-crypto | Sign and verify exchanges using the Signature Service of the Java Cryptographic Extension. |
CXF | cxf://Address[?Options] | camel-cxf | Working with Apache CXF for web services integration. |
CXF Bean | cxf:BeanName | camel-cxf | Proceess the exchange using a JAX WS or JAX RS annotated bean from the registry. |
CXFRS | cxfrs:bean:RsEndpoint[?Options] | camel-cxf | Provides integration with Apache CXF for connecting to JAX-RS services hosted in CXF. |
DataFormat | dataformat:Name:(marshal|unmarshal)[?Options] | camel-core | Enables you to marshal or unmarshal a message in one of the standard Camel data formats, by sending it to an endpoint. |
DataSet | dataset:Name[?Options] | camel-core | For load & soak testing the DataSet provides a way to create huge numbers of messages for sending to Components or asserting that they are consumed correctly. |
Direct | direct:EndpointID[?Options] | camel-core | Synchronous call (single-threaded) to another endpoint from same CamelContext. |
Direct-VM | direct-vm:EndpointID[?Options] | camel-core | Synchronous call (single-threaded) to another endpoint in another CamelContext running in the same JVM. |
Disruptor | disruptor:Name[?Options] disruptor-vm:Name[?Options] | camel-disruptor | Similar to a SEDA endpoint, but uses a Disruptor instead of a blocking queue. |
DNS | dns:Operation | camel-dns | Look up domain information and run DNS queries using DNSJava |
Dropbox | dropbox://[Operation][?Options] | camel-dropbox | Sends or receives messages from Dropbox remote folders. |
ElasticSearch | elasticsearch:ClusterName | camel-elasticsearch | For interfacing with an ElasticSearch server. |
EventAdmin | eventadmin:topic | camel-eventadmin | |
Exec | exec://Executable[?Options] | camel-exec | Execute system command. |
Fabric | fabric:ClusterID[:PublishedURI][?Options] | fabric-camel | Look up or publish a fabric endpoint. |
facebook://[Endpoint][?Options] | camel-facebook | Provides access to all of the Facebook APIs accessible using Facebook4J. | |
File2 | file://DirectoryName[?Options] | camel-core | Sending messages to a file or polling a file or directory. |
Flatpack | flatpack:[fixed|delim]: ConfigFile | camel-flatpack | Processing fixed width or delimited files or messages using the FlatPack library |
FOP | fop:OutputFormat | camel-fop | Renders the message into different output formats using Apache FOP. |
Freemarker | freemarker: TemplateResource | camel-freemarker | Generates a response using a Freemarker template. |
FTP2 | ftp://[Username@]Hostname[:Port]/Directoryname[?Options] | camel-ftp | Sending and receiving files over FTP. |
GAuth | gauth://Name[?Options] | camel-gae | Used by web applications to implement a Google-specific OAuth consumer |
GHTTP | ghttp:///Path[?Options] ghttp://Hostname[:Port]/Path[?Options] ghttps://Hostname[:Port]/Path[?Options] | camel-gae | Provides connectivity to the GAE URL fetch service and can also be used to receive messages from servlets. |
GLogin | glogin://Hostname[:Port][?Options] | camel-gae | Used by Camel applications outside Google App Engine (GAE) for programmatic login to GAE applications. |
GMail | gmail://Username@gmail.com[?Options] gmail://Username@googlemail.com[?Options] | camel-gae | Supports sending of emails via the GAE mail service. |
GTask | gtask://QueueName | camel-gae | Supports asynchronous message processing on GAE using the task queueing service as a message queue. |
Geocoder | geocoder:Address:Name[?Options] geocoder:latlng:Latitude,Longitude[?Options] | camel-geocoder | Looks up geocodes (latitude and longitude) for a given address, or performs reverse look-up. |
GoogleDrive | google-drive://EndpointPrefix/Endpoint[?Options] | camel-google-drive | Provides access to the Google Drive file storage service. |
Guava EventBus | guava-eventbus:BusName[?EventClass=ClassName] | camel-guava-eventbus | The Google Guava EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). This component provides integration bridge between Camel and Google Guava EventBus infrastructure. |
Hazelcast | hazelcast://StoreType:CacheName[?Options] | camel-hazelcast | Hazelcast is a data grid entirely implemented in Java (single JAR). This component supports map, multimap, seda, queue, set, atomic number and simple cluster. |
HBase | hbase://Table[?Options] | camel-hbase | For reading/writing from/to an HBase store (Hadoop database). |
HDFS | hdfs://Hostname[:Port][/Path][?Options] | camel-hdfs | Reads from and writes to a Hadoop Distributed File System (HDFS) using Hadoop 1.x. |
HDFS2 | hdfs2://Hostname[:Port][/Path][?Options] | camel-hdfs2 | Reads from and writes to a Hadoop Distributed File System (HDFS) using Hadoop 2.x. |
HL7 | mina:tcp://Host[:Port] | camel-hl7 | For working with the HL7 MLLP protocol and the HL7 model using the HAPI library. |
HTTP | http://Hostname[:Port][/ResourceUri] | camel-http | For calling out to external HTTP servers, using Apache HTTP Client 3.x. |
HTTP4 | http://Hostname[:Port][/ResourceUri] | camel-http4 | For calling out to external HTTP servers, using Apache HTTP Client 4.x. |
iBATIS | ibatis:OperationName[?Options] | camel-ibatis | Performs a query, poll, insert, update or delete in a relational database using Apache iBATIS. |
IMap | imap://[UserName@]Host[:Port][?Options] | camel-mail | Receiving email using IMap. |
IRC | irc:Host[:Port]/#Room | camel-irc | For IRC communication. |
JavaSpace | javaspace:jini://Host[?Options] | camel-javaspace | Sending and receiving messages through JavaSpace. |
JClouds | jclouds:[Blobstore|ComputService]:Provider | camel-jclouds | For interacting with cloud compute & blobstore service via JClouds. |
JCR | jcr://UserName:Password@Repository/path/to/node | camel-jcr | Storing a message in a JCR (JSR-170) compliant repository like Apache Jackrabbit. |
JDBC | jdbc:DataSourceName[?Options] | camel-jdbc | For performing JDBC queries and operations. |
Jetty | jetty:http://Host[:Port][/ResourceUri] | camel-jetty | For exposing services over HTTP. |
JGroups | jgroups:ClusterName[?Options] | camel-jgroups | Exchanges messages with JGroups clusters. |
Jing |
rng:LocalOrRemoteResource rnc:LocalOrRemoteResource | camel-jing | Validates the payload of a message using RelaxNG or RelaxNG compact syntax. |
JMS | jms:[temp:][queue:|topic:]DestinationName[?Options] | camel-jms | Working with JMS providers. |
JMX | jmx://Platform[?Options] | camel-jmx | For working with JMX notification listeners. |
JPA | jpa:[EntityClassName][?Options] | camel-jpa | For using a database as a queue via the JPA specification for working with OpenJPA, Hibernate or TopLink. |
Jsch | scp://Hostname/Destination | camel-jsch | Support for the scp protocol. |
JT400 | jt400://User:Pwd@System/PathToDTAQ | camel-jt400 | For integrating with data queues on an AS/400 (aka System i, IBM i, i5, ...) system. |
Kafka | kafka://Hostname[:Port][?Options] | camel-kafka | Sends or receives messages from an Apache Kafka message broker. |
Kestrel | kestrel://[AddressList/]Queuename[?Options] | camel-kestrel | For producing to or consuming from Kestrel queues. |
Krati | krati://[PathToDatastore/][?Options] | camel-krati | For producing to or consuming to Krati datastores. |
Language | language://LanguageName[:Script][?Options] | camel-core | Executes language scripts. |
LDAP | ldap:Host[:Port]?base=...[&scope=Scope] | camel-ldap | Performing searches on LDAP servers (Scope must be one of object|onelevel|subtree ). |
LevelDB | N/A | camel-leveldb | A very lightweight and embeddable key-value database. |
List | list:ListID | camel-core | Provides a simple BrowsableEndpoint which can be useful for testing, visualisation tools or debugging. The exchanges sent to the endpoint are all available to be browsed. |
Log | log:LoggingCategory[?level=LoggingLevel] | camel-core | Uses Jakarta Commons Logging to log the message exchange to some underlying logging system like log4j. |
Lucene | lucene:SearcherName:insert[?analyzer=Analyzer] lucene:SearcherName:query[?analyzer=Analyzer] | camel-lucene | Uses Apache Lucene to perform Java-based indexing and full text based searches using advanced analysis/tokenization capabilities. |
Master | REVISIT | ||
Metrics | metrics:[meter|counter|histogram|timer]:Metricname[?Options] | camel-metrics |
Enables you to collect various metrics directly from Camel routes using the Metrics Java library.
|
MINA | mina:tcp://Hostname[:Port][?Options] mina:udp://Hostname[:Port][?Options] mina:multicast://Hostname[:Port][?Options] mina:vm://Hostname[:Port][?Options] | camel-mina | Working with Apache MINA. |
MINA2 | mina2:tcp://Hostname[:Port][?Options] mina2:udp://Hostname[:Port][?Options] mina2:vm://Hostname[:Port][?Options] | camel-mina2 | Working with Apache MINA 2.x. |
Mock | mock:EndpointID | camel-core | For testing routes and mediation rules using mocks. |
MongoDB | mongodb:Connection[?Options] | camel-mongodb | Interacts with MongoDB databases and collections. Offers producer endpoints to perform CRUD-style operations and more against databases and collections, as well as consumer endpoints to listen on collections and dispatch objects to Camel routes. |
MQTT | mqtt:Name | camel-mqtt | Component for communicating with MQTT M2M message brokers |
MSV | msv:LocalOrRemoteResource | camel-msv | Validates the payload of a message using the MSV Library. |
Mustache | mustache:TemplateName[?Options] | camel-mustache | Enables you to process a message using a Mustache template. |
MVEL | mvel:TemplateName[?Options] | camel-mvel | Enables you to process a message using an MVEL template. |
MyBatis | mybatis:StatementName | camel-mybatis | Performs a query, poll, insert, update or delete in a relational database using MyBatis. |
Nagios | nagios://Host[:Port][?Options] | camel-nagios | Sending passive checks to Nagios using JSendNSCA. |
Netty | netty:tcp://localhost:99999[?Options] netty:udp://Remotehost:99999/[?Options] | camel-netty |
Enables you to work with TCP and UDP protocols, using the Java NIO based capabilities offered by Netty version 3.x.
|
Netty4 | netty4:tcp://localhost:99999[?Options] netty4:udp://Remotehost:99999/[?Options] | camel-netty4 |
Enables you to work with TCP and UDP protocols, using the Java NIO based capabilities offered by Netty version 4.x.
|
Netty HTTP | netty-http:http://Hostname[:Port][?Options] | camel-netty-http | An extension to the Netty component, facilitating the HTTP transport, using Netty version 3.x. |
Netty4 HTTP | netty4-http:http://Hostname[:Port][?Options] | camel-netty4-http | An extension to the Netty component, facilitating the HTTP transport, using Netty version 4.x. |
Olingo2 | olingo2://Endpoint/ResourcePath[?Options] | camel-olingo2 |
Communicates with OData 2.0 services using Apache Olingo 2.0.
|
Pax-Logging | paxlogging:Appender | camel-paxlogging |
Receives Pax Logging events in the context of an OSGi container.
|
POP | pop3://[UserName@]Host[:Port][?Options] | camel-mail | Receives email using POP3 and JavaMail. |
Printer | lpr://localhost[:Port]/default[?Options] lpr://RemoteHost[:Port]/path/to/printer[?Options] | camel-printer | Provides a way to direct payloads on a route to a printer. |
Properties | properties://Key[?Options] | camel-properties | Facilitates using property placeholders directly in endpoint URI definitions. |
Quartz | quartz://[GroupName/]TimerName[?Options] quartz://GroupName/TimerName/CronExpression | camel-quartz | Provides a scheduled delivery of messages using the Quartz scheduler. |
Quartz2 | quartz2://[GroupName/]TimerName[?Options] quartz2://GroupName/TimerName/CronExpression | camel-quartz2 | Provides a scheduled delivery of messages using the Quartz Scheduler 2.x. |
Quickfix | quickfix-server:ConfigFile quickfix-client:ConfigFile | camel-quickfix | Implementation of the QuickFix for Java engine which allow to send/receive FIX messages. |
RabbitMQ | rabbitmq://Hostname[:Port]/ExchangeName[?Options] | camel-rabbitmq | Enables you to produce and consume messages from a RabbitMQ instance. |
Ref | ref:EndpointID | camel-core | Component for lookup of existing endpoints bound in the Registry. |
Restlet | restlet:RestletUrl[?Options] | camel-restlet | Component for consuming and producing Restful resources using Restlet. |
RMI | rmi://RmiRegistryHost:RmiRegistryPort/RegistryPath | camel-rmi | Working with RMI. |
Routebox | routebox:routeboxName[?Options] | camel-routebox | |
RSS | rss:Uri | camel-rss | Working with ROME for RSS integration, such as consuming an RSS feed. |
RNC | rnc:LocalOrRemoteResource | camel-jing | Validates the payload of a message using RelaxNG Compact Syntax. |
RNG | rng:LocalOrRemoteResource | camel-jing | Validates the payload of a message using RelaxNG. |
Salesforce | salesforce:Topic[?Options] | camel-salesforce | Enables producer and consumer endpoints to communicate with Salesforce using Java DTOs. |
SAP | sap:[destination:DestinationName|server:ServerName]rfcName[?Options] | camel-sap | Enables outbound and inbound communication to and from SAP systems using synchronous remote function calls, sRFC. |
Chapter 124, SAP NetWeaver | sap-netweaver:https://Hostname[:Port]/Path[?Options] | camel-sap-netweaver | Integrates with the SAP NetWeaver Gateway using HTTP transports. |
Schematron | schematron://Path[?Options] | camel-schematron | Validates XML documents using Schematron. |
SEDA | seda:EndpointID | camel-core | Used to deliver messages to a java.util.concurrent.BlockingQueue, useful when creating SEDA style processing pipelines within the same CamelContext. |
SERVLET | servlet://RelativePath[?Options] | camel-servlet | Provides HTTP based endpoints for consuming HTTP requests that arrive at a HTTP endpoint and this endpoint is bound to a published Servlet. |
ServletListener | N/A | camel-servletlistener | Used for bootstrapping Camel applications in Web applications. |
SFTP | sftp://[Username@]Hostname[:Port]/Directoryname[?Options] | camel-ftp | Sending and receiving files over SFTP. |
Sip | sip://User@Hostname[:Port][?Options] sips://User@Hostname[:Port][?Options] | camel-sip | Publish/subscribe communication capability using the telecom SIP protocol. RFC3903 - Session Initiation Protocol (SIP) Extension for Event |
SJMS | sjms:[queue:|topic:]destinationName[?Options] | camel-sjms | A JMS client for Camel that employs best practices for JMS client creation and configuration. |
SMPP | smpp://UserInfo@Host[:Port][?Options] | camel-smpp | To send and receive SMS using Short Messaging Service Center using the JSMPP library. |
SMTP | smtp://[UserName@]Host[:Port][?Options] | camel-mail | Sending email using SMTP and JavaMail. |
SNMP | snmp://Hostname[:Port][?Options] | camel-snmp | Gives you the ability to poll SNMP capable devices or receive traps. |
Solr | solr://Hostname[:Port]/Solr[?Options] | camel-solr | Uses the Solrj client API to interface with an Apache Lucene Solr server. |
Splunk | splunk://Endpoint[?Options] | camel-splunk | Enables you to publish events and search for events in Splunk. |
Spring Batch | spring-batch:Job[?Options] | camel-spring-batch | To bridge Camel and Spring Batch. |
Spring Event | spring-event://dummy | camel-spring | Publishes or consumes Spring ApplicationEvents objects in a Spring context. |
Spring Integration | spring-integration:DefaultChannelName[?Options] | camel-spring-integration | The bridge component of Camel and Spring Integration. |
Spring LDAP | spring-ldap:SpringLdapTemplate[?Options] | camel-spring-ldap | Provides a Camel wrapper for Spring LDAP. |
Spring Redis | spring-redis://Hostname[:Port][?Options] | camel-spring-redis | Enables sending and receiving messages from Redis, which is an advanced key-value store, where keys can contain strings, hashes, lists, sets and sorted sets. |
Spring Web Services | spring-ws:[MappingType:]Address[?Options] | camel-spring-ws | Client-side support for accessing web services, and server-side support for creating your own contract-first web services using Spring Web Services. |
SQL | sql:SqlQueryString[?Options] | camel-sql | Performing SQL queries using JDBC. |
SSH | ssh:[Username[:Password]@]Host[:Port][?Options] | camel-ssh | For sending commands to a SSH server. |
StAX | stax:ContentHandlerClassName | camel-stax | Process messages through a SAX ContentHandler. |
Stomp | stomp:queue:Destination[?Options] | camel-stomp | For sending messages to or receiving messages from a Stomp compliant broker, such as Apache ActiveMQ. |
Stream | stream:[in|out|err|header][?Options] | camel-stream | Read or write to an input/output/error/file stream rather like Unix pipes. |
String Template | string-template:TemplateURI[?Options] | camel-stringtemplate | Generates a response using a String Template. |
Stub | stub:SomeOtherCamelUri | camel-core | Allows you to stub out some physical middleware endpoint for easier testing or debugging. |
Test | test:RouterEndpointUri | camel-spring | Creates a Mock endpoint which expects to receive all the message bodies that could be polled from the given underlying endpoint. |
Timer | timer:EndpointID[?Options] | camel-core | A timer endpoint. |
twitter://[Endpoint][?Options] | camel-twitter | A Twitter endpoint. | |
UrlRewrite | N/A | camel-urlrewrite | Enables you to plug URL rewrite functionality into the HTTP, HTTP4, Jetty, or AHC components. |
Validation | validator:LocalOrRemoteResource | camel-spring | Validates the payload of a message using XML Schema and JAXP Validation. |
Velocity | velocity:TemplateURI[?Options] | camel-velocity | Generates a response using an Apache Velocity template. |
Vertx | vertx:ChannelName[?Options] | camel-vertx | For working with the Vertx Event Bus. |
VM | vm:EndpointID | camel-core | Used to deliver messages to a java.util.concurrent.BlockingQueue, useful when creating SEDA style processing pipelines within the same JVM. |
Weather | weather://DummyName[?Options] | camel-weather | Polls weather information from Open Weather Map: a site that provides free global weather and forecast information. |
Websocket | websocket://Hostname[:Port]/Path | camel-websocket | Communicating with Websocket clients. |
XML RPC | xmlrpc://ServerURI[?Options] | camel-xmlrpc | Provides a data format for XML, which allows serialization and deserialization of request messages and response message using Apache XmlRpc's bindary data format. |
XML Security | N/A | camel-xmlsecurity | Generate and validate XML signatures as described in the W3C standard XML Signature Syntax and Processing. |
XMPP | xmpp:Hostname[:Port][/Room] | camel-xmpp | Working with XMPP and Jabber. |
XQuery | xquery:TemplateURI | camel-saxon | Generates a response using an XQuery template. |
XSLT | xslt:TemplateURI[?Options] | camel-spring | Enables you to process a message using an XSLT template. |
Yammer | yammer:[function][?Options] | camel-yammer | Enables you to interact with the Yammer enterprise social network. |
Zookeeper | zookeeper://Hostname[:Port]/Path | camel-zookeeper | Working with ZooKeeper cluster(s). |
Chapter 2. ActiveMQ
ActiveMQ Component
The ActiveMQ component allows messages to be sent to a JMS Queue or Topic; or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on the JMS Component and uses Spring's JMS support for declarative transactions, using Spring's
JmsTemplate
for sending and a MessageListenerContainer
for consuming. All the options from the JMS component also apply for this component.
To use this component, make sure you have the
activemq.jar
or activemq-core.jar
on your classpath along with any Apache Camel dependencies such as camel-core.jar
, camel-spring.jar
and camel-jms.jar
.
Transacted and caching
See section Transactions and Cache Levels below on JMS page if you are using transactions with JMS as it can impact performance.
URI format
activemq:[queue:|topic:]destinationName
Where destinationName is an ActiveMQ queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue,
FOO.BAR
, use:
activemq:FOO.BAR
You can include the optional
queue:
prefix, if you prefer:
activemq:queue:FOO.BAR
To connect to a topic, you must include the
topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
activemq:topic:Stocks.Prices
Options
See Options on the JMS component as all these options also apply for this component.
Configuring the Connection Factory
The following test case shows how to add an ActiveMQComponent to the CamelContext using the
activeMQComponent()
method while specifying the brokerURL used to connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Configuring the Connection Factory using Spring XML
You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
Using connection pooling
When sending to an ActiveMQ broker using Camel it's recommended to use a pooled connection factory to handle efficient pooling of JMS connections, sessions and producers. This is documented in the page ActiveMQ Spring Support.
You can grab Jencks AMQ pool with Maven:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.3.2</version> </dependency>
And then setup the activemq component as follows:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
Note
Notice the init and destroy methods on the pooled connection factory. This is important to ensure the connection pool is properly started and shutdown.
The
PooledConnectionFactory
will then create a connection pool with up to 8 connections in use at the same time. Each connection can be shared by many sessions. There is an option named maxActive
you can use to configure the maximum number of sessions per connection; the default value is 500
. From ActiveMQ 5.7 onwards the option has been renamed to better reflect its purpose, being named as maxActiveSessionPerConnection
. Notice the concurrentConsumers
is set to a higher value than maxConnections
is. This is okay, as each consumer is using a session, and as a session can share the same connection, we are in the safe. In this example we can have 8 * 500 = 4000 active sessions at the same time.
Invoking MessageListener POJOs in a route
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component is capable of invoking any JMS MessageListener bean directly inside any route.
So for example you can create a MessageListener in JMS as follows:
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
Then use it in your route as follows
from("file://foo/bar"). bean(MyListener.class);
That is, you can reuse any of the Apache Camel Components and easily integrate them into your JMS
MessageListener
POJO\!
Using ActiveMQ Destination Options
Available as of ActiveMQ 5.6
You can configure the Destination Options in the endpoint uri, using the "destination." prefix. For example to mark a consumer as exclusive, and set its prefetch size to 50, you can do as follows:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&estination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
Consuming Advisory Messages
ActiveMQ can generate Advisory messages which are put in topics that you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
If you consume a message on a queue, you should see the following files under data/activemq folder :
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
and containing string:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
Getting Component JAR
You need this dependency:
activemq-camel
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency>
Chapter 3. AHC
Async Http Client (AHC) Component
Available as of Camel 2.8
The ahc: component provides HTTP based endpoints for consuming external HTTP resources (as a client to call external servers using HTTP). The component uses the Async Http Client library.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ahc</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
ahc:http://hostname[:port][/resourceUri][?options] ahc:https://hostname[:port][/resourceUri][?options]
Will by default use port 80 for HTTP and 443 for HTTPS.
You can append query options to the URI in the following format,
?option=value&option=value&...
AhcEndpoint Options
Name | Default Value | Description |
---|---|---|
throwExceptionOnFailure
|
true
|
Option to disable throwing the AhcOperationFailedException in case of failed responses from the remote server. This allows you to get all responses regardless of the HTTP status code.
|
bridgeEndpoint
|
false
|
If the option is true, then the Exchange.HTTP_URI header is ignored, and use the endpoint's URI for request. You may also set the throwExcpetionOnFailure to be false to let the AhcProducer send all the fault response back. |
transferException
|
false
|
If enabled and an Exchange failed processing on the consumer side, and if the caused Exception was send back serialized in the response as a application/x-java-serialized-object content type (for example using Jetty or Servlet Camel components). On the producer side the exception will be deserialized and thrown as is, instead of the AhcOperationFailedException . The caused exception is required to be serialized.
|
client
|
null
|
To use a custom com.ning.http.client.AsyncHttpClient .
|
clientConfig
|
null
|
To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig .
|
clientConfig.x
|
null
|
To configure additional properties of the com.ning.http.client.AsyncHttpClientConfig instance used by the endpoint. Note that configuration options set using this parameter will be merged with those set using the clientConfig parameter or the instance set at the component level with properties set using this parameter taking priority.
|
clientConfig.realm.x
|
null
|
Camel 2.11: To configure realm properties of the com.ning.http.client.AsyncHttpClientConfig The options which can be used are the options from com.ning.http.client.Realm.RealmBuilder . eg to set scheme, you can configure "clientConfig.realm.scheme=DIGEST"
|
binding
|
null
|
To use a custom org.apache.camel.component.ahc.AhcBinding .
|
sslContextParameters
|
null
|
Camel 2.9: Reference to a org.apache.camel.util.jsse.SSLContextParameters in the CAMEL:Registry. This reference overrides any configured SSLContextParameters at the component level. See Using the JSSE Configuration Utility. Note that configuring this option will override any SSL/TLS configuration options provided through the clientConfig option at the endpoint or component level.
|
bufferSize
|
4096
|
Camel 2.10.3: The initial in-memory buffer size used when transferring data between Camel and AHC Client. |
AhcComponent Options
Name | Default Value | Description |
---|---|---|
client
|
null
|
To use a custom com.ning.http.client.AsyncHttpClient .
|
clientConfig
|
null
|
To configure the AsyncHttpClient s use a custom com.ning.http.client.AsyncHttpClientConfig .
|
binding
|
null
|
To use a custom org.apache.camel.component.ahc.AhcBinding .
|
sslContextParameters
|
null
|
Camel 2.9: To configure custom SSL/TLS configuration options at the component level. See Using the JSSE Configuration Utility for more details. Note that configuring this option will override any SSL/TLS configuration options provided through the clientConfig option at the endpoint or component level. |
Notice that setting any of the options on the
AhcComponent
will propagate those options to AhcEndpoint
s being created. However the AhcEndpoint
can also configure/override a custom option. Options set on endpoints will always take precedence over options from the AhcComponent
.
Message Headers
Name | Type | Description |
---|---|---|
Exchange.HTTP_URI
|
String
|
URI to call. Will override existing URI set directly on the endpoint. |
Exchange.HTTP_PATH
|
String
|
Request URI's path, the header will be used to build the request URI with the HTTP_URI. If the path is start with "/", http producer will try to find the relative path based on the Exchange.HTTP_BASE_URI header or the exchange.getFromEndpoint().getEndpointUri();
|
Exchange.HTTP_QUERY
|
String
|
URI parameters. Will override existing URI parameters set directly on the endpoint. |
Exchange.HTTP_RESPONSE_CODE
|
int
|
The HTTP response code from the external server. Is 200 for OK. |
Exchange.HTTP_CHARACTER_ENCODING
|
String
|
Character encoding. |
Exchange.CONTENT_TYPE
|
String
|
The HTTP content type. Is set on both the IN and OUT message to provide a content type, such as text/html .
|
Exchange.CONTENT_ENCODING
|
String
|
The HTTP content encoding. Is set on both the IN and OUT message to provide a content encoding, such as gzip .
|
Message Body
Camel will store the HTTP response from the external server on the OUT body. All headers from the IN message will be copied to the OUT message, so headers are preserved during routing. Additionally Camel will add the HTTP response headers as well to the OUT message headers.
Response code
Camel will handle according to the HTTP response code:
- Response code is in the range 100..299, Camel regards it as a success response.
- Response code is in the range 300..399, Camel regards it as a redirection response and will throw a
AhcOperationFailedException
with the information. - Response code is 400+, Camel regards it as an external server failure and will throw a
AhcOperationFailedException
with the information. The option,throwExceptionOnFailure
, can be set tofalse
to prevent theAhcOperationFailedException
from being thrown for failed response codes. This allows you to get any response from the remote server.
AhcOperationFailedException
This exception contains the following information:
- The HTTP status code
- The HTTP status line (text of the status code)
- Redirect location, if server returned a redirect
- Response body as a
java.lang.String
, if server provided a body as response
Calling using GET or POST
The following algorithm is used to determine if either
GET
or POST
HTTP method should be used: 1. Use method provided in header. 2. GET
if query string is provided in header. 3. GET
if endpoint is configured with a query string. 4. POST
if there is data to send (body is not null). 5. GET
otherwise.
Configuring URI to call
You can set the HTTP producer's URI directly form the endpoint URI. In the route below, Camel will call out to the external server,
oldhost
, using HTTP.
from("direct:start") .to("ahc:http://oldhost");
And the equivalent Spring sample:
<camelContext xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="direct:start"/> <to uri="ahc:http://oldhost"/> </route> </camelContext>
You can override the HTTP endpoint URI by adding a header with the key,
Exchange.HTTP_URI
, on the message.
from("direct:start") .setHeader(Exchange.HTTP_URI, constant("http://newhost")) .to("ahc:http://oldhost");
Configuring URI Parameters
The ahc producer supports URI parameters to be sent to the HTTP server. The URI parameters can either be set directly on the endpoint URI or as a header with the key
Exchange.HTTP_QUERY
on the message.
from("direct:start") .to("ahc:http://oldhost?order=123&detail=short");
Or options provided in a header:
from("direct:start") .setHeader(Exchange.HTTP_QUERY, constant("order=123&detail=short")) .to("ahc:http://oldhost");
How to set the http method (GET/POST/PUT/DELETE/HEAD/OPTIONS/TRACE) to the HTTP producer
The HTTP component provides a way to set the HTTP request method by setting the message header. Here is an example;
from("direct:start") .setHeader(Exchange.HTTP_METHOD, constant("POST")) .to("ahc:http://www.google.com") .to("mock:results");
And the equivalent Spring sample:
<camelContext xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="direct:start"/> <setHeader headerName="CamelHttpMethod"> <constant>POST</constant> </setHeader> <to uri="ahc:http://www.google.com"/> <to uri="mock:results"/> </route> </camelContext>
Configuring charset
If you are using
POST
to send data you can configure the charset
using the Exchange
property:
exchange.setProperty(Exchange.CHARSET_NAME, "iso-8859-1");
URI Parameters from the endpoint URI
In this sample we have the complete URI endpoint that is just what you would have typed in a web browser. Multiple URI parameters can of course be set using the
&
character as separator, just as you would in the web browser. Camel does no tricks here.
// we query for Camel at the Google page template.sendBody("ahc:http://www.google.com/search?q=Camel", null);
URI Parameters from the Message
Map headers = new HashMap(); headers.put(Exchange.HTTP_QUERY, "q=Camel&lr=lang_en"); // we query for Camel and English language at Google template.sendBody("ahc:http://www.google.com/search", null, headers);
In the header value above notice that it should not be prefixed with
?
and you can separate parameters as usual with the &
char.
Getting the Response Code
You can get the HTTP response code from the AHC component by getting the value from the Out message header with
Exchange.HTTP_RESPONSE_CODE
.
Exchange exchange = template.send("ahc:http://www.google.com/search", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(Exchange.HTTP_QUERY, constant("hl=en&q=activemq")); } }); Message out = exchange.getOut(); int responseCode = out.getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
Configuring AsyncHttpClient
The
AsyncHttpClient
client uses a AsyncHttpClientConfig
to configure the client. See the documentation at Async Http Client for more details.
The example below shows how to use a builder to create the
AsyncHttpClientConfig
which we configure on the AhcComponent
.
// create a client config builder AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder(); // use the builder to set the options we want, in this case we want to follow redirects and try // at most 3 retries to send a request to the host AsyncHttpClientConfig config = builder.setFollowRedirects(true).setMaxRequestRetry(3).build(); // lookup AhcComponent AhcComponent component = context.getComponent("ahc", AhcComponent.class); // and set our custom client config to be used component.setClientConfig(config);
In Camel 2.9, the AHC component uses Async HTTP library 1.6.4. This newer version provides added support for plain bean style configuration. The
AsyncHttpClientConfigBean
class provides getters and setters for the configuration options available in AsyncHttpClientConfig
. An instance of AsyncHttpClientConfigBean
may be passed directly to the AHC component or referenced in an endpoint URI using the clientConfig
URI parameter.
Also available in Camel 2.9 is the ability to set configuration options directly in the URI. URI parameters starting with "clientConfig." can be used to set the various configurable properties of
AsyncHttpClientConfig
. The properties specified in the endpoint URI are merged with those specified in the configuration referenced by the "clientConfig" URI parameter with those being set using the "clientConfig." parameter taking priority. The AsyncHttpClientConfig
instance referenced is always copied for each endpoint such that settings on one endpoint will remain independent of settings on any previously created endpoints. The example below shows how to configure the AHC component using the "clientConfig." type URI parameters.
from("direct:start") .to("ahc:http://localhost:8080/foo?clientConfig.maxRequestRetry=3&clientConfig.followRedirects=true")
SSL Support (HTTPS)
Using the JSSE Configuration Utility
As of Camel 2.9, the AHC component supports SSL/TLS configuration through the Camel JSSE Configuration Utility. This utility greatly decreases the amount of component specific code you need to write and is configurable at the endpoint and component levels. The following examples demonstrate how to use the utility with the AHC component.
Programmatic configuration of the component
KeyStoreParameters ksp = new KeyStoreParameters(); ksp.setResource("/users/home/server/keystore.jks"); ksp.setPassword("keystorePassword"); KeyManagersParameters kmp = new KeyManagersParameters(); kmp.setKeyStore(ksp); kmp.setKeyPassword("keyPassword"); SSLContextParameters scp = new SSLContextParameters(); scp.setKeyManagers(kmp); AhcComponent component = context.getComponent("ahc", AhcComponent.class); component.setSslContextParameters(scp));
Spring DSL based configuration of endpoint
... <camel:sslContextParameters id="sslContextParameters"> <camel:keyManagers keyPassword="keyPassword"> <camel:keyStore resource="/users/home/server/keystore.jks" password="keystorePassword"/> </camel:keyManagers> </camel:sslContextParameters>... ... <to uri="ahc:https://localhost/foo?sslContextParameters=#sslContextParameters"/> ...
Chapter 4. AHC-WS
Async Http Client (AHC) Websocket Client Component
Available as of Camel 2.14
The ahc-ws component provides Websocket based endpoints for a client communicating with external servers over Websocket (as a client opening a websocket connection to an external server). The component uses the Chapter 3, AHC component that in turn uses the Async Http Client library.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ahc-ws</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI Format
ahc-ws://hostname[:port][/resourceUri][?options] ahc-wss://hostname[:port][/resourceUri][?options]
Will by default use port 80 for ahc-ws and 443 for ahc-wss.
AHC-WS Options
As the AHC-WS component is based on the AHC component, you can use the various configuration options of the AHC component.
Writing and Reading Data over Websocket
An ahc-ws endpoint can either write data to the socket or read from the socket, depending on whether the endpoint is configured as the producer or the consumer, respectively.
Configuring URI to Write or Read Data
In the route below, Camel will write to the specified websocket connection.
from("direct:start") .to("ahc-ws://targethost");
And the equivalent Spring sample:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <to uri="ahc-ws://targethost"/> </route> </camelContext>
In the route below, Camel will read from the specified websocket connection.
from("ahc-ws://targethost") .to("direct:next");
And the equivalent Spring sample:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="ahc-ws://targethost"/> <to uri="direct:next"/> </route> </camelContext>
Chapter 5. AMQP
AMQP
The AMQP component supports the AMQP protocol via the Qpid project.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ampq</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
amqp:[queue:|topic:]destinationName[?options]
You can specify all of the various configuration options of the JMS component after the destination name.
Chapter 6. APNS
Apns Component
Available as of Camel 2.8
The apns component is used for sending notifications to iOS devices. The apns components use javapns library. The component supports sending notifications to Apple Push Notification Servers (APNS) and consuming feedback from the servers.
The consumer is configured with a default polling time of 3600 seconds. It is advisable to consume the feedback stream from Apple Push Notification Servers regularly at larger intervals to avoid flooding the servers.
The feedback stream gives information about inactive devices. This information can be consumed infrequently (every two or three hours) if your mobile application is not heavily used.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-apns</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
To send notifications:
apns:notify[?options]
To consume feedback:
apns:consumer[?options]
Options
Producer
Property | Default | Description |
---|---|---|
tokens
|
Empty by default. Configure this property in case you want to statically declare tokens related to devices you want to notify. Tokens are separated by comma. |
Consumer
Property | Default | Description |
---|---|---|
delay
|
3600
|
Delay in seconds between each poll. |
initialDelay
|
10
|
Seconds before polling starts. |
timeUnit
|
SECONDS
|
Time Unit for polling. |
userFixedDelay
|
true
|
If true , use fixed delay between pools, otherwise fixed rate is used. See ScheduledExecutorService in JDK for details.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Component
The
ApnsComponent
must be configured with a com.notnoop.apns.ApnsService
. The service can be created and configured using the org.apache.camel.component.apns.factory.ApnsServiceFactory
. See further below for an example. For further information, see the test source code.
Exchange data format
When Camel fetches feedback data corresponding to inactive devices, it retrieves a List of InactiveDevice objects. Each InactiveDevice object on the retrieved list will be set as the In body, and then processed by the consumer endpoint.
Message Headers
Camel Apns uses these headers.
Property | Default | Description |
---|---|---|
CamelApnsTokens
|
Empty by default. | |
CamelApnsMessageType
|
STRING, PAYLOAD
|
If you choose PAYLOAD as the message type, the message will be considered an APNS payload and sent as is. If you choose STRING, the message will be converted to an APNS payload |
ApnsServiceFactory Builder Callback
ApnsServiceFactory
comes with an empty callback method that can be used to configure or replace the default ApnsServiceBuilder
instance. The method has the following format:
protected ApnsServiceBuilder configureServiceBuilder(ApnsServiceBuilder serviceBuilder);It is used in the following way:
ApnsServiceFactory proxiedApnsServiceFactory = new ApnsServiceFactory(){ @Override protected ApnsServiceBuilder configureServiceBuilder(ApnsServiceBuilder serviceBuilder) { return serviceBuilder.withSocksProxy("my.proxy.com", 6666); } };
Samples
Camel Xml route
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- Replace by desired values --> <bean id="apnsServiceFactory" class="org.apache.camel.component.apns.factory.ApnsServiceFactory"> <!-- Optional configuration of feedback host and port --> <!-- <property name="feedbackHost" value="localhost" /> --> <!-- <property name="feedbackPort" value="7843" /> --> <!-- Optional configuration of gateway host and port --> <!-- <property name="gatewayHost" value="localhost" /> --> <!-- <property name="gatewayPort" value="7654" /> --> <!-- Declaration of certificate used --> <!-- from Camel 2.11 onwards you can use prefix: classpath:, file: to refer to load the certificate from classpath or file. Default it classpath --> <property name="certificatePath" value="certificate.p12" /> <property name="certificatePassword" value="MyCertPassword" /> <!-- Optional connection strategy - By Default: No need to configure --> <!-- Possible options: NON_BLOCKING, QUEUE, POOL or Nothing --> <!-- <property name="connectionStrategy" value="POOL" /> --> <!-- Optional pool size --> <!-- <property name="poolSize" value="15" /> --> <!-- Optional connection strategy - By Default: No need to configure --> <!-- Possible options: EVERY_HALF_HOUR, EVERY_NOTIFICATION or Nothing (Corresponds to NEVER javapns option) --> <!-- <property name="reconnectionPolicy" value="EVERY_HALF_HOUR" /> --> </bean> <bean id="apnsService" factory-bean="apnsServiceFactory" factory-method="getApnsService" /> <!-- Replace this declaration by wanted configuration --> <bean id="apns" class="org.apache.camel.component.apns.ApnsComponent"> <property name="apnsService" ref="apnsService" /> </bean> <camelContext id="camel-apns-test" xmlns="http://camel.apache.org/schema/spring"> <route id="apns-test"> <from uri="apns:consumer?initialDelay=10&elay=3600&imeUnit=SECONDS" /> <to uri="log:org.apache.camel.component.apns?showAll=true&ultiline=true" /> <to uri="mock:result" /> </route> </camelContext> </beans>
Camel Java route
Create camel context and declare apns component programmatically
protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); ApnsServiceFactory apnsServiceFactory = new ApnsServiceFactory(); apnsServiceFactory.setCertificatePath("classpath:/certificate.p12"); apnsServiceFactory.setCertificatePassword("MyCertPassword"); ApnsService apnsService = apnsServiceFactory.getApnsService(camelContext); ApnsComponent apnsComponent = new ApnsComponent(apnsService); camelContext.addComponent("apns", apnsComponent); return camelContext; }
ApnsProducer - iOS target device dynamically configured via header: "CamelApnsTokens"
protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("direct:test") .setHeader(ApnsConstants.HEADER_TOKENS, constant(IOS_DEVICE_TOKEN)) .to("apns:notify"); } } }
ApnsProducer - iOS target device statically configured via uri
protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("direct:test"). to("apns:notify?tokens=" + IOS_DEVICE_TOKEN); } }; }
ApnsConsumer
from("apns:consumer?initialDelay=10&delay=3600&timeUnit=SECONDS") .to("log:com.apache.camel.component.apns?showAll=true&multiline=true") .to("mock:result");
See Also
Chapter 7. Atmosphere-Websocket
Atmosphere Websocket Servlet Component
Available as of Camel 2.14
The atmosphere-websocket: component provides Websocket based endpoints for a servlet communicating with external clients over Websocket (as a servlet accepting websocket connections from external clients). The component uses the Chapter 128, SERVLET component and uses the Atmosphere library to support the Websocket transport in various Servlet containers (e..g., Jetty, Tomcat, ...).
Unlike the Chapter 159, Websocket component that starts the embedded Jetty server, this component uses the servlet provider of the container.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-atmosphere-websocket</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI Format
atmosphere-websocket:///relative path[?options]
Reading and Writing Data over Websocket
An atmopshere-websocket endpoint can either write data to the socket or read from the socket, depending on whether the endpoint is configured as the producer or the consumer, respectively.
Configuring URI to Read or Write Data
In the route below, Camel will read from the specified websocket connection.
from("atmosphere-websocket:///servicepath") .to("direct:next");
And the equivalent Spring sample:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="atmosphere-websocket:///servicepath"/> <to uri="direct:next"/> </route> </camelContext>
In the route below, Camel will read from the specified websocket connection.
from("direct:next") .to("atmosphere-websocket:///servicepath");
And the equivalent Spring sample:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:next"/> <to uri="atmosphere-websocket:///servicepath"/> </route> </camelContext>
Chapter 8. Atom
Atom Component
The atom: component is used for polling atom feeds.
Apache Camel will poll the feed every 500 milliseconds by default. Note: The component currently supports only polling (consuming) feeds.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-atom</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
atom://atomUri[?options]
Where atomUri is the URI to the Atom feed to poll.
Options
Property | Default | Description |
---|---|---|
splitEntries
|
true
|
If true Apache Camel will poll the feed and for the subsequent polls return each entry poll by poll. If the feed contains 7 entries then Apache Camel will return the first entry on the first poll, the 2nd entry on the next poll, until no more entries where as Apache Camel will do a new update on the feed. If false then Apache Camel will poll a fresh feed on every invocation.
|
filter
|
true
|
Is only used by the split entries to filter the entries to return. Apache Camel will default use the UpdateDateFilter that only return new entries from the feed. So the client consuming from the feed never receives the same entry more than once. The filter will return the entries ordered by the newest last.
|
lastUpdate
|
null
|
Is only used by the filter, as the starting timestamp for selection never entries (uses the entry.updated timestamp). Syntax format is: yyyy-MM-ddTHH:MM:ss . Example: 2007-12-24T17:45:59 .
|
throttleEntries
|
true
|
Camel 2.5: Sets whether all entries identified in a single feed poll should be delivered immediately. If true , only one entry is processed per consumer.delay . Only applicable when splitEntries is set to true .
|
feedHeader
|
true
|
Sets whether to add the Abdera Feed object as a header. |
sortEntries
|
false
|
If splitEntries is true , this sets whether to sort those entries by updated date.
|
consumer.delay
|
500
|
Delay in millis between each poll. |
consumer.initialDelay
|
1000
|
Millis before polling starts. |
consumer.userFixedDelay
|
false
|
If true , use fixed delay between pools, otherwise fixed rate is used. See ScheduledExecutorService in JDK for details.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Exchange data format
Apache Camel will set the In body on the returned
Exchange
with the entries. Depending on the splitEntries
flag Apache Camel will either return one Entry
or a List<Entry>
.
Option | Value | Behavior |
---|---|---|
splitEntries
|
true
|
Only a single entry from the currently being processed feed is set: exchange.in.body(Entry)
|
splitEntries
|
false
|
The entire list of entries from the feed is set: exchange.in.body(List<Entry>)
|
Apache Camel can set the
Feed
object on the in header (see feedHeader
option to disable this):
Message Headers
Apache Camel atom uses these headers.
Header | Description |
---|---|
CamelAtomFeed
|
Apache Camel 2.0: When consuming the org.apache.abdera.model.Feed object is set to this header.
|
Samples
In the following sample we poll James Strachan's blog:
from("atom://http://macstrac.blogspot.com/feeds/posts/default").to("seda:feeds");
In this sample we want to filter only good blogs we like to a SEDA queue. The sample also shows how to set up Apache Camel standalone, not running in any container or using Spring.
@Override protected CamelContext createCamelContext() throws Exception { // First we register a blog service in our bean registry SimpleRegistry registry = new SimpleRegistry(); registry.put("blogService", new BlogService()); // Then we create the camel context with our bean registry context = new DefaultCamelContext(registry); // Then we add all the routes we need using the route builder DSL syntax context.addRoutes(createMyRoutes()); // And finally we must start Camel to let the magic routing begins context.start(); return context; } /** * This is the route builder where we create our routes using the Camel DSL syntax */ protected RouteBuilder createMyRoutes() throws Exception { return new RouteBuilder() { public void configure() throws Exception { // We pool the atom feeds from the source for further processing in the seda queue // we set the delay to 1 second for each pool. // Using splitEntries=true will during polling only fetch one Atom Entry at any given time. // As the feed.atom file contains 7 entries, using this will require 7 polls to fetch the entire // content. When Camel have reach the end of entries it will refresh the atom feed from URI source // and restart - but as Camel by default uses the UpdatedDateFilter it will only deliver new // blog entries to "seda:feeds". So only when James Strachan updates his blog with a new entry // Camel will create an exchange for the seda:feeds. from("atom:file:src/test/data/feed.atom?splitEntries=true&consumer.delay=1000").to("seda:feeds"); // From the feeds we filter each blot entry by using our blog service class from("seda:feeds").filter().method("blogService", "isGoodBlog").to("seda:goodBlogs"); // And the good blogs is moved to a mock queue as this sample is also used for unit testing // this is one of the strengths in Camel that you can also use the mock endpoint for your // unit tests from("seda:goodBlogs").to("mock:result"); } }; } /** * This is the actual junit test method that does the assertion that our routes is working * as expected */ @Test public void testFiltering() throws Exception { // create and start Camel context = createCamelContext(); context.start(); // Get the mock endpoint MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); // There should be at least two good blog entries from the feed mock.expectedMinimumMessageCount(2); // Asserts that the above expectations is true, will throw assertions exception if it failed // Camel will default wait max 20 seconds for the assertions to be true, if the conditions // is true sooner Camel will continue mock.assertIsSatisfied(); // stop Camel after use context.stop(); } /** * Services for blogs */ public class BlogService { /** * Tests the blogs if its a good blog entry or not */ public boolean isGoodBlog(Exchange exchange) { Entry entry = exchange.getIn().getBody(Entry.class); String title = entry.getTitle(); // We like blogs about Camel boolean good = title.toLowerCase().contains("camel"); return good; } }
Chapter 9. avro
Avro Component
Available as of Camel 2.10
This component provides a dataformat for avro, which allows serialization and deserialization of messages using Apache Avro's binary dataformat. Moreover, it provides support for Apache Avro's rpc, by providing producers and consumers endpoint for using avro over netty or http.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-avro</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Apache Avro Overview
Avro allows you to define message types and a protocol using a json like format and then generate java code for the specified types and messages. An example of how a schema looks like is below.
{"namespace": "org.apache.camel.avro.generated", "protocol": "KeyValueProtocol", "types": [ {"name": "Key", "type": "record", "fields": [ {"name": "key", "type": "string"} ] }, {"name": "Value", "type": "record", "fields": [ {"name": "value", "type": "string"} ] } ], "messages": { "put": { "request": [{"name": "key", "type": "Key"}, {"name": "value", "type": "Value"} ], "response": "null" }, "get": { "request": [{"name": "key", "type": "Key"}], "response": "Value" } } }
You can easily generate classes from a schema, using maven, ant etc. More details can be found at the Apache Avro documentation.
However, it doesn't enforce a schema first approach and you can create schema for your existing classes. Since 2.12 you can use existing protocol interfaces to make RCP calls. You should use interface for the protocol itself and POJO beans or primitive/String classes for parameter and result types. Here is an example of the class that corresponds to schema above:
package org.apache.camel.avro.reflection; public interface KeyValueProtocol { void put(String key, Value value); Value get(String key); } class Value { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
Note: Existing classes can be used only for RPC (see below), not in data format.
Using the Avro data format
Using the avro data format is as easy as specifying that the class that you want to marshal or unmarshal in your route.
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:in"/> <marshal> <avro instanceClass="org.apache.camel.dataformat.avro.Message"/> </marshal> <to uri="log:out"/> </route> </camelContext>
An alternative can be to specify the dataformat inside the context and reference it from your route.
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <dataFormats> <avro id="avro" instanceClass="org.apache.camel.dataformat.avro.Message"/> </dataFormats> <route> <from uri="direct:in"/> <marshal ref="avro"/> <to uri="log:out"/> </route> </camelContext>
In the same manner you can umarshal using the avro data format.
Using Avro RPC in Camel
As mentioned above Avro also provides RPC support over multiple transports such as http and netty. Camel provides consumers and producers for these two transports.
avro:[transport]:[host]:[port][?options]
The supported transport values are currently http or netty.
Since 2.12 you can specify message name right in the URI:
avro:[transport]:[host]:[port][/messageName][?options]
For consumers this allows you to have multiple routes attached to the same socket. Dispatching to correct route will be done by the avro component automatically. Route with no messageName specified (if any) will be used as default.
When using camel producers for avro ipc, the "in" message body needs to contain the parameters of the operation specified in the avro protocol. The response will be added in the body of the "out" message.
In a similar manner when using camel avro consumers for avro ipc, the requests parameters will be placed inside the "in" message body of the created exchange and once the exchange is processed the body of the "out" message will be send as a response.
Note: By default consumer parameters are wrapped into array. If you've got only one parameter, since 2.12 you can use
singleParameter
URI option to receive it direcly in the "in" message body without array wrapping.
Avro RPC URI Options
Name | Version | Description |
---|---|---|
protocolClassName
|
The class name of the avro protocol. | |
singleParameter
|
2.12 | If true, consumer parameter won't be wrapped into array. Will fail if protocol specifies more then 1 parameter for the message |
protocol
|
Avro procol object. Can be used instead of protocolClassName when complex protocol needs to be created. One cane used #name notation to refer beans from the Registry
|
|
reflectionProtocol
|
2.12 |
If protocol object provided is reflection protocol. Should be used only with protocol parameter because for protocolClassName protocol type will be autodetected
|
Avro RPC Headers
Name | Description |
---|---|
CamelAvroMessageName
|
The name of the message to send. In consumer overrides message name from URI (if any) |
Examples
An example of using camel avro producers via http:
<route> <from uri="direct:start"/> <to uri="avro:http:localhost:{{avroport}}?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"/> <to uri="log:avro"/> </route>
In the example above you need to fill
CamelAvroMessageName
header. Since 2.12 you can use following syntax to call constant messages:
<route> <from uri="direct:start"/> <to uri="avro:http:localhost:{{avroport}}/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"/> <to uri="log:avro"/> </route>
An example of consuming messages using camel avro consumers via Netty:
<route> <from uri="avro:netty:localhost:{{avroport}}?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"/> <choice> <when> <el>${in.headers.CamelAvroMessageName == 'put'}</el> <process ref="putProcessor"/> </when> <when> <el>${in.headers.CamelAvroMessageName == 'get'}</el> <process ref="getProcessor"/> </when> </choice> </route>
Since 2.12 you can set up two distinct routes to perform the same task:
<route> <from uri="avro:netty:localhost:{{avroport}}/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"> <process ref="putProcessor"/> </route> <route> <from uri="avro:netty:localhost:{{avroport}}/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol&singleParameter=true"/> <process ref="getProcessor"/> </route>
In the example above, get takes only one parameter, so
singleParameter
is used and getProcessor
will receive Value class directly in body, while putProcessor
will receive an array of size 2 with String key and Value value filled as array contents.
Chapter 10. AWS
10.1. Introduction to the AWS Components
Camel Components for Amazon Web Services
The Camel Components for Amazon Web Services provide connectivity to AWS services from Camel.
AWS service | Camel component | Camel Version | Component description |
---|---|---|---|
Simple Queue Service (SQS) | AWS-SQS | 2.6 | Supports sending and receiving messages using SQS |
Simple Notification Service (SNS) | AWS-SNS | 2.8 | Supports sending messages using SNS |
Simple Storage Service (S3) | AWS-S3 | 2.8 | Supports storing and retrieving of objects using S3 |
Simple Email Service (SES) | AWS-SES | 2.8.4 | Supports sending emails using SES |
SimpleDB | AWS-SDB | 2.8.4 | Supports storing retrieving data to/from SDB |
DynamoDB | AWS-DDB | 2.10.0 | Supports storing retrieving data to/from DDB |
CloudWatch | AWS-CW | 2.10.3 | Supports sending metrics to CloudWatch |
Simple Workflow | AWS-SWF | 2.13.0 | Supports managing workflows with SWF |
10.2. AWS-CW
CW Component
*Available as of Camel 2.11
The CW component allows messages to be sent to an Amazon CloudWatch metrics. The implementation of the Amazon API is provided by the AWS SDK.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon CloudWatch. More information are available at Amazon CloudWatch.
URI Format
aws-cw://namespace[?options]
The metrics will be created if they don't already exists. You can append query options to the URI in the following format,
?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonCwClient |
null
|
Producer |
Reference to a com.amazonaws.services.cloudwatch.AmazonCloudWatch in the Registry.
|
accessKey |
null
|
Producer | Amazon AWS Access Key |
secretKey |
null
|
Producer | Amazon AWS Secret Key |
name |
null
|
Producer | The metric name which is used if the message header 'CamelAwsCwMetricName' is not present. |
value |
1.0
|
Producer | The metric value which is used if the message header 'CamelAwsCwMetricValue' is not present. |
unit |
Count
|
Producer | The metric unit which is used if the message header 'CamelAwsCwMetricUnit' is not present. |
namespace |
null
|
Producer | The metric namespace which is used if the message header 'CamelAwsCwMetricNamespace' is not present. |
timestamp |
null
|
Producer | The metric timestamp which is used if the message header 'CamelAwsCwMetricTimestamp' is not present. |
amazonCwEndpoint |
null
|
Producer | The region with which the AWS-CW client wants to work with. |
Required CW component options
You have to provide the amazonCwClient in the Registry or your accessKey and secretKey to access the Amazon's CloudWatch.
Usage
Message headers evaluated by the CW producer
Header | Type | Description |
---|---|---|
CamelAwsCwMetricName
|
String
|
The Amazon CW metric name. |
CamelAwsCwMetricValue
|
Double
|
The Amazon CW metric value. |
CamelAwsCwMetricUnit
|
String
|
The Amazon CW metric unit. |
CamelAwsCwMetricNamespace
|
String
|
The Amazon CW metric namespace. |
CamelAwsCwMetricTimestamp
|
Date
|
The Amazon CW metric timestamp. |
CamelAwsCwMetricDimensionName
|
String
|
Camel 2.12: The Amazon CW metric dimension name. |
CamelAwsCwMetricDimensionValue
|
String
|
Camel 2.12: The Amazon CW metric dimension value. |
CamelAwsCwMetricDimensions
|
Map<String, String>
|
Camel 2.12: A map of dimension names and dimension values. |
Advanced AmazonCloudWatch configuration
If you need more control over the
AmazonCloudWatch
instance configuration you can create your own instance and refer to it from the URI:
from("direct:start") .to("aws-cw://namepsace?amazonCwClient=#client");
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonCloudWatch client = new AmazonCloudWatchClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.10 or higher).
10.3. AWS-DDB
DDB Component
Available as of Camel 2.10
The DynamoDB component supports storing and retrieving data from/to Amazon's DynamoDB service.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon DynamoDB. More information are available at Amazon DynamoDB.
URI Format
aws-ddb://domainName[?options]
You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonDDBClient |
null
|
Producer |
Reference to a com.amazonaws.services.dynamodb.AmazonDynamoDB in the Registry.
|
accessKey |
null
|
Producer | Amazon AWS Access Key |
secretKey |
null
|
Producer | Amazon AWS Secret Key |
amazonDdbEndpoint |
null
|
Producer | The region with which the AWS-DDB client wants to work with. |
tableName |
null
|
Producer | The name of the table currently worked with. |
readCapacity |
0
|
Producer | The provisioned throughput to reserve for reading resources from your table |
writeCapacity |
0
|
Producer | The provisioned throughput to reserved for writing resources to your table |
consistentRead |
false
|
Producer | Determines whether or not strong consistency should be enforced when data is read. |
operation |
PutAttributes
|
Producer | Valid values are BatchGetItems, DeleteItem, DeleteTable, DescribeTable, GetItem, PutItem, Query, Scan, UpdateItem, UpdateTable. |
Required DDB component options
You have to provide the amazonDDBClient in the Registry or your accessKey and secretKey to access the Amazon's DynamoDB.
Usage
Message headers evaluated by the DDB producer
Header | Type | Description |
---|---|---|
CamelAwsDdbBatchItems
|
Map<String, KeysAndAttributes>
|
A map of the table name and corresponding items to get by primary key. |
CamelAwsDdbTableName
|
String
|
Table Name for this operation. |
CamelAwsDdbKey
|
Key
|
The primary key that uniquely identifies each item in a table. |
CamelAwsDdbReturnValues
|
String
|
Use this parameter if you want to get the attribute name-value pairs before or after they are modified(NONE, ALL_OLD, UPDATED_OLD, ALL_NEW, UPDATED_NEW). |
CamelAwsDdbUpdateCondition
|
Map<String, ExpectedAttributeValue>
|
Designates an attribute for a conditional modification. |
CamelAwsDdbAttributeNames
|
Collection<String>
|
If attribute names are not specified then all attributes will be returned. |
CamelAwsDdbConsistentRead
|
Boolean
|
If set to true, then a consistent read is issued, otherwise eventually consistent is used. |
CamelAwsDdbItem
|
Map<String, AttributeValue>
|
A map of the attributes for the item, and must include the primary key values that define the item. |
CamelAwsDdbExactCount
|
Boolean
|
If set to true, Amazon DynamoDB returns a total number of items that match the query parameters, instead of a list of the matching items and their attributes. |
CamelAwsDdbStartKey
|
Key
|
Primary key of the item from which to continue an earlier query. |
CamelAwsDdbHashKeyValue
|
AttributeValue
|
Value of the hash component of the composite primary key. |
CamelAwsDdbLimit
|
Integer
|
The maximum number of items to return. |
CamelAwsDdbScanRangeKeyCondition
|
Condition
|
A container for the attribute values and comparison operators to use for the query. |
CamelAwsDdbScanIndexForward
|
Boolean
|
Specifies forward or backward traversal of the index. |
CamelAwsDdbScanFilter
|
Map<String, Condition>
|
Evaluates the scan results and returns only the desired values. |
CamelAwsDdbUpdateValues
|
Map<String, AttributeValueUpdate>
|
Map of attribute name to the new value and action for the update. |
Message headers set during BatchGetItems operation
Header | Type | Description |
---|---|---|
CamelAwsDdbBatchResponse
|
Map<String,BatchResponse>
|
Table names and the respective item attributes from the tables. |
CamelAwsDdbUnprocessedKeys
|
Map<String,KeysAndAttributes>
|
Contains a map of tables and their respective keys that were not processed with the current response. |
Message headers set during DeleteItem operation
Header | Type | Description |
---|---|---|
CamelAwsDdbAttributes
|
Map<String, AttributeValue>
|
The list of attributes returned by the operation. |
Message headers set during DeleteTable operation
Header | Type | Description |
---|---|---|
CamelAwsDdbProvisionedThroughput
|
ProvisionedThroughputDescription
|
The value of the ProvisionedThroughput property for this table |
CamelAwsDdbCreationDate
|
Date
|
Creation DateTime of this table. |
CamelAwsDdbTableItemCount
|
Long
|
Item count for this table. |
CamelAwsDdbKeySchema
|
KeySchema
|
The KeySchema that identifies the primary key for this table. |
CamelAwsDdbTableName
|
String
|
The table name. |
CamelAwsDdbTableSize
|
Long
|
The table size in bytes. |
CamelAwsDdbTableStatus
|
String
|
The status of the table: CREATING, UPDATING, DELETING, ACTIVE |
Message headers set during DescribeTable operation
Header | Type | Description |
---|---|---|
CamelAwsDdbProvisionedThroughput
|
{{ProvisionedThroughputDescription} | The value of the ProvisionedThroughput property for this table |
CamelAwsDdbCreationDate
|
Date
|
Creation DateTime of this table. |
CamelAwsDdbTableItemCount
|
Long
|
Item count for this table. |
CamelAwsDdbKeySchema
|
{{KeySchema | The KeySchema that identifies the primary key for this table. |
CamelAwsDdbTableName
|
String
|
The table name. |
CamelAwsDdbTableSize
|
Long
|
The table size in bytes. |
CamelAwsDdbTableStatus
|
String
|
The status of the table: CREATING, UPDATING, DELETING, ACTIVE |
CamelAwsDdbReadCapacity
|
Long
|
ReadCapacityUnits property of this table. |
CamelAwsDdbWriteCapacity
|
Long
|
WriteCapacityUnits property of this table. |
Message headers set during GetItem operation
Header | Type | Description |
---|---|---|
CamelAwsDdbAttributes
|
Map<String, AttributeValue>
|
The list of attributes returned by the operation. |
Message headers set during PutItem operation
Header | Type | Description |
---|---|---|
CamelAwsDdbAttributes
|
Map<String, AttributeValue>
|
The list of attributes returned by the operation. |
Message headers set during Query operation
Header | Type | Description |
---|---|---|
CamelAwsDdbItems
|
List<java.util.Map<String,AttributeValue>>
|
The list of attributes returned by the operation. |
CamelAwsDdbLastEvaluatedKey
|
Key
|
Primary key of the item where the query operation stopped, inclusive of the previous result set. |
CamelAwsDdbConsumedCapacity
|
Double
|
The number of Capacity Units of the provisioned throughput of the table consumed during the operation. |
CamelAwsDdbCount
|
Integer
|
Number of items in the response. |
Message headers set during Scan operation
Header | Type | Description |
---|---|---|
CamelAwsDdbItems
|
List<java.util.Map<String,AttributeValue>>
|
The list of attributes returned by the operation. |
CamelAwsDdbLastEvaluatedKey
|
Key
|
Primary key of the item where the query operation stopped, inclusive of the previous result set. |
CamelAwsDdbConsumedCapacity
|
Double
|
The number of Capacity Units of the provisioned throughput of the table consumed during the operation. |
CamelAwsDdbCount
|
Integer
|
Number of items in the response. |
CamelAwsDdbScannedCount
|
Integer
|
Number of items in the complete scan before any filters are applied. |
Message headers set during UpdateItem operation
Header | Type | Description |
---|---|---|
CamelAwsDdbAttributes
|
Map<String, AttributeValue>
|
The list of attributes returned by the operation. |
Advanced AmazonDynamoDB configuration
If you need more control over the
AmazonDynamoDB
instance configuration you can create your own instance and refer to it from the URI:
from("direct:start") .to("aws-ddb://domainName?amazonDDBClient=#client");
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonDynamoDB client = new AmazonDynamoDBClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.10 or higher).
10.4. AWS-S3
S3 Component
Available as of Camel 2.8
The S3 component supports storing and retrieving objetcs from/to Amazon's S3 service.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon S3. More information are available at Amazon S3.
URI Format
aws-s3://bucket-name[?options]
The bucket will be created if it don't already exists. You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonS3Client |
null
|
Shared |
Reference to a com.amazonaws.services.sqs.AmazonS3 in the Registry.
|
accessKey |
null
|
Shared | Amazon AWS Access Key |
secretKey |
null
|
Shared | Amazon AWS Secret Key |
amazonS3Endpoint |
null
|
Shared | The region with which the AWS-S3 client wants to work with. |
region |
null
|
Producer |
The region who the bucket is located. This option is used in the com.amazonaws.services.s3.model.CreateBucketRequest .
|
deleteAfterRead |
true
|
Consumer | Delete objects from S3 after it has been retrieved. |
deleteAfterWrite |
false
|
Producer | Camel 2.11.0 Delete file object after the S3 file has been uploaded |
maxMessagesPerPoll | 10 | Consumer |
The maximum number of objects which can be retrieved in one poll. Used in in the com.amazonaws.services.s3.model.ListObjectsRequest .
|
policy |
null
|
Shared |
*Camel 2.8.4*: The policy for this queue to set in the com.amazonaws.services.s3.AmazonS3#setBucketPolicy() method.
|
storageClass |
null
|
Producer |
*Camel 2.8.4*: The storage class to set in the com.amazonaws.services.s3.model.PutObjectRequest request.
|
prefix |
null
|
Consumer |
*Camel 2.10.1*: The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in.
|
multiPartUpload |
false
|
Producer |
Camel 2.15.0: If true , Camel uploads the file in multi-part format, where the part size can be specified by the partSize option.
|
partSize |
25 * 1024 * 1024
|
Producer |
Camel 2.15.0: Specifies the partSize used in multi-part upload. Default is 25 MB.
|
Required S3 component options
You have to provide the amazonS3Client in the Registry or your accessKey and secretKey to access the Amazon's S3.
Batch Consumer
This component implements the Batch Consumer.
This allows you for instance to know how many messages exists in this batch and for instance let the Aggregator aggregate this number of messages.
Usage
Message headers evaluated by the S3 producer
Header | Type | Description |
---|---|---|
CamelAwsS3Key
|
String
|
The key under which this object will be stored. |
CamelAwsS3ContentLength
|
Long
|
The content length of this object. |
CamelAwsS3ContentType
|
String
|
The content type of this object. |
CamelAwsS3ContentControl
|
String
|
The content control of this object. |
CamelAwsS3ContentDisposition
|
String
|
The content disposition of this object. |
CamelAwsS3ContentEncoding
|
String
|
The content encoding of this object. |
CamelAwsS3ContentMD5
|
String
|
The md5 checksum of this object. |
CamelAwsS3LastModified
|
java.util.Date
|
The last modified timestamp of this object. |
CamelAwsS3StorageClass
|
String
|
*Camel 2.8.4:* The storage class of this object. |
CamelAwsS3CannedAcl
|
String
|
Camel 2.11.0: The canned acl that will be applied to the object. see com.amazonaws.services.s3.model.CannedAccessControlList for allowed values.
|
CamelAwsS3Acl
|
com.amazonaws.services.s3.model.AccessControlList
|
Camel 2.11.0: a well constructed Amazon S3 Access Control List object. see com.amazonaws.services.s3.model.AccessControlList for more details
|
Message headers set by the S3 producer
Header | Type | Description |
---|---|---|
CamelAwsS3ETag
|
String
|
The ETag value for the newly uploaded object. |
CamelAwsS3VersionId
|
String
|
The optional version ID of the newly uploaded object. |
Message headers set by the S3 consumer
Header | Type | Description |
---|---|---|
CamelAwsS3Key
|
String
|
The key under which this object is stored. |
CamelAwsS3BucketName
|
String
|
The name of the bucket in which this object is contained. |
CamelAwsS3ETag
|
String
|
The hex encoded 128-bit MD5 digest of the associated object according to RFC 1864. This data is used as an integrity check to verify that the data received by the caller is the same data that was sent by Amazon S3. |
CamelAwsS3LastModified
|
Date
|
The value of the Last-Modified header, indicating the date and time at which Amazon S3 last recorded a modification to the associated object. |
CamelAwsS3VersionId
|
String
|
The version ID of the associated Amazon S3 object if available. Version IDs are only assigned to objects when an object is uploaded to an Amazon S3 bucket that has object versioning enabled. |
CamelAwsS3ContentType
|
String
|
The Content-Type HTTP header, which indicates the type of content stored in the associated object. The value of this header is a standard MIME type. |
CamelAwsS3ContentMD5
|
String
|
The base64 encoded 128-bit MD5 digest of the associated object (content - not including headers) according to RFC 1864. This data is used as a message integrity check to verify that the data received by Amazon S3 is the same data that the caller sent. |
CamelAwsS3ContentLength
|
Long
|
The Content-Length HTTP header indicating the size of the associated object in bytes. |
CamelAwsS3ContentEncoding
|
String
|
The optional Content-Encoding HTTP header specifying what content encodings have been applied to the object and what decoding mechanisms must be applied in order to obtain the media-type referenced by the Content-Type field. |
CamelAwsS3ContentDisposition
|
String
|
The optional Content-Disposition HTTP header, which specifies presentational information such as the recommended filename for the object to be saved as. |
CamelAwsS3ContentControl
|
String
|
The optional Cache-Control HTTP header which allows the user to specify caching behavior along the HTTP request/reply chain. |
Advanced AmazonS3 configuration
If your Camel Application is running behind a firewall or if you need to have more control over the
AmazonS3
instance configuration, you can create your own instance:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonS3 client = new AmazonS3Client(awsCredentials, clientConfiguration); registry.bind("client", client);
and refer to it in your Camel aws-s3 component configuration:
from("aws-s3://MyBucket?amazonS3Client=#client&delay=5000&maxMessagesPerPoll=5") .to("mock:result");
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
$\{camel-version\}
must be replaced by the actual version of Camel (2.8 or higher).
10.5. AWS-SDB
SDB Component
Available as of Camel 2.8.4
The sdb component supports storing and retrieving data from/to Amazon's SDB service.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon SDB. More information are available at Amazon SDB.
URI Format
aws-sdb://domainName[?options]
You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonSDBClient |
null
|
Producer |
Reference to a com.amazonaws.services.simpledb.AmazonSimpleDB in the Registry.
|
accessKey |
null
|
Producer | Amazon AWS Access Key |
secretKey |
null
|
Producer | Amazon AWS Secret Key |
amazonSdbEndpoint |
null
|
Producer | The region with which the AWS-SDB client wants to work with. |
domainName |
null
|
Producer | The name of the domain currently worked with. |
maxNumberOfDomains |
100
|
Producer | The maximum number of domain names you want returned. The range is 1 * to 100. |
consistentRead |
false
|
Producer | Determines whether or not strong consistency should be enforced when data is read. |
operation |
PutAttributes
|
Producer | Valid values are BatchDeleteAttributes, BatchPutAttributes, DeleteAttributes, DeleteDomain, DomainMetadata, GetAttributes, ListDomains, PutAttributes, Select. |
Required SDB component options
You have to provide the amazonSDBClient in the Registry or your accessKey and secretKey to access the Amazon's SDB.
Usage
Message headers evaluated by the SDB producer
Header | Type | Description |
---|---|---|
CamelAwsSdbAttributes
|
Collection<Attribute>
|
List of attributes to be acted upon. |
CamelAwsSdbAttributeNames
|
Collection<String>
|
The names of the attributes to be retrieved. |
CamelAwsSdbConsistentRead
|
Boolean
|
Determines whether or not strong consistency should be enforced when data is read. |
CamelAwsSdbDeletableItems
|
Collection<DeletableItem>
|
A list of items on which to perform the delete operation in a batch. |
CamelAwsSdbDomainName
|
String
|
The name of the domain currently worked with. |
CamelAwsSdbItemName
|
String
|
The unique key for this item |
CamelAwsSdbMaxNumberOfDomains
|
Integer
|
The maximum number of domain names you want returned. The range is 1 * to 100. |
CamelAwsSdbNextToken
|
String
|
A string specifying where to start the next list of domain/item names. |
CamelAwsSdbOperation
|
String
|
To override the operation from the URI options. |
CamelAwsSdbReplaceableAttributes
|
Collection<ReplaceableAttribute>
|
List of attributes to put in an Item. |
CamelAwsSdbReplaceableItems
|
Collection<ReplaceableItem>
|
A list of items to put in a Domain. |
CamelAwsSdbSelectExpression
|
String
|
The expression used to query the domain. |
CamelAwsSdbUpdateCondition
|
UpdateCondition
|
The update condition which, if specified, determines whether the specified attributes will be updated/deleted or not. |
Message headers set during DomainMetadata operation
Header | Type | Description |
---|---|---|
CamelAwsSdbTimestamp
|
Integer
|
The data and time when metadata was calculated, in Epoch (UNIX) seconds. |
CamelAwsSdbItemCount
|
Integer
|
The number of all items in the domain. |
CamelAwsSdbAttributeNameCount
|
Integer
|
The number of unique attribute names in the domain. |
CamelAwsSdbAttributeValueCount
|
Integer
|
The number of all attribute name/value pairs in the domain. |
CamelAwsSdbAttributeNameSize
|
Long
|
The total size of all unique attribute names in the domain, in bytes. |
CamelAwsSdbAttributeValueSize
|
Long
|
The total size of all attribute values in the domain, in bytes. |
CamelAwsSdbItemNameSize
|
Long
|
The total size of all item names in the domain, in bytes. |
Message headers set during GetAttributes operation
Header | Type | Description |
---|---|---|
CamelAwsSdbAttributes
|
List<Attribute>
|
The list of attributes returned by the operation. |
Message headers set during ListDomains operation
Header | Type | Description |
---|---|---|
CamelAwsSdbDomainNames
|
List<String>
|
A list of domain names that match the expression. |
CamelAwsSdbNextToken
|
String
|
An opaque token indicating that there are more domains than the specified MaxNumberOfDomains still available. |
Message headers set during Select operation
Header | Type | Description |
---|---|---|
CamelAwsSdbItems
|
List<Item>
|
A list of items that match the select expression. |
CamelAwsSdbNextToken
|
String
|
An opaque token indicating that more items than MaxNumberOfItems were matched, the response size exceeded 1 megabyte, or the execution time exceeded 5 seconds. |
Advanced AmazonSimpleDB configuration
If you need more control over the
AmazonSimpleDB
instance configuration you can create your own instance and refer to it from the URI:
from("direct:start") .to("aws-sdb://domainName?amazonSDBClient=#client");
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonSimpleDB client = new AmazonSimpleDBClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.8.4 or higher).
10.6. AWS-SES
SES Component
Available as of Camel 2.8.4
The ses component supports sending emails with Amazon's SES service.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon SES. More information are available at Amazon SES.
URI Format
aws-ses://from[?options]
You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonSESClient |
null
|
Producer |
Reference to a com.amazonaws.services.simpleemail.AmazonSimpleEmailService in the Registry.
|
accessKey |
null
|
Producer | Amazon AWS Access Key |
secretKey |
null
|
Producer | Amazon AWS Secret Key |
amazonSESEndpoint |
null
|
Producer | The region with which the AWS-SES client wants to work with. |
subject |
null
|
Producer | The subject which is used if the message header 'CamelAwsSesSubject' is not present. |
to |
null
|
Producer | List of destination email address. Can be overriden with 'CamelAwsSesTo' header. |
returnPath |
null
|
Producer | The email address to which bounce notifications are to be forwarded, override it using 'CamelAwsSesReturnPath' header. |
replyToAddresses |
null
|
Producer | List of reply-to email address(es) for the message, override it using 'CamelAwsSesReplyToAddresses' header. |
Required SES component options
You have to provide the amazonSESClient in the Registry or your accessKey and secretKey to access the Amazon's SES.
Usage
Message headers evaluated by the SES producer
Header | Type | Description |
---|---|---|
CamelAwsSesFrom
|
String
|
The sender's email address. |
CamelAwsSesTo
|
List<String>
|
The destination(s) for this email. |
CamelAwsSesSubject
|
String
|
The subject of the message. |
CamelAwsSesReplyToAddresses
|
List<String>
|
The reply-to email address(es) for the message. |
CamelAwsSesReturnPath
|
String
|
The email address to which bounce notifications are to be forwarded. |
CamelAwsSesHtmlEmail
|
Boolean
|
Since Camel 2.12.3 The flag to show if email content is HTML.
|
Message headers set by the SES producer
Header | Type | Description |
---|---|---|
CamelAwsSesMessageId
|
String
|
The Amazon SES message ID. |
Advanced AmazonSimpleEmailService configuration
If you need more control over the
AmazonSimpleEmailService
instance configuration you can create your own instance and refer to it from the URI:
from("direct:start") .to("aws-ses://example@example.com?amazonSESClient=#client");
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonSimpleEmailService client = new AmazonSimpleEmailServiceClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.8.4 or higher).
10.7. AWS-SNS
SNS Component
Available as of Camel 2.8
The SNS component allows messages to be sent to an Amazon Simple Notification Topic. The implementation of the Amazon API is provided by the AWS SDK.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon SNS. More information are available at Amazon SNS.
URI Format
aws-sns://topicName[?options]
The topic will be created if they don't already exists. You can append query options to the URI in the following format,
?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonSNSClient |
null
|
Producer |
Reference to a com.amazonaws.services.sns.AmazonSNS in the Registry.
|
accessKey |
null
|
Producer | Amazon AWS Access Key |
secretKey |
null
|
Producer | Amazon AWS Secret Key |
subject |
null
|
Producer | The subject which is used if the message header 'CamelAwsSnsSubject' is not present. |
amazonSNSEndpoint |
null
|
Producer | The region with which the AWS-SNS client wants to work with. |
policy |
null
|
Producer |
Camel 2.8.4: The policy for this queue to set in the com.amazonaws.services.sns.model.SetTopicAttributesRequest .
|
Required SNS component options
You have to provide the amazonSNSClient in the Registry or your accessKey and secretKey to access the Amazon's SNS.
Usage
Message headers evaluated by the SNS producer
Header | Type | Description |
---|---|---|
CamelAwsSnsSubject
|
String
|
The Amazon SNS message subject. If not set, the subject from the SnsConfiguration is used.
|
Message headers set by the SNS producer
Header | Type | Description |
---|---|---|
CamelAwsSnsMessageId
|
String
|
The Amazon SNS message ID. |
Advanced AmazonSNS configuration
If you need more control over the
AmazonSNS
instance configuration you can create your own instance and refer to it from the URI:
from("direct:start") .to("aws-sns://MyTopic?amazonSNSClient=#client");
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonSNS client = new AmazonSNSClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
$\{camel-version\}
must be replaced by the actual version of Camel (2.8 or higher).
10.8. AWS-SQS
SQS Component
Available as of Camel 2.6
The sqs component supports sending and receiving messages to Amazon's SQS service.
Prerequisites
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon SQS. More information are available at Amazon SQS.
URI Format
aws-sqs://queue-name[?options]
The queue will be created if they don't already exists. You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
amazonSQSClient |
null
|
Shared |
Reference to a com.amazonaws.services.sqs.AmazonSQS in the Registry.
|
accessKey |
null
|
Shared | Amazon AWS Access Key |
secretKey |
null
|
Shared | Amazon AWS Secret Key |
amazonSQSEndpoint |
null
|
Shared | The region with which the AWS-SQS client wants to work with. Only works if Camel creates the AWS-SQS client, i.e., if you explicitly set amazonSQSClient, then this setting will have no effect. You would have to set it on the client you create directly. |
attributeNames |
null
|
Consumer |
A list of attributes to set in the com.amazonaws.services.sqs.model.ReceiveMessageRequest .
|
concurrentConsumers |
1
|
Consumer | Camel 2.15.0 Allows you to use multiple threads to poll the SQS queue to increase throughput. |
defaultVisibilityTimeout |
null
|
Shared |
The visibility timeout (in seconds) to set in the com.amazonaws.services.sqs.model.CreateQueueRequest .
|
deleteAfterRead |
true
|
Consumer | Delete message from SQS after it has been read |
deleteIfFiltered |
true
|
Consumer | Camel 2.12.2,2.13.0 Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter. If 'false' and exchange does not make it through a Camel filter upstream in the route, then don't send DeleteMessage. |
maxMessagesPerPoll |
null
|
Consumer |
The maximum number of messages which can be received in one poll to set in the com.amazonaws.services.sqs.model.ReceiveMessageRequest .
|
visibilityTimeout |
null
|
Shared |
The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest . This only make sense if its different from defaultVisibilityTimeout . It changes the queue visibility timeout attribute permanently.
|
messageVisibilityTimeout |
null
|
Consumer |
Camel 2.8: The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.ReceiveMessageRequest . It does NOT change the queue visibility timeout attribute permanently.
|
extendMessageVisibility |
false
|
Consumer |
Camel 2.10: If enabled then a scheduled background task will keep extending the message visibility on SQS. This is needed if it taks a long time to process the message. If set to true defaultVisibilityTimeout must be set. See details at Amazon docs.
|
maximumMessageSize |
null
|
Shared |
Camel 2.8: The maximumMessageSize (in bytes) an SQS message can contain for this queue, to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest .
|
messageRetentionPeriod |
null
|
Shared |
Camel 2.8: The messageRetentionPeriod (in seconds) a message will be retained by SQS for this queue, to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest .
|
policy |
null
|
Shared |
Camel 2.8: The policy for this queue to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest .
|
delaySeconds |
null
|
Producer | Camel 2.9.3: Delay sending messages for a number of seconds. |
waitTimeSeconds |
0
|
Producer | Camel 2.11: Duration in seconds (0 to 20) that the ReceiveMessage action call will wait until a message is in the queue to include in the response. |
receiveMessageWaitTimeSeconds |
0
|
Shared | Camel 2.11: If you do not specify WaitTimeSeconds in the request, the queue attribute ReceiveMessageWaitTimeSeconds is used to determine how long to wait. |
queueOwnerAWSAccountId |
null
|
Shared | Camel 2.12: Specify the queue owner aws account id when you need to connect the queue with different account owner. |
region
|
null
|
Shared
|
Camel 2.12.3: Specify the queue region which could be used with
queueOwnerAWSAccountId to build the service URL.
|
redrivePolicy
|
null
|
Shared
|
Camel 2.15.0: Specify the policy that sends a message to DeadLetter queue. See detail at Amazon docs.
|
Required SQS component options
You have to provide the amazonSQSClient in the Registry or your accessKey and secretKey to access the Amazon's SQS.
Batch Consumer
This component implements the Batch Consumer.
This allows you for instance to know how many messages exists in this batch and for instance let the Aggregator aggregate this number of messages.
Usage
Message headers set by the SQS producer
Header | Type | Description |
---|---|---|
CamelAwsSqsMD5OfBody
|
String
|
The MD5 checksum of the Amazon SQS message. |
CamelAwsSqsMessageId
|
String
|
The Amazon SQS message ID. |
CamelAwsSqsDelaySeconds
|
Integer
|
Since Camel 2.11, the delay seconds that the Amazon SQS message can be see by others. |
Message headers set by the SQS consumer
Header | Type | Description |
---|---|---|
CamelAwsSqsMD5OfBody
|
String
|
The MD5 checksum of the Amazon SQS message. |
CamelAwsSqsMessageId
|
String
|
The Amazon SQS message ID. |
CamelAwsSqsReceiptHandle
|
String
|
The Amazon SQS message receipt handle. |
CamelAwsSqsAttributes
|
Map<String, String>
|
The Amazon SQS message attributes. |
Advanced AmazonSQS configuration
If your Camel Application is running behind a firewall or if you need to have more control over the AmazonSQS instance configuration, you can create your own instance:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonSQS client = new AmazonSQSClient(awsCredentials, clientConfiguration); registry.bind("client", client);
and refer to it in your Camel aws-sqs component configuration:
from("aws-sqs://MyQueue?amazonSQSClient=#client&delay=5000&maxMessagesPerPoll=5") .to("mock:result");
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
$\{camel-version\}
must be replaced by the actual version of Camel (2.6 or higher).
JMS-style Selectors
SQS does not allow selectors, but you can effectively achieve this by using the Camel Filter EIP and setting an appropriate
visibilityTimeout
. When SQS dispatches a message, it will wait up to the visibility timeout before it will try to dispatch the message to a different consumer unless a DeleteMessage is received. By default, Camel will always send the DeleteMessage at the end of the route, unless the route ended in failure. To achieve appropriate filtering and not send the DeleteMessage even on successful completion of the route, use a Filter:
from("aws-sqs://MyQueue?amazonSQSClient=#client&defaultVisibilityTimeout=5000&deleteIfFiltered=false") .filter("${header.login} == true") .to("mock:result");
In the above code, if an exchange doesn't have an appropriate header, it will not make it through the filter AND also not be deleted from the SQS queue. After 5000 miliseconds, the message will become visible to other consumers.
10.9. AWS-SWF
SWF Component
Available as of Camel 2.13
The Simple Workflow component supports managing workflows from Amazon's Simple Workflow service.
Note
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon Simple Workflow. More information are available at Amazon Simple Workflow.
URI Format
aws-swf://<workflow|activity>[?options]
You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name
|
Default Value
|
Context
|
Description
|
---|---|---|---|
amazonSWClient
|
null
|
All |
A reference to a com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient in the Registry.
|
accessKey
|
null
|
All
|
Amazon AWS Access Key.
|
secretKey
|
null
|
All
|
Amazon AWS Secret Key.
|
sWClient.XXX
|
null
|
All
|
Properties to set on AmazonSimpleWorkflowClient in use.
|
clientConfiguration.XXX
|
null
|
All
|
Properties to set on ClientConfiguration in use.
|
startWorkflowOptions.XXX
|
null
|
Workflow/Producer
|
Properties to set on useStartWorkflowOptions in use.
|
operation
|
START
|
Workflow/Producer
|
The operation to perform on the workflow. Supported operations are:
SIGNAL , CANCEL , TERMINATE , GET_STATE , START , DESCRIBE , GET_HISTORY .
|
domainName
|
null
|
All
|
The workflow domain to use.
|
activityList
|
null
|
Activity/Consumer
|
The list name to consume activities from.
|
workflowList
|
null
|
Workflow/Consumer
|
The list name to consume workflows from.
|
eventName
|
null
|
All | The workflow or activity event name to use. |
version
|
null
|
All | The workflow or activity event version to use. |
signalName
|
null
|
Workflow/Producer | The name of the signal to send to the workflow. |
childPolicy
|
null
|
Workflow/Producer | The policy to use on child workflows when terminating a workflow. |
terminationReason
|
null
|
Workflow/Producer | The reason for terminating a workflow. |
stateResultType
|
Object
|
Workflow/Producer | The type of the result when a workflow state is queried. |
terminationDetails
|
null
|
Workflow/Producer | Details for terminating a workflow. |
dataConverter
|
JsonDataConverter
|
All |
An instance of com.amazonaws.services.simpleworkflow.flow.DataConverter to use for serializing/deserializing the data.
|
activitySchedulingOptions
|
null
|
Activity/Producer |
An instance of ActivitySchedulingOptions used to specify different timeout options.
|
activityTypeExecutionOptions
|
null
|
Activity/Consumer |
An instance of ActivityTypeExecutionOptions .
|
activityTypeRegistrationOptions
|
null
|
Activity/Consumer |
An instance of ActivityTypeRegistrationOptions .
|
workflowTypeRegistrationOptions
|
null
|
Workflow/Consumer |
An instance of WorkflowTypeRegistrationOptions .
|
Note
You have to provide the amazonSWClient in the Registry or your accessKey and secretKey to access the Amazon's Simple Workflow Service.
Usage
Message headers evaluated by the SWF Workflow Producer
A workflow producer allows interacting with a workflow. It can start a new workflow execution, query its state, send signals to a running workflow, or terminate and cancel it.
Header
|
Type
|
Description
|
---|---|---|
CamelSWFOperation
|
String
|
The operation to perform on the workflow. Supported operations are: SIGNAL, CANCEL, TERMINATE, GET_STATE, START, DESCRIBE, GET_HISTORY.
|
CamelSWFWorkflowId
|
String
|
A workflow ID to use.
|
CamelAwsDdbKeyCamelSWFRunId
|
String
|
A worfklow run ID to use.
|
CamelSWFStateResultType
|
String
|
The type of the result when a workflow state is queried.
|
CamelSWFEventName
|
String
|
The workflow or activity event name to use.
|
CamelSWFVersion
|
String
|
The workflow or activity event version to use.
|
CamelSWFReason
|
String
|
The reason for terminating a workflow.
|
CamelSWFDetails
|
String
|
Details for terminating a workflow.
|
CamelSWFChildPolicy
|
String
|
The policy to use on child workflows when terminating a workflow.
|
Message headers set by the SWF Workflow Producer
Header
|
Type
|
Description
|
---|---|---|
CamelSWFWorkflowId
|
String
|
The worfklow ID used or newly generated.
|
CamelAwsDdbKeyCamelSWFRunId
|
String
|
The worfklow run ID used or generated.
|
Message headers set by the SWF Workflow Consumer
A workflow consumer represents the workflow logic. When it is started, it will start polling workflow decision tasks and process them. In addition to processing decision tasks, a workflow consumer route, will also receive signals (send from a workflow producer) or state queries. The primary purpose of a workflow consumer is to schedule activity tasks for execution using activity producers. Actually activity tasks can be scheduled only from a thread started by a workflow consumer.
Header
|
Type
|
Description
|
---|---|---|
CamelSWFAction
|
String
|
Indicates what type is the current event: CamelSWFActionExecute, CamelSWFSignalReceivedAction or CamelSWFGetStateAction.
|
CamelSWFWorkflowReplaying
|
boolean
|
Indicates whether the current decision task is a replay or not.
|
CamelSWFWorkflowStartTime
|
long
|
The time of the start event for this decision task.
|
Message headers set by the SWF Activity Producer
An activity producer allows scheduling activity tasks. An activity producer can be used only from a thread started by a workflow consumer ie, it can process synchronous exchanges started by a workflow consumer.
Header
|
Type
|
Description
|
---|---|---|
CamelSWFEventName
|
String
|
The activity name to schedule.
|
CamelSWFVersion
|
String
|
The activity version to schedule.
|
Message headers set by the SWF Activity Consumer
Header
|
Type
|
Description
|
---|---|---|
CamelSWFTaskToken
|
String
|
The task token that is required to report task completion for manually completed tasks.
|
Advanced amazonSWClient configuration
If you need more control over the AmazonSimpleWorkflowClient instance configuration you can create your own instance and refer to it from the URI:
The
#client
refers to a AmazonSimpleWorkflowClient in the Registry.
For example if your Camel Application is running behind a firewall:
AWSCredentials awsCredentials = new BasicAWSCredentials("myAccessKey", "mySecretKey"); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); AmazonSimpleWorkflowClient client = new AmazonSimpleWorkflowClient(awsCredentials, clientConfiguration); registry.bind("client", client);
Dependencies
Maven users will need to add the following dependency to their pom.xml.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-aws</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version
} must be replaced by the actual version of Camel (2.13 or higher).
Chapter 11. Bean
Bean Component
The bean: component binds beans to Apache Camel message exchanges.
URI format
bean:beanID[?options]
Where beanID can be any string which is used to lookup look up the bean in the Registry
Options
Name | Type | Default | Description |
---|---|---|---|
method
|
String
|
null
|
The method name from the bean that will be invoked. If not provided, Camel will try to determine the method itself. In case of ambiguity an exception will be thrown. See Bean Binding for more details. |
cache
|
boolean
|
false
|
If enabled, Apache Camel will cache the result of the first Registry look-up. Cache can be enabled if the bean in the Registry is defined as a singleton scope. |
You can append query options to the URI in the following format,
?option=value&option=value&...
Using
The object instance that is used to consume messages must be explicitly registered with the Registry. For example, if you are using Spring you must define the bean in the Spring configuration,
spring.xml
; or if you don't use Spring, put the bean in JNDI.
// lets populate the context with the services we need // note that we could just use a spring.xml file to avoid this step JndiContext context = new JndiContext(); context.bind("bye", new SayService("Good Bye!")); CamelContext camelContext = new DefaultCamelContext(context);
Once an endpoint has been registered, you can build routes that use it to process exchanges.
// lets add simple route camelContext.addRoutes(new RouteBuilder() { public void configure() { from("direct:hello").to("bean:bye"); } });
A bean: endpoint cannot be defined as the input to the route; i.e. you cannot consume from it, you can only route from some inbound message Endpoint to the bean endpoint as output. So consider using a direct: or queue: endpoint as the input.
You can use the
createProxy()
methods on ProxyHelper to create a proxy that will generate BeanExchanges and send them to any endpoint:
Endpoint endpoint = camelContext.getEndpoint("direct:hello"); ISay proxy = ProxyHelper.createProxy(endpoint, ISay.class); String rc = proxy.say(); assertEquals("Good Bye!", rc);
And the same route using Spring DSL:
<route> <from uri="direct:hello"> <to uri="bean:bye"/> </route>
Bean as endpoint
Apache Camel also supports invoking Bean as an Endpoint. In the route below:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <to uri="myBean"/> <to uri="mock:results"/> </route> </camelContext> <bean id="myBean" class="org.apache.camel.spring.bind.ExampleBean"/>
What happens is that when the exchange is routed to the
myBean
, Apache Camel will use the Bean Binding to invoke the bean. The source for the bean is just a plain POJO:
public class ExampleBean { public String sayHello(String name) { return "Hello " + name + "!"; } }
Apache Camel will use Bean Binding to invoke the
sayHello
method, by converting the Exchange's In body to the String
type and storing the output of the method on the Exchange Out body.
Java DSL bean syntax
Java DSL comes with syntactic sugar for the Bean component. Instead of specifying the bean explicitly as the endpoint (i.e.
to("bean:beanName")
) you can use the following syntax:
// Send message to the bean endpoint // and invoke method resolved using Bean Binding. from("direct:start").beanRef("beanName"); // Send message to the bean endpoint // and invoke given method. from("direct:start").beanRef("beanName", "methodName");
Instead of passing name of the reference to the bean (so that Camel will lookup for it in the registry), you can specify the bean itself:
// Send message to the given bean instance. from("direct:start").bean(new ExampleBean()); // Explicit selection of bean method to be invoked. from("direct:start").bean(new ExampleBean(), "methodName"); // Camel will create the instance of bean and cache it for you. from("direct:start").bean(ExampleBean.class);
Bean Binding
How bean methods to be invoked are chosen (if they are not specified explicitly through the method parameter) and how parameter values are constructed from the Message are all defined by the Bean Binding mechanism which is used throughout all of the various Bean Integration mechanisms in Apache Camel.
- Class component
Chapter 12. Bean Validator
Bean Validator Component
Available as of Apache Camel 2.3
The Validator component performs bean validation of the message body using the Java Bean Validation API (JSR 303). Camel uses the reference implementation, which is Hibernate Validator.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-bean-validator</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
bean-validator:label[?options]
or
bean-validator://label[?options]
Where
label
is an arbitrary text value describing the endpoint. You can append query options to the URI in the following format, ?option=value&option=value&...
URI Options
The following URI options are supported:
Option | Default | Description |
---|---|---|
group
|
javax.validation.groups.Default
|
The custom validation group to use. |
messageInterpolator
|
org.hibernate.validator.engine. ResourceBundleMessageInterpolator
|
Reference to a custom javax.validation.MessageInterpolator in the Registry.
|
traversableResolver
|
org.hibernate.validator.engine.resolver. DefaultTraversableResolver
|
Reference to a custom javax.validation.TraversableResolver in the Registry.
|
constraintValidatorFactory
|
org.hibernate.validator.engine. ConstraintValidatorFactoryImpl
|
Reference to a custom javax.validation.ConstraintValidatorFactory in the Registry.
|
OSGi deployment
To use Hibernate Validator in the OSGi environment use dedicated
ValidationProviderResolver
implementation, just as org.apache.camel.component.bean.validator.HibernateValidationProviderResolver
. The snippet below demonstrates this approach. Keep in mind that you can use HibernateValidationProviderResolver
starting from the Camel 2.13.0.
Example 12.1. Using HibernateValidationProviderResolver
from("direct:test") .to("bean-validator://ValidationProviderResolverTest?validationProviderResolver=#myValidationProviderResolver"); ... <bean id="myValidationProviderResolver" class="org.apache.camel.component.bean.validator.HibernateValidationProviderResolver"/>
If no custom
ValidationProviderResolver
is defined and the validator component has been deployed into the OSGi environment, the HibernateValidationProviderResolver
will be automatically used.
Example
Assumed we have a Java bean with the following annotations
Car.java
// Java public class Car { @NotNull private String manufacturer; @NotNull @Size(min = 5, max = 14, groups = OptionalChecks.class) private String licensePlate; // getter and setter }
and an interface definition for our custom validation group
OptionalChecks.java
public interface OptionalChecks { }
with the following Apache Camel route, only the @NotNull constraints on the attributes manufacturer and licensePlate will be validated (Apache Camel uses the default group
javax.validation.groups.Default
).
from("direct:start") .to("bean-validator://x") .to("mock:end")
If you want to check the constraints from the group
OptionalChecks
, you have to define the route like this
from("direct:start") .to("bean-validator://x?group=OptionalChecks") .to("mock:end")
If you want to check the constraints from both groups, you have to define a new interface first
AllChecks.java
@GroupSequence({Default.class, OptionalChecks.class}) public interface AllChecks { }
and then your route definition should looks like this
from("direct:start") .to("bean-validator://x?group=AllChecks") .to("mock:end")
And if you have to provide your own message interpolator, traversable resolver and constraint validator factory, you have to write a route like this
<bean id="myMessageInterpolator" class="my.ConstraintValidatorFactory" /> <bean id="myTraversableResolver" class="my.TraversableResolver" /> <bean id="myConstraintValidatorFactory" class="my.ConstraintValidatorFactory" /> from("direct:start") .to("bean-validator://x?group=AllChecks&messageInterpolator=#myMessageInterpolator&traversableResolver=#myTraversableResolver&constraintValidatorFactory=#myConstraintValidatorFactory") .to("mock:end")
It's also possible to describe your constraints as XML and not as Java annotations. In this case, you have to provide the file
META-INF/validation.xml
which could looks like this
validation.xml
<?xml version="1.0" encoding="UTF-8"?> <validation-config xmlns="http://jboss.org/xml/ns/javax/validation/configuration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://jboss.org/xml/ns/javax/validation/configuration"> <default-provider>org.hibernate.validator.HibernateValidator</default-provider> <message-interpolator>org.hibernate.validator.engine.ResourceBundleMessageInterpolator</message-interpolator> <traversable-resolver>org.hibernate.validator.engine.resolver.DefaultTraversableResolver</traversable-resolver> <constraint-validator-factory>org.hibernate.validator.engine.ConstraintValidatorFactoryImpl</constraint-validator-factory> <constraint-mapping>/constraints-car.xml</constraint-mapping> </validation-config>
and the
constraints-car.xml
file
constraints-car.xml
<?xml version="1.0" encoding="UTF-8"?> <constraint-mappings xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://jboss.org/xml/ns/javax/validation/mapping validation-mapping-1.0.xsd" xmlns="http://jboss.org/xml/ns/javax/validation/mapping"> <default-package>org.apache.camel.component.bean.validator</default-package> <bean class="CarWithoutAnnotations" ignore-annotations="true"> <field name="manufacturer"> <constraint annotation="javax.validation.constraints.NotNull" /> </field> <field name="licensePlate"> <constraint annotation="javax.validation.constraints.NotNull" /> <constraint annotation="javax.validation.constraints.Size"> <groups> <value>org.apache.camel.component.bean.validator.OptionalChecks</value> </groups> <element name="min">5</element> <element name="max">14</element> </constraint> </field> </bean> </constraint-mappings>
Chapter 13. Beanstalk
Beanstalk component
Available in Camel 2.15
camel-beanstalk project provides a Camel component for job retrieval and post-processing of Beanstalk jobs.
You can find the detailed explanation of Beanstalk job life cycle at Beanstalk protocol.
Dependencies
Maven users need to add the following dependency to their
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-beanstalk</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version
} must be replaced by the actual version of Camel (2.15.0 or higher).
URI format
beanstalk://[host[:port]][/tube][?options]
You may omit either
port
or both host
and port
: for the Beanstalk defaults to be used (“localhost” and 11300). If you omit tube
, Beanstalk component will use the tube with name “default”.
When listening, you may probably want to watch for jobs from several tubes. Just separate them with plus sign, e.g.
beanstalk://localhost:11300/tube1+tube2
Tube name will be URL decoded, so if your tube names include special characters like + or ?, you need to URL-encode them appropriately, or use the RAW syntax, see more details here.
By the way, you cannot specify several tubes when you are writing jobs into Beanstalk.
Common URI options
Name
|
Default value
|
Description
|
---|---|---|
jobPriority | 1000 | Job priority. (0 is the highest, see Beanstalk protocol) |
jobDelay | 0 | Job delay in seconds. |
jobTimeToRun | 60 | Job time to run in seconds. (when 0, the beanstalkd daemon raises it to 1 automatically, see Beanstalk protocol) |
Producer UIR options
Producer behaviour is affected by the
command
parameter which tells what to do with the job, it can be
Name
|
Default value
|
Description
|
---|---|---|
command | put |
|
Consumer UIR options
The consumer may delete the job immediately after reserving it or wait until Camel routes process it. While the first scenario is more like a “message queue”, the second is similar to “job queue”. This behavior is controlled by
consumer.awaitJob
parameter, which equals true
by default (following Beanstalkd nature).
When synchronous, the consumer calls
delete
on successful job completion and calls bury
on failure. You can choose which command to execute in the case of failure by specifying consumer.onFailure
parameter in the URI. It can take values of bury
, delete
or release
.
There is a boolean parameter
consumer.useBlockIO
which corresponds to the same parameter in JavaBeanstalkClient library. By default it is true
.
Be careful when specifying
release
, as the failed job will immediately become available in the same tube and your consumer will try to acquire it again. You can release
and specify jobDelay though.
Name
|
Default value
|
Description
|
---|---|---|
onFailure | bury | Command to use when processing failed. You can choose among: bury, delete or release. |
useBlockIO | true | Whether to use blockIO. |
awaitJob | true | Whether to wait for job to complete before ack the job from beanstalk |
The beanstalk consumer is a Scheduled Polling Consumer which means there is more options you can configure, such as how frequent the consumer should poll. For more details see Polling Consumer.
Consumer Headers
The consumer stores a number of job headers in the Exchange message:
Property
|
Type
|
Description
|
---|---|---|
beanstalk.jobId | long | Job ID |
beanstalk.tube | string | the name of the tube that contains this job |
beanstalk.state | string | “ready” or “delayed” or “reserved” or “buried” (must be “reserved”) |
beanstalk.priority | long | the priority value set |
beanstalk.age | int | the time in seconds since the put command that created this job |
beanstalk.time-left | int | the number of seconds left until the server puts this job into the ready queue |
beanstalk.timeouts | int | the number of times this job has timed out during a reservation |
beanstalk.releases | int | the number of times a client has released this job from a reservation |
beanstalk.buries | int | the number of times this job has been buried |
beanstalk.kicks | int | the number of times this job has been kicked |
Examples
This Camel component lets you both request the jobs for processing and supply them to Beanstalkd daemon. Our simple demo routes may look like
from("beanstalk:testTube"). log("Processing job #${property.beanstalk.jobId} with body ${in.body}"). process(new Processor() { @Override public void process(Exchange exchange) { // try to make integer value out of body exchange.getIn().setBody( Integer.valueOf(exchange.getIn().getBody(classOf[String])) ); } }). log("Parsed job #${property.beanstalk.jobId} to body ${in.body}");
from("timer:dig?period=30seconds"). setBody(constant(10)).log("Kick ${in.body} buried/delayed tasks"). to("beanstalk:testTube?command=kick");
In the first route we are listening for new jobs in tube “testTube”. When they are arriving, we are trying to parse integer value from the message body. If done successful, we log it and this successful exchange completion makes Camel component to delete this job from Beanstalk automatically. Contrary, when we cannot parse the job data, the exchange failed and the Camel component buries it by default, so that it can be processed later or probably we are going to inspect failed jobs manually.
So the second route periodically requests Beanstalk to kick 10 jobs out of buried and/or delayed state to the normal queue.
Chapter 14. Box
Box Component
Available as of Camel 2.14
The Box component provides access to all of the Box.com APIs accessible using box-java-sdk-v2. It allows producing messages to upload and download files, create, edit, and manage folders, etc. It also supports APIs that allow polling for updates to user accounts and even changes to enterprise accounts, etc.
Box.com requires the use of OAuth2.0 for all client application authentication. In order to use camel-box with your account, you'll need to create a new application within Box.com at https://app.box.com/developers/services/edit/. The Box application's client id and secret will allow access to Box APIs which require a current user. A user access token is generated and managed by the API for an end user. Alternatively the Camel application can register an implementation of com.box.boxjavalibv2.authorization.IAuthSecureStorage to provide an com.box.boxjavalibv2.dao.IAuthData OAuth token.
Maven users will need to add the following dependency to their pom.xml for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-box</artifactId> <version>${camel-version}</version> </dependency>
URI format
box://endpoint-prefix/endpoint?[options]
Endpoint prefix can be one of:
- collaborations
- comments
- events
- files
- folders
- groups
- poll-events
- search
- shared-comments
- shared-files
- shared-folders
- shared-items
- users
Box Component
The Box Component can be configured with the options below. These options can be provided using the component's bean property
configuration
of type org.apache.camel.component.box.BoxConfiguration
. These options can also be specified in the endpoint URI.
Option
|
Type
|
Description
|
authSecureStorage
|
com.box.boxjavalibv2.authorization.IAuthSecureStorage
|
OAuth Secure Storage callback, can be used to provide and or save OAuth tokens. The callback may return null on first call to allow the component to login and authorize application and obtain an OAuth token, which can then be saved in the secure storage. For the component to be able to create a token automatically a user password must be provided.
|
boxConfig
|
com.box.boxjavalibv2.IBoxConfig
|
Custom Box SDK configuration, not required normally
|
clientId
|
String
|
Box application client ID
|
clientSecret
|
String
|
Box application client secret
|
connectionManagerBuilder
|
com.box.boxjavalibv2.BoxConnectionManagerBuilder
|
Custom Box connection manager builder, used to override default settings like max connections for underlying HttpClient.
|
httpParams
|
java.util.Map
|
Custom HTTP params for settings like proxy host
|
loginTimeout
|
int
|
amount of time the component will wait for a response from Box.com, default is 30 seconds
|
refreshListener
|
com.box.boxjavalibv2.authorization.OAuthRefreshListener
|
OAuth listener for token updates, if the Camel application needs to use the access token outside the route
|
revokeOnShutdown
|
boolean
|
Flag to revoke OAuth refresh token on route shutdown, default false. Will require a fresh refresh token on restart using either a custom IAuthSecureStorage or automatic component login by providing a user password
|
sharedLink
|
String
|
Box shared link for shared-* endpoints, can be a link for a shared comment, file or folder
|
sharedPassword
|
String
|
Password associated with the shared link, MUST be provided with sharedLink
|
userName
|
String
|
Box user name, MUST be provided
|
userPassword
|
String
|
Box user password, MUST be provided if authSecureStorage is not set, or returns null on first call
|
Producer Endpoints:
Producer endpoints can use endpoint prefixes followed by endpoint names and associated options described next. A shorthand alias can be used for some endpoints. The endpoint URI MUST contain a prefix.
Endpoint options that are not mandatory are denoted by []. When there are no mandatory options for an endpoint, one of the set of [] options MUST be provided. Producer endpoints can also use a special option
inBody
that in turn should contain the name of the endpoint option whose value will be contained in the Camel Exchange In message.
Any of the endpoint options can be provided in either the endpoint URI, or dynamically in a message header. The message header name must be of the format
CamelBox.<option>
. Note that the inBody
option overrides message header, i.e. the endpoint option inBody=option
would override a CamelBox.option
header.
If a value is not provided for the option defaultRequest either in the endpoint URI or in a message header, it will be assumed to be
null
. Note that the null
value will only be used if other options do not satisfy matching endpoints.
In case of Box API errors the endpoint will throw a RuntimeCamelException with a com.box.restclientv2.exceptions.BoxSDKException derived exception cause.
Endpoint Prefix collaborations
For more information on Box collaborations see https://developers.box.com/docs/#collaborations. The following endpoints can be invoked with the prefix
collaborations
as follows:
box://collaborations/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
createCollaboration
|
create
|
collabRequest, folderId
|
com.box.boxjavalibv2.dao.BoxCollaboration
|
deleteCollaboration
|
delete
|
collabId, defaultRequest
|
|
getAllCollaborations
|
allCollaborations
|
getAllCollabsRequest
|
java.util.List
|
getCollaboration
|
collaboration
|
collabId, defaultRequest
|
com.box.boxjavalibv2.dao.BoxCollaboration
|
updateCollaboration
|
update
|
collabId, collabRequest
|
com.box.boxjavalibv2.dao.BoxCollaboration
|
URI Options for collaborations
Name
|
Type
|
collabId
|
String
|
collabRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxCollabRequestObject
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
folderId
|
String
|
getAllCollabsRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxGetAllCollabsRequestObject
|
Endpoint Prefix events
For more information on Box events see https://developers.box.com/docs/#events. Although this endpoint can be used by producers, Box events are better used as a consumer endpoint using the poll-events endpoint prefix. The following endpoints can be invoked with the prefix
events
as follows:
box://events/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
getEventOptions
|
eventOptions
|
defaultRequest
|
com.box.boxjavalibv2.dao.BoxCollection
|
getEvents
|
events
|
eventRequest
|
com.box.boxjavalibv2.dao.BoxEventCollection
|
URI Options for events
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
eventRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxEventRequestObject
|
Endpoint Prefix groups
For more information on Box groups see https://developers.box.com/docs/#groups. The following endpoints can be invoked with the prefix
groups
as follows:
box://groups/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
createGroup
|
|
[groupRequest], [name]
|
com.box.boxjavalibv2.dao.BoxGroup
|
createMembership
|
|
[groupId, role, userId], [groupMembershipRequest]
|
com.box.boxjavalibv2.dao.BoxGroupMembership
|
deleteGroup
|
delete
|
defaultRequest, groupId
|
|
deleteMembership
|
delete
|
defaultRequest, membershipId
|
|
getAllCollaborations
|
allCollaborations
|
defaultRequest, groupId
|
com.box.boxjavalibv2.dao.BoxCollection
|
getAllGroups
|
allGroups
|
defaultRequest
|
com.box.boxjavalibv2.dao.BoxCollection
|
getMembership
|
membership
|
defaultRequest, membershipId
|
com.box.boxjavalibv2.dao.BoxGroupMembership
|
getMemberships
|
memberships
|
defaultRequest, groupId
|
com.box.boxjavalibv2.dao.BoxCollection
|
updateGroup
|
update
|
groupId, groupRequest
|
com.box.boxjavalibv2.dao.BoxGroup
|
updateMembership
|
update
|
[groupMembershipRequest], [role], membershipId
|
com.box.boxjavalibv2.dao.BoxGroupMembership
|
URI Options for groups
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
groupId
|
String
|
groupMembershipRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxGroupMembershipRequestObject
|
groupRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxGroupRequestObject
|
membershipId
|
String
|
name
|
String
|
role
|
String
|
userId
|
String
|
Endpoint Prefix search
For more information on Box search API see https://developers.box.com/docs/#search. The following endpoints can be invoked with the prefix
search
as follows:
box://search/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
search
|
|
defaultRequest, searchQuery
|
com.box.boxjavalibv2.dao.BoxCollection
|
URI Options for search
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
searchQuery
|
String
|
Endpoint Prefix comments and shared-comments
For more information on Box comments see https://developers.box.com/docs/#comments. The following endpoints can be invoked with the prefix comments or
shared-comments
as follows. The shared-comments prefix requires sharedLink and sharedPassword properties.
box://comments/endpoint?[options] box://shared-comments/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
addComment
|
|
[commentRequest], [commentedItemId, commentedItemType, message]
|
com.box.boxjavalibv2.dao.BoxComment
|
deleteComment
|
delete
|
commentId, defaultRequest
|
|
getComment
|
comment
|
commentId, defaultRequest
|
com.box.boxjavalibv2.dao.BoxComment
|
updateComment
|
update
|
commentId, commentRequest
|
com.box.boxjavalibv2.dao.BoxComment
|
URI Options for comments and shared-comments
Name
|
Type
|
commentId
|
String
|
commentRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxCommentRequestObject
|
commentedItemId
|
String
|
commentedItemType
|
com.box.boxjavalibv2.dao.IBoxType
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
message
|
String
|
Endpoint Prefix files and shared-files
For more information on Box files see https://developers.box.com/docs/#files. The following endpoints can be invoked with the prefix
files
or shared-files
as follows. The shared-files
prefix requires sharedLink and sharedPassword properties.
box://files/endpoint?[options] box://shared-files/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
copyFile
|
|
fileId, itemCopyRequest
|
com.box.boxjavalibv2.dao.BoxFile
|
createSharedLink
|
create
|
fileId, sharedLinkRequest
|
com.box.boxjavalibv2.dao.BoxFile
|
deleteFile
|
|
defaultRequest, fileId
|
|
downloadFile
|
download
|
[destination, listener], [listener, outputStreams], defaultRequest, fileId
|
java.io.InputStream
|
downloadThumbnail
|
download
|
extension, fileId, imageRequest
|
java.io.InputStream
|
getFile
|
file
|
defaultRequest, fileId
|
com.box.boxjavalibv2.dao.BoxFile
|
getFileComments
|
fileComments
|
defaultRequest, fileId
|
com.box.boxjavalibv2.dao.BoxCollection
|
getFileVersions
|
fileVersions
|
defaultRequest, fileId
|
java.util.List
|
getPreview
|
preview
|
extension, fileId, imageRequest
|
com.box.boxjavalibv2.dao.BoxPreview
|
getThumbnail
|
thumbnail
|
extension, fileId, imageRequest
|
com.box.boxjavalibv2.dao.BoxThumbnail
|
updateFileInfo
|
update
|
fileId, fileRequest
|
com.box.boxjavalibv2.dao.BoxFile
|
uploadFile
|
upload
|
fileUploadRequest
|
com.box.boxjavalibv2.dao.BoxFile
|
uploadNewVersion
|
upload
|
fileId, fileUploadRequest
|
com.box.boxjavalibv2.dao.BoxFile
|
URI Options for files and shared-files
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
destination
|
java.io.File
|
extension
|
String
|
fileId
|
String
|
fileRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxFileRequestObject
|
fileUploadRequest
|
com.box.restclientv2.requestsbase.BoxFileUploadRequestObject
|
imageRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxImageRequestObject
|
itemCopyRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxItemCopyRequestObject
|
listener
|
com.box.boxjavalibv2.filetransfer.IFileTransferListener
|
outputStreams
|
java.io.OutputStream[]
|
sharedLinkRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxSharedLinkRequestObject
|
Endpoint Prefix folders and shared-folders
For more information on Box folders see https://developers.box.com/docs/#folders. The following endpoints can be invoked with the prefix
folders
or shared-folders
as follows. The prefix shared-folders requires sharedLink and sharedPassword properties.
box://folders/endpoint?[options] box://shared-folders/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
copyFolder
|
|
folderId, itemCopyRequest
|
com.box.boxjavalibv2.dao.BoxFolder
|
createFolder
|
create
|
folderRequest
|
com.box.boxjavalibv2.dao.BoxFolder
|
createSharedLink
|
create
|
folderId, sharedLinkRequest
|
com.box.boxjavalibv2.dao.BoxFolder
|
deleteFolder
|
delete
|
folderDeleteRequest, folderId
|
|
getFolder
|
folder
|
defaultRequest, folderId
|
com.box.boxjavalibv2.dao.BoxFolder
|
getFolderCollaborations
|
folderCollaborations
|
defaultRequest, folderId
|
java.util.List
|
getFolderItems
|
folderItems
|
folderId, pagingRequest
|
com.box.boxjavalibv2.dao.BoxCollection
|
updateFolderInfo
|
update
|
folderId, folderRequest
|
com.box.boxjavalibv2.dao.BoxFolder
|
URI Options for folders or shared-folders
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
folderDeleteRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxFolderDeleteRequestObject
|
folderId
|
String
|
folderRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxFolderRequestObject
|
itemCopyRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxItemCopyRequestObject
|
pagingRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxPagingRequestObject
|
sharedLinkRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxSharedLinkRequestObject
|
Endpoint Prefix shared-items
For more information on Box shared items see https://developers.box.com/docs/#shared-items. The following endpoints can be invoked with the prefix
shared-items
as follows:
box://shared-items/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
getSharedItem
|
sharedItem
|
defaultRequest
|
com.box.boxjavalibv2.dao.BoxItem
|
URI Options for shared-items
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
Endpoint Prefix users
For information on Box users see https://developers.box.com/docs/#users. The following endpoints can be invoked with the prefix
users
as follows:
box://users/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
addEmailAlias
|
|
emailAliasRequest, userId
|
com.box.boxjavalibv2.dao.BoxEmailAlias
|
createEnterpriseUser
|
create
|
userRequest
|
com.box.boxjavalibv2.dao.BoxUser
|
deleteEmailAlias
|
|
defaultRequest, emailId, userId
|
|
deleteEnterpriseUser
|
|
userDeleteRequest, userId
|
|
getAllEnterpriseUser
|
allEnterpriseUser
|
defaultRequest, filterTerm
|
java.util.List
|
getCurrentUser
|
currentUser
|
defaultRequest
|
com.box.boxjavalibv2.dao.BoxUser
|
getEmailAliases
|
emailAliases
|
defaultRequest, userId
|
java.util.List
|
moveFolderToAnotherUser
|
|
folderId, simpleUserRequest, userId
|
com.box.boxjavalibv2.dao.BoxFolder
|
updateUserInformaiton
|
update
|
userId, userRequest
|
com.box.boxjavalibv2.dao.BoxUser
|
updateUserPrimaryLogin
|
update
|
userId, userUpdateLoginRequest
|
com.box.boxjavalibv2.dao.BoxUser
|
URI Options for users
Name
|
Type
|
defaultRequest
|
com.box.restclientv2.requestsbase.BoxDefaultRequestObject
|
emailAliasRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxEmailAliasRequestObject
|
emailId
|
String
|
filterTerm
|
String
|
folderId
|
String
|
simpleUserRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxSimpleUserRequestObject
|
userDeleteRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxUserDeleteRequestObject
|
userId
|
String
|
userRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxUserRequestObject
|
userUpdateLoginRequest
|
com.box.boxjavalibv2.requests.requestobjects.BoxUserUpdateLoginRequestObject
|
Consumer Endpoints:
For more information on Box events see https://developers.box.com/docs/#events and for long polling see https://developers.box.com/docs/#events-long-polling. Consumer endpoints can only use the endpoint prefix poll-events as shown in the example next. By default the consumer will split the com.box.boxjavalibv2.dao.BoxEventCollection from every long poll and create an exchange for every com.box.boxjavalibv2.dao.BoxEvent. To make the consumer return the entire collection in a single exchange, use the URI option consumer.splitResult=false.
box://poll-events/endpoint?[options]
Endpoint
|
Shorthand Alias
|
Options
|
Result Body Type
|
poll
|
|
limit, streamPosition, streamType
|
com.box.boxjavalibv2.dao.BoxEvent by default, or com.box.boxjavalibv2.dao.BoxEventCollection when consumer.splitResult=false
|
URI Options for poll-events
Name
|
Type
|
limit
|
Integer
|
streamPosition
|
Long
|
streamType
|
String
|
splitResult
|
boolean
|
Message header
Any of the options can be provided in a message header for producer endpoints with CamelBox. prefix.
Message body
All result message bodies utilize objects provided by the Box Java SDK. Producer endpoints can specify the option name for incoming message body in the inBody endpoint parameter.
Type Converter
The Box component also provides a Camel type converter to convert GenericFile objects from File component to a com.box.restclientv2.requestsbase.BoxFileUploadRequestObject to upload files to Box.com. The target folderId for the upload can be specified in the exchange property CamelBox.folderId. If the exchange property is not specified the value defaults to "0" for the root folder ID.
Use cases
The following route uploads new files to the user's root folder:
from("file:...") .to("box://files/upload/inBody=fileUploadRequest");
The following route polls user's account for updates:
from("box://poll-events/poll?streamPosition=-1&streamType=all&limit=100") .to("bean:blah");
The following route uses a producer with dynamic header options. The fileId property has the Box file id , so its assigned to the CamelBox.fileId header as follows:
from("direct:foo") .setHeader("CamelBox.fileId", header("fileId")) .to("box://files/download") .to("file://...");
Chapter 15. Browse
Browse Component
Available as of Apache Camel 2.0
The Browse component provides a simple BrowsableEndpoint which can be useful for testing, visualisation tools or debugging. The exchanges sent to the endpoint are all available to be browsed.
URI format
browse:someName
Where someName can be any string to uniquely identify the endpoint.
Sample
In the route below, we insert a
browse:
component to be able to browse the Exchanges that are passing through:
from("activemq:order.in").to("browse:orderReceived").to("bean:processOrder");
We can now inspect the received exchanges from within the Java code:
private CamelContext context; public void inspectRecievedOrders() { BrowsableEndpoint browse = context.getEndpoint("browse:orderReceived", BrowsableEndpoint.class); List<Exchange> exchanges = browse.getExchanges(); ... // then we can inspect the list of received exchanges from Java for (Exchange exchange : exchanges) { String payload = exchange.getIn().getBody(); ... } }
Chapter 16. Cache
16.1. Cache Component
Available as of Camel 2.1
The cache component enables you to perform caching operations using EHCache as the Cache Implementation. The cache itself is created on demand or if a cache of that name already exists then it is simply utilized with its original settings.
This component supports producer and event based consumer endpoints.
The Cache consumer is an event based consumer and can be used to listen and respond to specific cache activities. If you need to perform selections from a pre-existing cache, use the processors defined for the cache component.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cache</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
cache://cacheName[?options]
You can append query options to the URI in the following format,
?option=value&option=#beanRef&...
Options
The Cache component supports the following options:
Name | Default Value | Description |
---|---|---|
maxElementsInMemory
|
1000
|
The number of elements that may be stored in the defined cache |
memoryStoreEvictionPolicy
|
MemoryStoreEvictionPolicy.LFU
|
The number of elements that may be stored in the defined cache. Options include
|
overflowToDisk
|
true
|
Specifies whether cache may overflow to disk |
eternal
|
false
|
Sets whether elements are eternal. If eternal, timeouts are ignored and the element never expires.
|
timeToLiveSeconds
|
300
|
The maximum time between creation time and when an element expires. Is used only if the element is not eternal
|
timeToIdleSeconds
|
300
|
The maximum amount of time between accesses before an element expires |
diskPersistent
|
false
|
Whether the disk store persists between restarts of the Virtual Machine. |
diskExpiryThreadIntervalSeconds
|
120
|
The number of seconds between runs of the disk expiry thread. |
cacheManagerFactory
|
null
|
Camel 2.8: If you want to use a custom factory which instantiates and creates the EHCache net.sf.ehcache.CacheManager . Type: abstract org.apache.camel.component.cache.CacheManagerFactory
|
eventListenerRegistry
|
null
|
Camel 2.8: Sets a list of EHCache net.sf.ehcache.event.CacheEventListener for all new caches\- no need to define it per cache in EHCache xml config anymore. Type: org.apache.camel.component.cache.CacheEventListenerRegistry
|
cacheLoaderRegistry
|
null
|
Camel 2.8: Sets a list of org.apache.camel.component.cache.CacheLoaderWrapper that extends EHCache net.sf.ehcache.loader.CacheLoader for all new caches\- no need to define it per cache in EHCache xml config anymore. Type: org.apache.camel.component.cache.CacheLoaderRegistry
|
key
|
null
|
Camel 2.10: To configure using a cache key by default. If a key is provided in the message header, then the key from the header takes precedence. |
operation
|
null
|
Camel 2.10: To configure using an cache operation by default. If an operation in the message header, then the operation from the header takes precedence. |
objectCache
|
false
|
Camel 2.10: Whether to turn on allowing to store non serializable objects in the cache. If this option is enabled then overflow to disk cannot be enabled as well. |
configurationFile
|
|
Camel 2.13/2.12.3: To configure the location of the
ehcache.xml file to use, such as classpath:com/foo/mycache.xml to load from classpath. If no configuration is given, then the default settings from EHCache is used.
|
configuration
|
|
To use a custom
org.apache.camel.component.cache.CacheConfiguration configuration.
|
Cache component options
Name
|
Default Value
|
Description
|
---|---|---|
configuration
|
|
To use a custom
org.apache.camel.component.cache.CacheConfiguration configuration.
|
cacheManagerFactory
|
|
To use a custom
org.apache.camel.component.cache.CacheManagerFactory .
|
configurationFile
|
|
Camel 2.13/2.12.3: To configure the location of the
ehcache.xml file to use, such as classpath:com/foo/mycache.xml to load from classpath. If no configuration is given, then the default settings from EHCache is used.
|
Message Headers Camel 2.8+
Header | Description |
---|---|
CamelCacheOperation
|
The operation to be performed on the cache. The valid options are
|
CamelCacheKey
|
The cache key used to store the Message in the cache. The cache key is optional if the CamelCacheOperation is CamelCacheDeleteAll
|
Header changes in Camel 2.8
The header names and supported values have changed to be prefixed with
CamelCache
and use mixed case. This makes them easier to identify and keep separate from other headers. The CacheConstants
variable names remain unchanged, just their values have been changed. Also, these headers are now removed from the exchange after the cache operation is performed.
The
CamelCacheAdd
and CamelCacheUpdate
operations support additional headers:
Header | Type | Description |
---|---|---|
CamelCacheTimeToLive
|
Integer
|
Camel 2.11: Time to live in seconds. |
CamelCacheTimeToIdle
|
Integer
|
Camel 2.11: Time to idle in seconds. |
CamelCacheEternal
|
Boolean
|
Camel 2.11: Whether the content is eternal. |
Cache Producer
Sending data to the cache involves the ability to direct payloads in exchanges to be stored in a pre-existing or created-on-demand cache. The mechanics of doing this involve
- setting the Message Exchange Headers shown above.
- ensuring that the Message Exchange Body contains the message directed to the cache
Cache Consumer
Receiving data from the cache involves the ability of the CacheConsumer to listen on a pre-existing or created-on-demand Cache using an event Listener and receive automatic notifications when any cache activity take place (i.e CamelCacheGet/CamelCacheUpdate/CamelCacheDelete/CamelCacheDeleteAll). Upon such an activity taking place
- an exchange containing Message Exchange Headers and a Message Exchange Body containing the just added/updated payload is placed and sent.
- in case of a CamelCacheDeleteAll operation, the Message Exchange Header CamelCacheKey and the Message Exchange Body are not populated.
Cache Processors
There are a set of nice processors with the ability to perform cache lookups and selectively replace payload content at the
- body
- token
- xpath level
Example 1: Configuring the cache
from("cache://MyApplicationCache" + "?maxElementsInMemory=1000" + "&memoryStoreEvictionPolicy=" + "MemoryStoreEvictionPolicy.LFU" + "&overflowToDisk=true" + "&eternal=true" + "&timeToLiveSeconds=300" + "&timeToIdleSeconds=true" + "&diskPersistent=true" + "&diskExpiryThreadIntervalSeconds=300")
Example 2: Adding keys to the cache
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
Example 2: Updating existing keys in a cache
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_UPDATE)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
Example 3: Deleting existing keys in a cache
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_DELETE)) .setHeader(CacheConstants.CACHE_KEY", constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
Example 4: Deleting all existing keys in a cache
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_DELETEALL)) .to("cache://TestCache1"); } };
Example 5: Notifying any changes registering in a Cache to Processors and other Producers
RouteBuilder builder = new RouteBuilder() { public void configure() { from("cache://TestCache1") .process(new Processor() { public void process(Exchange exchange) throws Exception { String operation = (String) exchange.getIn().getHeader(CacheConstants.CACHE_OPERATION); String key = (String) exchange.getIn().getHeader(CacheConstants.CACHE_KEY); Object body = exchange.getIn().getBody(); // Do something } }) } };
Example 6: Using Processors to selectively replace payload with cache values
RouteBuilder builder = new RouteBuilder() { public void configure() { //Message Body Replacer from("cache://TestCache1") .filter(header(CacheConstants.CACHE_KEY).isEqualTo("greeting")) .process(new CacheBasedMessageBodyReplacer("cache://TestCache1","farewell")) .to("direct:next"); //Message Token replacer from("cache://TestCache1") .filter(header(CacheConstants.CACHE_KEY).isEqualTo("quote")) .process(new CacheBasedTokenReplacer("cache://TestCache1","novel","#novel#")) .process(new CacheBasedTokenReplacer("cache://TestCache1","author","#author#")) .process(new CacheBasedTokenReplacer("cache://TestCache1","number","#number#")) .to("direct:next"); //Message XPath replacer from("cache://TestCache1"). .filter(header(CacheConstants.CACHE_KEY).isEqualTo("XML_FRAGMENT")) .process(new CacheBasedXPathReplacer("cache://TestCache1","book1","/books/book1")) .process (new CacheBasedXPathReplacer("cache://TestCache1","book2","/books/book2")) .to("direct:next"); } };
Example 7: Getting an entry from the Cache
from("direct:start") // Prepare headers .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_GET)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")). .to("cache://TestCache1"). // Check if entry was not found .choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull()). // If not found, get the payload and put it to cache .to("cxf:bean:someHeavyweightOperation"). .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") .end() .to("direct:nextPhase");
Example 8: Checking for an entry in the Cache
Note: The CHECK command tests existence of an entry in the cache but doesn't place a message in the body.
from("direct:start") // Prepare headers .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_CHECK)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")). .to("cache://TestCache1"). // Check if entry was not found .choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull()). // If not found, get the payload and put it to cache .to("cxf:bean:someHeavyweightOperation"). .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") .end();
Management of EHCache
Here's a snippet on how to expose them via JMX in a Spring application context:
<bean id="ehCacheManagementService" class="net.sf.ehcache.management.ManagementService" init-method="init" lazy-init="false"> <constructor-arg> <bean class="net.sf.ehcache.CacheManager" factory-method="getInstance"/> </constructor-arg> <constructor-arg> <bean class="org.springframework.jmx.support.JmxUtils" factory-method="locateMBeanServer"/> </constructor-arg> <constructor-arg value="true"/> <constructor-arg value="true"/> <constructor-arg value="true"/> <constructor-arg value="true"/> </bean>
Of course you can do the same thing in straight Java:
ManagementService.registerMBeans(CacheManager.getInstance(), mbeanServer, true, true, true, true);
You can get cache hits, misses, in-memory hits, disk hits, size stats this way. You can also change CacheConfiguration parameters on the fly.
Cache replication Camel 2.8+
The Camel Cache component is able to distribute a cache across server nodes using several different replication mechanisms including: RMI, JGroups, JMS and Cache Server.
There are two different ways to make it work:
- You can configure
ehcache.xml
manually, or - You can configure these three options:
cacheManagerFactory
eventListenerRegistry
cacheLoaderRegistry
Configuring Camel Cache replication using the first option is a bit of hard work as you have to configure all caches separately. So in a situation when the all names of caches are not known, using
ehcache.xml
is not a good idea.
The second option is much better when you want to use many different caches as you do not need to define options per cache. This is because replication options are set per
CacheManager
and per CacheEndpoint
. Also it is the only way when cache names are not know at the development phase.
Note
It might be useful to read the EHCache manual to get a better understanding of the Camel Cache replication mechanism.
Example: JMS cache replication
JMS replication is the most powerful and secured replication method. Used together with Camel Cache replication makes it also rather simple. An example is available on a separate page.
16.2. cacheReplicationJMSExample
Example: JMS cache replication
Note
Please note, that this example is not finished yet. It is based on OSGi iTest instead of real life example. But no matter to that it is very good staring point for all Camel Cache Riders!
JMS replication is the most powerful and secured way. Used altogether with Camel Cache replication options is also the most easy way. This basic example is divided to few important steps that have to be made to get the cache replication to work.
The first step is to write your own implementation of
CacheManagerFactory
.
public class TestingCacheManagerFactory extends CacheManagerFactory { [...] //This constructor is very useful when using Camel with Spring/Blueprint public TestingCacheManagerFactory(String xmlName, TopicConnection replicationTopicConnection, Topic replicationTopic, QueueConnection getQueueConnection, Queue getQueue) { this.xmlName = xmlName; this.replicationTopicConnection = replicationTopicConnection; this.replicationTopic = replicationTopic; this.getQueue = getQueue; this.getQueueConnection = getQueueConnection; } @Override protected synchronized CacheManager createCacheManagerInstance() { //Singleton if (cacheManager == null) { cacheManager = new WrappedCacheManager(getClass().getResourceAsStream(xmlName)); } return cacheManager; } //Wrapping Ehcache's CacheManager to be able to add JMSCacheManagerPeerProvider public class WrappedCacheManager extends CacheManager { public WrappedCacheManager(InputStream xmlConfig) { super(xmlConfig); JMSCacheManagerPeerProvider jmsCMPP = new JMSCacheManagerPeerProvider(this, replicationTopicConnection, replicationTopic, getQueueConnection, getQueue, AcknowledgementMode.AUTO_ACKNOWLEDGE, true); cacheManagerPeerProviders.put(jmsCMPP.getScheme(), jmsCMPP); jmsCMPP.init(); } } }
Next step is to write your own implementation of
CacheLoaderWrapper
, the easiest one is:
public class WrappedJMSCacheLoader implements CacheLoaderWrapper { [...] //This constructor is very useful when using Camel with Spring/Blueprint public WrappedJMSCacheLoader(QueueConnection getQueueConnection, Queue getQueue, AcknowledgementMode acknowledgementMode, int timeoutMillis) { this.getQueueConnection = getQueueConnection; this.getQueue = getQueue; this.acknowledgementMode = acknowledgementMode; this.timeoutMillis = timeoutMillis; } @Override public void init(Ehcache cache) { jmsCacheLoader = new JMSCacheLoader(cache, defaultLoaderArgument, getQueueConnection, getQueue, acknowledgementMode, timeoutMillis); } @Override public CacheLoader clone(Ehcache arg0) throws CloneNotSupportedException { return jmsCacheLoader.clone(arg0); } @Override public void dispose() throws CacheException { jmsCacheLoader.dispose(); } [...] }
At the third step you can take care about Camel Cache options (prepare their values):
- cacheManagerFactory
- eventListenerRegistry
- cacheLoaderRegistry
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="queueConnection1" factory-bean="amqCF" factory-method="createQueueConnection" class="javax.jms.QueueConnection" /> <bean id="topicConnection1" factory-bean="amqCF" factory-method="createTopicConnection" class="javax.jms.TopicConnection" /> <bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg ref="getQueue" /> </bean> <bean id="topic1" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg ref="getTopic" /> </bean> <bean id="jmsListener1" class="net.sf.ehcache.distribution.jms.JMSCacheReplicator"> <constructor-arg index="0" value="true" /> <constructor-arg index="1" value="true" /> <constructor-arg index="2" value="true" /> <constructor-arg index="3" value="true" /> <constructor-arg index="4" value="false" /> <constructor-arg index="5" value="0" /> </bean> <bean id="jmsLoader1" class="my.cache.replication.WrappedJMSCacheLoader"> <constructor-arg index="0" ref="queueConnection1" /> <constructor-arg index="1" ref="queue1" /> <constructor-arg index="2" value="AUTO_ACKNOWLEDGE" /> <constructor-arg index="3" value="30000" /> </bean> <bean id="cacheManagerFactory1" class="my.cache.replication.TestingCacheManagerFactory"> <constructor-arg index="0" value="ehcache_jms_test.xml" /> <constructor-arg index="1" ref="topicConnection1" /> <constructor-arg index="2" ref="topic1" /> <constructor-arg index="3" ref="queueConnection1" /> <constructor-arg index="4" ref="queue1" /> </bean> <bean id="eventListenerRegistry1" class="org.apache.camel.component.cache.CacheEventListenerRegistry"> <constructor-arg> <list> <ref bean="jmsListener1" /> </list> </constructor-arg> </bean> <bean id="cacheLoaderRegistry1" class="org.apache.camel.component.cache.CacheLoaderRegistry"> <constructor-arg> <list> <ref bean="jmsLoader1"/> </list> </constructor-arg> </bean> </beans>
The final step is to define some routes using Cache component
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="getQueue" class="java.lang.String"> <constructor-arg value="replicationGetQueue" /> </bean> <bean id="getTopic" class="java.lang.String"> <constructor-arg value="replicationTopic" /> </bean> <!-- Import the xml file explained at step three --> <import resource="JMSReplicationCache1.xml"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <camel:endpoint id="fooCache1" uri="cache:foo?cacheManagerFactory=#cacheManagerFactory1&ventListenerRegistry=#eventListenerRegistry1&acheLoaderRegistry=#cacheLoaderRegistry1"/> <camel:route> <camel:from uri="direct:addRoute"/> <camel:setHeader headerName="CamelCacheOperation"> <camel:constant>CamelCacheAdd</camel:constant> </camel:setHeader> <camel:setHeader headerName="CamelCacheKey"> <camel:constant>foo</camel:constant> </camel:setHeader> <camel:to ref="fooCache1"/> </camel:route> </camelContext> <bean id="amqCF" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false"/> </bean> <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory"> <ref bean="amqCF"/> </property> </bean> </beans>
Chapter 17. Cassandra
Camel Cassandra Component
Available as of Camel 2.15
Apache Cassandra is an open source NoSQL database designed to handle large amounts on commodity hardware. Like Amazon's DynamoDB, Cassandra has a peer-to-peer and master-less architecture to avoid single point of failure and garanty high availability. Like Google's BigTable, Cassandra data is structured using column families which can be accessed through the Thrift RPC API or a SQL-like API called CQL.
This component aims at integrating Cassandra 2.0+ using the CQL3 API (not the Thrift API). It's based on Cassandra Java Driver provided by DataStax.
Maven users will need to add the following dependency to their
pom.xml
:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cassandraql</artifactId> <version>x.y.z</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
The endpoint can initiate the Cassandra connection or use an existing one.
URI | Description |
---|---|
cql:localhost/keyspace
|
Single host, default port, usual for testing |
cql:host1,host2/keyspace
|
Multi host, default port |
cql:host1,host2:9042/keyspace
|
Multi host, custom port |
cql:host1,host2
|
Default port and keyspace |
cql:bean:sessionRef
|
Provided Session reference |
cql:bean:clusterRef/keyspace
|
Provided Cluster reference |
To fine tune the Cassandra connection (SSL options, pooling options, load balancing policy, retry policy, reconnection policy...), create your own Cluster instance and give it to the Camel endpoint.
Endpoint Options
Option | Default | Description |
---|---|---|
clusterName
|
Cluster name
|
|
username and password
|
Session authentication
|
|
cql
|
CQL query. Can be overriden with a message header.
|
|
consistencyLevel
|
ANY , ONE , TWO , QUORUM , LOCAL_QUORUM ...
|
|
prepareStatements
|
true
|
Use prepared statement (default) or not |
resultSetConversionStrategy
|
ALL
|
How is ResultSet converted transformed into message body
ALL , ONE , LIMIT_10 , LIMIT_100 ...
|
Messages
Incoming Message
The Camel Cassandra endpoint expects a bunch of simple objects (
Object
or Object[]
or Collection<Object>
) which will be bound to the CQL statement as query parameters. If message body is null or empty, then CQL query will be executed without binding parameters.
Headers:
CamelCqlQuery
(optional,String
orRegularStatement
): CQL query either as a plain String or built using theQueryBuilder
.
Outgoing Message
The Camel Cassandra endpoint produces one or many a Cassandra Row objects depending on the
resultSetConversionStrategy
:
List<Row>
ifresultSetConversionStrategy
isALL
orLIMIT_[0-9]+
- Single
Row
ifresultSetConversionStrategy
isONE
- Anything else, if
resultSetConversionStrategy
is a custom implementation of theResultSetConversionStrategy
Repositories
Cassandra can be used to store message keys or messages for the idempotent and aggregation EIP.
Cassandra might not be the best tool for queuing use cases yet, read Cassandra anti-patterns queues and queue like datasets. It's advised to use LeveledCompaction and a small GC grace setting for these tables to allow tombstoned rows to be removed quickly.
Idempotent repository
The
NamedCassandraIdempotentRepository
stores messages keys in a Cassandra table like this:
CREATE TABLE CAMEL_IDEMPOTENT ( NAME varchar, -- Repository name KEY varchar, -- Message key PRIMARY KEY (NAME, KEY) ) WITH compaction = {'class':'LeveledCompactionStrategy'} AND gc_grace_seconds = 86400;
This repository implementation uses lightweight transactions (also known as Compare and Set) and requires Cassandra 2.0.7+.
Alternatively, the
CassandraIdempotentRepository
does not have a NAME
column and can be extended to use a different data model.
Option | Default | Description |
---|---|---|
table
|
CAMEL_IDEMPOTENT
|
Table name |
pkColumns
|
NAME , KEY
|
Primary key columns |
name
|
Repository name, value used for NAME column
|
|
ttl
|
Key time to live | |
writeConsistencyLevel
|
Consistency level used to insert/delete key: ANY , ONE , TWO , QUORUM , LOCAL_QUORUM …
|
|
readConsistencyLevel
|
Consistency level used to read/check key: ONE , TWO , QUORUM , LOCAL_QUORUM …
|
Aggregation repository
The
NamedCassandraAggregationRepository
stores exchanges by correlation key in a Cassandra table like this:
CREATE TABLE CAMEL_AGGREGATION ( NAME varchar, -- Repository name KEY varchar, -- Correlation id EXCHANGE_ID varchar, -- Exchange id EXCHANGE blob, -- Serialized exchange PRIMARY KEY (NAME, KEY) ) WITH compaction = {'class':'LeveledCompactionStrategy'} AND gc_grace_seconds = 86400;
Alternatively, the
CassandraAggregationRepository
does not have a NAME
column and can be extended to use a different data model.
Option | Default | Description |
---|---|---|
table
|
CAMEL_AGGREGATION
|
Table name |
pkColumns
|
NAME ,KEY
|
Primary key columns |
exchangeIdColumn
|
EXCHANGE_ID
|
Exchange Id column |
exchangeColumn
|
EXCHANGE
|
Exchange content column |
name
|
Repository name, value used for NAME column
|
|
ttl
|
Exchange time to live | |
writeConsistencyLevel
|
Consistency level used to insert/delete exchange: ANY , ONE , TWO , QUORUM , LOCAL_QUORUM …
|
|
readConsistencyLevel
|
Consistency level used to read/check exchange: ONE , TWO , QUORUM , LOCAL_QUORUM …
|
Chapter 18. Chunk
Chunk Component
Available as of Camel 2.15
The chunk: component allows for processing a message using a Chunk template. This can be ideal when using Templating to generate responses for requests.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-chunk</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
chunk:templateName[?options]
Where templateName is the classpath-local URI of the template to invoke.
You can append query options to the URI in the following format,
?option=value&option=value&...
Options
Option
|
Default
|
Description
|
---|---|---|
encoding
|
null
|
Character encoding of the resource content.
|
themesFolder
|
null
|
Alternative folder to scan for a template name.
|
themeSubfolder
|
null
|
Alternative subfolder to scan for a template name if themeFolder parameter is set.
|
themeLayer
|
null
|
A specific layer of a template file to use as template.
|
extension
|
null
|
Alternative extension to scan for a template name if themeFolder and themeSubfolder are set.
|
Chunk component will look for a specific template in themes folder with extensions .chtml or .cxml. If you need to specify a different folder or extensions, you will need to use the specific options listed above.
Chunk Context
Camel will provide exchange information in the Chunk context (just a
Map
). The Exchange
is transferred as:
key
|
value
|
---|---|
exchange
|
The
Exchange itself.
|
exchange.properties
|
The
Exchange properties.
|
headers
|
The headers of the In message.
|
camelContext
|
The Camel Context.
|
request
|
The In message.
|
body
|
The In message body.
|
response
|
The Out message (only for InOut message exchange pattern).
|
Dynamic templates
Camel provides two headers by which you can define a different resource location for a template or the template content itself. If any of these headers is set then Camel uses this over the endpoint configured resource. This allows you to provide a dynamic template at runtime.
Header
|
Type
|
Description
|
Support Version
|
---|---|---|---|
ChunkConstants.CHUNK_RESOURCE_URI
|
String
|
A URI for the template resource to use instead of the endpoint configured.
|
|
ChunkConstants.CHUNK_TEMPLATE
|
String
|
The template to use instead of the endpoint configured.
|
|
Samples
For example you could use something like:
from("activemq:My.Queue"). to("chunk:template");
To use a Chunk template to formulate a response for a message for InOut message exchanges (where there is a
JMSReplyTo
header).
If you want to use InOnly and consume the message and send it to another destination you could use:
from("activemq:My.Queue"). to("chunk:template"). to("activemq:Another.Queue");
It's possible to specify what template the component should use dynamically via a header, so for example:
from("direct:in"). setHeader(ChunkConstants.CHUNK_RESOURCE_URI).constant("template"). to("chunk:dummy");
An example of Chunk component options use:
from("direct:in"). to("chunk:file_example?themeFolder=template&themeSubfolder=subfolder&extension=chunk");
In this example Chunk component will look for the file file_example.chunk in the folder template/subfolder.
The Email Sample
In this sample we want to use Chunk templating for an order confirmation email. The email template is laid out in Chunk as:
Dear {$headers.lastName}, {$headers.firstName} Thanks for the order of {$headers.item}. Regards Camel Riders Bookstore {$body}
Chapter 19. Class
Class Component
Available as of Apache Camel 2.4
URI format
class:className[?options]
Where className is the fully qualified class name to create and use as bean.
Options
Name | Type | Default | Description |
---|---|---|---|
method
|
String
|
null
|
The method name that bean will be invoked. If not provided, Apache Camel will try to pick the method itself. In case of ambiguity an exception is thrown. See Bean Binding for more details. |
multiParameterArray
|
boolean
|
false
|
How to treat the parameters which are passed from the message body; if it is true , the In message body should be an array of parameters.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Using
You simply use the class component just as the Bean component but by specifying the fully qualified classname instead. For example to use the
MyFooBean
you have to do as follows:
from("direct:start").to("class:org.apache.camel.component.bean.MyFooBean").to("mock:result");
You can also specify which method to invoke on the
MyFooBean
, for example hello
:
from("direct:start").to("class:org.apache.camel.component.bean.MyFooBean?method=hello").to("mock:result");
Setting properties on the created instance
In the endpoint uri you can specify properties to set on the created instance, for example if it has a
setPrefix
method:
from("direct:start") .to("class:org.apache.camel.component.bean.MyPrefixBean?prefix=Bye") .to("mock:result");
And you can also use the
#
syntax to refer to properties to be looked up in the Registry.
from("direct:start") .to("class:org.apache.camel.component.bean.MyPrefixBean?cool=#foo") .to("mock:result");
Which will lookup a bean from the Registry with the id
foo
and invoke the setCool
method on the created instance of the MyPrefixBean
class.
Note
See more details at the Bean component as the class component works in much the same way.
Chapter 20. CMIS
CMIS Component
Available as of Camel 2.11 The cmis component uses the Apache Chemistry client API and allows you to add/read nodes to/from a CMIS compliant content repositories.
URI Format
cmis://cmisServerUrl[?options]
You can append query options to the URI in the following format, ?options=value&option2=value&...
URI Options
Name | Default Value | Context | Description |
---|---|---|---|
queryMode |
false
|
Producer | If true, will execute the cmis query from the message body and return result, otherwise will create a node in the cmis repository |
query |
String
|
Consumer | The cmis query to execute against the repository. If not specified, the consumer will retrieve every node from the content repository by iterating the content tree recursively |
username |
null
|
Both | Username for the cmis repository |
password |
null
|
Both | Password for the cmis repository |
repositoryId |
null
|
Both | The Id of the repository to use. If not specified the first available repository is used |
pageSize |
100
|
Both | Number of nodes to retrieve per page |
readCount |
0
|
Both | Max number of nodes to read |
readContent |
false
|
Both | If set to true, the content of document node will be retrieved in addition to the properties |
Usage
Message headers evaluated by the producer
Header | Default Value | Description |
---|---|---|
CamelCMISFolderPath
|
/
|
The current folder to use during the execution. If not specified will use the root folder |
CamelCMISRetrieveContent
|
false
|
In queryMode this header will force the producer to retrieve the content of document nodes.
|
CamelCMISReadSize
|
0
|
Max number of nodes to read. |
cmis:path
|
null
|
If CamelCMISFolderPath is not set, will try to find out the path of the node from this cmis property and it is name
|
cmis:name
|
null
|
If CamelCMISFolderPath is not set, will try to find out the path of the node from this cmis property and it is path
|
cmis:objectTypeId
|
null
|
The type of the node |
cmis:contentStreamMimeType
|
null
|
The mimetype to set for a document |
Message headers set during querying Producer operation
Header | Type | Description |
---|---|---|
CamelCMISResultCount
|
Integer
|
Number of nodes returned from the query. |
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cmis</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.11 or higher).
Chapter 21. Cometd
Cometd Component
The cometd: component is a transport for working with the jetty implementation of the cometd/bayeux protocol. Using this component in combination with the dojo toolkit library it's possible to push Apache Camel messages directly into the browser using an AJAX based mechanism.
URI format
cometd://host:port/channelName[?options]
The channelName represents a topic that can be subscribed to by the Apache Camel endpoints.
Examples
cometd://localhost:8080/service/mychannel cometds://localhost:8443/service/mychannel
where
cometds:
represents an SSL configured endpoint.
Options
Name | Default Value | Description |
---|---|---|
resourceBase
|
The root directory for the web resources or classpath. Use the protocol file: or classpath: depending if you want that the component loads the resource from file system or classpath. Classpath is required for OSGI deployment where the resources are packaged in the jar | |
baseResource
|
Camel 2.7: The root directory for the web resources or classpath. Use the protocol file: or classpath: depending if you want that the component loads the resource from file system or classpath. Classpath is required for OSGI deployment where the resources are packaged in the jar | |
timeout
|
240000
|
The server side poll timeout in milliseconds. This is how long the server will hold a reconnect request before responding. |
interval
|
0
|
The client side poll timeout in milliseconds. How long a client will wait between reconnects |
maxInterval
|
30000
|
The max client side poll timeout in milliseconds. A client will be removed if a connection is not received in this time. |
multiFrameInterval
|
1500
|
The client side poll timeout, if multiple connections are detected from the same browser. |
jsonCommented
|
true
|
If true , the server will accept JSON wrapped in a comment and will generate JSON wrapped in a comment. This is a defence against Ajax Hijacking.
|
logLevel
|
1
|
0 =none, 1 =info, 2 =debug.
|
sslContextParameters |
Camel 2.9: Reference to a org.apache.camel.util.jsse.SSLContextParameters in the Registry. This reference overrides any configured SSLContextParameters at the component level. See Using the JSSE Configuration Utility.
|
|
crossOriginFilterOn
|
false
|
Camel 2.10: If true , the server will support for cross-domain filtering
|
allowedOrigins
|
*
|
Camel 2.10: The origins domain that support to cross, if the crosssOriginFilterOn is true
|
filterPath
|
Camel 2.10: The filterPath will be used by the CrossOriginFilter, if the crosssOriginFilterOn is true
|
|
disconnectLocalSession
|
true
|
Camel 2.10.5/2.11.1: (Producer only): Whether to disconnect local sessions after publishing a message to its channel. Disconnecting local session is needed as they are not swept by default by CometD, and therefore you can run out of memory. |
You can append query options to the URI in the following format,
?option=value&option=value&...
Here is some examples of how to pass the parameters.
For file (when the Webapp resources are located in the Web Application directory)
cometd://localhost:8080?resourceBase=file./webapp
. For classpath (when the web resources are packaged inside the Webapp folder) cometd://localhost:8080?resourceBase=classpath:webapp
.
Authentication
Available as of Camel 2.8
You can configure custom
SecurityPolicy
and Extension
's to the CometdComponent
which allows you to use authentication as documented here
Setting up SSL for Cometd Component
Using the JSSE Configuration Utility
As of Camel 2.9, the Cometd component supports SSL/TLS configuration through the Camel JSSE Configuration Utility. This utility greatly decreases the amount of component specific code you need to write and is configurable at the endpoint and component levels. The following examples demonstrate how to use the utility with the Cometd component. You need to configure SSL on the
CometdComponent
class.
Programmatic configuration of the component
KeyStoreParameters ksp = new KeyStoreParameters(); ksp.setResource("/users/home/server/keystore.jks"); ksp.setPassword("keystorePassword"); KeyManagersParameters kmp = new KeyManagersParameters(); kmp.setKeyStore(ksp); kmp.setKeyPassword("keyPassword"); TrustManagersParameters tmp = new TrustManagersParameters(); tmp.setKeyStore(ksp); SSLContextParameters scp = new SSLContextParameters(); scp.setKeyManagers(kmp); scp.setTrustManagers(tmp); CometdComponent commetdComponent = getContext().getComponent("cometds", CometdComponent.class); commetdComponent.setSslContextParameters(scp);
Spring DSL based configuration of endpoint
... <camel:sslContextParameters id="sslContextParameters"> <camel:keyManagers keyPassword="keyPassword"> <camel:keyStore resource="/users/home/server/keystore.jks" password="keystorePassword"/> </camel:keyManagers> <camel:trustManagers> <camel:keyStore resource="/users/home/server/keystore.jks" password="keystorePassword"/> </camel:keyManagers> </camel:sslContextParameters> <bean id="cometd" class="org. apache. camel. component.cometd.CometdComponent"> <property name="sslContextParameters" ref="sslContextParameters"/> </bean> ... <to uri="cometds://127.0.0.1:443/service/test?baseResource=file:./target/test-classes/webapp&timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2&sslContextParameters=#sslContextParameters"/>...
Chapter 22. Context
Context Component
Available as of Camel 2.7
The context component allows you to create new Camel Components from a CamelContext with a number of routes which is then treated as a black box, allowing you to refer to the local endpoints within the component from other CamelContexts.
It is similar to the Routebox component in idea, though the Context component tries to be really simple for end users; just a simple convention over configuration approach to refer to local endpoints inside the CamelContext Component.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-context</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
context:camelContextId:localEndpointName[?options]
Or you can omit the "context:" prefix.
camelContextId:localEndpointName[?options]
- camelContextId is the ID you used to register the CamelContext into the Registry.
- localEndpointName can be a valid Camel URI evaluated within the black box CamelContext. Or it can be a logical name which is mapped to any local endpoints. For example if you locally have endpoints like direct:invoices and seda:purchaseOrders inside a CamelContext of id supplyChain, then you can just use the URIs supplyChain:invoices or supplyChain:purchaseOrders to omit the physical endpoint kind and use pure logical URIs.
You can append query options to the URI in the following format,
?option=value&option=value&...
Example
In this example we'll create a black box context, then we'll use it from another CamelContext.
Defining the context component
First you need to create a CamelContext, add some routes in it, start it and then register the CamelContext into the Registry (JNDI, Spring, Guice or OSGi etc).
This can be done in the usual Camel way from this test case (see the createRegistry() method); this example shows Java and JNDI being used...
// lets create our black box as a camel context and a set of routes DefaultCamelContext blackBox = new DefaultCamelContext(registry); blackBox.setName("blackBox"); blackBox.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { // receive purchase orders, lets process it in some way then send an invoice // to our invoice endpoint from("direct:purchaseOrder"). setHeader("received").constant("true"). to("direct:invoice"); } }); blackBox.start(); registry.bind("accounts", blackBox);
Notice in the above route we are using pure local endpoints (direct and seda). Also note we expose this CamelContext using the accounts ID. We can do the same thing in Spring via
<camelContext id="accounts" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:purchaseOrder"/> ... <to uri="direct:invoice"/> </route> </camelContext>
Using the context component
Then in another CamelContext we can then refer to this "accounts black box" by just sending to accounts:purchaseOrder and consuming from accounts:invoice.
If you prefer to be more verbose and explicit you could use context:accounts:purchaseOrder or even context:accounts:direct://purchaseOrder if you prefer. But using logical endpoint URIs is preferred as it hides the implementation detail and provides a simple logical naming scheme.
For example if we wish to then expose this accounts black box on some middleware (outside of the black box) we can do things like...
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <!-- consume from an ActiveMQ into the black box --> <from uri="activemq:Accounts.PurchaseOrders"/> <to uri="accounts:purchaseOrders"/> </route> <route> <!-- lets send invoices from the black box to a different ActiveMQ Queue --> <from uri="accounts:invoice"/> <to uri="activemq:UK.Accounts.Invoices"/> </route> </camelContext>
Naming endpoints
A context component instance can have many public input and output endpoints that can be accessed from outside it's CamelContext. When there are many it is recommended that you use logical names for them to hide the middleware as shown above.
However when there is only one input, output or error/dead letter endpoint in a component we recommend using the common posix shell names in, out and err
Chapter 23. ControlBus Component
ControlBus Component
Available as of Camel 2.11
The controlbus: component provides easy management of Camel applications based on the Control Bus EIP pattern. For example, by sending a message to an Endpoint you can control the lifecycle of routes, or gather performance statistics.
controlbus:command[?options]
Where command can be any string to identify which type of command to use.
Commands
Command | Description |
---|---|
route
|
To control routes using the routeId and action parameter.
|
language
|
Allows you to specify a Language to use for evaluating the message body. If there is any result from the evaluation, then the result is put in the message body. |
Options
Name | Default Value | Description |
---|---|---|
routeId
|
null
|
To specify a route by its id .
|
action
|
null
|
To denote an action that can be either: start , stop , or status . To either start or stop a route, or to get the status of the route as output in the message body. You can use suspend and resume from Camel 2.11.1 onwards to either suspend or resume a route. And from Camel 2.11.1 onwards you can use stats to get performance statics returned in XML format; the routeId option can be used to define which route to get the performance stats for, if routeId is not defined, then you get statistics for the entire CamelContext.
|
async
|
false
|
Whether to execute the control bus task asynchronously. Important: If this option is enabled, then any result from the task is not set on the Exchange. This is only possible if executing tasks synchronously. |
loggingLevel
|
INFO
|
Logging level used for logging when task is done, or if any exceptions occurred during processing the task. |
You can append query options to the URI in the following format,
?option=value&option=value&...
Samples
Using route command
The route command allows you to do common tasks on a given route very easily, for example to start a route, you can send an empty message to this endpoint:
template.sendBody("controlbus:route?routeId=foo&action=start", null);
To get the status of the route, you can do:
String status = template.requestBody("controlbus:route?routeId=foo&action=status", null, String.class);
Getting performance statistics
Available as of Camel 2.11.1
This requires JMX to be enabled (is by default) then you can get the performance statics per route, or for the CamelContext. For example to get the statics for a route named foo, we can do:
String xml = template.requestBody("controlbus:route?routeId=foo&action=stats", null, String.class);
The returned statics is in XML format. Its the same data you can get from JMX with the
dumpRouteStatsAsXml
operation on the ManagedRouteMBean
.
To get statics for the entire CamelContext you just omit the routeId parameter as shown below:
String xml = template.requestBody("controlbus:route?action=stats", null, String.class);
Using Simple language
You can use the Simple language with the control bus, for example to stop a specific route, you can send a message to the
"controlbus:language:simple"
endpoint containing the following message:
template.sendBody("controlbus:language:simple", "${camelContext.stopRoute('myRoute')}");
As this is a void operation, no result is returned. However, if you want the route status you can do:
String status = template.requestBody("controlbus:language:simple", "${camelContext.getRouteStatus('myRoute')}", String.class);
Notice: its easier to use the
route
command to control lifecycle of routes. The language
command allows you to execute a language script that has stronger powers such as Groovy or to some extend the Simple language.
For example to shutdown Camel itself you can do:
template.sendBody("controlbus:language:simple?async=true", "${camelContext.stop()}");
Notice we use
async=true
to stop Camel asynchronously as otherwise we would be trying to stop Camel while it was in-flight processing the message we sent to the control bus component.
Note
You can also use other languages such as Groovy, etc.
- ControlBus EIP
- JMX Component
- Using JMX with Camel
Chapter 24. CouchDB
Camel CouchDB component
Available as of Camel 2.11
The couchdb: component allows you to treat CouchDB instances as a producer or consumer of messages. Using the lightweight LightCouch API, this camel component has the following features:
- As a consumer, monitors couch changesets for inserts, updates and deletes and publishes these as messages into camel routes.
- As a producer, can save or update documents into couch.
- Can support as many endpoints as required, eg for multiple databases across multiple instances.
- Ability to have events trigger for only deletes, only inserts/updates or all (default).
- Headers set for sequenceId, document revision, document id, and HTTP method type.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-couchdb</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
couchdb:http://hostname[:port]/database?[options]
Where hostname is the hostname of the running couchdb instance. Port is optional and if not specified then defaults to 5984.
Options
Property | Default | Description |
---|---|---|
deletes
|
true
|
document deletes are published as events |
updates
|
true
|
document inserts/updates are published as events |
heartbeat
|
30000
|
how often to send an empty message to keep socket alive in millis |
createDatabase
|
true
|
create the database if it does not already exist |
username
|
null
|
username in case of authenticated databases |
password
|
null
|
password for authenticated databases |
Headers
The following headers are set on exchanges during message transport.
Property | Value |
---|---|
CouchDbDatabase
|
the database the message came from |
CouchDbSeq
|
the couchdb changeset sequence number of the update / delete message |
CouchDbId
|
the couchdb document id |
CouchDbRev
|
the couchdb document revision |
CouchDbMethod
|
the method (delete / update) |
Headers are set by the consumer once the message is received. The producer will also set the headers for downstream processors once the insert/update has taken place. Any headers set prior to the producer are ignored. That means for example, if you set CouchDbId as a header, it will not be used as the id for insertion, the id of the document will still be used.
Message Body
The component will use the message body as the document to be inserted. If the body is an instance of String, then it will be marshalled into a GSON object before insert. This means that the string must be valid JSON or the insert / update will fail. If the body is an instance of a com.google.gson.JsonElement then it will be inserted as is. Otherwise the producer will throw an exception of unsupported body type.
Samples
For example if you wish to consume all inserts, updates and deletes from a CouchDB instance running locally, on port 9999 then you could use the following:
from("couchdb:http://localhost:9999").process(someProcessor);
If you were only interested in deletes, then you could use the following
from("couchdb:http://localhost:9999?updates=false").process(someProcessor);
If you wanted to insert a message as a document, then the body of the exchange is used
from("someProducingEndpoint").process(someProcessor).to("couchdb:http://localhost:9999")
Chapter 25. Crypto (Digital Signatures)
Crypto component for Digital Signatures
Available as of Apache Camel 2.3
Using Apache Camel cryptographic endpoints and Java's Cryptographic extension it is easy to create Digital Signatures for Exchanges. Apache Camel provides a pair of flexible endpoints which get used in concert to create a signature for an exchange in one part of the exchange's workflow and then verify the signature in a later part of the workflow.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-crypto</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Introduction
Digital signatures make use Asymmetric Cryptographic techniques to sign messages. From a (very) high level, the algorithms use pairs of complimentary keys with the special property that data encrypted with one key can only be decrypted with the other. One, the private key, is closely guarded and used to 'sign' the message while the other, public key, is shared around to anyone interested in verifying your messages. Messages are signed by encrypting a digest of the message with the private key. This encrypted digest is transmitted along with the message. On the other side the verifier recalculates the message digest and uses the public key to decrypt the the digest in the signature. If both digest match the verifier knows only the holder of the private key could have created the signature.
Apache Camel uses the Signature service from the Java Cryptographic Extension to do all the heavy cryptographic lifting required to create exchange signatures. The following are some excellent sources for explaining the mechanics of Cryptography, Message digests and Digital Signatures and how to leverage them with the JCE.
- Bruce Schneier's Applied Cryptography
- Beginning Cryptography with Java by David Hook
- The ever insightful, Wikipedia Digital_signatures
URI format
As mentioned Apache Camel provides a pair of crypto endpoints to create and verify signatures
crypto:sign:name[?options] crypto:verify:name[?options]
crypto:sign
creates the signature and stores it in the Header keyed by the constantExchange.SIGNATURE
, i.e."CamelDigitalSignature"
.crypto:verify
will read in the contents of this header and do the verification calculation.
In order to correctly function, sign and verify need to share a pair of keys, sign requiring a
PrivateKey
and verify a PublicKey
(or a Certificate
containing one). Using the JCE is is very simple to generate these key pairs but it is usually most secure to use a KeyStore to house and share your keys. The DSL is very flexible about how keys are supplied and provides a number of mechanisms.
Note a
crypto:sign
endpoint is typically defined in one route and the complimentary crypto:verify
in another, though for simplicity in the examples they appear one after the other. It goes without saying that both sign and verify should be configured identically.
Options
Name | Type | Default | Description |
---|---|---|---|
algorithm
|
String
|
DSA
|
The name of the JCE Signature algorithm that will be used. |
alias
|
String
|
null
|
An alias name that will be used to select a key from the keystore. |
bufferSize
|
Integer
|
2048
|
the size of the buffer used in the signature process. |
certificate
|
Certificate
|
null
|
A Certificate used to verify the signature of the exchange's payload. Either this or a Public Key is required. |
keystore
|
KeyStore
|
null
|
A reference to a JCE Keystore that stores keys and certificates used to sign and verify. |
provider
|
String
|
null
|
The name of the JCE Security Provider that should be used. |
privateKey
|
PrivatKey
|
null
|
The private key used to sign the exchange's payload. |
publicKey
|
PublicKey
|
null
|
The public key used to verify the signature of the exchange's payload. |
secureRandom
|
secureRandom
|
null
|
A reference to a SecureRandom object that wil lbe used to initialize the Signature service.
|
password
|
char[]
|
null
|
The password for the keystore. |
clearHeaders
|
String
|
true
|
Remove camel crypto headers from Message after a verify operation (value can be "true" /{{"false"}}).
|
1) Raw keys
The most basic way to way to sign and verify an exchange is with a KeyPair as follows.
from("direct:keypair").to("crypto:sign://basic?privateKey=#myPrivateKey", "crypto:verify://basic?publicKey=#myPublicKey", "mock:result");
The same can be achieved with the Spring XML Extensions using references to keys
<route> <from uri="direct:keypair"/> <to uri="crypto:sign://basic?privateKey=#myPrivateKey" /> <to uri="crypto:verify://basic?publicKey=#myPublicKey" /> <to uri="mock:result"/> </route>
2) KeyStores and Aliases.
The JCE provides a very versatile KeyStore for housing pairs of PrivateKeys and Certificates keeping them encrypted and password protected. They can be retrieved from it by applying an alias to the retrieval apis. There are a number of ways to get keys and Certificates into a keystore most often this is done with the external 'keytool' application. This is a good example of using keytool to create a KeyStore with a self signed Cert and Private key.
The examples use a Keystore with a key and cert aliased by 'bob'. The password for the keystore and the key is 'letmein'
The following shows how to use a Keystore via the Fluent builders, it also shows how to load and initialize the keystore.
from("direct:keystore").to("crypto:sign://keystore?keystore=#keystore&alias=bob&password=letmein", "crypto:verify://keystore?keystore=#keystore&alias=bob", "mock:result");
Again in Spring a ref is used to lookup an actual keystore instance.
<route> <from uri="direct:keystore"/> <to uri="crypto:sign://keystore?keystore=#keystore&lias=bob&assword=letmein" /> <to uri="crypto:verify://keystore?keystore=#keystore&lias=bob" /> <to uri="mock:result"/> </route>
3) Changing JCE Provider and Algorithm
Changing the Signature algorithm or the Security provider is a simple matter of specifying their names. You will need to also use Keys that are compatible with the algorithm you choose.
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA"); keyGen.initialize(512, new SecureRandom()); keyPair = keyGen.generateKeyPair(); PrivateKey privateKey = keyPair.getPrivate(); PublicKey publicKey = keyPair.getPublic(); // we can set the keys explicitly on the endpoint instances. context.getEndpoint("crypto:sign://rsa?algorithm=MD5withRSA", DigitalSignatureEndpoint.class).setPrivateKey(privateKey); context.getEndpoint("crypto:verify://rsa?algorithm=MD5withRSA", DigitalSignatureEndpoint.class).setPublicKey(publicKey); from("direct:algorithm").to("crypto:sign://rsa?algorithm=MD5withRSA", "crypto:verify://rsa?algorithm=MD5withRSA", "mock:result");
from("direct:provider").to("crypto:sign://provider?privateKey=#myPrivateKey&provider=SUN", "crypto:verify://provider?publicKey=#myPublicKey&provider=SUN", "mock:result");
or
<route> <from uri="direct:algorithm"/> <to uri="crypto:sign://rsa?algorithm=MD5withRSA&rivateKey=#rsaPrivateKey" /> <to uri="crypto:verify://rsa?algorithm=MD5withRSA&ublicKey=#rsaPublicKey" /> <to uri="mock:result"/> </route>
<route> <from uri="direct:provider"/> <to uri="crypto:sign://provider?privateKey=#myPrivateKey&rovider=SUN" /> <to uri="crypto:verify://provider?publicKey=#myPublicKey&rovider=SUN" /> <to uri="mock:result"/> </route>
4) Changing the Signature Mesasge Header
It may be desirable to change the message header used to store the signature. A different header name can be specified in the route definition as follows
from("direct:signature-header").to("crypto:sign://another?privateKey=#myPrivateKey&signatureHeader=AnotherDigitalSignature", "crypto:verify://another?publicKey=#myPublicKey&signatureHeader=AnotherDigitalSignature", "mock:result");
or
<route> <from uri="direct:signature-header"/> <to uri="crypto:sign://another?privateKey=#myPrivateKey&ignatureHeader=AnotherDigitalSignature" /> <to uri="crypto:verify://another?publicKey=#myPublicKey&ignatureHeader=AnotherDigitalSignature" /> <to uri="mock:result"/> </route>
5) Changing the buffersize
In case you need to update the size of the buffer...
from("direct:buffersize").to("crypto:sign://buffer?privateKey=#myPrivateKey&buffersize=1024", "crypto:verify://buffer?publicKey=#myPublicKey&buffersize=1024", "mock:result");
or
<route> <from uri="direct:buffersize" /> <to uri="crypto:sign://buffer?privateKey=#myPrivateKey&uffersize=1024" /> <to uri="crypto:verify://buffer?publicKey=#myPublicKey&uffersize=1024" /> <to uri="mock:result"/> </route>
6) Supplying Keys dynamically.
When using a Recipient list or similar EIP the recipient of an exchange can vary dynamically. Using the same key across all recipients may neither be feasible or desirable. It would be useful to be able to specify the signature keys dynamically on a per exchange basis. The exchange could then be dynamically enriched with the key of its target recipient prior to signing. To facilitate this the signature mechanisms allow for keys to be supplied dynamically via the message headers below
Exchange.SIGNATURE_PRIVATE_KEY
,"CamelSignaturePrivateKey"
Exchange.SIGNATURE_PUBLIC_KEY_OR_CERT
,"CamelSignaturePublicKeyOrCert"
from("direct:headerkey-sign").to("crypto:sign://alias"); from("direct:headerkey-verify").to("crypto:verify://alias", "mock:result");
or
<route> <from uri="direct:headerkey-sign"/> <to uri="crypto:sign://headerkey" /> </route> <route> <from uri="direct:headerkey-verify"/> <to uri="crypto:verify://headerkey" /> <to uri="mock:result"/> </route>
Better again would be to dynamically supply a keystore alias. Again the alias can be supplied in a message header
Exchange.KEYSTORE_ALIAS
,"CamelSignatureKeyStoreAlias"
from("direct:alias-sign").to("crypto:sign://alias?keystore=#keystore"); from("direct:alias-verify").to("crypto:verify://alias?keystore=#keystore", "mock:result");
or
<route> <from uri="direct:alias-sign"/> <to uri="crypto:sign://alias?keystore=#keystore" /> </route> <route> <from uri="direct:alias-verify"/> <to uri="crypto:verify://alias?keystore=#keystore" /> <to uri="mock:result"/> </route>
The header would be set as follows
Exchange unsigned = getMandatoryEndpoint("direct:alias-sign").createExchange(); unsigned.getIn().setBody(payload); unsigned.getIn().setHeader(DigitalSignatureConstants.KEYSTORE_ALIAS, "bob"); unsigned.getIn().setHeader(DigitalSignatureConstants.KEYSTORE_PASSWORD, "letmein".toCharArray()); template.send("direct:alias-sign", unsigned); Exchange signed = getMandatoryEndpoint("direct:alias-sign").createExchange(); signed.getIn().copyFrom(unsigned.getOut()); signed.getIn().setHeader(KEYSTORE_ALIAS, "bob"); template.send("direct:alias-verify", signed);
See also:
- Crypto is also available as a Data Format
Chapter 26. CXF
CXF Component
The cxf: component provides integration with Apache CXF for connecting to JAX-WS services hosted in CXF.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cxf</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Note
If you want to learn about CXF dependencies, see the
WHICH-JARS
text file.
Note
When using CXF as a consumer, the CAMEL:CXF Bean Component allows you to factor out how message payloads are received from their processing as a RESTful or SOAP web service. This has the potential of using a multitude of transports to consume web services. The bean component's configuration is also simpler and provides the fastest method to implement web services using Camel and CXF.
Note
When using CXF in streaming modes (see DataFormat option), then also read about Stream caching.
URI format
cxf:bean:cxfEndpoint[?options]
Where cxfEndpoint represents a bean ID that references a bean in the Spring bean registry. With this URI format, most of the endpoint details are specified in the bean definition.
cxf://someAddress[?options]
Where someAddress specifies the CXF endpoint's address. With this URI format, most of the endpoint details are specified using options.
For either style above, you can append options to the URI as follows:
cxf:bean:cxfEndpoint?wsdlURL=wsdl/hello_world.wsdl&dataFormat=PAYLOAD
Options
Name | Required | Description |
---|---|---|
wsdlURL
|
No |
The location of the WSDL. WSDL is obtained from endpoint address by default. For example:
file://local/wsdl/hello.wsdl or wsdl/hello.wsdl
|
serviceClass
|
Yes |
The name of the SEI (Service Endpoint Interface) class. This class can have, but does not require, JSR181 annotations. Since 2.0, this option is only required by POJO mode. If the wsdlURL option is provided, serviceClass is not required for PAYLOAD and MESSAGE mode. When wsdlURL option is used without serviceClass, the serviceName and portName (endpointName for Spring configuration) options MUST be provided.
Since 2.0, it is possible to use
# notation to reference a serviceClass object instance from the registry..
Please be advised that the referenced object cannot be a Proxy (Spring AOP Proxy is OK) as it relies on
Object.getClass().getName() method for non Spring AOP Proxy.
Since 2.8, it is possible to omit both wsdlURL and serviceClass options for PAYLOAD and MESSAGE mode. When they are omitted, arbitrary XML elements can be put in CxfPayload's body in PAYLOAD mode to facilitate CXF Dispatch Mode.
For example:
org.apache.camel.Hello
|
serviceName
|
Only if more than one serviceName present in WSDL
|
The service name this service is implementing, it maps to the
wsdl:service@name . For example:
{http://org.apache.camel}ServiceName
|
endpointName
|
Only if more than one portName under the serviceName is present, and it is required for camel-cxf consumer since camel 2.2
|
The port name this service is implementing, it maps to the
wsdl:port@name . For example:
{http://org.apache.camel}PortName
|
dataFormat
|
No |
Which message data format the CXF endpoint supports. Possible values are: POJO (default), PAYLOAD , MESSAGE .
|
relayHeaders
|
No |
Please see the Description ofrelayHeaders option section for this option. Should a CXF endpoint relay headers along the route. Currently only available when dataFormat=POJO Default: true Example: true , false
|
wrapped
|
No |
Which kind of operation the CXF endpoint producer will invoke. Possible values are: true , false (default).
|
wrappedStyle
|
No |
Since 2.5.0 The WSDL style that describes how parameters are represented in the SOAP body. If the value is false , CXF will chose the document-literal unwrapped style, If the value is true , CXF will chose the document-literal wrapped style
|
setDefaultBus
|
No |
Specifies whether or not to use the default CXF bus for this endpoint. Possible values are: true , false (default).
|
bus
|
No |
Use
# notation to reference a bus object from the registry—for example, bus=#busName . The referenced object must be an instance of org.apache.cxf.Bus .
By default, uses the default bus created by CXF Bus Factory.
|
cxfBinding
|
No |
Use
# notation to reference a CXF binding object from the registry—for example, cxfBinding=#bindingName . The referenced object must be an instance of org.apache.camel.component.cxf.CxfBinding .
|
headerFilterStrategy
|
No |
Use # notation to reference a header filter strategy object from the registry—for example, headerFilterStrategy=#strategyName . The referenced object must be an instance of org.apache.camel.spi.HeaderFilterStrategy .
|
loggingFeatureEnabled
|
No |
New in 2.3, this option enables CXF Logging Feature which writes inbound and outbound SOAP messages to log. Possible values are: true , false (default).
|
defaultOperationName
|
No |
New in 2.4, this option will set the default
operationName that will be used by the CxfProducer that invokes the remote service. For example:
defaultOperationName =greetMe
|
defaultOperationNamespace
|
No |
New in 2.4, this option will set the default operationNamespace that will be used by the CxfProducer which invokes the remote service. For example:
defaultOperationNamespace = http://apache.org/hello_world_soap_http
|
synchronous
|
No |
New in 2.5, this option will let CXF endpoint decide to use sync or async API to do the underlying work. The default value is false , which means camel-cxf endpoint will try to use async API by default.
|
publishedEndpointUrl
|
No |
New in 2.5, this option overrides the endpoint URL that appears in the published WSDL that is accessed using the service address URL plus
?wsdl . For example:
publshedEndpointUrl=http://example.com/service
|
properties.propName
|
No |
Camel 2.8: Allows you to set custom CXF properties in the endpoint URI. For example, setting properties.mtom-enabled=true to enable MTOM. To make sure that CXF does not switch the thread when starting the invocation, you can set properties.org.apache.cxf.interceptor.OneWayProcessorInterceptor.USE_ORIGINAL_THREAD=true .
|
allowStreaming
|
No | New in 2.8.2. This option controls whether the CXF component, when running in PAYLOAD mode (see below), will DOM parse the incoming messages into DOM Elements or keep the payload as a javax.xml.transform.Source object that would allow streaming in some cases. |
skipFaultLogging
|
No | New in 2.11. This option controls whether the PhaseInterceptorChain skips logging the Fault that it catches. |
cxfEndpointConfigurer
|
No
|
New in Camel 2.11. This option could apply the implementation of
org.apache.camel.component.cxf.CxfEndpointConfigurer which supports to configure the CXF endpoint in programmatic way. Since Camel 2.15.0, user can configure the CXF server and client by implementing configure{Server|Client} method of CxfEndpointConfigurer .
|
username
|
No
|
New in Camel 2.12.3 This option is used to set the basic authentication information of username for the CXF client.
|
password
|
No
|
New in Camel 2.12.3 This option is used to set the basic authentication information of password for the CXF client.
|
continuationTimeout
|
No
|
New in Camel 2.14.0 This option is used to set the CXF continuation timeout which could be used in CxfConsumer by default when the CXF server is using Jetty or Servlet transport. (Before Camel 2.14.0, CxfConsumer just set the continuation timeout to be 0, which means the continuation suspend operation never timeout.)
Default: 30000 Example: continuation=80000
|
The
serviceName
and portName
are QNames, so if you provide them be sure to prefix them with their {namespace}
as shown in the examples above.
The descriptions of the dataformats
DataFormat | Description |
---|---|
POJO
|
POJOs (plain old Java objects) are the Java parameters to the method being invoked on the target server. Both Protocol and Logical JAX-WS handlers are supported. |
PAYLOAD
|
PAYLOAD is the message payload (the contents of the soap:body ) after message configuration in the CXF endpoint is applied. Only Protocol JAX-WS handler is supported. Logical JAX-WS handler is not supported.
|
MESSAGE
|
MESSAGE is the raw message that is received from the transport layer. It is not suppose to touch or change Stream, some of the CXF interceptors will be removed if you are using this kind of DataFormat so you can't see any soap headers after the camel-cxf consumer and JAX-WS handler is not supported.
|
CXF_MESSAGE
|
New in Camel 2.8.2, CXF_MESSAGE allows for invoking the full capabilities of CXF interceptors by converting the message from the transport layer into a raw SOAP message
|
You can determine the data format mode of an exchange by retrieving the exchange property,
CamelCXFDataFormat
. The exchange key constant is defined in org.apache.camel.component.cxf.CxfConstants.DATA_FORMAT_PROPERTY
.
Configuring the CXF Endpoints with Apache Aries Blueprint.
Since Camel 2.8, there is support for using Aries blueprint dependency injection for your CXF endpoints. The schema is very similar to the Spring schema, so the transition is fairly transparent.
For example:
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" xmlns:camel-cxf="http://camel.apache.org/schema/blueprint/cxf" xmlns:cxfcore="http://cxf.apache.org/blueprint/core" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> <camel-cxf:cxfEndpoint id="routerEndpoint" address="http://localhost:9001/router" serviceClass="org.apache.servicemix.examples.cxf.HelloWorld"> <camel-cxf:properties> <entry key="dataFormat" value="MESSAGE"/> </camel-cxf:properties> </camel-cxf:cxfEndpoint> <camel-cxf:cxfEndpoint id="serviceEndpoint" address="http://localhost:9000/SoapContext/SoapPort" serviceClass="org.apache.servicemix.examples.cxf.HelloWorld"> </camel-cxf:cxfEndpoint> <camelContext xmlns="http://camel.apache.org/schema/blueprint"> <route> <from uri="routerEndpoint"/> <to uri="log:request"/> </route> </camelContext> </blueprint>
Currently the endpoint element is the first supported CXF namespacehandler.
You can also use the bean references just as in spring
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" xmlns:jaxws="http://cxf.apache.org/blueprint/jaxws" xmlns:cxf="http://cxf.apache.org/blueprint/core" xmlns:camel="http://camel.apache.org/schema/blueprint" xmlns:camelcxf="http://camel.apache.org/schema/blueprint/cxf" xsi:schemaLocation=" http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd http://cxf.apache.org/blueprint/jaxws http://cxf.apache.org/schemas/blueprint/jaxws.xsd http://cxf.apache.org/blueprint/core http://cxf.apache.org/schemas/blueprint/core.xsd "> <camelcxf:cxfEndpoint id="reportIncident" address="/camel-example-cxf-blueprint/webservices/incident" wsdlURL="META-INF/wsdl/report_incident.wsdl" serviceClass="org.apache.camel.example.reportincident.ReportIncidentEndpoint"> </camelcxf:cxfEndpoint> <bean id="reportIncidentRoutes" class="org.apache.camel.example.reportincident.ReportIncidentRoutes" /> <camelContext xmlns="http://camel.apache.org/schema/blueprint"> <routeBuilder ref="reportIncidentRoutes"/> </camelContext> </blueprint>
How to enable CXF's LoggingOutInterceptor in MESSAGE mode
CXF's
LoggingOutInterceptor
outputs outbound message that goes on the wire to logging system (java.util.logging
). Since the LoggingOutInterceptor
is in PRE_STREAM
phase (but PRE_STREAM
phase is removed in MESSAGE
mode), you have to configure LoggingOutInterceptor
to be run during the WRITE
phase. The following is an example.
<bean id="loggingOutInterceptor" class="org.apache.cxf.interceptor.LoggingOutInterceptor"> <!-- it really should have been user-prestream but CXF does have such phase! --> <constructor-arg value="target/write"/> </bean> <cxf:cxfEndpoint id="serviceEndpoint" address="http://localhost:9002/helloworld" serviceClass="org.apache.camel.component.cxf.HelloService"> <cxf:outInterceptors> <ref bean="loggingOutInterceptor"/> </cxf:outInterceptors> <cxf:properties> <entry key="dataFormat" value="MESSAGE"/> </cxf:properties> </cxf:cxfEndpoint>
Description of relayHeaders option
There are in-band and out-of-band on-the-wire headers from the perspective of a JAXWS WSDL-first developer.
The in-band headers are headers that are explicitly defined as part of the WSDL binding contract for an endpoint such as SOAP headers.
The out-of-band headers are headers that are serialized over the wire, but are not explicitly part of the WSDL binding contract.
Headers relaying/filtering is bi-directional.
When a route has a CXF endpoint and the developer needs to have on-the-wire headers, such as SOAP headers, be relayed along the route to be consumed say by another JAXWS endpoint, then
relayHeaders
should be set to true
, which is the default value.
Available only in POJO mode
The
relayHeaders=true
setting expresses an intent to relay the headers. The actual decision on whether a given header is relayed is delegated to a pluggable instance that implements the MessageHeadersRelay
interface. A concrete implementation of MessageHeadersRelay
will be consulted to decide if a header needs to be relayed or not. There is already an implementation of SoapMessageHeadersRelay
which binds itself to well-known SOAP name spaces. Currently only out-of-band headers are filtered, and in-band headers will always be relayed when relayHeaders=true
. If there is a header on the wire, whose name space is unknown to the runtime, then a fall back DefaultMessageHeadersRelay
will be used, which simply allows all headers to be relayed.
The
relayHeaders=false
setting asserts that all headers, in-band and out-of-band, will be dropped.
You can plugin your own
MessageHeadersRelay
implementations overriding or adding additional ones to the list of relays. In order to override a preloaded relay instance just make sure that your MessageHeadersRelay
implementation services the same name spaces as the one you looking to override. Also note, that the overriding relay has to service all of the name spaces as the one you looking to override, or else a runtime exception on route start up will be thrown as this would introduce an ambiguity in name spaces to relay instance mappings.
<cxf:cxfEndpoint ...> <cxf:properties> <entry key="org.apache.camel.cxf.message.headers.relays"> <list> <ref bean="customHeadersRelay"/> </list> </entry> </cxf:properties> </cxf:cxfEndpoint> <bean id="customHeadersRelay" class="org.apache.camel.component.cxf.soap.headers.CustomHeadersRelay"/>
Take a look at the tests that show how you'd be able to relay/drop headers here:
Changes since Release 2.0
POJO
andPAYLOAD
modes are supported. InPOJO
mode, only out-of-band message headers are available for filtering as the in-band headers have been processed and removed from the header list by CXF. The in-band headers are incorporated into theMessageContentList
inPOJO
mode. Thecamel-cxf
component does make any attempt to remove the in-band headers from theMessageContentList
If filtering of in-band headers is required, please usePAYLOAD
mode or plug in a (pretty straightforward) CXF interceptor/JAXWS Handler to the CXF endpoint.- The Message Header Relay mechanism has been merged into
CxfHeaderFilterStrategy
. TherelayHeaders
option, its semantics, and default value remain the same, but it is a property ofCxfHeaderFilterStrategy
. Here is an example of configuring it.<bean id="dropAllMessageHeadersStrategy" class="org.apache.camel.component.cxf.common.header.CxfHeaderFilterStrategy"> <!-- Set relayHeaders to false to drop all SOAP headers --> <property name="relayHeaders" value="false"/> </bean>
Then, your endpoint can reference theCxfHeaderFilterStrategy
.<route> <from uri="cxf:bean:routerNoRelayEndpoint?headerFilterStrategy=#dropAllMessageHeadersStrategy"/> <to uri="cxf:bean:serviceNoRelayEndpoint?headerFilterStrategy=#dropAllMessageHeadersStrategy"/> </route>
- The
MessageHeadersRelay
interface has changed slightly and has been renamed toMessageHeaderFilter
. It is a property ofCxfHeaderFilterStrategy
. Here is an example of configuring user defined Message Header Filters:<bean id="customMessageFilterStrategy" class="org.apache.camel.component.cxf.common.header.CxfHeaderFilterStrategy"> <property name="messageHeaderFilters"> <list> <!-- SoapMessageHeaderFilter is the built in filter. It can be removed by omitting it. --> <bean class="org.apache.camel.component.cxf.common.header.SoapMessageHeaderFilter"/> <!-- Add custom filter here --> <bean class="org.apache.camel.component.cxf.soap.headers.CustomHeaderFilter"/> </list> </property> </bean>
- Other than
relayHeaders
, there are new properties that can be configured inCxfHeaderFilterStrategy
.
Name | Description | type | Required? | Default value |
---|---|---|---|---|
relayHeaders
|
All message headers will be processed by Message Header Filters |
boolean
|
No |
true (1.6.1 behavior)
|
relayAllMessageHeaders
|
All message headers will be propagated (without processing by Message Header Filters) |
boolean
|
No |
false (1.6.1 behavior)
|
allowFilterNamespaceClash
|
If two filters overlap in activation namespace, the property control how it should be handled. If the value is true , last one wins. If the value is false , it will throw an exception
|
boolean
|
No |
false (1.6.1 behavior)
|
Configure the CXF endpoints with Spring
You can configure the CXF endpoint with the Spring configuration file shown below, and you can also embed the endpoint into the
camelContext
tags. When you are invoking the service endpoint, you can set the operationName
and operationNamespace
headers to explicitly state which operation you are calling.
NOTE In Camel 2.x we change to use
http://camel.apache.org/schema/cxf
as the CXF endpoint's target namespace.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://camel.apache.org/schema/cxf" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> ...
Note
In Apache Camel 2.x, the
http://activemq.apache.org/camel/schema/cxfEndpoint
namespace was changed to http://camel.apache.org/schema/cxf
.
Be sure to include the JAX-WS
schemaLocation
attribute specified on the root beans element. This allows CXF to validate the file and is required. Also note the namespace declarations at the end of the <cxf:cxfEndpoint/>
tag--these are required because the combined {namespace}localName
syntax is presently not supported for this tag's attribute values.
The
cxf:cxfEndpoint
element supports many additional attributes:
Name | Value |
---|---|
PortName
|
The endpoint name this service is implementing, it maps to the wsdl:port@name . In the format of ns:PORT_NAME where ns is a namespace prefix valid at this scope.
|
serviceName
|
The service name this service is implementing, it maps to the wsdl:service@name . In the format of ns:SERVICE_NAME where ns is a namespace prefix valid at this scope.
|
wsdlURL
|
The location of the WSDL. Can be on the classpath, file system, or be hosted remotely. |
bindingId
|
The bindingId for the service model to use.
|
address
|
The service publish address. |
bus
|
The bus name that will be used in the JAX-WS endpoint. |
serviceClass
|
The class name of the SEI (Service Endpoint Interface) class which could have JSR181 annotation or not. |
It also supports many child elements:
Name | Value |
---|---|
cxf:inInterceptors
|
The incoming interceptors for this endpoint. A list of <bean> or <ref> .
|
cxf:inFaultInterceptors
|
The incoming fault interceptors for this endpoint. A list of <bean> or <ref> .
|
cxf:outInterceptors
|
The outgoing interceptors for this endpoint. A list of <bean> or <ref> .
|
cxf:outFaultInterceptors
|
The outgoing fault interceptors for this endpoint. A list of <bean> or <ref> .
|
cxf:properties
|
A properties map which should be supplied to the JAX-WS endpoint. See below. |
cxf:handlers
|
A JAX-WS handler list which should be supplied to the JAX-WS endpoint. See below. |
cxf:dataBinding
|
You can specify the which DataBinding will be use in the endpoint. This can be supplied using the Spring <bean class="MyDataBinding"/> syntax.
|
cxf:binding
|
You can specify the BindingFactory for this endpoint to use. This can be supplied using the Spring <bean class="MyBindingFactory"/> syntax.
|
cxf:features
|
The features that hold the interceptors for this endpoint. A list of <bean> s or <ref> s
|
cxf:schemaLocations
|
The schema locations for endpoint to use. A list of <schemaLocation> s
|
cxf:serviceFactory
|
The service factory for this endpoint to use. This can be supplied using the Spring <bean class="MyServiceFactory"/> syntax
|
You can find more advanced examples which show how to provide interceptors, properties and handlers here: http://cxf.apache.org/docs/jax-ws-configuration.html
Note
You can use CXF:properties to set the CXF endpoint's
dataFormat
and setDefaultBus
properties from a Spring configuration file, as follows:
<cxf:cxfEndpoint id="testEndpoint" address="http://localhost:9000/router" serviceClass="org.apache.camel.component.cxf.HelloService" endpointName="s:PortName" serviceName="s:ServiceName" xmlns:s="http://www.example.com/test"> <cxf:properties> <entry key="dataFormat" value="MESSAGE"/> <entry key="setDefaultBus" value="true"/> </cxf:properties> </cxf:cxfEndpoint>
How to make the camel-cxf component use log4j instead of java.util.logging
CXF's default logger is
java.util.logging
. If you want to change it to log4j
, proceed as follows. Create a file, in the classpath, named META-INF/cxf/org.apache.cxf.logger
. This file should contain the fully-qualified name of the class, org.apache.cxf.common.logging.Log4jLogger
, with no comments, on a single line.
How to let camel-cxf response message with xml start document
If you are using some SOAP client such as PHP, you will get this kind of error, because CXF doesn't add the XML start document
<?xml version="1.0" encoding="utf-8"?>
.
Error:sendSms: SoapFault exception: [Client] looks like we got no XML document in [...]
To resolved this issue, you just need to tell StaxOutInterceptor to write the XML start document for you.
public class WriteXmlDeclarationInterceptor extends AbstractPhaseInterceptor<SoapMessage> { public WriteXmlDeclarationInterceptor() { super(Phase.PRE_STREAM); addBefore(StaxOutInterceptor.class.getName()); } public void handleMessage(SoapMessage message) throws Fault { message.put("org.apache.cxf.stax.force-start-document", Boolean.TRUE); } }
You can add a customer interceptor like this and configure it into you
camel-cxf
endpont
<cxf:cxfEndpoint id="routerEndpoint" address="http://localhost:${CXFTestSupport.port2}/CXFGreeterRouterTest/CamelContext/RouterPort" serviceClass="org.apache.hello_world_soap_http.GreeterImpl" skipFaultLogging="true"> <cxf:outInterceptors> <!-- This interceptor will force the CXF server send the XML start document to client --> <bean class="org.apache.camel.component.cxf.WriteXmlDeclarationInterceptor"/> </cxf:outInterceptors> <cxf:properties> <!-- Set the publishedEndpointUrl which could override the service address from generated WSDL as you want --> <entry key="publishedEndpointUrl" value="http://www.simple.com/services/test" /> </cxf:properties> </cxf:cxfEndpoint>
Or adding a message header for it like this if you are using Camel 2.4.
// set up the response context which force start document Map<String, Object> map = new HashMap<String, Object>(); map.put("org.apache.cxf.stax.force-start-document", Boolean.TRUE); exchange.getOut().setHeader(Client.RESPONSE_CONTEXT, map);
How to consume a message from a camel-cxf endpoint in POJO data format
The
camel-cxf
endpoint consumer POJO
data format is based on the cxf invoker, so the message header has a property with the name of CxfConstants.OPERATION_NAME
and the message body is a list of the SEI method parameters.
public class PersonProcessor implements Processor { private static final transient Logger LOG = LoggerFactory.getLogger(PersonProcessor.class); @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { LOG.info("processing exchange in camel"); BindingOperationInfo boi = (BindingOperationInfo)exchange.getProperty(BindingOperationInfo.class.toString()); if (boi != null) { LOG.info("boi.isUnwrapped" + boi.isUnwrapped()); } // Get the parameters list which element is the holder. MessageContentsList msgList = (MessageContentsList)exchange.getIn().getBody(); Holder<String> personId = (Holder<String>)msgList.get(0); Holder<String> ssn = (Holder<String>)msgList.get(1); Holder<String> name = (Holder<String>)msgList.get(2); if (personId.value == null || personId.value.length() == 0) { LOG.info("person id 123, so throwing exception"); // Try to throw out the soap fault message org.apache.camel.wsdl_first.types.UnknownPersonFault personFault = new org.apache.camel.wsdl_first.types.UnknownPersonFault(); personFault.setPersonId(""); org.apache.camel.wsdl_first.UnknownPersonFault fault = new org.apache.camel.wsdl_first.UnknownPersonFault("Get the null value of person name", personFault); // Since camel has its own exception handler framework, we can't throw the exception to trigger it // We just set the fault message in the exchange for camel-cxf component handling and return exchange.getOut().setFault(true); exchange.getOut().setBody(fault); return; } name.value = "Bonjour"; ssn.value = "123"; LOG.info("setting Bonjour as the response"); // Set the response message, first element is the return value of the operation, // the others are the holders of method parameters exchange.getOut().setBody(new Object[] {null, personId, ssn, name}); } }
How to prepare the message for the camel-cxf endpoint in POJO data format
The
camel-cxf
endpoint producer is based on the cxf client API. First you need to specify the operation name in the message header, then add the method parameters to a list, and initialize the message with this parameter list. The response message's body is a messageContentsList
, you can get the result from that list.
If you don't specify the operation name in the message header,
CxfProducer
will try to use the defaultOperationName
from CxfEndpoint
. If there is no defaultOperationName
set on CxfEndpoint
, it will pick up the first operation name from the operation list.
If you want to get the object array from the message body, you can get the body using
message.getbody(Object[].class)
, as follows:
Exchange senderExchange = new DefaultExchange(context, ExchangePattern.InOut); final List<String> params = new ArrayList<String>(); // Prepare the request message for the camel-cxf procedure params.add(TEST_MESSAGE); senderExchange.getIn().setBody(params); senderExchange.getIn().setHeader(CxfConstants.OPERATION_NAME, ECHO_OPERATION); Exchange exchange = template.send("direct:EndpointA", senderExchange); org.apache.camel.Message out = exchange.getOut(); // The response message's body is an MessageContentsList which first element is the return value of the operation, // If there are some holder parameters, the holder parameter will be filled in the reset of List. // The result will be extract from the MessageContentsList with the String class type MessageContentsList result = (MessageContentsList)out.getBody(); LOG.info("Received output text: " + result.get(0)); Map<String, Object> responseContext = CastUtils.cast((Map<?, ?>)out.getHeader(Client.RESPONSE_CONTEXT)); assertNotNull(responseContext); assertEquals("We should get the response context here", "UTF-8", responseContext.get(org.apache.cxf.message.Message.ENCODING)); assertEquals("Reply body on Camel is wrong", "echo " + TEST_MESSAGE, result.get(0));
How to deal with the message for a camel-cxf endpoint in PAYLOAD data format
In Apache Camel 2.0:
CxfMessage.getBody()
will return an org.apache.camel.component.cxf.CxfPayload
object, which has getters for SOAP message headers and Body elements. This change enables decoupling the native CXF message from the Apache Camel message.
protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { from(SIMPLE_ENDPOINT_URI + "&dataFormat=PAYLOAD").to("log:info").process(new Processor() { @SuppressWarnings("unchecked") public void process(final Exchange exchange) throws Exception { CxfPayload<SoapHeader> requestPayload = exchange.getIn().getBody(CxfPayload.class); List<Source> inElements = requestPayload.getBodySources(); List<Source> outElements = new ArrayList<Source>(); // You can use a customer toStringConverter to turn a CxfPayLoad message into String as you want String request = exchange.getIn().getBody(String.class); XmlConverter converter = new XmlConverter(); String documentString = ECHO_RESPONSE; Element in = new XmlConverter().toDOMElement(inElements.get(0)); // Just check the element namespace if (!in.getNamespaceURI().equals(ELEMENT_NAMESPACE)) { throw new IllegalArgumentException("Wrong element namespace"); } if (in.getLocalName().equals("echoBoolean")) { documentString = ECHO_BOOLEAN_RESPONSE; checkRequest("ECHO_BOOLEAN_REQUEST", request); } else { documentString = ECHO_RESPONSE; checkRequest("ECHO_REQUEST", request); } Document outDocument = converter.toDOMDocument(documentString); outElements.add(new DOMSource(outDocument.getDocumentElement())); // set the payload header with null CxfPayload<SoapHeader> responsePayload = new CxfPayload<SoapHeader>(null, outElements, null); exchange.getOut().setBody(responsePayload); } }); } }; }
How to get and set SOAP headers in POJO mode
POJO
means that the data format is a list of Java objects when the CXF endpoint produces or consumes Camel exchanges. Even though Apache Camel exposes the message body as POJOs in this mode, the CXF component still provides access to read and write SOAP headers. However, since CXF interceptors remove in-band SOAP headers from the header list after they have been processed, only out-of-band SOAP headers are available in POJO mode.
The following example illustrates how to get/set SOAP headers. Suppose we have a route that forwards from one CXF endpoint to another. That is, SOAP Client -> Apache Camel -> CXF service. We can attach two processors to obtain/insert SOAP headers at (1) before request goes out to the CXF service and (2) before response comes back to the SOAP Client. Processor (1) and (2) in this example are InsertRequestOutHeaderProcessor and InsertResponseOutHeaderProcessor. Our route looks like this:
<route> <from uri="cxf:bean:routerRelayEndpointWithInsertion"/> <process ref="InsertRequestOutHeaderProcessor" /> <to uri="cxf:bean:serviceRelayEndpointWithInsertion"/> <process ref="InsertResponseOutHeaderProcessor" /> </route>
In 2.x SOAP headers are propagated to and from Apache Camel Message headers. The Apache Camel message header name is
org.apache.cxf.headers.Header.list
, which is a constant defined in CXF (org.apache.cxf.headers.Header.HEADER_LIST
). The header value is a List<>
of CXF SoapHeader
objects (org.apache.cxf.binding.soap.SoapHeader
). The following snippet is the InsertResponseOutHeaderProcessor
(that inserts a new SOAP header in the response message). The way to access SOAP headers in both InsertResponseOutHeaderProcessor
and InsertRequestOutHeaderProcessor
are actually the same. The only difference between the two processors is setting the direction of the inserted SOAP header.
public static class InsertResponseOutHeaderProcessor implements Processor { @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { // You should be able to get the header if exchange is routed from camel-cxf endpoint List<SoapHeader> soapHeaders = CastUtils.cast((List<?>)exchange.getIn().getHeader(Header.HEADER_LIST)); if (soapHeaders == null) { // we just create a new soap headers in case the header is null soapHeaders = new ArrayList<SoapHeader>(); } // Insert a new header String xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?><outofbandHeader " + "xmlns=\"http://cxf.apache.org/outofband/Header\" hdrAttribute=\"testHdrAttribute\" " + "xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" soap:mustUnderstand=\"1\">" + "<name>New_testOobHeader</name><value>New_testOobHeaderValue</value></outofbandHeader>"; SoapHeader newHeader = new SoapHeader(soapHeaders.get(0).getName(), DOMUtils.readXml(new StringReader(xml)).getDocumentElement()); // make sure direction is OUT since it is a response message. newHeader.setDirection(Direction.DIRECTION_OUT); //newHeader.setMustUnderstand(false); soapHeaders.add(newHeader); } }
How to get and set SOAP headers in PAYLOAD mode
We have already shown how to access SOAP message (
CxfPayload
object) in PAYLOAD
mode (see the section called “How to deal with the message for a camel-cxf endpoint in PAYLOAD data format”).
Once you obtain a
CxfPayload
object, you can invoke the CxfPayload.getHeaders()
method that returns a List
of DOM Elements (SOAP headers).
from(getRouterEndpointURI()).process(new Processor() { @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { CxfPayload<SoapHeader> payload = exchange.getIn().getBody(CxfPayload.class); List<Source> elements = payload.getBodySources(); assertNotNull("We should get the elements here", elements); assertEquals("Get the wrong elements size", 1, elements.size()); Element el = new XmlConverter().toDOMElement(elements.get(0)); elements.set(0, new DOMSource(el)); assertEquals("Get the wrong namespace URI", "http://camel.apache.org/pizza/types", el.getNamespaceURI()); List<SoapHeader> headers = payload.getHeaders(); assertNotNull("We should get the headers here", headers); assertEquals("Get the wrong headers size", headers.size(), 1); assertEquals("Get the wrong namespace URI", ((Element)(headers.get(0).getObject())).getNamespaceURI(), "http://camel.apache.org/pizza/types"); } }) .to(getServiceEndpointURI());
SOAP headers are not available in MESSAGE mode
SOAP headers are not available in
MESSAGE
mode as SOAP processing is skipped.
How to throw a SOAP Fault from Apache Camel
If you are using a CXF endpoint to consume the SOAP request, you may need to throw the SOAP
Fault
from the camel context. Basically, you can use the throwFault
DSL to do that; it works for POJO
, PAYLOAD
and MESSAGE
data format. You can define the soap fault like this:
SOAP_FAULT = new SoapFault(EXCEPTION_MESSAGE, SoapFault.FAULT_CODE_CLIENT); Element detail = SOAP_FAULT.getOrCreateDetail(); Document doc = detail.getOwnerDocument(); Text tn = doc.createTextNode(DETAIL_TEXT); detail.appendChild(tn);
Then throw it as you like:
from(routerEndpointURI).setFaultBody(constant(SOAP_FAULT));
If your CXF endpoint is working in the
MESSAGE
data format, you could set the the SOAP Fault message in the message body and set the response code in the message header.
from(routerEndpointURI).process(new Processor() { public void process(Exchange exchange) throws Exception { Message out = exchange.getOut(); // Set the message body with the out.setBody(this.getClass().getResourceAsStream("SoapFaultMessage.xml")); // Set the response code here out.setHeader(org.apache.cxf.message.Message.RESPONSE_CODE, new Integer(500)); } });
The same is true for the POJO data format. You can set the SOAP Fault on the Out body and also indicate it's a fault by calling
Message.setFault(true)
, as follows:
from("direct:start").onException(SoapFault.class).maximumRedeliveries(0).handled(true) .process(new Processor() { public void process(Exchange exchange) throws Exception { SoapFault fault = exchange .getProperty(Exchange.EXCEPTION_CAUGHT, SoapFault.class); exchange.getOut().setFault(true); exchange.getOut().setBody(fault); } }).end().to(serviceURI);
How to propagate a CXF endpoint's request and response context
cxf client API provides a way to invoke the operation with request and response context. If you are using a CXF endpoint producer to invoke the external Web service, you can set the request context and get the response context with the following code:
CxfExchange exchange = (CxfExchange)template.send(getJaxwsEndpointUri(), new Processor() { public void process(final Exchange exchange) { final List<String> params = new ArrayList<String>(); params.add(TEST_MESSAGE); // Set the request context to the inMessage Map<String, Object> requestContext = new HashMap<String, Object>(); requestContext.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, JAXWS_SERVER_ADDRESS); exchange.getIn().setBody(params); exchange.getIn().setHeader(Client.REQUEST_CONTEXT , requestContext); exchange.getIn().setHeader(CxfConstants.OPERATION_NAME, GREET_ME_OPERATION); } }); org.apache.camel.Message out = exchange.getOut(); // The output is an object array, the first element of the array is the return value Object\[\] output = out.getBody(Object\[\].class); LOG.info("Received output text: " + output\[0\]); // Get the response context form outMessage Map<String, Object> responseContext = CastUtils.cast((Map)out.getHeader(Client.RESPONSE_CONTEXT)); assertNotNull(responseContext); assertEquals("Get the wrong wsdl opertion name", "{http://apache.org/hello_world_soap_http}greetMe", responseContext.get("javax.xml.ws.wsdl.operation").toString());
Attachment Support
POJO Mode: Both SOAP with Attachment and MTOM are supported (see example in Payload Mode for enabling MTOM). However, SOAP with Attachment is not tested. Since attachments are marshalled and unmarshalled into POJOs, users typically do not need to deal with the attachment themself. Attachments are propagated to Camel message's attachments since 2.1. So, it is possible to retreive attachments by Camel Message API
DataHandler Message.getAttachment(String id)
.
Payload Mode: MTOM is supported since 2.1. Attachments can be retrieved by Camel Message APIs mentioned above. SOAP with Attachment is not supported as there is no SOAP processing in this mode.
To enable MTOM, set the CXF endpoint property "mtom_enabled" to true. (I believe you can only do it with Spring.)
<cxf:cxfEndpoint id="routerEndpoint" address="http://localhost:${CXFTestSupport.port1}/CxfMtomRouterPayloadModeTest/jaxws-mtom/hello" wsdlURL="mtom.wsdl" serviceName="ns:HelloService" endpointName="ns:HelloPort" xmlns:ns="http://apache.org/camel/cxf/mtom_feature"> <cxf:properties> <!-- enable mtom by setting this property to true --> <entry key="mtom-enabled" value="true"/> <!-- set the camel-cxf endpoint data fromat to PAYLOAD mode --> <entry key="dataFormat" value="PAYLOAD"/> </cxf:properties>
You can produce a Camel message with attachment to send to a CXF endpoint in Payload mode.
Exchange exchange = context.createProducerTemplate().send("direct:testEndpoint", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); List<Source> elements = new ArrayList<Source>(); elements.add(new DOMSource(DOMUtils.readXml(new StringReader(MtomTestHelper.REQ_MESSAGE)).getDocumentElement())); CxfPayload<SoapHeader> body = new CxfPayload<SoapHeader>(new ArrayList<SoapHeader>(), elements, null); exchange.getIn().setBody(body); exchange.getIn().addAttachment(MtomTestHelper.REQ_PHOTO_CID, new DataHandler(new ByteArrayDataSource(MtomTestHelper.REQ_PHOTO_DATA, "application/octet-stream"))); exchange.getIn().addAttachment(MtomTestHelper.REQ_IMAGE_CID, new DataHandler(new ByteArrayDataSource(MtomTestHelper.requestJpeg, "image/jpeg"))); } }); // process response CxfPayload<SoapHeader> out = exchange.getOut().getBody(CxfPayload.class); Assert.assertEquals(1, out.getBody().size()); Map<String, String> ns = new HashMap<String, String>(); ns.put("ns", MtomTestHelper.SERVICE_TYPES_NS); ns.put("xop", MtomTestHelper.XOP_NS); XPathUtils xu = new XPathUtils(ns); Element oute = new XmlConverter().toDOMElement(out.getBody().get(0)); Element ele = (Element)xu.getValue("//ns:DetailResponse/ns:photo/xop:Include", oute, XPathConstants.NODE); String photoId = ele.getAttribute("href").substring(4); // skip "cid:" ele = (Element)xu.getValue("//ns:DetailResponse/ns:image/xop:Include", oute, XPathConstants.NODE); String imageId = ele.getAttribute("href").substring(4); // skip "cid:" DataHandler dr = exchange.getOut().getAttachment(photoId); Assert.assertEquals("application/octet-stream", dr.getContentType()); MtomTestHelper.assertEquals(MtomTestHelper.RESP_PHOTO_DATA, IOUtils.readBytesFromStream(dr.getInputStream())); dr = exchange.getOut().getAttachment(imageId); Assert.assertEquals("image/jpeg", dr.getContentType()); BufferedImage image = ImageIO.read(dr.getInputStream()); Assert.assertEquals(560, image.getWidth()); Assert.assertEquals(300, image.getHeight());
You can also consume a Camel message received from a CXF endpoint in Payload mode.
public static class MyProcessor implements Processor { @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { CxfPayload<SoapHeader> in = exchange.getIn().getBody(CxfPayload.class); // verify request assertEquals(1, in.getBody().size()); Map<String, String> ns = new HashMap<String, String>(); ns.put("ns", MtomTestHelper.SERVICE_TYPES_NS); ns.put("xop", MtomTestHelper.XOP_NS); XPathUtils xu = new XPathUtils(ns); Element body = new XmlConverter().toDOMElement(in.getBody().get(0)); Element ele = (Element)xu.getValue("//ns:Detail/ns:photo/xop:Include", body, XPathConstants.NODE); String photoId = ele.getAttribute("href").substring(4); // skip "cid:" assertEquals(MtomTestHelper.REQ_PHOTO_CID, photoId); ele = (Element)xu.getValue("//ns:Detail/ns:image/xop:Include", body, XPathConstants.NODE); String imageId = ele.getAttribute("href").substring(4); // skip "cid:" assertEquals(MtomTestHelper.REQ_IMAGE_CID, imageId); DataHandler dr = exchange.getIn().getAttachment(photoId); assertEquals("application/octet-stream", dr.getContentType()); MtomTestHelper.assertEquals(MtomTestHelper.REQ_PHOTO_DATA, IOUtils.readBytesFromStream(dr.getInputStream())); dr = exchange.getIn().getAttachment(imageId); assertEquals("image/jpeg", dr.getContentType()); MtomTestHelper.assertEquals(MtomTestHelper.requestJpeg, IOUtils.readBytesFromStream(dr.getInputStream())); // create response List<Source> elements = new ArrayList<Source>(); elements.add(new DOMSource(DOMUtils.readXml(new StringReader(MtomTestHelper.RESP_MESSAGE)).getDocumentElement())); CxfPayload<SoapHeader> sbody = new CxfPayload<SoapHeader>(new ArrayList<SoapHeader>(), elements, null); exchange.getOut().setBody(sbody); exchange.getOut().addAttachment(MtomTestHelper.RESP_PHOTO_CID, new DataHandler(new ByteArrayDataSource(MtomTestHelper.RESP_PHOTO_DATA, "application/octet-stream"))); exchange.getOut().addAttachment(MtomTestHelper.RESP_IMAGE_CID, new DataHandler(new ByteArrayDataSource(MtomTestHelper.responseJpeg, "image/jpeg"))); } }
Message Mode: Attachments are not supported as it does not process the message at all.
CXF_MESSAGE Mode: MTOM is supported, and Attachments can be retrieved by Camel Message APIs mentioned above. Note that when receiving a multipart (that is, MTOM) message the default
SOAPMessage
to String
converter will provide the complete multi-part payload on the body. If you require just the SOAP XML as a String
, you can set the message body with message.getSOAPPart()
, and Camel convert can do the rest of work for you.
How to propagate stack trace information
It is possible to configure a CXF endpoint so that, when a Java exception is thrown on the server side, the stack trace for the exception is marshalled into a fault message and returned to the client. To enable this feaure, set the
dataFormat
to PAYLOAD
and set the faultStackTraceEnabled
property to true
in the cxfEndpoint
element, as follows:
<cxf:cxfEndpoint id="router" address="http://localhost:9002/TestMessage" wsdlURL="ship.wsdl" endpointName="s:TestSoapEndpoint" serviceName="s:TestService" xmlns:s="http://test"> <cxf:properties> <!-- enable sending the stack trace back to client; the default value is false--> <entry key="faultStackTraceEnabled" value="true" /> <entry key="dataFormat" value="PAYLOAD" /> </cxf:properties> </cxf:cxfEndpoint>
For security reasons, the stack trace does not include the causing exception (that is, the part of a stack trace that follows
Caused by
). If you want to include the causing exception in the stack trace, set the exceptionMessageCauseEnabled
property to true
in the cxfEndpoint
element, as follows:
<cxf:cxfEndpoint id="router" address="http://localhost:9002/TestMessage"
wsdlURL="ship.wsdl"
endpointName="s:TestSoapEndpoint"
serviceName="s:TestService"
xmlns:s="http://test">
<cxf:properties>
<!-- enable to show the cause exception message and the default value is false -->
<entry key="exceptionMessageCauseEnabled" value="true" />
<!-- enable to send the stack trace back to client, the default value is false-->
<entry key="faultStackTraceEnabled" value="true" />
<entry key="dataFormat" value="PAYLOAD" />
</cxf:properties>
</cxf:cxfEndpoint>
Warning
You should only enable the
exceptionMessageCauseEnabled
flag for testing and diagnostic purposes. It is normal practice for servers to conceal the original cause of an exception to make it harder for hostile users to probe the server.
Streaming Support in PAYLOAD mode
In 2.8.2, the camel-cxf component now supports streaming of incoming messages when using PAYLOAD mode. Previously, the incoming messages would have been completely DOM parsed. For large messages, this is time consuming and uses a significant amount of memory. Starting in 2.8.2, the incoming messages can remain as a javax.xml.transform.Source while being routed and, if nothing modifies the payload, can then be directly streamed out to the target destination. For common "simple proxy" use cases (example: from("cxf:...").to("cxf:...")), this can provide very significant performance increases as well as significantly lowered memory requirements.
However, there are cases where streaming may not be appropriate or desired. Due to the streaming nature, invalid incoming XML may not be caught until later in the processing chain. Also, certain actions may require the message to be DOM parsed anyway (like WS-Security or message tracing and such) in which case the advantages of the streaming is limited. At this point, there are two ways to control the streaming:
- Endpoint property: you can add "allowStreaming=false" as an endpoint property to turn the streaming on/off.
- Component property: the CxfComponent object also has an allowStreaming property that can set the default for endpoints created from that component.
- Global system property: you can add a system property of "org.apache.camel.component.cxf.streaming" to "false" to turn if off. That sets the global default, but setting the endpoint property above will override this value for that endpoint.
Using the generic CXF Dispatch mode
From 2.8.0, the camel-cxf component supports the generic CXF dispatch mode that can transport messages of arbitrary structures (i.e., not bound to a specific XML schema). To use this mode, you simply omit specifying the wsdlURL and serviceClass attributes of the CXF endpoint.
<cxf:cxfEndpoint id="testEndpoint" address="http://localhost:9000/SoapContext/SoapAnyPort"> <cxf:properties> <entry key="dataFormat" value="PAYLOAD"/> </cxf:properties> </cxf:cxfEndpoint>
It is noted that the default CXF dispatch client does not send a specific SOAPAction header. Therefore, when the target service requires a specific SOAPAction value, it is supplied in the Camel header using the key SOAPAction (case-insensitive).
Chapter 27. CXF Bean Component
CXF Bean Component (2.0 or later)
The cxfbean: component allows other Camel endpoints to send exchange and invoke Web service bean objects. Currently, it only supports JAXRS and JAXWS (new to Camel 2.1) annotated service beans.
Important
CxfBeanEndpoint
is a ProcessorEndpoint
so it has no consumers. It works similarly to a Bean component.
URI format
cxfbean:serviceBeanRef
Where serviceBeanRef is a registry key to look up the service bean object. If
serviceBeanRef
references a List
object, elements of the List
are the service bean objects accepted by the endpoint.
Options
Name | Description | Example | Required? | Default Value |
---|---|---|---|---|
bus
|
CXF bus reference specified by the # notation. The referenced object must be an instance of org.apache.cxf.Bus .
|
bus=#busName
|
No | Default bus created by CXF Bus Factory |
cxfBeanBinding
|
CXF bean binding specified by the # notation. The referenced object must be an instance of org.apache.camel.component.cxf.cxfbean.CxfBeanBinding .
|
cxfBinding=#bindingName
|
No |
DefaultCxfBeanBinding
|
headerFilterStrategy
|
Header filter strategy specified by the # notation. The referenced object must be an instance of org.apache.camel.spi.HeaderFilterStrategy .
|
headerFilterStrategy=#strategyName
|
No |
CxfHeaderFilterStrategy
|
populateFromClass
|
Since 2.3, the wsdlLocation annotated in the POJO is ignored (by default) unless this option is set to false. Prior to 2.3, the wsdlLocation annotated in the POJO is always honored and it is not possible to ignore.
|
true , false
|
No |
true
|
providers
|
Since 2.5, setting the providers for the CXFRS endpoint. |
providers=#providerRef1,#providerRef2
|
No |
null
|
setDefaultBus
|
Will set the default bus when CXF endpoint create a bus by itself. |
true , false
|
No |
false
|
Headers
Name | Description | Type | Required? | Default Value | In/Out | Examples |
---|---|---|---|---|---|---|
CamelHttpCharacterEncoding (before 2.0-m2: CamelCxfBeanCharacterEncoding )
|
Character encoding |
String
|
No | None | In | ISO-8859-1 |
CamelContentType (before 2.0-m2: CamelCxfBeanContentType )
|
Content type |
String
|
No | \**/*\* | In |
text/xml
|
CamelHttpBaseUri (2.0-m3 and before:
CamelCxfBeanRequestBasePath )
|
The value of this header will be set in the CXF message as the Message.BASE_PATH property. It is needed by CXF JAX-RS processing. Basically, it is the scheme, host and port portion of the request URI.
|
String
|
Yes | The Endpoint URI of the source endpoint in the Camel exchange | In | http://localhost:9000 |
CamelHttpPath (before 2.0-m2: CamelCxfBeanRequestPat{} h)
|
Request URI's path |
String
|
Yes | None | In |
consumer/123
|
CamelHttpMethod (before 2.0-m2: CamelCxfBeanVerb )
|
RESTful request verb |
String
|
Yes | None | In |
GET , PUT , POST , DELETE
|
CamelHttpResponseCode
|
HTTP response code |
Integer
|
No | None | Out | 200 |
Note
Currently, CXF Bean component has (only) been tested with Jetty HTTP component it can understand headers from Jetty HTTP component without requiring conversion.
A Working Sample
This sample shows how to create a route that starts a Jetty HTTP server. The route sends requests to a CXF Bean and invokes a JAXRS annotated service.
First, create a route as follows. The
from
endpoint is a Jetty HTTP endpoint that is listening on port 9000. Notice that the matchOnUriPrefix
option must be set to true
because RESTful request URI will not match the endpoint's URI http://localhost:9000 exactly.
<route> <from uri="jetty:http://localhost:9000?matchOnUriPrefix=true" /> <to uri="cxfbean:customerServiceBean" /> <to uri="mock:endpointA" /> </route>
The
to
endpoint is a CXF Bean with bean name customerServiceBean
. The name will be looked up from the registry. Next, we make sure our service bean is available in Spring registry. We create a bean definition in the Spring configuration. In this example, we create a List of service beans (of one element). We could have created just a single bean without a List.
<util:list id="customerServiceBean"> <bean class="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService" /> </util:list> <bean class="org.apache.camel.wsdl_first.PersonImpl" id="jaxwsBean" />
That's it. Once the route is started, the web service is ready for business. A HTTP client can make a request and receive response.
url = new URL("http://localhost:9000/customerservice/orders/223/products/323"); in = url.openStream(); assertEquals("{\"Product\":{\"description\":\"product 323\",\"id\":323}}", CxfUtils.getStringFromInputStream(in));
Chapter 28. CXFRS
CXFRS Component
The cxfrs: component provides integration with Apache CXF for connecting to JAX-RS services hosted in CXF.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cxf</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Note
When using CXF as a consumer, the CAMEL:CXF Bean Component allows you to factor out how message payloads are received from their processing as a RESTful or SOAP web service. This has the potential of using a multitude of transports to consume web services. The bean component's configuration is also simpler and provides the fastest method to implement web services using Camel and CXF.
URI format
cxfrs://address?options
Where address represents the CXF endpoint's address
cxfrs:bean:rsEndpoint
Where rsEndpoint represents the Spring bean's name which represents the CXFRS client or server
For either style above, you can append options to the URI as follows:
cxfrs:bean:cxfEndpoint?resourceClasses=org.apache.camel.rs.Example
Options
Name | Description | Example | Required? | default value |
---|---|---|---|---|
resourceClasses
|
The resource classes which you want to export as REST service. Multiple classes can be separated by a comma. |
resourceClasses=org.apache.camel.rs.Example1, org.apache.camel.rs.Exchange2
|
No | None |
httpClientAPI
|
New to Apache Camel 2.1 If true, the CxfRsProducer will use the HttpClientAPI to invoke the service |
httpClientAPI=true
|
No |
true
|
synchronous
|
New in 2.5, this option will let CxfRsConsumer decide to use sync or async API to do the underlying work. The default value is false which means it will try to use async API by default. |
synchronous=true
|
No |
false
|
throwExceptionOnFailure
|
New in 2.6, this option tells the CxfRsProducer to inspect return codes and will generate an Exception if the return code is larger than 207. |
throwExceptionOnFailure=true
|
No |
true
|
maxClientCacheSize
|
New in 2.6, you can set the In message header, CamelDestinationOverrideUrl , to dynamically override the target destination Web Service or REST Service defined in your routes. The implementation caches CXF clients or ClientFactoryBean in CxfProvider and CxfRsProvider . This option allows you to configure the maximum size of the cache.
|
maxClientCacheSize=5
|
No | 10 |
setDefaultBus
|
New in 2.9.0. Will set the default bus when CXF endpoint create a bus by itself |
setDefaultBus=true
|
No |
false
|
bus
|
New in 2.9.0. A default bus created by CXF Bus Factory. Use \# notation to reference a bus object from the registry. The referenced object must be an instance of org.apache.cxf.Bus .
|
bus=#busName
|
No | None |
bindingStyle
|
As of 2.11. Sets how requests and responses will be mapped to/from Camel. Two values are possible:
|
bindingStyle=SimpleConsumer
|
No | Default |
binding
|
Allows you to specify a custom
CxfRsBinding implementation to perform low-level processing of the raw CXF request and response objects. The implementation must be bound in the Camel registry, and you must use the hash (#) notation to refer to it.
|
binding=#myBinding
|
No
|
DefaultCxfRsBinding
|
providers
|
Since Camel 2.12.2 set custom JAX-RS providers list to the CxfRs endpoint.
|
providers=#MyProviders
|
No
|
None
|
schemaLocations
|
Since Camel 2.12.2 Sets the locations of the schemas which can be used to validate the incoming XML or JAXB-driven JSON.
|
schemaLocations=#MySchemaLocations
|
No
|
None
|
features
|
Since Camel 2.12.3 Set the feature list to the CxfRs endpoint.
|
features=#MyFeatures
|
No
|
None
|
properties
|
Since Camel 2.12.4 Set the properties to the CxfRs endpoint.
|
properties=#MyProperties
|
No
|
None
|
inInterceptors
|
Since Camel 2.12.4 Set the inInterceptors to the CxfRs endpoint.
|
inInterceptors=#MyInterceptors
|
No
|
None
|
outInterceptors
|
Since Camel 2.12.4 Set the outInterceptor to the CxfRs endpoint.
|
outInterceptors=#MyInterceptors
|
No
|
None
|
inFaultInterceptors
|
Since Camel 2.12.4 Set the inFaultInterceptors to the CxfRs endpoint.
|
inFaultInterceptors=#MyInterceptors
|
No
|
None
|
outFaultIntercetpros
|
Since Camel 2.12.4 Set the outFaultInterceptors to the CxfRs endpoint.
|
outFaultInterceptors=#MyInterceptors
|
No
|
None
|
continuationTimeout
|
Since Camel 2.14.0 This option is used to set the CXF continuation timeout which could be used in CxfConsumer by default when the CXF server is using Jetty or Servlet transport. (Before Camel 2.14.0, CxfConsumer just set the continuation timeout to be 0, which means the continuation suspend operation never timeout.)
|
continuationTimeout=800000
|
No
|
30000
|
ignoreDeleteMethodMessageBody
|
Since Camel 2.14.1 This option is used to tell
CxfRsProducer to ignore the message body of the DELETE method when using HTTP API.
|
ignoreDeleteMethodMessageBody=true
|
No
|
false
|
modelRef
|
Since Camel 2.14.2 This option is used to specify the model file which is useful for the resource class without annotation.
Since Camel 2.15 This option can point to a model file without specifying a service class for emulating document-only endpoints.
|
modelRef=classpath:/CustomerServiceModel.xml
|
No
|
None
|
performInvocation
|
Since Camel 2.15 When the option is true, camel will perform the invocation of the resource class instance and put the response object into the exchange for further processing.
|
performInvocation=true
|
No
|
false
|
propagateContexts
|
Since Camel 2.15 When
true , JAXRS UriInfo, HttpHeaders, Request and SecurityContext contexts will be available to custom CXFRS processors as typed Camel exchange properties. These contexts can be used to analyze the current requests using JAX-RS API.
|
|
|
|
You can also configure the CXF REST endpoint through the Spring configuration. Since there are lots of differences between the CXF REST client and CXF REST Server, we provide different configurations for them. Please check out the schema file and the CXF JAX-RS documentation for more information.
How to configure the REST endpoint in Apache Camel
In camel-cxf schema file, there are two elements for the REST endpoint definition. cxf:rsServer for REST consumer, cxf:rsClient for REST producer. You can find a Apache Camel REST service route configuration example here.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://camel.apache.org/schema/cxf" xmlns:jaxrs="http://cxf.apache.org/jaxrs" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <!-- Defined the real JAXRS back end service --> <jaxrs:server id="restService" address="http://localhost:9002/rest" staticSubresourceResolution="true"> <jaxrs:serviceBeans> <ref bean="customerService"/> </jaxrs:serviceBeans> </jaxrs:server> <!--bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.JSONProvider"/--> <bean id="customerService" class="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService" /> <!-- Defined the server endpoint to create the cxf-rs consumer --> <cxf:rsServer id="rsServer" address="http://localhost:9000/route" serviceClass="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService" loggingFeatureEnabled="true" loggingSizeLimit="20" skipFaultLogging="true"/> <!-- Defined the client endpoint to create the cxf-rs consumer --> <cxf:rsClient id="rsClient" address="http://localhost:9002/rest" serviceClass="org.apache.camel.component.cxf.jaxrs.testbean.CustomerService" loggingFeatureEnabled="true" skipFaultLogging="true"/> <!-- The camel route context --> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxfrs://bean://rsServer"/> <!-- We can remove this configure as the CXFRS producer is using the HttpAPI by default --> <setHeader headerName="CamelCxfRsUsingHttpAPI"> <constant>True</constant> </setHeader> <to uri="cxfrs://bean://rsClient"/> </route> </camelContext> </beans>
How to override the CXF producer address from message header
The camel-cxfrs producer supports to override the services address by setting the message with the key of "CamelDestinationOverrideUrl".
// set up the service address from the message header to override the setting of CXF endpoint exchange.getIn().setHeader(Exchange.DESTINATION_OVERRIDE_URL, constant(getServiceAddress()));
Consuming a REST Request - Simple Binding Style
Available as of Camel 2.11
The
Default
binding style is rather low-level, requiring the user to manually process the MessageContentsList
object coming into the route. Thus, it tightly couples the route logic with the method signature and parameter indices of the JAX-RS operation. Somewhat inelegant, difficult and error-prone.
In contrast, the
SimpleConsumer
binding style performs the following mappings, in order to make the request data more accessible to you within the Camel Message:
- JAX-RS Parameters (@HeaderParam, @QueryParam, etc.) are injected as IN message headers. The header name matches the value of the annotation.
- The request entity (POJO or other type) becomes the IN message body. If a single entity cannot be identified in the JAX-RS method signature, it falls back to the original
MessageContentsList
. - Binary
@Multipart
body parts become IN message attachments, supportingDataHandler
,InputStream
,DataSource
and CXF'sAttachment
class. - Non-binary
@Multipart
body parts are mapped as IN message headers. The header name matches the Body Part name.
Additionally, the following rules apply to the Response mapping:
- If the message body type is different to
javax.ws.rs.core.Response
(user-built response), a newResponse
is created and the message body is set as the entity (so long it's not null). The response status code is taken from theExchange.HTTP_RESPONSE_CODE
header, or defaults to 200 OK if not present. - If the message body type is equal to
javax.ws.rs.core.Response
, it means that the user has built a custom response, and therefore it is respected and it becomes the final response. - In all cases, Camel headers permitted by custom or default
HeaderFilterStrategy
are added to the HTTP response.
Enabling the Simple Binding Style
This binding style can be activated by setting the
bindingStyle
parameter in the consumer endpoint to value SimpleConsumer
:
from("cxfrs:bean:rsServer?bindingStyle=SimpleConsumer") .to("log:TEST?showAll=true");
Examples of request binding with different method signatures
Below is a list of method signatures along with the expected result from the Simple binding.
public Response doAction(BusinessObject request);
Request payload is placed in IN message body, replacing the original MessageContentsList.
public Response doAction(BusinessObject request, @HeaderParam("abcd") String abcd, @QueryParam("defg") String defg);
Request payload placed in IN message body, replacing the original MessageContentsList. Both request params mapped as IN message headers with names abcd and defg.
public Response doAction(@HeaderParam("abcd") String abcd, @QueryParam("defg") String defg);
Both request params mapped as IN message headers with names abcd and defg. The original MessageContentsList is preserved, even though it only contains the 2 parameters.
public Response doAction(@Multipart(value="body1") BusinessObject request, @Multipart(value="body2") BusinessObject request2);
The first parameter is transferred as a header with name body1, and the second one is mapped as header body2. The original MessageContentsList is preserved as the IN message body.
public Response doAction(InputStream abcd);
The InputStream is unwrapped from the MessageContentsList and preserved as the IN message body.
public Response doAction(DataHandler abcd);
The DataHandler is unwrapped from the MessageContentsList and preserved as the IN message body.
More examples of the Simple Binding Style
Given a JAX-RS resource class with this method:
@POST @Path("/customers/{type}") public Response newCustomer(Customer customer, @PathParam("type") String type, @QueryParam("active") @DefaultValue("true") boolean active) { return null; }
Serviced by the following route:
from("cxfrs:bean:rsServer?bindingStyle=SimpleConsumer") .recipientList(simple("direct:${header.operationName}")); from("direct:newCustomer") .log("Request: type=${header.type}, active=${header.active}, customerData=${body}");
The following HTTP request with XML payload (given that the Customer DTO is JAXB-annotated):
POST /customers/gold?active=true Payload: <Customer> <fullName>Raul Kripalani</fullName> <country>Spain</country> <project>Apache Camel</project> </Customer>
Will print the message:
Request: type=gold, active=true, customerData=<Customer.toString() representation>
For more examples on how to process requests and write responses can be found here.
Consuming a REST Request - Default Binding Style
The CXF JAX-RS front end implements the JAX-RS (JSR-311) API, so we can export the resources classes as a REST service. And we leverage the CXF Invoker API to turn a REST request into a normal Java object method invocation. Unlike the
camel-restlet
component, you don't need to specify the URI template within your endpoint, CXF takes care of the REST request URI to resource class method mapping according to the JSR-311 specification. All you need to do in Apache Camel is delegate this method request to a right processor or endpoint.
Here is an example of a CXFRS route:
private static final String CXF_RS_ENDPOINT_URI = "cxfrs://http://localhost:" + CXT + "/rest?resourceClasses=org.apache.camel.component.cxf.jaxrs.testbean.CustomerServiceResource"; protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { errorHandler(new NoErrorHandlerBuilder()); from(CXF_RS_ENDPOINT_URI).process(new Processor() { public void process(Exchange exchange) throws Exception { Message inMessage = exchange.getIn(); // Get the operation name from in message String operationName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); if ("getCustomer".equals(operationName)) { String httpMethod = inMessage.getHeader(Exchange.HTTP_METHOD, String.class); assertEquals("Get a wrong http method", "GET", httpMethod); String path = inMessage.getHeader(Exchange.HTTP_PATH, String.class); // The parameter of the invocation is stored in the body of in message String id = inMessage.getBody(String.class); if ("/customerservice/customers/126".equals(path)) { Customer customer = new Customer(); customer.setId(Long.parseLong(id)); customer.setName("Willem"); // We just put the response Object into the out message body exchange.getOut().setBody(customer); } else { if ("/customerservice/customers/400".equals(path)) { // We return the remote client IP address this time org.apache.cxf.message.Message cxfMessage = inMessage.getHeader(CxfConstants.CAMEL_CXF_MESSAGE, org.apache.cxf.message.Message.class); ServletRequest request = (ServletRequest) cxfMessage.get("HTTP.REQUEST"); String remoteAddress = request.getRemoteAddr(); Response r = Response.status(200).entity("The remoteAddress is " + remoteAddress).build(); exchange.getOut().setBody(r); return; } if ("/customerservice/customers/123".equals(path)) { // send a customer response back Response r = Response.status(200).entity("customer response back!").build(); exchange.getOut().setBody(r); return; } if ("/customerservice/customers/456".equals(path)) { Response r = Response.status(404).entity("Can't found the customer with uri " + path).build(); throw new WebApplicationException(r); } else { throw new RuntimeCamelException("Can't found the customer with uri " + path); } } } if ("updateCustomer".equals(operationName)) { assertEquals("Get a wrong customer message header", "header1;header2", inMessage.getHeader("test")); String httpMethod = inMessage.getHeader(Exchange.HTTP_METHOD, String.class); assertEquals("Get a wrong http method", "PUT", httpMethod); Customer customer = inMessage.getBody(Customer.class); assertNotNull("The customer should not be null.", customer); // Now you can do what you want on the customer object assertEquals("Get a wrong customer name.", "Mary", customer.getName()); // set the response back exchange.getOut().setBody(Response.ok().build()); } } }); } }; }
The corresponding resource class used to configure the endpoint is defined as an interface:
@Path("/customerservice/") public interface CustomerServiceResource { @GET @Path("/customers/{id}/") Customer getCustomer(@PathParam("id") String id); @PUT @Path("/customers/") Response updateCustomer(Customer customer); }
Important
By default, JAX-RS resource classes are used to configure the JAX-RS properties only. The methods will not be executed during the routing of messages to the endpoint, the route itself is responsible for all processing instead.
Note
Note that starting from Camel 2.15, it is also sufficient to provide an interface only, as opposed to a no-op service implementation class for the default mode. Starting from Camel 2.15, if the
performInvocation
option is enabled, the service implementation will be invoked first, the response will be set on the Camel exchange and the route execution will continue as usual. This can be useful for integrating the existing JAX-RS implementations into Camel routes and for post-processing JAX-RS Responses in custom processors.
How to invoke the REST service through camel-cxfrs producer ?
The CXF JAXRS front end implements a proxy-based client API, with this API you can invoke the remote REST service through a proxy. The
camel-cxfrs
producer is based on this proxy API. You just need to specify the operation name in the message header and prepare the parameter in the message body, the camel-cxfrs
producer will generate the right REST request for you.
Here is an example:
Exchange exchange = template.send("direct://proxy", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); setupDestinationURL(inMessage); // set the operation name inMessage.setHeader(CxfConstants.OPERATION_NAME, "getCustomer"); // using the proxy client API inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE); // set a customer header inMessage.setHeader("key", "value"); // set the parameters , if you just have one parameter // camel will put this object into an Object[] itself inMessage.setBody("123"); } }); // get the response message Customer response = (Customer) exchange.getOut().getBody(); assertNotNull("The response should not be null ", response); assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); assertEquals("Get a wrong customer name", response.getName(), "John"); assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); assertEquals("Get a wrong header value", "value", exchange.getOut().getHeader("key"));
CXF JAXRS front end also provides a http centric client API, You can also invoke this API from
camel-cxfrs
producer. You need to specify the HTTP_PATH and Http method and let the the producer know to use the HTTP centric client by using the URI option httpClientAPI or set the message header with CxfConstants.CAMEL_CXF_RS_USING_HTTP_API
. You can turn the response object to the type class that you specify with CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS
.
Exchange exchange = template.send("direct://http", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); setupDestinationURL(inMessage); // using the http central client API inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.TRUE); // set the Http method inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); // set the relative path inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/123"); // Specify the response class , cxfrs will use InputStream as the response object type inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); // set a customer header inMessage.setHeader("key", "value"); // since we use the Get method, so we don't need to set the message body inMessage.setBody(null); } }); // get the response message Customer response = (Customer) exchange.getOut().getBody(); assertNotNull("The response should not be null ", response); assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); assertEquals("Get a wrong customer name", response.getName(), "John"); assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); assertEquals("Get a wrong header value", "value", exchange.getOut().getHeader("key"));
From Apache Camel 2.1, we also support to specify the query parameters from CXFRS URI for the CXFRS HTTP centric client.
Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13"
To support the Dynamical routing, you can override the URI's query parameters by using the
CxfConstants.CAMEL_CXF_RS_QUERY_MAP
header to set the parameter map for it.
Map<String, String> queryMap = new LinkedHashMap<String, String>(); queryMap.put("q1", "new"); queryMap.put("q2", "world"); inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_QUERY_MAP, queryMap);
Chapter 29. DataFormat Component
Data Format Component
Available as of Camel 2.12
The dataformat: component allows to use Data Format as a Camel Component.
URI format
dataformat:name:(marshal|unmarshal)[?options]
Where name is the name of the Data Format. And then followed by the operation which must either be
marshal
or unmarshal
. The options is used for configuring the Data Format in use. See the Data Format documentation for which options it support.
Samples
For example to use the JAXB Data Format we can do as follows:
from("activemq:My.Queue"). to("dataformat:jaxb:unmarshal?contextPath=com.acme.model"). to("mqseries:Another.Queue");
And in XML DSL you do:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="activemq:My.Queue"/> <to uri="dataformat:jaxb:unmarshal?contextPath=com.acme.model"/> <to uri="mqseries:Another.Queue"/> </route> </camelContext>
Chapter 30. DataSet
DataSet Component
The DataSet component (available since 1.3.0) provides a mechanism to easily perform load & soak testing of your system. It works by allowing you to create DataSet instances both as a source of messages and as a way to assert that the data set is received.
Apache Camel will use the throughput logger when sending dataset's.
URI format
dataset:name[?options]
Apache Camel ships with a support implementation of
org.apache.camel.component.dataset.DataSet
, the org.apache.camel.component.dataset.DataSetSupport
class, that can be used as a base for implementing your own DataSet. Apache Camel also ships with a default implementation, the org.apache.camel.component.dataset.SimpleDataSet
that can be used for testing.
Options
Option | Default | Description |
---|---|---|
produceDelay
|
3 | Allows a delay in ms to be specified, which causes producers to pause in order to simulate slow producers. Uses a minimum of 3 ms delay unless you set this option to -1 to force no delay at all. |
consumeDelay
|
0 | Allows a delay in ms to be specified, which causes consumers to pause in order to simulate slow consumers. |
preloadSize
|
0 | Sets how many messages should be preloaded (sent) before the route completes its initialization. |
initialDelay
|
1000 | Camel 2.1: Time period in millis to wait before starting sending messages. |
minRate
|
0 | Wait until the DataSet contains at least this number of messages |
You can append query options to the URI in the following format,
?option=value&option=value&...
Configuring DataSet
Apache Camel will lookup in the Registry for a bean implementing the DataSet interface. So you can register your own DataSet as:
<bean id="myDataSet" class="com.mycompany.MyDataSet"> <property name="size" value="100"/> </bean>
Example
For example, to test that a set of messages are sent to a queue and then consumed from the queue without losing any messages:
// send the dataset to a queue from("dataset:foo").to("activemq:SomeQueue"); // now lets test that the messages are consumed correctly from("activemq:SomeQueue").to("dataset:foo");
The above would look in the Registry to find the foo DataSet instance which is used to create the messages.
Then you create a DataSet implementation, such as using the
SimpleDataSet
as described below, configuring things like how big the data set is and what the messages look like etc.
Properties on SimpleDataSet
Property | Type | Default | Description |
---|---|---|---|
defaultBody
|
Object
|
<hello>world!</hello>
|
Specifies the default message body. For SimpleDataSet it is a constant payload; though if you want to create custom payloads per message, create your own derivation of DataSetSupport .
|
reportCount
|
long
|
-1
|
Specifies the number of messages to be received before reporting progress. Useful for showing progress of a large load test. If < 0, then size / 5, if is 0 then size , else set to reportCount value.
|
size
|
long
|
10
|
Specifies how many messages to send/consume. |
Chapter 31. Direct
Direct Component
The direct: component provides direct, synchronous invocation of any consumers when a producer sends a message exchange. This endpoint can be used to connect existing routes in the same camel context.
Note
The SEDA component provides asynchronous invocation of any consumers when a producer sends a message exchange.
Note
The VM component provides connections between Camel contexts as long they run in the same JVM.
URI format
direct:someName[?options]
Where someName can be any string to uniquely identify the endpoint
Options
Name | Default Value | Description |
---|---|---|
block
|
false
|
Camel 2.11.1: If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active. |
timeout
|
30000
|
Camel 2.11.1: The timeout value to use if block is enabled. |
You can append query options to the URI in the following format,
?option=value&option=value&...
Samples
In the route below we use the direct component to link the two routes together:
from("activemq:queue:order.in") .to("bean:orderServer?method=validate") .to("direct:processOrder"); from("direct:processOrder") .to("bean:orderService?method=process") .to("activemq:queue:order.out");
And the sample using spring DSL:
<route> <from uri="activemq:queue:order.in"/> <to uri="bean:orderService?method=validate"/> <to uri="direct:processOrder"/> </route> <route> <from uri="direct:processOrder"/> <to uri="bean:orderService?method=process"/> <to uri="activemq:queue:order.out"/> </route>
See also samples from the SEDA component, how they can be used together.
Chapter 32. Direct-VM
Direct VM Component
Available as of Camel 2.10
The direct-vm: component provides direct, synchronous invocation of any consumers in the JVM when a producer sends a message exchange. This endpoint can be used to connect existing routes in the same camel context, as well from other camel contexts in the same JVM.
This component differs from the Direct component in that Direct-VM supports communication across CamelContext instances - so you can use this mechanism to communicate across web applications (provided that camel-core.jar is on the system/boot classpath).
At runtime you can swap in new consumers, by stopping the existing consumer(s) and start new consumers. But at any given time there can be at most only one active consumer for a given endpoint.
This component allows also to connect routes deployed in different OSGI Bundles as you can see here after. Even if they are running in different bundles, the camel routes will use the same thread. That autorises to develop applications using Transactions - Tx.
URI format
direct-vm:someName
Where someName can be any string to uniquely identify the endpoint
Options
Name | Default Value | Description |
---|---|---|
block
|
false
|
Camel 2.11.1: If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active. |
timeout
|
30000
|
Camel 2.11.1: The timeout value to use if block is enabled. |
Samples
In the route below we use the direct component to link the two routes together:
from("activemq:queue:order.in") .to("bean:orderServer?method=validate") .to("direct-vm:processOrder");
And now in another CamelContext, such as another OSGi bundle
from("direct-vm:processOrder") .to("bean:orderService?method=process") .to("activemq:queue:order.out");
And the sample using spring DSL:
<route> <from uri="activemq:queue:order.in"/> <to uri="bean:orderService?method=validate"/> <to uri="direct-vm:processOrder"/> </route> <route> <from uri="direct-vm:processOrder"/> <to uri="bean:orderService?method=process"/> <to uri="activemq:queue:order.out"/> </route>
Chapter 33. Disruptor
Disruptor Component
Available as of Camel 2.12
The disruptor: component provides asynchronous SEDA behavior much as the standard SEDA Component, but utilizes a Disruptor instead of a BlockingQueue utilized by the standard SEDA. Alternatively, a
disruptor-vm: endpoint is supported by this component, providing an alternative to the standard VM. As with the SEDA component, buffers of the disruptor: endpoints are only visible within a single CamelContext and no support is provided for persistence or recovery. The buffers of the *disruptor-vm:* endpoints also provides support for communication across CamelContexts instances so you can use this mechanism to communicate across web applications (provided that camel-disruptor.jar is on the system/boot classpath).
The main advantage of choosing to use the Disruptor Component over the SEDA or the VM Component is performance in use cases where there is high contention between producer(s) and/or multicasted or concurrent Consumers. In those cases, significant increases of throughput and reduction of latency has been observed. Performance in scenarios without contention is comparable to the SEDA and VM Components.
The Disruptor is implemented with the intention of mimicing the behaviour and options of the SEDA and VM Components as much as possible. The main differences with the them are the following:
- The buffer used is always bounded in size (default 1024 exchanges).
- As a the buffer is always bouded, the default behaviour for the Disruptor is to block while the buffer is full instead of throwing an exception. This default behaviour may be configured on the component (see options).
- The Disruptor enpoints don't implement the BrowsableEndpoint interface. As such, the exchanges currently in the Disruptor can't be retrieved, only the amount of exchanges.
- The Disruptor requires its consumers (multicasted or otherwise) to be statically configured. Adding or removing consumers on the fly requires complete flushing of all pending exchanges in the Disruptor.
- As a result of the reconfiguration: Data sent over a Disruptor is directly processed and 'gone' if there is at least one consumer, late joiners only get new exchanges published after they've joined.
- The pollTimeout option is not supported by the Disruptor Component.
- When a producer blocks on a full Disruptor, it does not respond to thread interrupts.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-disruptor</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
disruptor:someName[?options]
or
disruptor-vm:someName[?options]
Where *someName* can be any string that uniquely identifies the endpoint within the current CamelContext (or across contexts in case of *disruptor-vm:*). You can append query options to the URI in the following format:
?option=value&option=value&...
Options
All the following options are valid for both the *disruptor:* and *disruptor-vm:* components.
Name | Default | Description |
---|---|---|
size | 1024 | The maximum capacity of the Disruptors ringbuffer. Will be effectively increased to the nearest power of two. Notice: Mind if you use this option, then its the first endpoint being created with the queue name, that determines the size. To make sure all endpoints use same size, then configure the size option on all of them, or the first endpoint being created. |
bufferSize | Component only: The maximum default size (capacity of the number of messages it can hold) of the Disruptors ringbuffer. This option is used if size is not in use. | |
queueSize | Component only: Additional option to specify the <em>bufferSize</em> to maintain maximum compatibility with the SEDA Component. | |
concurrentConsumers | 1 | Number of concurrent threads processing exchanges. |
waitForTaskToComplete | IfReplyExpected | Option to specify whether the caller should wait for the async task to complete or not before continuing. The following three options are supported: Always, Never or IfReplyExpected. The first two values are self-explanatory. The last value, IfReplyExpected, will only wait if the message is Request Reply based. See more information about Async messaging. |
timeout | 30000 | Timeout (in milliseconds) before a producer will stop waiting for an asynchronous task to complete. See waitForTaskToComplete and Async for more details. You can disable timeout by using 0 or a negative value. |
defaultMultipleConsumers | Component only: Allows to set the default allowance of multiple consumers for endpoints created by this comonent used when multipleConsumers is not provided. | |
multipleConsumers
|
false
|
Specifies whether multiple consumers are allowed. If enabled, you can use Disruptor for Publish-Subscribe messaging. That is, you can send a message to the SEDA queue and have each consumer receive a copy of the message. When enabled, this option should be specified on every consumer endpoint.
|
limitConcurrentConsumers | true | Whether to limit the number of concurrentConsumers to the maximum of 500. By default, an exception will be thrown if a Disruptor endpoint is configured with a greater number. You can disable that check by turning this option off. |
blockWhenFull | true | Whether a thread that sends messages to a full Disruptor will block until the ringbuffer's capacity is no longer exhausted. By default, the calling thread will block and wait until the message can be accepted. By disabling this option, an exception will be thrown stating that the queue is full. |
defaultBlockWhenFull | Component only: Allows to set the default producer behaviour when the ringbuffer is full for endpoints created by this comonent used when blockWhenFull is not provided. | |
waitStrategy | Blocking | Defines the strategy used by consumer threads to wait on new exchanges to be published. The options allowed are:Blocking, Sleeping, BusySpin and Yielding. Refer to the section below for more information on this subject |
defaultWaitStrategy | Component only: Allows to set the default wait strategy for endpoints created by this comonent used when waitStrategy is not provided. | |
producerType | Multi |
Defines the producers allowed on the Disruptor. The options allowed are: Multi to allow multiple producers and Single to enable certain optimizations only allowed when one concurrent producer (on one thread or otherwise synchronized) is active.
|
Wait strategies
The wait strategy effects the type of waiting performed by the consumer threads that are currently waiting for the next exchange to be published. The following strategies can be chosen:
Name | Description | Advice |
---|---|---|
Blocking | Blocking strategy that uses a lock and condition variable for Consumers waiting on a barrier. | This strategy can be used when throughput and low-latency are not as important as CPU resource. |
Sleeping | Sleeping strategy that initially spins, then uses a Thread.yield(), and eventually for the minimum number of nanos the OS and JVM will allow while the Consumers are waiting on a barrier. | This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet periods. |
BusySpin | Busy Spin strategy that uses a busy spin loop for Consumers waiting on a barrier. | This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best used when threads can be bound to specific CPU cores. |
Yielding | Yielding strategy that uses a Thread.yield() for Consumers waiting on a barrier after an initially spinning. | This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes. |
Use of Request Reply
The Disruptor component supports using Request Reply, where the caller will wait for the Async route to complete. For instance:
from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input"); from("disruptor:input").to("bean:processInput").to("bean:createResponse");
In the route above, we have a TCP listener on port 9876 that accepts incoming requests. The request is routed to the disruptor:input buffer. As it is a Request Reply message, we wait for the response. When the consumer on the disruptor:input buffer is complete, it copies the response to the original message response.
Concurrent consumers
By default, the Disruptor endpoint uses a single consumer thread, but you can configure it to use concurrent consumer threads. So instead of thread pools you can use:
from("disruptor:stageName?concurrentConsumers=5").process(...)
As for the difference between the two, note a thread pool can increase/shrink dynamically at runtime depending on load, whereas the number of concurrent consumers is always fixed and supported by the Disruptor internally so performance will be higher.
Thread pools
Be aware that adding a thread pool to a Disruptor endpoint by doing something like:
from("disruptor:stageName").thread(5).process(...)
Can wind up with adding a normal BlockingQueue to be used in conjunction with the Disruptor, effectively negating part of the performance gains achieved by using the Disruptor. Instead, it is advices to directly configure number of threads that process messages on a Disruptor endpoint using the concurrentConsumers option.
Sample
In the route below we use the Disruptor to send the request to this async queue to be able to send a fire-and-forget message for further processing in another thread, and return a constant reply in this thread to the original caller.
public void configure() throws Exception { from("direct:start") // send it to the disruptor that is async .to("disruptor:next") // return a constant response .transform(constant("OK")); from("disruptor:next").to("mock:result"); }
Here we send a Hello World message and expects the reply to be OK.
Object out = template.requestBody("direct:start", "Hello World"); assertEquals("OK", out);
The "Hello World" message will be consumed from the Disruptor from another thread for further processing. Since this is from a unit test, it will be sent to a mock endpoint where we can do assertions in the unit test.
Using multipleConsumers
In this example we have defined two consumers and registered them as spring beans.
<!-- define the consumers as spring beans --> <bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/> <bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <!-- define a shared endpoint which the consumers can refer to instead of using url --> <endpoint id="foo" uri="disruptor:foo?multipleConsumers=true"/> </camelContext>
Since we have specified multipleConsumers=true on the Disruptor foo endpoint we can have those two or more consumers receive their own copy of the message as a kind of pub-sub style messaging. As the beans are part of an unit test they simply send the message to a mock endpoint, but notice how we can use @Consume to consume from the Disruptor.
public class FooEventConsumer { @EndpointInject(uri = "mock:result") private ProducerTemplate destination; @Consume(ref = "foo") public void doSomething(String body) { destination.sendBody("foo" + body); } }
Extracting disruptor information
If needed, information such as buffer size, etc. can be obtained without using JMX in this fashion:
DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx"); int size = disruptor.getBufferSize();
Chapter 34. DNS
DNS
Available as of Camel 2.7
This is an additional component for Camel to run DNS queries, using DNSJava. The component is a thin layer on top of DNSJava. The component offers the following operations:
ip
- To resolve a domain by its IP address.
lookup
- To look up information about the domain.
dig
- To run DNS queries.
Requires SUN JVM
The DNSJava library requires running on the SUN JVM. If you use Apache ServiceMix or Apache Karaf, you'll need to adjust the
etc/jre.properties
file, to add sun.net.spi.nameservice
to the list of Java platform packages exported. The server will need restarting before this change takes effect.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-dns</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
The URI scheme for a DNS component is as follows
dns://operation
This component only supports producers.
Options
None.
Headers
Header | Type | Operations | Description |
---|---|---|---|
dns.domain
|
String
|
ip
|
The domain name. Mandatory. |
dns.name
|
String
|
lookup
|
The name to lookup. Mandatory. |
dns.type
|
- |
lookup , dig
|
The type of the lookup. Should match the values of org.xbill.dns.Type . Optional.
|
dns.class
|
- |
lookup , dig
|
he DNS class of the lookup. Should match the values of org.xbill.dns.DClass . Optional.
|
dns.query
|
String
|
dig
|
The query itself. Mandatory. |
dns.server
|
String
|
dig
|
The server in particular for the query. If none is given, the default one specified by the OS will be used. Optional. |
Examples
IP lookup
<route id="IPCheck"> <from uri="direct:start"/> <to uri="dns:ip"/> </route>
This looks up a domain's IP. For example, www.example.com resolves to 192.0.32.10. The IP address to lookup must be provided in the header with key
"dns.domain"
.
DNS lookup
<route id="IPCheck"> <from uri="direct:start"/> <to uri="dns:lookup"/> </route>
This returns a set of DNS records associated with a domain. The name to lookup must be provided in the header with key
"dns.name"
.
DNS Dig
Dig is a Unix command-line utility to run DNS queries.
<route id="IPCheck"> <from uri="direct:start"/> <to uri="dns:dig"/> </route>
The query must be provided in the header with key
"dns.query"
.
Chapter 35. Docker
Docker Component
Available as of Camel 2.15
Camel component for communicating with Docker.
The Docker Camel component leverages the docker-java via the Docker Remote API.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-docker</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
docker://[operation]?[options]
Where operation is the specific action to perform on Docker.
Header Strategy
All URI option can be passed as Header properties. Values found in a message header take precedence over URI parameters. A header property takes the form of a URI option prefixed with *CamelDocker* as shown below
URI Option | Header Property |
---|---|
containerId | CamelDockerContainerId |
General Options
The following parameters can be used with any invocation of the component
Option | Header | Description | Default Value |
---|---|---|---|
host | CamelDockerHost | Mandatory: Docker host | localhost |
port | CamelDockerPort | Mandatory:Docker port | 2375 |
username | CamelDockerUserName | User name to authenticate with | |
password | CamelDockerPassword | Password to authenticate with | |
CamelDockerEmail | Email address associated with the user | ||
secure | CamelDockerSecure | Use HTTPS communication | false |
requestTimeout | CamelDockerRequestTimeout | Request timeout for response (in seconds) | 30 |
certPath | CamelDockerCertPath | Location containing the SSL certificate chain |
Consumer Operations
The consumer supports the following operations.
Operation | Options | Description | Produces |
---|---|---|---|
events | initialRange | Monitor Docker events (Streaming) | Event |
Producer Operations
The following producer operations are available.
Misc Operation | Options | Description | Returns |
---|---|---|---|
auth | Check auth configuration | ||
info | System wide information | Info | |
ping | Ping the Docker server | ||
version | Show the docker version information | Version |
Image Operation | Options | Description | Body Content | Returns |
---|---|---|---|---|
image/list | filter, showAll | List images | List<Image> | |
image/create | repository | Create an image | InputStream | CreateImageResponse |
image/build | noCache, quiet, remove, tag | Build an image from Dockerfile via stdin | InputStream or File | InputStream |
image/pull | repository, registry, tag | Pull an image from the registry | InputStream | |
image/push | name | Push an image on the registry | InputStream | |
image/search | term | Search for images | List<SearchItem> | |
image/remove | imageId | Remove an image | ||
image/tag | imageId, repository, tag, force | Tag an image into a repository | ||
image/inspect | imageId | Inspect an image | InspectImageResponse |
Container Operation | Options | Description | Body Content | Returns |
---|---|---|---|---|
container/list | showSize, showAll, before, since, limit, List containers | initialRange | List<Container> | |
container/create | imageId, name, exposedPorts, workingDir, disableNetwork, hostname, user, tty, stdInOpen, stdInOnce, memoryLimit, memorySwap, cpuShares, attachStdIn, attachStdOut, attachStdErr, env, cmd, dns, image, volumes, volumesFrom | Create a container | CreateContainerResponse | |
container/start |
containerId, binds, links, lxcConf, portBindings, privileged, publishAllPorts, dns, dnsSearch, volumesFrom, networkMode, devices, restartPolicy, capAdd, capDrop
|
Start a container | ||
container/inspect | containerId | Inspect a container | InspectContainerResponse | |
container/wait | containerId | Wait a container | Integer | |
container/log | containerId, stdOut, stdErr, timestamps, followStream, tailAll, tail | Get container logs | InputStream | |
container/attach | containerId, stdOut, stdErr, timestamps, logs, followStream | Attach to a container | InputStream | |
container/stop | containerId, timeout | Stop a container | ||
container/restart | containerId, timeout | Restart a container | ||
container/diff | containerId | Inspect changes on a container | ChangeLog | |
container/kill | containerId, signal | Kill a container | ||
container/top | containerId, psArgs | List processes running in a container | TopContainerResponse | |
container/pause | containerId | Pause a container | ||
container/unpause | containerId | Unpause a container | ||
container/commit | containerId, repository, message, tag, attachStdIn, attachStdOut, attachStdErr, cmd, disableNetwork, pause, env, exposedPorts, hostname, memory, memorySwap, openStdIn, portSpecs, stdInOnce, tty, user, volumes, hostname | Create a new image from a container's changes | String | |
container/copyfile | containerId, resource, hostPath | Copy files or folders from a container | InputStream | |
container/remove | containerId, force, removeVolumes | Remove a container |
Examples
The following example consumes events from Docker:
from("docker://events?host=192.168.59.103&port=2375").to("log:event");
The following example queries Docker for system wide information
from("docker://info?host=192.168.59.103&port=2375").to("log:info");
Chapter 36. Dozer
Dozer Component
The dozer: component provides the ability to map between Java beans using the Dozer mapping framework. Camel also supports the ability to trigger Dozer mappings as a type converter. The primary differences between using a Dozer endpoint and a Dozer converter are:
- The ability to manage Dozer mapping configuration on a per-endpoint basis vs. global configuration via the converter registry.
- A Dozer endpoint can be configured to marshal/unmarshal input and output data using Camel data formats to support a single, any-to-any transformation endpoint
- The Dozer component allows for fine-grained integration and extension of Dozer to support additional functionality (e.g. mapping literal values, using expressions for mappings, etc.).
In order to use the Dozer component, Maven users will need to add the following dependency to their
pom.xml
:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-dozer</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
The Dozer component only supports producer endpoints.
dozer:endpointId[?options]
Where endpointId is a name used to uniquely identify the Dozer endpoint configuration.
An example Dozer endpoint URI:
from("direct:orderInput"). to("dozer:transformOrder?mappingFile=orderMapping.xml&targetModel=example.XYZOrder"). to("direct:orderOutput");
Options
Name
|
Default
|
Description
|
---|---|---|
mappingFile
|
dozerBeanMapping.xml
|
The location of a Dozer configuration file. The file is loaded from the classpath by default, but you can use
file: , classpath: , or http: to load the configuration from a specific location.
|
unmarshalId
|
none
|
The id of a dataFormat defined within the Camel Context to use for unmarshalling the mapping input from a non-Java type. |
marshalId
|
none
|
The id of a dataFormat defined within the Camel Context to use for marshalling the mapping output to a non-Java type. |
sourceModel
|
none
|
Fully-qualified class name for the source type used in the mapping. If specified, the input to the mapping is converted to the specified type before being mapped with Dozer. |
targetModel
|
none
|
Fully-qualified class name for the target type used in the mapping. This option is required. |
mappingConfiguration
|
none | The name of a DozerBeanMapperConfiguration bean in the Camel registry which should be used for configuring the Dozer mapping. This is an alternative to the mappingFile option that can be used for fine-grained control over how Dozer is configured. Remember to use a "#" prefix in the value to indicate that the bean is in the Camel registry (e.g. "#myDozerConfig"). |
Using Data Formats with Dozer
Dozer does not support non-Java sources and targets for mappings, so it cannot, for example, map an XML document to a Java object on its own. Luckily, Camel has extensive support for marshalling between Java and a wide variety of formats using data formats. The Dozer component takes advantage of this support by allowing you to specify that input and output data should be passed through a data format prior to processing via Dozer. You can always do this on your own outside the call to Dozer, but supporting it directly in the Dozer component allows you to use a single endpoints to configure any-to-any transformation within Camel.
As an example, let's say you wanted to map between an XML data structure and a JSON data structure using the Dozer component. If you had the following data formats defined in a Camel Context:
<dataFormats> <json library="Jackson" id="myjson"/> <jaxb contextPath="org.example" id="myjaxb"/> </dataFormats>
You could then configure a Dozer endpoint to unmarshal the input XML using a JAXB data format and marshal the mapping output using Jackson.
<endpoint uri="dozer:xml2json?marshalId=myjson&unmarshalId=myjaxb&targetModel=org.example.Order"/>
Configuring Dozer
All Dozer endpoints require a Dozer mapping configuration file which defines mappings between source and target objects. The component will default to a location of META-INF/dozerBeanMapping.xml if the mappingFile or mappingConfiguration options are not specified on an endpoint. If you need to supply multiple mapping configuration files for a single endpoint or specify additional configuration options (e.g. event listeners, custom converters, etc.), then you can use an instance of
org.apache.camel.converter.dozer.DozerBeanMapperConfiguration
.
<bean id="mapper" class="org.apache.camel.converter.dozer.DozerBeanMapperConfiguration"> <property name="mappingFiles"> <list> <value>mapping1.xml</value> <value>mapping2.xml</value> </list> </property> </bean>
Mapping Extensions
The Dozer component implements a number of extensions to the Dozer mapping framework as custom converters. These converters implement mapping functions that are not supported directly by Dozer itself.
Variable Mappings
Variable mappings allow you to map the value of a variable definition within a Dozer configuration into a target field instead of using the value of a source field. This is equivalent to constant mapping in other mapping frameworks, where can you assign a literal value to a target field. To use a variable mapping, simply define a variable within your mapping configuration and then map from the VariableMapper class into your target field of choice:
<mappings xmlns="http://dozer.sourceforge.net" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://dozer.sourceforge.net http://dozer.sourceforge.net/schema/beanmapping.xsd"> <configuration> <variables> <variable name="CUST_ID">ACME-SALES</variable> </variables> </configuration> <mapping> <class-a>org.apache.camel.component.dozer.VariableMapper</class-a> <class-b>org.example.Order</class-b> <field custom-converter-id="_variableMapping" custom-converter-param="${CUST_ID}"> <a>literal</a> <b>custId</b> </field> </mapping> </mappings>
Custom Mappings
Custom mappings allow you to define your own logic for how a source field is mapped to a target field. They are similar in function to Dozer customer converters, with two notable differences:
- You can have multiple converter methods in a single class with custom mappings.
- There is no requirement to implement a Dozer-specific interface with custom mappings.
A custom mapping is declared by using the built-in '_customMapping' converter in your mapping configuration. The parameter to this converter has the following syntax:
[class-name][,method-name]
Method name is optional - the Dozer component will search for a method that matches the input and output types required for a mapping. An example custom mapping and configuration are provided below.
public class CustomMapper { // All customer ids must be wrapped in "[ ]" public Object mapCustomer(String customerId) { return "[" + customerId + "]"; } }
<mappings xmlns="http://dozer.sourceforge.net" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://dozer.sourceforge.net http://dozer.sourceforge.net/schema/beanmapping.xsd"> <mapping> <class-a>org.example.A</class-a> <class-b>org.example.B</class-b> <field custom-converter-id="_customMapping" custom-converter-param="org.example.CustomMapper,mapCustomer"> <a>header.customerNum</a> <b>custId</b> </field> </mapping> </mappings>
Expression Mappings
Expression mappings allow you to use the powerful language capabilities of Camel to evaluate an expression and assign the result to a target field in a mapping. Any language that Camel supports can be used in an expression mapping. Basic examples of expressions include the ability to map a Camel message header or exchange property to a target field or to concatenate multiple source fields into a target field. The syntax of a mapping expression is:
[language]:[expression]
An example of mapping a message header into a target field:
<mappings xmlns="http://dozer.sourceforge.net" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://dozer.sourceforge.net http://dozer.sourceforge.net/schema/beanmapping.xsd"> <mapping> <class-a>org.apache.camel.component.dozer.ExpressionMapper</class-a> <class-b>org.example.B</class-b> <field custom-converter-id="_expressionMapping" custom-converter-param="simple:\${header.customerNumber}"> <a>expression</a> <b>custId</b> </field> </mapping> </mappings>
Note that any properties within your expression must be escaped with "\" to prevent an error when Dozer attempts to resolve variable values defined using the EL.
Chapter 37. Dropbox
Camel Dropbox component
Available as of Camel 2.14
The dropbox: component allows you to treat Dropbox remote folders as a producer or consumer of messages. Using the Dropbox Java Core API (reference version for this component is 1.7.x), this camel component has the following features:
- As a consumer, download files and search files by queries
- As a producer, download files, move files between remote directories, delete files/dir, upload files and search files by queries
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-dropbox</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
dropbox://[operation]?[options]
Where operation is the specific action (typically is a CRUD action) to perform on Dropbox remote folder.
Operation
Operation
|
Description
|
---|---|
del
|
deletes files or directories on Dropbox
|
get
|
download files from Dropbox
|
move
|
move files from folders on Dropbox
|
put
|
upload files on Dropbox
|
search
|
search files on Dropbox based on string queries
|
Operations require additional options to work, some are mandatory for the specific operation.
Options
In order to work with Dropbox API you need to obtain an accessToken and a clientIdentifier. You can refer to the Dropbox documentation that expalins how to get them.
Below are listed the mandatory options for all operations:
Property
|
Mandatory
|
Description
|
---|---|---|
accessToken
|
true
|
The access token to make API requests for a specific Dropbox user
|
clientIdentifier
|
true
|
Name of the app registered to make API requests
|
Del operation
Delete files on Dropbox.
Works only as Camel producer.
Below are listed the options for this operation:
Property
|
Mandatory
|
Description
|
---|---|---|
remotePath
|
true
|
Folder or file to delete on Dropbox
|
Samples
from("direct:start").to("dropbox://del?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1").to("mock:result");
from("direct:start").to("dropbox://del?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1/file1.tar.gz").to("mock:result");
Result Message Headers
The following headers are set on message result:
Property
|
Value
|
---|---|
DELETED_PATH
|
name of the path deleted on dropbox
|
Result Message Body
The following objects are set on message body result:
Object type
|
Description
|
---|---|
String
|
name of the path deleted on dropbox
|
Get (download) operation
Download files from Dropbox.
Works as Camel producer or Camel consumer.
Below are listed the options for this operation:
Property
|
Mandatory
|
Description
|
---|---|---|
remotePath
|
true
|
Folder or file to download from Dropbox
|
Samples
from("direct:start").to("dropbox://get?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1/file1.tar.gz").to("file:///home/kermit/?fileName=file1.tar.gz");
from("direct:start").to("dropbox://get?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1").to("mock:result");
from("dropbox://get?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1").to("file:///home/kermit/");
Result Message Headers
The following headers are set on message result:
Property
|
Value
|
---|---|
DOWNLOADED_FILE
|
in case of single file download, path of the remote file downloaded
|
DOWNLOADED_FILES
|
in case of multiple files download, path of the remote files downloaded
|
Result Message Body
The following objects are set on message body result:
Object type
|
Description
|
---|---|
ByteArrayOutputStream
|
in case of single file download, stream representing the file downloaded
|
Map<String, ByteArrayOutputStream>
|
in case of multiple files download, a map with as key the path of the remote file downloaded and as value the stream representing the file downloaded
|
Move operation
Move files on Dropbox between one folder to another.
Works only as Camel producer.
Below are listed the options for this operation:
Property
|
Mandatory
|
Description
|
---|---|---|
remotePath
|
true
|
Original file or folder to move
|
newRemotePath
|
true
|
Destination file or folder
|
Samples
from("direct:start").to("dropbox://move?accessToken=XXX&clientIdentifier=XXX&remotePath=/root/folder1&newRemotePath=/root/folder2").to("mock:result");
Result Message Headers
The following headers are set on message result:
Property
|
Value
|
---|---|
MOVED_PATH
|
name of the path moved on dropbox
|
Result Message Body
The following objects are set on message body result:
Object type
|
Description
|
---|---|
String
|
name of the path moved on dropbox
|
Put (upload) operation
Upload files on Dropbox.
Works as Camel producer.
Below are listed the options for this operation:
Property
|
Mandatory
|
Description
|
---|---|---|
uploadMode
|
true
|
add or force this option specifies how a file should be saved on dropbox: in case of "add" the new file will be renamed if a file with the same name already exists on dropbox. in case of "force" if a file with the same name already exists on dropbox, this will be overwritten.
|
localPath
|
true
|
Folder or file to upload on Dropbox from the local filesystem .
|
remotePath
|
false
|
Folder destination on Dropbox. If the property is not set, the component will upload the file on a remote path equal to the local path.
|
Samples
from("direct:start").to("dropbox://put?accessToken=XXX&clientIdentifier=XXX&uploadMode=add&localPath=/root/folder1").to("mock:result");
from("direct:start").to("dropbox://put?accessToken=XXX&clientIdentifier=XXX&uploadMode=add&localPath=/root/folder1&remotePath=/root/folder2").to("mock:result");
Result Message Headers
The following headers are set on message result:
Property
|
Value
|
---|---|
UPLOADED_FILE
|
in case of single file upload, path of the remote path uploaded
|
UPLOADED_FILES
|
in case of multiple files upload, string with the remote paths uploaded
|
Result Message Body
The following objects are set on message body result:
Object type
|
Description
|
---|---|
String
|
in case of single file upload, result of the upload operation, OK or KO
|
Map<String, DropboxResultCode>
|
in case of multiple files upload, a map with as key the path of the remote file uploaded and as value the result of the upload operation, OK or KO
|
Search operation
Search inside a remote Dropbox folder including its sub directories.
Works as Camel producer and as Camel consumer.
Below are listed the options for this operation:
Property
|
Mandatory
|
Description
|
---|---|---|
remotePath
|
true
|
Folder on Dropbox where to search in.
|
query
|
false
|
A space-separated list of substrings to search for. A file matches only if it contains all the substrings. If this option is not set, all files will be matched.
|
Samples
from("dropbox://search?accessToken=XXX&clientIdentifier=XXX&remotePath=/XXX&query=XXX").to("mock:result");
from("direct:start").to("dropbox://search?accessToken=XXX&clientIdentifier=XXX&remotePath=/XXX").to("mock:result");
Result Message Headers
The following headers are set on message result:
Property
|
Value
|
---|---|
FOUNDED_FILES
|
list of file path founded
|
Result Message Body
The following objects are set on message body result:
Object type
|
Description
|
---|---|
List<DbxEntry>
|
list of file path founded. For more information on this object refer to Dropbox documentation, http://dropbox.github.io/dropbox-sdk-java/api-docs/v1.7.x/com/dropbox/core/DbxEntry.html
|
Chapter 38. ElasticSearch
ElasticSearch Component
Available as of Camel 2.11
The ElasticSearch component allows you to interface with an ElasticSearch server.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-elasticsearch</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
elasticsearch://clusterName?[options]
Tip
if you want to run against a local (in JVM/classloader) ElasticSearch server, just set the clusterName value in the URI to
local
. See the client guide for more details.
Endpoint Options
The following options may be configured on the ElasticSearch endpoint. All are required to be set as either an endpoint URI parameter or as a header (headers override endpoint properties)
name | description |
---|---|
operation | required, indicates the operation to perform |
indexName |
the name of the index to act against
|
ip |
the TransportClient remote host ip to use Camel 2.12
|
Message Operations
The following ElasticSearch operations are currently supported. Simply set an endpoint URI option or exchange header with a key of
operation
and a value set to one of the following. Some operations also require other parameters or the message body to be set.
operation | message body | description |
---|---|---|
INDEX
|
Map , String , byte[] or XContentBuilder content to index
|
Adds content to an index and returns the content's indexId in the body.
|
GET_BY_ID
|
Index ID of content to retrieve |
Retrieves the specified index and returns a GetResult object in the body.
|
DELETE
|
Index ID of content to delete |
Deletes the specified indexId and returns a DeleteResult object in the body.
|
BULK_INDEX
|
A List or Collection of any type that is already accepted (
XContentBuilder , Map , byte[] , or String )
|
Camel 2.14, Adds content to an index and return a List of the id of the successfully indexed documents in the body.
|
BULK
|
A List or Collection of any type that is already accepted (
XContentBuilder , Map , byte[] , or String )
|
Camel 2.15, Adds content to an index and returns the BulkResponse object in the body.
|
Index Example
Below is a simple INDEX example
from("direct:index") .to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
<route> <from uri="direct:index" /> <to uri="elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"/> </route>
A client would simply need to pass a body message containing a Map to the route. The result body contains the indexId created.
Map<String, String> map = new HashMap<String, String>(); map.put("content", "test"); String indexId = template.requestBody("direct:index", map, String.class);
For more information, see these resources
Chapter 39. EventAdmin
EventAdmin component
Available in Camel 2.6
The
eventadmin
component can be used in an OSGi environment to receive OSGi EventAdmin events and process them.
Dependencies
Maven users need to add the following dependency to their
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-eventadmin</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Camel (2.6.0 or higher).
URI format
eventadmin:topic[?options]
where
topic
is the name of the topic to listen too.
URI options
Name | Default value | Description |
---|---|---|
send
|
false
|
Whether to use 'send' or 'synchronous' deliver. Default false (async delivery) |
Message headers
Name | Type | Message | Description |
---|
Message body
The
in
message body will be set to the received Event.
Example usage
<route> <from uri="eventadmin:*"/> <to uri="stream:out"/> </route>
Chapter 40. Exec
Exec component
Available in Apache Camel 2.3
The
exec
component can be used to execute system commands.
Dependencies
Maven users need to add the following dependency to their
pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-exec</artifactId> <version>${camel-version}</version> </dependency>
where
${camel-version}
must be replaced by the actual version of Apache Camel (2.3.0 or higher).
URI format
exec://executable[?options]
where
executable
is the name, or file path, of the system command that will be executed. If executable name is used (e.g. exec:java
), the executable must in the system path.
URI options
Name | Default value | Description |
---|---|---|
args
|
null
|
The arguments of the executable. The arguments may be one or many whitespace-separated tokens, that can be quoted with
" , e.g. args="arg 1" arg2 will use two arguments arg 1 and arg2 . To include the quotes use "" , e.g. args=""arg 1"" arg2 will use the arguments "arg 1" and arg2 .
|
workingDir
|
null
|
The directory in which the command should be executed. If null , the working directory of the current process will be used.
|
timeout
|
Long.MAX_VALUE
|
The timeout, in milliseconds, after which the executable should be terminated. If execution has has not finished within the timeout, the component will send a termination request. |
outFile
|
null
|
The name of a file, created by the executable, that should be considered as its output. If no outFile is set, the standard output (stdout) of the executable will be considered as output.
|
binding
|
a DefaultExecBinding instance
|
A reference to a org.apache.commons.exec.ExecBinding in the Registry.
|
commandExecutor
|
a DefaultCommandExecutor instance
|
A reference to a org.apache.commons.exec.ExecCommandExecutor in the Registry, that customizes the command execution. The default command executor utilizes the commons-exec library. It adds a shutdown hook for every executed command.
|
useStderrOnEmptyStdout
|
false
|
A boolean indicating that when stdout is empty, this component will populate the Camel Message Body with stderr . This behavior is disabled (false ) by default.
|
Message headers
The supported headers are defined in
org.apache.camel.component.exec.ExecBinding
.
Name | Type | Message | Description |
---|---|---|---|
ExecBinding.EXEC_COMMAND_EXECUTABLE
|
String
|
in
|
The name of the system command that will be executed. Overrides the executable in the URI.
|
ExecBinding.EXEC_COMMAND_ARGS
|
java.util.List<String>
|
in
|
The arguments of the executable. The arguments are used literally, no quoting is applied. Overrides existing args in the URI.
|
ExecBinding.EXEC_COMMAND_ARGS
|
String
|
in
|
Camel 2.5: The arguments of the executable as a Single string where each argument is whitespace separated (see args in URI option). The arguments are used literally, no quoting is applied. Overrides existing args in the URI.
|
ExecBinding.EXEC_COMMAND_OUT_FILE
|
String
|
in
|
The name of a file, created by the executable, that should be considered as output of the executable. Overrides existing outFile in the URI.
|
ExecBinding.EXEC_COMMAND_TIMEOUT
|
long
|
in
|
The timeout, in milliseconds, after which the executable should be terminated. Overrides any existing timeout in the URI.
|
ExecBinding.EXEC_COMMAND_WORKING_DIR
|
String
|
in
|
The directory in which the command should be executed. Overrides any existing workingDir in the URI.
|
ExecBinding.EXEC_EXIT_VALUE
|
int
|
out
|
The value of this header is the exit value of the executable. Non-zero exit values typically indicate abnormal termination. Note that the exit value is OS-dependent. |
ExecBinding.EXEC_STDERR
|
java.io.InputStream
|
out
|
The value of this header points to the standard error stream (stderr) of the executable. If no stderr is written, the value is null .
|
ExecBinding.EXEC_USE_STDERR_ON_EMPTY_STDOUT
|
boolean
|
in
|
Indicates that when stdout is empty, this component will populate the Camel Message Body with stderr . This behavior is disabled (false ) by default.
|
Message body
If the
Exec
component receives an in
message body that is convertible to java.io.InputStream
, it is used to feed input to the executable via its stdin. After execution, the message body is the result of the execution, that is, an org.apache.camel.components.exec.ExecResult
instance containing the stdout, stderr, exit value, and out file. This component supports the following ExecResult
type converters for convenience:
From | To |
---|---|
ExecResult
|
java.io.InputStream
|
ExecResult
|
String
|
ExecResult
|
byte []
|
ExecResult
|
org.w3c.dom.Document
|
Executing word count (Linux)
The example below executes
wc
(word count, Linux) to count the words in file /usr/share/dict/words
. The word count (output) is written in the standart output stream of wc
.
from("direct:exec") .to("exec:wc?args=--words /usr/share/dict/words") .process(new Processor() { public void process(Exchange exchange) throws Exception { // By default, the body is ExecResult instance assertIsInstanceOf(ExecResult.class, exchange.getIn().getBody()); // Use the Camel Exec String type converter to convert the ExecResult to String // In this case, the stdout is considered as output String wordCountOutput = exchange.getIn().getBody(String.class); // do something with the word count } });
Executing java
The example below executes
java
with 2 arguments: -server
and -version
, provided that java
is in the system path.
from("direct:exec") .to("exec:java?args=-server -version")
The example below executes
java
in c:/temp
with 3 arguments: -server
, -version
and the sytem property user.name
.
from("direct:exec") .to("exec:c:/program files/jdk/bin/java?args=-server -version -Duser.name=Camel&workingDir=c:/temp")
Executing Ant scripts
The following example executes Apache Ant (Windows only) with the build file
CamelExecBuildFile.xml
, provided that ant.bat
is in the system path, and that CamelExecBuildFile.xml
is in the current directory.
from("direct:exec") .to("exec:ant.bat?args=-f CamelExecBuildFile.xml")
In the next example, the
ant.bat
command redirects its output to CamelExecOutFile.txt
with -l
. The file CamelExecOutFile.txt
is used as the out file with outFile=CamelExecOutFile.txt
. The example assumes that ant.bat
is in the system path, and that CamelExecBuildFile.xml
is in the current directory.
from("direct:exec") .to("exec:ant.bat?args=-f CamelExecBuildFile.xml -l CamelExecOutFile.txt&outFile=CamelExecOutFile.txt") .process(new Processor() { public void process(Exchange exchange) throws Exception { InputStream outFile = exchange.getIn().getBody(InputStream.class); assertIsInstanceOf(InputStream.class, outFile); // do something with the out file here } });
Executing echo (Windows)
Commands such as
echo
and dir
can be executed only with the command interpreter of the operating system. This example shows how to execute such a command - echo
- in Windows.
from("direct:exec").to("exec:cmd?args=/C echo echoString")
Chapter 41. Fabric Component
Abstract
The Fabric component implements a location discovery mechanism for Apache Camel endpoints. This mechanism can also be used to provide load-balancing over a cluster of endpoints. On the client side (producer endpoints), endpoints are represented by an abstract ID and at run time, the ID is resolved to a specific endpoint URI. Because the URI is stored in a distributed registry (provided by Fuse Fabric), this enables you to create flexible applications whose topology can be specified at deploy time and updated dynamically.
Dependencies
The Fabric component can only be used in the context of a fabric-enabled Red Hat JBoss Fuse container. You must ensure that the
fabric-camel
feature is installed. If necessary, you can install it using the following console command:
karaf@root> features:install fabric-camel
Alternatively, if you decide to use a custom feature to deploy your application, you can ensure that the
fabric-camel
feature is installed by including it in your feature definition. For example:
<?xml version="1.0" encoding="UTF-8"?> <features> <feature name="fabric-component-example"> <feature>fabric-camel</feature> <bundle>URIforMyBundle</bundle> <!-- Specify any other required bundles or features --> ... </feature> </features>
For more details about features, see Deploying Features.
URI format
A fabric endpoint has the following URI format:
fabric:ClusterID[:PublishedURI[?Options]]
The format of the URI depends on whether it is used to specify a consumer endpoint or a producer endpoint.
For a Fabric producer endpoint, the URI format is:
fabric:ClusterID:PublishedURI[?Options]
Where the specified URI,
PublishedURI
, is published in the fabric registry and associated with the ClusterId
cluster. The options, Options
, are used when creating the producer endpoint instance, but the options are not published with the PublishedURI
in the fabric registry.
For a Fabric consumer endpoint, the URI format is:
fabric:ClusterID
Where the client looks up the ID,
ClusterId
, in the fabric registry to discover the URI to connect to.
URI options
The Fabric component itself does not support any URI options. It is possible, however, to specify options for the published URI. These options are stored in the fabric registry as part of the URI and are used as follows:
- Server-only options—options that are applicable only to the server are applied to the server endpoint (consumer endpoint) at run time.
- Client-only options—options that are applicable only to the client are applied to the client endpoint (producer endpoint) at run time.
- Common options—options common to the client and the server are applied to both.
Use cases for fabric endpoints
Fabric endpoints essentially provide a discovery mechanism for Apache Camel endpoints. For example, they support the following basic use cases:
Location discovery
Figure 41.1, “Location Discovery through Fabric” gives an overview of how Fabric endpoints enable location discovery at run time.
Figure 41.1. Location Discovery through Fabric
The server side of this application is defined by a route that starts with a Fabric endpoint, where the Fabric endpoint publishes the URI,
jetty:http://0.0.0.0:9090
. When this route is started, it automatically registers the Jetty URI in the fabric registry, under the cluster ID, foo
.
The client side of the application is defined by a route that ends with the Fabric endpoint,
fabric:foo
. Now, when the client route starts, it automatically looks up the ID, foo
, in the fabric registry and retrieves the associated Jetty endpoint URI. The client then creates a producer endpoint using the discovered Jetty URI and connects to the corresponding server port.
Load-balancing cluster
Figure 41.2, “Load Balancing through Fabric” gives an overview of how Fabric endpoints enable you to create a load-balancing cluster.
Figure 41.2. Load Balancing through Fabric
In this case, two Jetty servers are created, with the URIs,
jetty:http://0.0.0.0:9090
and jetty:http://0.0.0.0:9191
. Because these published URIs are both prefixed by fabric:foo:
, both of the Jetty URIs are registered under the same cluster ID, foo
, in the fabric registry.
Now, when the client routes starts, it automatically looks up the ID,
foo
, in the fabric registry. Because the foo
ID is associated with multiple endpoint URIs, fabric implements a random load balancing algorithm to choose one of the available URIs. The client then creates a producer endpoint, using the chosen URI.
Auto-reconnect feature
Fabric endpoints support auto-reconnection. So, if a client endpoint (producer endpoint) loses its connection to a server endpoint, it will automatically go back to the fabric registry, ask for another URI, and then connect to the new URI.
Publishing an endpoint URI
To publish an endpoint URI,
PublishedURI
, in the fabric registry, define a fabric endpoint with the publisher syntax, FabricScheme:ClusterID:PublishedURI
. Note that this syntax can only be used in a consumer endpoint (that is, an endpoint that appears in a from
DSL command).
Example 41.1, “Publishing a URI ” shows a route that implements a Jetty HTTP server, where the Jetty URI is published to the fabric registry under the ID,
cluster
. The route is a simply HTTP server that returns the constant message, Response from Zookeeper agent
, in the body of the HTTP response.
Example 41.1. Publishing a URI
<?xml version="1.0" encoding="UTF-8"?> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> <bean id="fabric-camel" class="io.fabric8.camel.FabricComponent"/> <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/blueprint"> <route id="fabric-server"> <from uri="fabric-camel:cluster:jetty:http://0.0.0.0:9090/fabric"/> <log message="Request received : ${body}"/> <setHeader headerName="karaf.name"> <simple>${sys.karaf.name}</simple> </setHeader> <transform> <simple>Response from Zookeeper agent</simple> </transform> </route> </camelContext> </blueprint>
Note the following points about the preceding sample:
- The Fabric component uses the
CuratorFramework
object to connect to the ZooKeeper server (Fabric registry), where the reference to theCuratorFramework
object is provided automatically. - The
from
DSL command defines the fabric URI,fabric-camel:cluster:jetty:http://0.0.0.0:9090/fabric
. At run time, this causes two things to happen:- The specified
jetty
URI is published to the fabric registry under the cluster ID,cluster
. - The Jetty endpoint is activated and used as the consumer endpoint of the route (just as if it had been specified without the
fabric-camel:cluster:
prefix).
Because the route is implemented in blueprint XML, you would normally add the file containing this code to the
src/main/resources/OSGI-INF/blueprint
directory of a Maven project.
Looking up an endpoint URI
To look up a URI in the fabric registry, simply specify the fabric endpoint URI with an ID, in the format,
FabricScheme:ClusterID
. This syntax is used in a producer endpoint (for example, an endpoint that appears in a to
DSL command).
Example 41.2, “Looking up a URI” shows a route that implements a HTTP client, where the HTTP endpoint is discovered dynamically at run time, by looking up the specified ID,
cluster
, in the fabric registry.
Example 41.2. Looking up a URI
<?xml version="1.0" encoding="UTF-8"?> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> <bean id="fabric-camel" class="io.fabric8.camel.FabricComponent"/> <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/blueprint"> <route id="fabric-client"> <from uri="timer://foo?fixedRate=true&period=10000"/> <setBody> <simple>Hello from Zookeeper server</simple> </setBody> <to uri="fabric-camel:cluster"/> <log message=">>> ${body} : ${header.karaf.name}"/> </route> </camelContext> <reference interface="org.apache.camel.spi.ComponentResolver" filter="(component=jetty)"/> </blueprint>
Because the route is implemented in blueprint XML, you would normally add the file containing this code to the
src/main/resources/OSGI-INF/blueprint
directory of a Maven project.
Load-balancing example
In principle, implementing load balancing is easy using fabric endpoints. All that you have to do is to publish more than one endpoint URI under the same cluster ID. Now, when a client looks up that cluster ID, it gets a random selection out of the list of available endpoint URIs.
The servers in the load-balancing cluster have almost the same configuration. Essentially, the only difference between them is that they publish an endpoint URI with a different hostname and/or IP port. Instead of creating a separate OSGi bundle for every single server in the load-balancing cluster, however, it is better to define a template that enables you to specify the host or port using a configuration variable.
Example 41.3, “Server Template for a Load-Balancing Cluster” illustrates the template approach to defining servers in a load-balancing cluster.
Example 41.3. Server Template for a Load-Balancing Cluster
<?xml version="1.0" encoding="UTF-8"?> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" xsi:schemaLocation=" http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> <!-- osgi blueprint property placeholder --> <cm:property-placeholder id="myConfig" persistent-id="io.fabric8.examples.camel.loadbalancing.server"/> <bean id="fabric-camel" class="io.fabric8.camel.FabricComponent"/> <camelContext id="