이 콘텐츠는 선택한 언어로 제공되지 않습니다.

Chapter 2. Kafka Bridge quickstart


Use this quickstart to try out the AMQ Streams Kafka Bridge in your local development environment.

You will learn how to do the following:

  • Produce messages to topics and partitions in your Kafka cluster
  • Create a Kafka Bridge consumer
  • Perform basic consumer operations, such as subscribing the consumer to topics and retrieving the messages that you produced

In this quickstart, HTTP requests are formatted as curl commands that you can copy and paste to your terminal.

Ensure you have the prerequisites and then follow the tasks in the order provided in this chapter.

About data formats

In this quickstart, you will produce and consume messages in JSON format, not binary.

Prerequisites for the quickstart

  • A Kafka cluster is running on the host machine.

2.1. Downloading a Kafka Bridge archive

A zipped distribution of the AMQ Streams Kafka Bridge is available for download.

Procedure

  • Download the latest version of the AMQ Streams Kafka Bridge archive from the Customer Portal.

2.2. Configuring Kafka Bridge properties

This procedure describes how to configure the Kafka and HTTP connection properties used by the AMQ Streams Kafka Bridge.

You configure the Kafka Bridge, as any other Kafka client, using appropriate prefixes for Kafka-related properties.

  • kafka. for general configuration that applies to producers and consumers, such as server connection and security.
  • kafka.consumer. for consumer-specific configuration passed only to the consumer.
  • kafka.producer. for producer-specific configuration passed only to the producer.

As well as enabling HTTP access to a Kafka cluster, HTTP properties provide the capability to enable and define access control for the Kafka Bridge through Cross-Origin Resource Sharing (CORS). CORS is a HTTP mechanism that allows browser access to selected resources from more than one origin. To configure CORS, you define a list of allowed resource origins and HTTP methods to access them. Additional HTTP headers in requests describe the CORS origins that are permitted access to the Kafka cluster.

Procedure

  1. Edit the application.properties file provided with the AMQ Streams Kafka Bridge installation archive.

    Use the properties file to specify Kafka and HTTP-related properties, and to enable distributed tracing.

    1. Configure standard Kafka-related properties, including properties specific to the Kafka consumers and producers.

      Use:

      • kafka.bootstrap.servers to define the host/port connections to the Kafka cluster
      • kafka.producer.acks to provide acknowledgments to the HTTP client
      • kafka.consumer.auto.offset.reset to determine how to manage reset of the offset in Kafka

        For more information on configuration of Kafka properties, see the Apache Kafka website

    2. Configure HTTP-related properties to enable HTTP access to the Kafka cluster.

      For example:

      bridge.id=my-bridge
      http.enabled=true
      http.host=0.0.0.0
      http.port=8080 1
      http.cors.enabled=true 2
      http.cors.allowedOrigins=https://strimzi.io 3
      http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
      1
      The default HTTP configuration for the Kafka Bridge to listen on port 8080.
      2
      Set to true to enable CORS.
      3
      Comma-separated list of allowed CORS origins. You can use a URL or a Java regular expression.
      4
      Comma-separated list of allowed HTTP methods for CORS.

2.3. Installing the Kafka Bridge

Follow this procedure to install the AMQ Streams Kafka Bridge.

Procedure

  1. If you have not already done so, unzip the Kafka Bridge installation archive to any directory.
  2. Run the Kafka Bridge script using the configuration properties as a parameter:

    For example:

    ./bin/kafka_bridge_run.sh --config-file=<path>/configfile.properties
  3. Check to see that the installation was successful in the log.

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092

2.4. Producing messages to topics and partitions

Use the Kafka Bridge to produce messages to a Kafka topic in JSON format by using the topics endpoint.

You can produce messages to topics in JSON format by using the topics endpoint. You can specify destination partitions for messages in the request body. The partitions endpoint provides an alternative method for specifying a single destination partition for all messages as a path parameter.

In this procedure, messages are produced to a topic called bridge-quickstart-topic.

Prerequisites

  • The Kafka cluster has a topic with three partitions.

    You can use the kafka-topics.sh utility to create topics.

    Example topic creation with three partitions

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1

    Verifying the topic was created

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic

Note

If you deployed AMQ Streams on OpenShift, you can create a topic using the KafkaTopic custom resource.

Procedure

  1. Using the Kafka Bridge, produce three messages to the topic you created:

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    • sales-lead-0001 is sent to a partition based on the hash of the key.
    • sales-lead-0002 is sent directly to partition 2.
    • sales-lead-0003 is sent to a partition in the bridge-quickstart-topic topic using a round-robin method.
  2. If the request is successful, the Kafka Bridge returns an offsets array, along with a 200 code and a content-type header of application/vnd.kafka.v2+json. For each message, the offsets array describes:

    • The partition that the message was sent to
    • The current message offset of the partition

      Example response

      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }

Additional topic requests

Make other curl requests to find information on topics and partitions.

List topics
curl -X GET \
  http://localhost:8080/topics

Example response

[
  "__strimzi_store_topic",
  "__strimzi-topic-operator-kstreams-topic-store-changelog",
  "bridge-quickstart-topic",
  "my-topic"
]

Get topic configuration and partition details
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic

Example response

{
  "name": "bridge-quickstart-topic",
  "configs": {
    "compression.type": "producer",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "message.downconversion.enable": "true",
    "segment.jitter.ms": "0",
    "cleanup.policy": "delete",
    "flush.ms": "9223372036854775807",
    "follower.replication.throttled.replicas": "",
    "segment.bytes": "1073741824",
    "retention.ms": "604800000",
    "flush.messages": "9223372036854775807",
    "message.format.version": "2.8-IV1",
    "max.compaction.lag.ms": "9223372036854775807",
    "file.delete.delay.ms": "60000",
    "max.message.bytes": "1048588",
    "min.compaction.lag.ms": "0",
    "message.timestamp.type": "CreateTime",
    "preallocate": "false",
    "index.interval.bytes": "4096",
    "min.cleanable.dirty.ratio": "0.5",
    "unclean.leader.election.enable": "false",
    "retention.bytes": "-1",
    "delete.retention.ms": "86400000",
    "segment.ms": "604800000",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.index.bytes": "10485760"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 1,
      "leader": 2,
      "replicas": [
        {
          "broker": 2,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 2,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        }
      ]
    }
  ]
}

List the partitions of a specific topic
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions

Example response

[
  {
    "partition": 0,
    "leader": 0,
    "replicas": [
      {
        "broker": 0,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 1,
    "leader": 2,
    "replicas": [
      {
        "broker": 2,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 2,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      }
    ]
  }
]

List the details of a specific topic partition
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0

Example response

{
  "partition": 0,
  "leader": 0,
  "replicas": [
    {
      "broker": 0,
      "leader": true,
      "in_sync": true
    },
    {
      "broker": 1,
      "leader": false,
      "in_sync": true
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true
    }
  ]
}

List the offsets of a specific topic partition
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets

Example response

{
  "beginning_offset": 0,
  "end_offset": 1
}

What to do next

After producing messages to topics and partitions, create a Kafka Bridge consumer.

2.5. Creating a Kafka Bridge consumer

Before you can perform any consumer operations in the Kafka cluster, you must first create a consumer by using the consumers endpoint. The consumer is referred to as a Kafka Bridge consumer.

Procedure

  1. Create a Kafka Bridge consumer in a new consumer group named bridge-quickstart-consumer-group:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    • The consumer is named bridge-quickstart-consumer and the embedded data format is set as json.
    • Some basic configuration settings are defined.
    • The consumer will not commit offsets to the log automatically because the enable.auto.commit setting is false. You will commit the offsets manually later in this quickstart.

      If the request is successful, the Kafka Bridge returns the consumer ID (instance_id) and base URL (base_uri) in the response body, along with a 200 code.

      Example response

      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }

  2. Copy the base URL (base_uri) to use in the other consumer operations in this quickstart.

What to do next

Now that you have created a Kafka Bridge consumer, you can subscribe it to topics.

Additional resources

2.6. Subscribing a Kafka Bridge consumer to topics

After you have created a Kafka Bridge consumer, subscribe it to one or more topics by using the subscription endpoint. When subscribed, the consumer starts receiving all messages that are produced to the topic.

Procedure

  • Subscribe the consumer to the bridge-quickstart-topic topic that you created earlier, in Producing messages to topics and partitions:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'

    The topics array can contain a single topic (as shown here) or multiple topics. If you want to subscribe the consumer to multiple topics that match a regular expression, you can use the topic_pattern string instead of the topics array.

    If the request is successful, the Kafka Bridge returns a 204 (No Content) code only.

What to do next

After subscribing a Kafka Bridge consumer to topics, you can retrieve messages from the consumer.

2.7. Retrieving the latest messages from a Kafka Bridge consumer

Retrieve the latest messages from the Kafka Bridge consumer by requesting data from the records endpoint. In production, HTTP clients can call this endpoint repeatedly (in a loop).

Procedure

  1. Produce additional messages to the Kafka Bridge consumer, as described in Producing messages to topics and partitions.
  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    After creating and subscribing to a Kafka Bridge consumer, a first GET request will return an empty response because the poll operation starts a rebalancing process to assign partitions.

  3. Repeat step two to retrieve messages from the Kafka Bridge consumer.

    The Kafka Bridge returns an array of messages — describing the topic name, key, value, partition, and offset — in the response body, along with a 200 code. Messages are retrieved from the latest offset by default.

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    Note

    If an empty response is returned, produce more records to the consumer as described in Producing messages to topics and partitions, and then try retrieving messages again.

What to do next

After retrieving messages from a Kafka Bridge consumer, try committing offsets to the log.

2.8. Commiting offsets to the log

Use the offsets endpoint to manually commit offsets to the log for all messages received by the Kafka Bridge consumer. This is required because the Kafka Bridge consumer that you created earlier, in Creating a Kafka Bridge consumer, was configured with the enable.auto.commit setting as false.

Procedure

  • Commit offsets to the log for the bridge-quickstart-consumer:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets

    Because no request body is submitted, offsets are committed for all the records that have been received by the consumer. Alternatively, the request body can contain an array (OffsetCommitSeekList) that specifies the topics and partitions that you want to commit offsets for.

    If the request is successful, the Kafka Bridge returns a 204 code only.

What to do next

After committing offsets to the log, try out the endpoints for seeking to offsets.

2.9. Seeking to offsets for a partition

Use the positions endpoints to configure the Kafka Bridge consumer to retrieve messages for a partition from a specific offset, and then from the latest offset. This is referred to in Apache Kafka as a seek operation.

Procedure

  1. Seek to a specific offset for partition 0 of the quickstart-bridge-topic topic:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns a 204 code only.

  2. Submit a GET request to the records endpoint:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    The Kafka Bridge returns messages from the offset that you seeked to.

  3. Restore the default message retrieval behavior by seeking to the last offset for the same partition. This time, use the positions/end endpoint.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'

    If the request is successful, the Kafka Bridge returns another 204 code.

Note

You can also use the positions/beginning endpoint to seek to the first offset for one or more partitions.

What to do next

In this quickstart, you have used the AMQ Streams Kafka Bridge to perform several common operations on a Kafka cluster. You can now delete the Kafka Bridge consumer that you created earlier.

2.10. Deleting a Kafka Bridge consumer

Delete the Kafka Bridge consumer that you used throughout this quickstart.

Procedure

  • Delete the Kafka Bridge consumer by sending a DELETE request to the instances endpoint.

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

    If the request is successful, the Kafka Bridge returns a 204 code.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.