Dieser Inhalt ist in der von Ihnen ausgewählten Sprache nicht verfügbar.

Chapter 385. ZooKeeper Component


Available as of Camel version 2.9

The ZooKeeper component allows interaction with a ZooKeeper cluster and exposes the following features to Camel:

  1. Creation of nodes in any of the ZooKeeper create modes.
  2. Get and Set the data contents of arbitrary cluster nodes (data being set must be convertible to byte[]).
  3. Create and retrieve the list the child nodes attached to a particular node.
  4. A Distributed RoutePolicy that leverages a Leader election coordinated by ZooKeeper to determine if exchanges should get processed.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-zookeeper</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

385.1. URI format

zookeeper://zookeeper-server[:port][/path][?options]

The path from the URI specifies the node in the ZooKeeper server (a.k.a. znode) that will be the target of the endpoint:

385.2. Options

The ZooKeeper component supports 2 options, which are listed below.

NameDescriptionDefaultType

configuration (advanced)

To use a shared ZooKeeperConfiguration

 

ZooKeeperConfiguration

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The ZooKeeper endpoint is configured using URI syntax:

zookeeper:serverUrls/path

with the following path and query parameters:

385.2.1. Path Parameters (2 parameters):

NameDescriptionDefaultType

serverUrls

Required The zookeeper server hosts (multiple servers can be separated by comma)

 

String

path

Required The node in the ZooKeeper server (aka znode)

 

String

385.2.2. Query Parameters (12 parameters):

NameDescriptionDefaultType

awaitExistence (common)

Deprecated Not in use

true

boolean

listChildren (common)

Whether the children of the node should be listed

false

boolean

timeout (common)

The time interval to wait on connection before timing out.

5000

int

backoff (consumer)

The time interval to backoff for after an error before retrying.

5000

long

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

repeat (consumer)

Should changes to the znode be 'watched' and repeatedly processed.

false

boolean

sendEmptyMessageOnDelete (consumer)

Upon the delete of a znode, should an empty message be send to the consumer

true

boolean

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

 

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange.

 

ExchangePattern

create (producer)

Should the endpoint create the node if it does not currently exist.

false

boolean

createMode (producer)

The create mode that should be used for the newly created node

EPHEMERAL

String

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

385.3. Spring Boot Auto-Configuration

The component supports 57 options, which are listed below.

NameDescriptionDefaultType

camel.component.zookeeper.cluster.service.attributes

Custom service attributes.

 

Map

camel.component.zookeeper.cluster.service.auth-info-list

  

List

camel.component.zookeeper.cluster.service.base-path

  

String

camel.component.zookeeper.cluster.service.connection-timeout

  

Long

camel.component.zookeeper.cluster.service.connection-timeout-unit

  

TimeUnit

camel.component.zookeeper.cluster.service.curator-framework

  

CuratorFramework

camel.component.zookeeper.cluster.service.enabled

Sets if the zookeeper cluster service should be enabled or not, default is false.

false

Boolean

camel.component.zookeeper.cluster.service.id

Cluster Service ID

 

String

camel.component.zookeeper.cluster.service.max-close-wait

  

Long

camel.component.zookeeper.cluster.service.max-close-wait-unit

  

TimeUnit

camel.component.zookeeper.cluster.service.namespace

  

String

camel.component.zookeeper.cluster.service.nodes

  

List

camel.component.zookeeper.cluster.service.order

Service lookup order/priority.

 

Integer

camel.component.zookeeper.cluster.service.reconnect-base-sleep-time

  

Long

camel.component.zookeeper.cluster.service.reconnect-base-sleep-time-unit

  

TimeUnit

camel.component.zookeeper.cluster.service.reconnect-max-retries

  

Integer

camel.component.zookeeper.cluster.service.reconnect-max-sleep-time

  

Long

camel.component.zookeeper.cluster.service.reconnect-max-sleep-time-unit

  

TimeUnit

camel.component.zookeeper.cluster.service.retry-policy

  

RetryPolicy

camel.component.zookeeper.cluster.service.session-timeout

  

Long

camel.component.zookeeper.cluster.service.session-timeout-unit

  

TimeUnit

camel.component.zookeeper.configuration.backoff

The time interval to backoff for after an error before retrying.

5000

Long

camel.component.zookeeper.configuration.create

Should the endpoint create the node if it does not currently exist.

false

Boolean

camel.component.zookeeper.configuration.create-mode

The create mode that should be used for the newly created node

EPHEMERAL

String

camel.component.zookeeper.configuration.list-children

Whether the children of the node should be listed

false

Boolean

camel.component.zookeeper.configuration.path

The node in the ZooKeeper server (aka znode)

 

String

camel.component.zookeeper.configuration.repeat

Should changes to the znode be 'watched' and repeatedly processed.

false

Boolean

camel.component.zookeeper.configuration.send-empty-message-on-delete

Upon the delete of a znode, should an empty message be send to the consumer

true

Boolean

camel.component.zookeeper.configuration.servers

The zookeeper server hosts

 

List

camel.component.zookeeper.configuration.timeout

The time interval to wait on connection before timing out.

5000

Integer

camel.component.zookeeper.enabled

Enable zookeeper component

true

Boolean

camel.component.zookeeper.resolve-property-placeholders

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

Boolean

camel.component.zookeeper.service-registry.attributes

Custom service attributes.

 

Map

camel.component.zookeeper.service-registry.auth-info-list

  

List

camel.component.zookeeper.service-registry.base-path

  

String

camel.component.zookeeper.service-registry.connection-timeout

  

Long

camel.component.zookeeper.service-registry.connection-timeout-unit

  

TimeUnit

camel.component.zookeeper.service-registry.curator-framework

  

CuratorFramework

camel.component.zookeeper.service-registry.deregister-services-on-stop

  

Boolean

camel.component.zookeeper.service-registry.enabled

Sets if the zookeeper service registry should be enabled or not, default is false.

false

Boolean

camel.component.zookeeper.service-registry.id

Service Registry ID

 

String

camel.component.zookeeper.service-registry.max-close-wait

  

Long

camel.component.zookeeper.service-registry.max-close-wait-unit

  

TimeUnit

camel.component.zookeeper.service-registry.namespace

  

String

camel.component.zookeeper.service-registry.nodes

  

List

camel.component.zookeeper.service-registry.order

Service lookup order/priority.

 

Integer

camel.component.zookeeper.service-registry.override-service-host

  

Boolean

camel.component.zookeeper.service-registry.reconnect-base-sleep-time

  

Long

camel.component.zookeeper.service-registry.reconnect-base-sleep-time-unit

  

TimeUnit

camel.component.zookeeper.service-registry.reconnect-max-retries

  

Integer

camel.component.zookeeper.service-registry.reconnect-max-sleep-time

  

Long

camel.component.zookeeper.service-registry.reconnect-max-sleep-time-unit

  

TimeUnit

camel.component.zookeeper.service-registry.retry-policy

  

RetryPolicy

camel.component.zookeeper.service-registry.service-host

  

String

camel.component.zookeeper.service-registry.session-timeout

  

Long

camel.component.zookeeper.service-registry.session-timeout-unit

  

TimeUnit

camel.component.zookeeper.configuration.await-existence

Not in use

true

Boolean

385.4. Use cases

385.4.1. Reading from a znode

The following snippet will read the data from the znode/somepath/somenode/ provided that it already exists. The data retrieved will be placed into an exchange and passed onto the rest of the route:

from("zookeeper://localhost:39913/somepath/somenode").to("mock:result");

If the node does not yet exist then a flag can be supplied to have the endpoint await its creation:

from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").to("mock:result");

385.4.2. Reading from a znode (additional Camel 2.10 onwards)

When data is read due to a WatchedEvent received from the ZooKeeper ensemble, the CamelZookeeperEventType header holds ZooKeeper’s EventType value from that WatchedEvent. If the data is read initially (not triggered by a WatchedEvent) the CamelZookeeperEventType header will not be set.

385.4.3. Writing to a znode

The following snippet will write the payload of the exchange into the znode at /somepath/somenode/ provided that it already exists:

from("direct:write-to-znode")
    .to("zookeeper://localhost:39913/somepath/somenode");

For flexibility, the endpoint allows the target znode to be specified dynamically as a message header. If a header keyed by the string CamelZooKeeperNode is present then the value of the header will be used as the path to the znode on the server. For instance using the same route definition above, the following code snippet will write the data not to /somepath/somenode but to the path from the header /somepath/someothernode.

Warning

the testPayload must be convertible to byte[] as the data stored in ZooKeeper is byte based.

Object testPayload = ...
template.sendBodyAndHeader("direct:write-to-znode", testPayload, "CamelZooKeeperNode", "/somepath/someothernode");

To also create the node if it does not exist the create option should be used.

from("direct:create-and-write-to-znode")
    .to("zookeeper://localhost:39913/somepath/somenode?create=true");

Starting version 2.11 it is also possible to delete a node using the header CamelZookeeperOperation by setting it to DELETE:

from("direct:delete-znode")
    .setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE"))
    .to("zookeeper://localhost:39913/somepath/somenode");

or equivalently:

<route>
  <from uri="direct:delete-znode" />
  <setHeader headerName="CamelZookeeperOperation">
     <constant>DELETE</constant>
  </setHeader>
  <to uri="zookeeper://localhost:39913/somepath/somenode" />
</route>

ZooKeeper nodes can have different types; they can be 'Ephemeral' or 'Persistent' and 'Sequenced' or 'Unsequenced'. For further information of each type you can check here. By default endpoints will create unsequenced, ephemeral nodes, but the type can be easily manipulated via a uri config parameter or via a special message header. The values expected for the create mode are simply the names from the CreateMode enumeration:

  • PERSISTENT
  • PERSISTENT_SEQUENTIAL
  • EPHEMERAL
  • EPHEMERAL_SEQUENTIAL

For example to create a persistent znode via the URI config:

from("direct:create-and-write-to-persistent-znode")
    .to("zookeeper://localhost:39913/somepath/somenode?create=true&createMode=PERSISTENT");

or using the header CamelZookeeperCreateMode.

Warning

the testPayload must be convertible to byte[] as the data stored in ZooKeeper is byte based.

Object testPayload = ...
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", testPayload, "CamelZooKeeperCreateMode", "PERSISTENT");

385.5. ZooKeeper enabled Route policies

ZooKeeper allows for very simple and effective leader election out of the box. This component exploits this election capability in a RoutePolicy to control when and how routes are enabled. This policy would typically be used in fail-over scenarios, to control identical instances of a route across a cluster of Camel based servers. A very common scenario is a simple 'Master-Slave' setup where there are multiple instances of a route distributed across a cluster but only one of them, that of the master, should be running at a time. If the master fails, a new master should be elected from the available slaves and the route in this new master should be started.

The policy uses a common znode path across all instances of the RoutePolicy that will be involved in the election. Each policy writes its id into this node and Zookeeper will order the writes in the order it received them. The policy then reads the listing of the node to see what position of its id; this position is used to determine if the route should be started or not. The policy is configured at startup with the number of route instances that should be started across the cluster and if its position in the list is less than this value then its route will be started. For a Master-slave scenario, the route is configured with 1 route instance and only the first entry in the listing will start its route. All policies watch for updates to the listing and if the listing changes they recalculate if their route should be started. For more info on Zookeeper’s leader election capability see this page.

The following example uses the node /someapplication/somepolicy for the election and is set up to start only the top '1' entries in the node listing i.e. elect a master:

ZooKeeperRoutePolicy policy = new ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
from("direct:policy-controlled")
    .routePolicy(policy)
    .to("mock:controlled");

There are currently 3 policies defined in the component, with different SLAs:

  • ZooKeeperRoutePolicy
  • CuratorLeaderRoutePolicy (since 2.19)
  • MultiMasterCuratorLeaderRoutePolicy (since 2.19)

ZooKeeperRoutePolicy supports multiple active nodes, but it’s activation kicks in only after a Camel component and its correspondent Consumer have already been started, this introduces, depending on your routes definition, the risk that you component can already start consuming events and producing `Exchange`s, before the policy could estabilish that the node should not be activated.

CuratorLeaderRoutePolicy supports only a single active node, but it’s bound to a different CamelContext lifecycle method; this Policy kicks in before any route or consumer is started thus you can be sure that no even is processed before the Policy takes its decision.

MultiMasterCuratorLeaderRoutePolicy support multiple active nodes, and it’s bound to the same lifecycle method as CuratorLeaderRoutePolicy; this Policy kicks in before any route or consumer is started thus you can be sure that no even is processed before the Policy takes its decision.

385.6. See Also

  • Configuring Camel
  • Component
  • Endpoint
  • Getting Started
Red Hat logoGithubRedditYoutubeTwitter

Lernen

Testen, kaufen und verkaufen

Communitys

Über Red Hat Dokumentation

Wir helfen Red Hat Benutzern, mit unseren Produkten und Diensten innovativ zu sein und ihre Ziele zu erreichen – mit Inhalten, denen sie vertrauen können.

Mehr Inklusion in Open Source

Red Hat hat sich verpflichtet, problematische Sprache in unserem Code, unserer Dokumentation und unseren Web-Eigenschaften zu ersetzen. Weitere Einzelheiten finden Sie in Red Hat Blog.

Über Red Hat

Wir liefern gehärtete Lösungen, die es Unternehmen leichter machen, plattform- und umgebungsübergreifend zu arbeiten, vom zentralen Rechenzentrum bis zum Netzwerkrand.

© 2024 Red Hat, Inc.