Chapter 11. Kafka Bridge


This chapter provides an overview of the AMQ Streams Kafka Bridge on Red Hat Enterprise Linux and helps you get started using its REST API to interact with AMQ Streams. To try out the Kafka Bridge in your local environment, see the Section 11.2, “Kafka Bridge quickstart” later in this chapter.

Additional resources

11.1. Kafka Bridge overview

The AMQ Streams Kafka Bridge provides an API for integrating HTTP-based clients with a Kafka cluster running on Red Hat Enterprise Linux. The API enables such clients to produce and consume messages without the requirement to use the native Kafka protocol.

The API has two main resources — consumers and topics — that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.

You can:

  • Send messages to a topic.
  • Create and delete consumers.
  • Subscribe consumers to topics, so that they start receiving messages from those topics.
  • Retrieve a list of the topics to which a consumer is subscribed.
  • Unsubscribe consumers from topics.
  • Assign partitions to consumers.
  • Retrieve messages from topics.
  • Commit a list of consumer offsets.
  • Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.

Similar to an AMQ Streams installation, you can download the Kafka Bridge files for installation on Red Hat Enterprise Linux. See Section 11.1.4, “Downloading a Kafka Bridge archive”.

For more information on configuring the host and port for the KafkaBridge resource, see Section 11.1.5, “Configuring Kafka Bridge properties”.

11.1.1. Requests to the Kafka Bridge

11.1.1.1. Authentication and encryption

Authentication and encryption between HTTP clients and the Kafka Bridge is not yet supported. This means that requests sent from clients to the Kafka Bridge are:

  • Not encrypted, and must use HTTP rather than HTTPS
  • Sent without authentication

You can configure TLS or SASL-based authentication between the Kafka Bridge and your Kafka cluster.

You configure the Kafka Bridge for authentication through its properties file.

11.1.1.2. Data formats and headers

Specify data formats and HTTP headers to ensure valid requests are submitted to the Kafka Bridge.

API request and response bodies are always encoded as JSON.

11.1.1.2.1. Content Type headers

A Content-Type header must be submitted for all requests (apart from the exception described below).

Consumer operations (/consumers endpoints) and producer operations (/topics endpoints) require different Content-Type headers.

Content-Type headers for consumer operations

Regardless of the embedded data format, POST requests for consumer operations must provide the following Content-Type header if the request body contains data:

Content-Type: application/vnd.kafka.v2+json

Do not send a Content-Type header if the POST request body is empty, or the request will fail.

Content-Type headers for producer operations

When performing producer operations, POST requests must provide the following Content-Type header specifying the desired embedded data format, either json or binary, as shown in the following table.

Embedded data formatContent-Type header

JSON

Content-Type: application/vnd.kafka.json.v2+json

Binary

Content-Type: application/vnd.kafka.binary.v2+json

You set the embedded data format when creating a consumer using the consumers/groupid endpoint—​for more information, see the next section.

The Content-Type must not be set if the POST request has an empty body. An empty body can be used to create a consumer with the default values.

11.1.1.2.2. Embedded data format

The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON or binary.

When creating a consumer using the /consumers/groupid endpoint, the POST request body must specify an embedded data format of either JSON or binary. This is specified in the format field in the request body, for example:

{
  "name": "my-consumer",
  "format": "binary", 1
...
}
1
A binary embedded data format.

If an embedded data format for the consumer is not specified, then a binary format is set.

The embedded data format specified when creating a consumer must match the data format of the Kafka messages it will consume.

If you choose to specify a binary embedded data format, subsequent producer requests must provide the binary data in the request body as Base64-encoded strings. For example, when sending messages by making POST requests to the /topics/topicname endpoint, the value must be encoded in Base64:

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}

Producer requests must also provide a Content-Type header that corresponds to the embedded data format, for example, Content-Type: application/vnd.kafka.binary.v2+json.

11.1.1.2.3. Accept headers

After creating a consumer, all subsequent GET requests must provide an Accept header in the following format:

Accept: application/vnd.kafka.embedded-data-format.v2+json

The embedded-data-format is either json or binary.

For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header:

Accept: application/vnd.kafka.json.v2+json

11.1.2. Configuring loggers for the Kafka Bridge

The AMQ Streams Kafka bridge allows you to set a different log level for each operation that is defined by the related OpenAPI specification.

Each operation has a corresponding API endpoint through which the bridge receives requests from HTTP clients. You can change the log level on each endpoint to produce more or less fine-grained logging information about the incoming and outgoing HTTP requests.

Loggers are defined in the log4j.properties file, which has the following default configuration for healthy and ready endpoints:

log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false

The log level of all other operations is set to INFO by default. Loggers are formatted as follows:

log4j.logger.http.openapi.operation.<operation-id>

Where <operation-id> is the identifier of the specific operation. Following is the list of operations defined by the OpenAPI specification:

  • createConsumer
  • deleteConsumer
  • subscribe
  • unsubscribe
  • poll
  • assign
  • commit
  • send
  • sendToPartition
  • seekToBeginning
  • seekToEnd
  • seek
  • healthy
  • ready
  • openapi

11.1.3. Kafka Bridge API resources

For the full list of REST API endpoints and descriptions, including example requests and responses, see the Kafka Bridge API reference.

11.1.4. Downloading a Kafka Bridge archive

A zipped distribution of the AMQ Streams Kafka Bridge is available for download from the Red Hat website.

Procedure

  • Download the latest version of the Red Hat AMQ Streams Kafka Bridge archive from the Customer Portal.

11.1.5. Configuring Kafka Bridge properties

This procedure describes how to configure the properties used by the AMQ Streams Kafka Bridge.

You configure the Kafka Bridge, as any other Kafka client, using appropriate prefixes for Kafka-related properties.

  • kafka. for general configuration that applies to producers and consumers, such as server connection and security.
  • kafka.consumer. for consumer-specific configuration passed only to the consumer.
  • kafka.producer. for producer-specific configuration passed only to the producer.

Procedure

  1. Edit the application.properties file provided with the AMQ Streams Kafka Bridge installation archive.

    Use the properties file to specify Kafka and HTTP-related properties.

    1. Configure standard Kafka-related properties, including properties specific to the Kafka consumers and producers.

      Use:

      • kafka.bootstrap.servers to define the host/port connections to the Kafka cluster
      • kafka.producer.acks to provide acknowledgments to the HTTP client
      • kafka.consumer.auto.offset.reset to determine how to manage reset of the offset in Kafka

        For more information on configuration of Kafka properties, see the Apache Kafka website

    2. Configure HTTP-related properties to enable HTTP access to the Kafka cluster.

      http.enabled=true
      http.host=0.0.0.0
      http.port=8080

11.1.6. Installing the Kafka Bridge

Follow this procedure to install the AMQ Streams Kafka Bridge on Red Hat Enterprise Linux.

Procedure

  1. If you have not already done so, unzip the AMQ Streams Kafka Bridge installation archive to any directory.
  2. Run the Kafka Bridge script using the configuration properties as a parameter:

    For example:

    ./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
  3. Check to see that the installation was successful in the log.

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092

11.2. Kafka Bridge quickstart

Use this quickstart to try out the AMQ Streams Kafka Bridge on Red Hat Enterprise Linux. You will learn how to:

  • Install the Kafka Bridge
  • Produce messages to topics and partitions in your Kafka cluster
  • Create a Kafka Bridge consumer
  • Perform basic consumer operations, such as subscribing the consumer to topics and retrieving the messages that you produced

In this quickstart, HTTP requests are formatted as curl commands that you can copy and paste to your terminal.

Ensure you have the prerequisites and then follow the tasks in the order provided in this chapter.

About data formats

In this quickstart, you will produce and consume messages in JSON format, not binary. For more information on the data formats and HTTP headers used in the example requests, see Section 11.1.1, “Requests to the Kafka Bridge”.

11.2.1. Deploying the Kafka Bridge locally

Deploy an instance of the AMQ Streams Kafka Bridge to the host. Use the application.properties file provided with the installation archive to apply the default configuration settings.

Procedure

  1. Open the application.properties file and check that the default HTTP related settings are defined:

    http.enabled=true
    http.host=0.0.0.0
    http.port=8080

    This configures the Kafka Bridge to listen for requests on port 8080.

  2. Run the Kafka Bridge script using the configuration properties as a parameter:

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties

11.2.2. Producing messages to topics and partitions

Produce messages to a topic in JSON format by using the topics endpoint.

You can specify destination partitions for messages in the request body, as shown below. The partitions endpoint provides an alternative method for specifying a single destination partition for all messages as a path parameter.

Procedure

  1. Create a Kafka topic using the kafka-topics.sh utility:

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824

    Specify three partitions.

  2. Verify that the topic was created:

    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic bridge-quickstart-topic
  3. Using the Kafka Bridge, produce three messages to the topic you created:

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    • sales-lead-0001 is sent to a partition based on the hash of the key.
    • sales-lead-0002 is sent directly to partition 2.
    • sales-lead-0003 is sent to a partition in the bridge-quickstart-topic topic using a round-robin method.
  4. If the request is successful, the Kafka Bridge returns an offsets array, along with a 200 (OK) code and a content-type header of application/vnd.kafka.v2+json. For each message, the offsets array describes:

    • The partition that the message was sent to
    • The current message offset of the partition

      Example response

      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }

What to do next

After producing messages to topics and partitions, create a Kafka Bridge consumer.

Additional resources

11.2.3. Creating a Kafka Bridge consumer

Before you can perform any consumer operations on the Kafka cluster, you must first create a consumer by using the consumers endpoint. The consumer is referred to as a Kafka Bridge consumer.

Procedure

  1. Create a Kafka Bridge consumer in a new consumer group named bridge-quickstart-consumer-group:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    • The consumer is named bridge-quickstart-consumer and the embedded data format is set as json.
    • The consumer will not commit offsets to the log automatically because the enable.auto.commit setting is false. You will commit the offsets manually later in this quickstart.

      Note

      The Kafka Bridge generates a random consumer name if you do not specify a consumer name in the request body.

      If the request is successful, the Kafka Bridge returns the consumer ID (instance_id) and base URL (base_uri) in the response body, along with a 200 (OK) code.

      Example response

      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }

  2. Copy the base URL (base_uri) to use in the other consumer operations in this quickstart.

What to do next

Now that you have created a Kafka Bridge consumer, you can subscribe it to topics.

Additional resources

11.2.4. Subscribing a Kafka Bridge consumer to topics

Subscribe the Kafka Bridge consumer to one or more topics by using the subscription endpoint. Once subscribed, the consumer starts receiving all messages that are produced to the topic.

Procedure

  • Subscribe the consumer to the bridge-quickstart-topic topic that you created earlier, in Producing messages to topics and partitions:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'

    The topics array can contain a single topic (as shown above) or multiple topics. If you want to subscribe the consumer to multiple topics that match a regular expression, you can use the topic_pattern string instead of the topics array.

    If the request is successful, the Kafka Bridge returns a 204 No Content code only.

What to do next

After subscribing a Kafka Bridge consumer to topics, you can retrieve messages from the consumer.

Additional resources

11.2.5. Retrieving the latest messages from a Kafka Bridge consumer

Retrieve the latest messages from the Kafka Bridge consumer by requesting data from the records endpoint. In production, HTTP clients can call this endpoint repeatedly (in a loop).

Procedure

  1. Produce additional messages to the Kafka Bridge consumer, as described in Producing messages to topics and partitions.
  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    After creating and subscribing to a Kafka Bridge consumer, a first GET request will return an empty response because the poll operation triggers a rebalancing process to assign partitions.

  3. Repeat step two to retrieve messages from the Kafka Bridge consumer.

    The Kafka Bridge returns an array of messages — describing the topic name, key, value, partition, and offset — in the response body, along with a 200 (OK) code. Messages are retrieved from the latest offset by default.

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    Note

    If an empty response is returned, produce more records to the consumer as described in Producing messages to topics and partitions, and then try retrieving messages again.

What to do next

After retrieving messages from a Kafka Bridge consumer, try committing offsets to the log.

Additional resources

11.2.6. Commiting offsets to the log

Use the offsets endpoint to manually commit offsets to the log for all messages received by the Kafka Bridge consumer. This is required because the Kafka Bridge consumer that you created earlier, in Creating a Kafka Bridge consumer, was configured with the enable.auto.commit setting as false.

Procedure

  • Commit offsets to the log for the bridge-quickstart-consumer:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets

    Because no request body is submitted, offsets are committed for all the records that have been received by the consumer. Alternatively, the request body can contain an array (OffsetCommitSeekList) that specifies the topics and partitions that you want to commit offsets for.

    If the request is successful, the Kafka Bridge returns a 204 No Content code only.

What to do next

After committing offsets to the log, try out the endpoints for seeking to offsets.

Additional resources

11.2.7. Seeking to offsets for a partition

Use the positions endpoints to configure the Kafka Bridge consumer to retrieve messages for a partition from a specific offset, and then from the latest offset. This is referred to in Apache Kafka as a seek operation.

Procedure

  1. Seek to a specific offset for partition 0 of the quickstart-bridge-topic topic:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns a 204 No Content code only.

  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    The Kafka Bridge returns messages from the offset that you seeked to.

  3. Restore the default message retrieval behavior by seeking to the last offset for the same partition. This time, use the positions/end endpoint.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns another 204 No Content code.

Note

You can also use the positions/beginning endpoint to seek to the first offset for one or more partitions.

What to do next

In this quickstart, you have used the AMQ Streams Kafka Bridge to perform several common operations on a Kafka cluster. You can now delete the Kafka Bridge consumer that you created earlier.

Additional resources

11.2.8. Deleting a Kafka Bridge consumer

Finally, delete the Kafa Bridge consumer that you used throughout this quickstart.

Procedure

  • Delete the Kafka Bridge consumer by sending a DELETE request to the instances endpoint.

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

    If the request is successful, the Kafka Bridge returns a 204 No Content code only.

Additional resources

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.