Chapter 13. Cassandra CQL


Both producer and consumer are supported

Apache Cassandra is an open source NoSQL database designed to handle large amounts on commodity hardware. Like Amazon’s DynamoDB, Cassandra has a peer-to-peer and master-less architecture to avoid single point of failure and garanty high availability. Like Google’s BigTable, Cassandra data is structured using column families which can be accessed through the Thrift RPC API or a SQL-like API called CQL.

Note

This component aims at integrating Cassandra 2.0+ using the CQL3 API (not the Thrift API). It’s based on Cassandra Java Driver provided by DataStax.

13.1. Configuring Options

Camel components are configured on two separate levels:

  • component level
  • endpoint level

13.1.1. Configuring Component Options

The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.

Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.

Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.

13.1.2. Configuring Endpoint Options

Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.

Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL as a type safe way of configuring endpoints.

A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.

The following two sections lists all the options, firstly for the component followed by the endpoint.

13.2. Component Options

The Cassandra CQL component supports 3 options, which are listed below.

NameDescriptionDefaultType

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

autowiredEnabled (advanced)

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

boolean

13.3. Endpoint Options

The Cassandra CQL endpoint is configured using URI syntax:

cql:beanRef:hosts:port/keyspace

with the following path and query parameters:

13.3.1. Path Parameters (4 parameters)

NameDescriptionDefaultType

beanRef (common)

beanRef is defined using bean:id.

 

String

hosts (common)

Hostname(s) Cassandra server(s). Multiple hosts can be separated by comma.

 

String

port (common)

Port number of Cassandra server(s).

 

Integer

keyspace (common)

Keyspace to use.

 

String

13.3.2. Query Parameters (30 parameters)

NameDescriptionDefaultType

clusterName (common)

Cluster name.

 

String

consistencyLevel (common)

Consistency level to use.

Enum values:

  • ANY
  • ONE
  • TWO
  • THREE
  • QUORUM
  • ALL
  • LOCAL_ONE
  • LOCAL_QUORUM
  • EACH_QUORUM
  • SERIAL
  • LOCAL_SERIAL
 

DefaultConsistencyLevel

cql (common)

CQL query to perform. Can be overridden with the message header with key CamelCqlQuery.

 

String

datacenter (common)

Datacenter to use.

datacenter1

String

loadBalancingPolicyClass (common)

To use a specific LoadBalancingPolicyClass.

 

String

password (common)

Password for session authentication.

 

String

prepareStatements (common)

Whether to use PreparedStatements or regular Statements.

true

boolean

resultSetConversionStrategy (common)

To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100…​

 

ResultSetConversionStrategy

session (common)

To use the Session instance (you would normally not use this option).

 

CqlSession

username (common)

Username for session authentication.

 

String

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

sendEmptyMessageWhenIdle (consumer)

If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.

false

boolean

exceptionHandler (consumer (advanced))

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

 

ExceptionHandler

exchangePattern (consumer (advanced))

Sets the exchange pattern when the consumer creates an exchange.

Enum values:

  • InOnly
  • InOut
  • InOptionalOut
 

ExchangePattern

pollStrategy (consumer (advanced))

A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.

 

PollingConsumerPollStrategy

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

backoffErrorThreshold (scheduler)

The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.

 

int

backoffIdleThreshold (scheduler)

The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.

 

int

backoffMultiplier (scheduler)

To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.

 

int

delay (scheduler)

Milliseconds before the next poll.

500

long

greedy (scheduler)

If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.

false

boolean

initialDelay (scheduler)

Milliseconds before the first poll starts.

1000

long

repeatCount (scheduler)

Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever.

0

long

runLoggingLevel (scheduler)

The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.

Enum values:

  • TRACE
  • DEBUG
  • INFO
  • WARN
  • ERROR
  • OFF

TRACE

LoggingLevel

scheduledExecutorService (scheduler)

Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.

 

ScheduledExecutorService

scheduler (scheduler)

To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler.

none

Object

schedulerProperties (scheduler)

To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler.

 

Map

startScheduler (scheduler)

Whether the scheduler should be auto started.

true

boolean

timeUnit (scheduler)

Time unit for initialDelay and delay options.

Enum values:

  • NANOSECONDS
  • MICROSECONDS
  • MILLISECONDS
  • SECONDS
  • MINUTES
  • HOURS
  • DAYS

MILLISECONDS

TimeUnit

useFixedDelay (scheduler)

Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.

true

boolean

13.4. Endpoint Connection Syntax

The endpoint can initiate the Cassandra connection or use an existing one.

URIDescription

cql:localhost/keyspace

Single host, default port, usual for testing

cql:host1,host2/keyspace

Multi host, default port

cql:host1,host2:9042/keyspace

Multi host, custom port

cql:host1,host2

Default port and keyspace

cql:bean:sessionRef

Provided Session reference

cql:bean:clusterRef/keyspace

Provided Cluster reference

To fine tune the Cassandra connection (SSL options, pooling options, load balancing policy, retry policy, reconnection policy…​), create your own Cluster instance and give it to the Camel endpoint.

13.5. Messages

13.5.1. Incoming Message

The Camel Cassandra endpoint expects a bunch of simple objects (Object or Object[] or Collection<Object>) which will be bound to the CQL statement as query parameters. If message body is null or empty, then CQL query will be executed without binding parameters.

Headers

  • CamelCqlQuery (optional, String or RegularStatement)

    CQL query either as a plain String or built using the QueryBuilder.

13.5.2. Outgoing Message

The Camel Cassandra endpoint produces one or many a Cassandra Row objects depending on the resultSetConversionStrategy:

  • List<Row> if resultSetConversionStrategy is ALL or LIMIT_[0-9]+
  • Single` Row` if resultSetConversionStrategy is ONE
  • Anything else, if resultSetConversionStrategy is a custom implementation of the ResultSetConversionStrategy

13.6. Repositories

Cassandra can be used to store message keys or messages for the idempotent and aggregation EIP.

Cassandra might not be the best tool for queuing use cases yet, read Cassandra anti-patterns queues and queue like datasets. It’s advised to use LeveledCompaction and a small GC grace setting for these tables to allow tombstoned rows to be removed quickly.

13.7. Idempotent repository

The NamedCassandraIdempotentRepository stores messages keys in a Cassandra table like this:

CAMEL_IDEMPOTENT.cql

CREATE TABLE CAMEL_IDEMPOTENT (
  NAME varchar,   -- Repository name
  KEY varchar,    -- Message key
  PRIMARY KEY (NAME, KEY)
) WITH compaction = {'class':'LeveledCompactionStrategy'}
  AND gc_grace_seconds = 86400;

This repository implementation uses lightweight transactions (also known as Compare and Set) and requires Cassandra 2.0.7+.

Alternatively, the CassandraIdempotentRepository does not have a NAME column and can be extended to use a different data model.

OptionDefaultDescription

table

CAMEL_IDEMPOTENT

Table name

pkColumns

NAME,` KEY`

Primary key columns

name

 

Repository name, value used for NAME column

ttl

 

Key time to live

writeConsistencyLevel

 

Consistency level used to insert/delete key: ANY, ONE, TWO, QUORUM, LOCAL_QUORUM

readConsistencyLevel

 

Consistency level used to read/check key: ONE, TWO, QUORUM, LOCAL_QUORUM

13.8. Aggregation repository

The NamedCassandraAggregationRepository stores exchanges by correlation key in a Cassandra table like this:

CAMEL_AGGREGATION.cql

CREATE TABLE CAMEL_AGGREGATION (
  NAME varchar,        -- Repository name
  KEY varchar,         -- Correlation id
  EXCHANGE_ID varchar, -- Exchange id
  EXCHANGE blob,       -- Serialized exchange
  PRIMARY KEY (NAME, KEY)
) WITH compaction = {'class':'LeveledCompactionStrategy'}
  AND gc_grace_seconds = 86400;

Alternatively, the CassandraAggregationRepository does not have a NAME column and can be extended to use a different data model.

OptionDefaultDescription

table

CAMEL_AGGREGATION

Table name

pkColumns

NAME,KEY

Primary key columns

exchangeIdColumn

EXCHANGE_ID

Exchange Id column

exchangeColumn

EXCHANGE

Exchange content column

name

 

Repository name, value used for NAME column

ttl

 

Exchange time to live

writeConsistencyLevel

 

Consistency level used to insert/delete exchange: ANY, ONE, TWO, QUORUM, LOCAL_QUORUM

readConsistencyLevel

 

Consistency level used to read/check exchange: ONE, TWO, QUORUM, LOCAL_QUORUM

13.9. Examples

To insert something on a table you can use the following code:

String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
from("direct:input")
    .to("cql://localhost/camel_ks?cql=" + CQL);

At this point you should be able to insert data by using a list as body

Arrays.asList("davsclaus", "Claus", "Ibsen")

The same approach can be used for updating or querying the table.

13.10. Spring Boot Auto-Configuration

When using cql with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-cassandraql-starter</artifactId>
  <version>{CamelSBProjectVersion}</version>
  <!-- Use your Camel Spring Boot version -->
</dependency>

The component supports 4 options, which are listed below.

NameDescriptionDefaultType

camel.component.cql.autowired-enabled

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

Boolean

camel.component.cql.bridge-error-handler

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

Boolean

camel.component.cql.enabled

Whether to enable auto configuration of the cql component. This is enabled by default.

 

Boolean

camel.component.cql.lazy-start-producer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

Boolean

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.