此内容没有您所选择的语言版本。
Chapter 7. Kafka Bridge
This chapter provides an overview of the AMQ Streams Kafka Bridge and helps you get started using its REST API to interact with AMQ Streams. To try out the Kafka Bridge in your local environment, see the Section 7.2, “Kafka Bridge quickstart” later in this chapter.
Additional resources
- For information on how to deploy the Kafka Bridge, see Section 2.7, “Kafka Bridge” in the Getting started with AMQ Streams chapter.
- For detailed information on configuring the Kafka Bridge, see Section 3.5, “Kafka Bridge configuration” in the Deployment configuration chapter
- To view the API documentation, including example requests and responses, see the Kafka Bridge API reference on the Strimzi website.
7.1. Kafka Bridge overview
The AMQ Streams Kafka Bridge provides an API for integrating HTTP-based clients with a Kafka cluster running on OpenShift. The API enables such clients to produce and consume messages without the requirement to use the native Kafka protocol.
The API has two main resources — consumers
and topics
— that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.
You can:
- Send messages to a topic.
- Create and delete consumers.
- Subscribe consumers to topics, so that they start receiving messages from those topics.
- Unsubscribe consumers from topics.
- Assign partitions to consumers.
- Retrieve messages from topics.
- Commit a list of consumer offsets.
- Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.
You deploy the Kafka Bridge into your OpenShift cluster by using the Cluster Operator. For deployment instructions, see Section 2.7.1, “Deploying Kafka Bridge to your OpenShift cluster”.
After the Kafka Bridge is deployed, the Cluster Operator creates a Deployment, Service, and Pod in your OpenShift cluster, each named strimzi-kafka-bridge
by default.
7.1.1. Supported clients for the Kafka Bridge
You can use the Kafka Bridge to integrate both internal and external HTTP client applications with your Kafka cluster.
- Internal clients are container-based HTTP clients running in the same OpenShift cluster as the Kafka Bridge itself.
- External clients are HTTP clients running outside the OpenShift cluster in which the Kafka Bridge is deployed and running.
HTTP internal and external client integration
Internal clients can access the Kafka Bridge on the host and port defined in the KafkaBridge
custom resource.
External clients can access the Kafka Bridge through an OpenShift Route, a loadbalancer service, or using an Ingress.
Additional resources
-
For more information on configuring the host and port for the
KafkaBridge
resource, see Section 3.5.5.3, “Kafka Bridge HTTP configuration”. - For more information on integrating external clients, see Section 7.1.3, “Accessing the Kafka Bridge outside of OpenShift”.
7.1.2. Securing the Kafka Bridge
AMQ Streams does not currently provide any encryption, authentication, or authorization for the Kafka Bridge. This means that requests sent from external clients to the Kafka Bridge are:
- Not encrypted, and must use HTTP rather than HTTPS
- Sent without authentication
However, you can secure the Kafka Bridge using other methods, such as:
- OpenShift Network Policies that define which pods can access the Kafka Bridge.
- Reverse proxies with authentication or authorization, for example, OAuth2 proxies.
- API Gateways.
- Ingress or OpenShift Routes with TLS termination.
The Kafka Bridge supports TLS encryption and TLS and SASL authentication when connecting to the Kafka Brokers. Within your OpenShift cluster, you can configure:
- TLS or SASL-based authentication between the Kafka Bridge and your Kafka cluster
- A TLS-encrypted connection between the Kafka Bridge and your Kafka cluster.
For more information, see Section 3.5.4.1, “Authentication support in Kafka Bridge”.
You can use ACLs in Kafka brokers to restrict the topics that can be consumed and produced using the Kafka Bridge.
7.1.3. Accessing the Kafka Bridge outside of OpenShift
After deployment, the AMQ Streams Kafka Bridge can only be accessed by applications running in the same OpenShift cluster. These applications use the kafka-bridge-name-bridge-service
Service to access the API.
If you want to make the Kafka Bridge accessible to applications running outside of the OpenShift cluster, you can expose it manually by using one of the following features:
- Services of types LoadBalancer or NodePort
- Ingress resources
- OpenShift Routes
If you decide to create Services, use the following labels in the selector
to configure the pods to which the service will route the traffic:
# ...
selector:
strimzi.io/cluster: kafka-bridge-name 1
strimzi.io/kind: KafkaBridge
#...
- 1
- Name of the Kafka Bridge custom resource in your OpenShift cluster.
7.1.4. Requests to the Kafka Bridge
Specify data formats and HTTP headers to ensure valid requests are submitted to the Kafka Bridge.
7.1.4.1. Content Type headers
API request and response bodies are always encoded as JSON.
When performing consumer operations,
POST
requests must provide the followingContent-Type
header if there is a non-empty body:Content-Type: application/vnd.kafka.v2+json
When performing producer operations,
POST
requests must provideContent-Type
headers specifying the desired embedded data format, eitherjson
orbinary
, as shown in the following table.Embedded data format Content-Type header JSON
Content-Type: application/vnd.kafka.json.v2+json
Binary
Content-Type: application/vnd.kafka.binary.v2+json
You set the embedded data format when creating a consumer using the consumers/groupid
endpoint—for more information, see the next section.
The Content-Type
must not be set if the POST
request has an empty body. An empty body can be used to create a consumer with the default values.
7.1.4.2. Embedded data format
The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON and binary.
When creating a consumer using the /consumers/groupid
endpoint, the POST
request body must specify an embedded data format of either JSON or binary. This is specified in the format
field, for example:
{
"name": "my-consumer",
"format": "binary", 1
...
}
- 1
- A binary embedded data format.
The embedded data format specified when creating a consumer must match the data format of the Kafka messages it will consume.
If you choose to specify a binary embedded data format, subsequent producer requests must provide the binary data in the request body as Base64-encoded strings. For example, when sending messages using the /topics/topicname
endpoint, records.value
must be encoded in Base64:
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
Producer requests must also provide a Content-Type
header that corresponds to the embedded data format, for example, Content-Type: application/vnd.kafka.binary.v2+json
.
7.1.4.3. Accept headers
After creating a consumer, all subsequent GET requests must provide an Accept
header in the following format:
Accept: application/vnd.kafka.embedded-data-format.v2+json
The embedded-data-format
is either json
or binary
.
For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header:
Accept: application/vnd.kafka.json.v2+json
7.1.5. Configuring loggers for the Kafka Bridge
The AMQ Streams Kafka bridge allows you to set a different log level for each operation that is defined by the related OpenAPI specification.
Each operation has a corresponding API endpoint through which the bridge receives requests from HTTP clients. You can change the log level on each endpoint to produce more or less fine-grained logging information about the incoming and outgoing HTTP requests.
Loggers are defined in the log4j.properties
file, which has the following default configuration for healthy
and ready
endpoints:
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
The log level of all other operations is set to INFO
by default. Loggers are formatted as follows:
log4j.logger.http.openapi.operation.<operation-id>
Where <operation-id>
is the identifier of the specific operation. Following is the list of operations defined by the OpenAPI specification:
-
createConsumer
-
deleteConsumer
-
subscribe
-
unsubscribe
-
poll
-
assign
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
healthy
-
ready
-
openapi
7.1.6. Kafka Bridge API resources
For the full list of REST API endpoints and descriptions, including example requests and responses, see the Kafka Bridge API reference on the Strimzi website.
7.2. Kafka Bridge quickstart
Use this quickstart to try out the AMQ Streams Kafka Bridge in your local development environment. You will learn how to:
- Deploy the Kafka Bridge to your OpenShift cluster
- Expose the Kafka Bridge service to your local machine by using port-forwarding
- Produce messages to topics and partitions in your Kafka cluster
- Create a Kafka Bridge consumer
- Perform basic consumer operations, such as subscribing the consumer to topics and retrieving the messages that you produced
In this quickstart, HTTP requests are formatted as curl commands that you can copy and paste to your terminal. Access to an OpenShift cluster is required; to run and manage a local OpenShift cluster, use a tool such as Minikube, CodeReady Containers, or MiniShift.
Ensure you have the prerequisites and then follow the tasks in the order provided in this chapter.
About data formats
In this quickstart, you will produce and consume messages in JSON format, not binary. For more information on the data formats and HTTP headers used in the example requests, see Section 7.1.4, “Requests to the Kafka Bridge”.
Prerequisites for the quickstart
- Cluster administrator access to a local or remote OpenShift cluster.
- AMQ Streams is installed.
- A running Kafka cluster, deployed by the Cluster Operator, in an OpenShift namespace.
- The Entity Operator is deployed and running as part of the Kafka cluster.
7.2.1. Deploying the Kafka Bridge to your OpenShift cluster
AMQ Streams includes a YAML example that specifies the configuration of the AMQ Streams Kafka Bridge. Make some minimal changes to this file and then deploy an instance of the Kafka Bridge to your OpenShift cluster.
Procedure
Edit the
examples/kafka-bridge/kafka-bridge.yaml
file.apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaBridge metadata: name: quickstart 1 spec: replicas: 1 bootstrapServers: <cluster-name>-kafka-bootstrap:9092 2 http: port: 8080
- 1
- When the Kafka Bridge is deployed,
-bridge
is appended to the name of the deployment and other related resources. In this example, the Kafka Bridge deployment is namedquickstart-bridge
and the accompanying Kafka Bridge service is namedquickstart-bridge-service
. - 2
- In the
bootstrapServers
property, enter the name of the Kafka cluster as the<cluster-name>
.
Deploy the Kafka Bridge to your OpenShift cluster:
oc apply -f examples/kafka-bridge/kafka-bridge.yaml
A
quickstart-bridge
deployment, service, and other related resources are created in your OpenShift cluster.Verify that the Kafka Bridge was successfully deployed:
oc get deployments
NAME READY UP-TO-DATE AVAILABLE AGE quickstart-bridge 1/1 1 1 34m my-cluster-connect 1/1 1 1 24h my-cluster-entity-operator 1/1 1 1 24h #...
What to do next
After deploying the Kafka Bridge to your OpenShift cluster, expose the Kafka Bridge service to your local machine.
Additional resources
- For more detailed information about configuring the Kafka Bridge, see Section 3.5, “Kafka Bridge configuration”.
7.2.2. Exposing the Kafka Bridge service to your local machine
Next, use port forwarding to expose the AMQ Streams Kafka Bridge service to your local machine on http://localhost:8080.
Port forwarding is only suitable for development and testing purposes.
Procedure
List the names of the pods in your OpenShift cluster:
oc get pods -o name pod/kafka-consumer # ... pod/quickstart-bridge-589d78784d-9jcnr pod/strimzi-cluster-operator-76bcf9bc76-8dnfm
Connect to the
quickstart-bridge
pod on port8080
:oc port-forward pod/quickstart-bridge-589d78784d-9jcnr 8080:8080 &
NoteIf port 8080 on your local machine is already in use, use an alternative HTTP port, such as
8008
.
API requests are now forwarded from port 8080 on your local machine to port 8080 in the Kafka Bridge pod.
7.2.3. Producing messages to topics and partitions
Next, produce messages to topics in JSON format by using the topics endpoint. You can specify destination partitions for messages in the request body, as shown here. The partitions endpoint provides an alternative method for specifying a single destination partition for all messages as a path parameter.
Procedure
In a text editor, create a YAML definition for a Kafka topic with three partitions.
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: bridge-quickstart-topic labels: strimzi.io/cluster: <kafka-cluster-name> 1 spec: partitions: 3 2 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
Save the file to the
examples/topic
directory asbridge-quickstart-topic.yaml
. Create the topic in your OpenShift cluster:
oc apply -f examples/topic/bridge-quickstart-topic.yaml
Using the Kafka Bridge, produce three messages to the topic you created:
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
sales-lead-0001
is sent to a partition based on the hash of the key. -
sales-lead-0002
is sent directly to partition 2. -
sales-lead-0003
is sent to a partition in thebridge-quickstart-topic
topic using a round-robin method.
-
If the request is successful, the Kafka Bridge returns an
offsets
array, along with a200
code and acontent-type
header ofapplication/vnd.kafka.v2+json
. For each message, theoffsets
array describes:- The partition that the message was sent to
The current message offset of the partition
Example response
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
What to do next
After producing messages to topics and partitions, create a Kafka Bridge consumer.
Additional resources
- POST /topics/{topicname} in the API reference documentation.
- POST /topics/{topicname}/partitions/{partitionid} in the API reference documentation.
7.2.4. Creating a Kafka Bridge consumer
Before you can perform any consumer operations in the Kafka cluster, you must first create a consumer by using the consumers endpoint. The consumer is referred to as a Kafka Bridge consumer.
Procedure
Create a Kafka Bridge consumer in a new consumer group named
bridge-quickstart-consumer-group
:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
-
The consumer is named
bridge-quickstart-consumer
and the embedded data format is set asjson
. - Some basic configuration settings are defined.
The consumer will not commit offsets to the log automatically because the
enable.auto.commit
setting isfalse
. You will commit the offsets manually later in this quickstart.If the request is successful, the Kafka Bridge returns the consumer ID (
instance_id
) and base URL (base_uri
) in the response body, along with a200
code.Example response
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
The consumer is named
-
Copy the base URL (
base_uri
) to use in the other consumer operations in this quickstart.
What to do next
Now that you have created a Kafka Bridge consumer, you can subscribe it to topics.
Additional resources
- POST /consumers/{groupid} in the API reference documentation.
7.2.5. Subscribing a Kafka Bridge consumer to topics
After you have created a Kafka Bridge consumer, subscribe it to one or more topics by using the subscription endpoint. Once subscribed, the consumer starts receiving all messages that are produced to the topic.
Procedure
Subscribe the consumer to the
bridge-quickstart-topic
topic that you created earlier, in Producing messages to topics and partitions:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
The
topics
array can contain a single topic (as shown here) or multiple topics. If you want to subscribe the consumer to multiple topics that match a regular expression, you can use thetopic_pattern
string instead of thetopics
array.If the request is successful, the Kafka Bridge returns a
204
(No Content) code only.
What to do next
After subscribing a Kafka Bridge consumer to topics, you can retrieve messages from the consumer.
Additional resources
- POST /consumers/{groupid}/instances/{name}/subscription in the API reference documentation.
7.2.6. Retrieving the latest messages from a Kafka Bridge consumer
Next, retrieve the latest messages from the Kafka Bridge consumer by requesting data from the records endpoint. In production, HTTP clients can call this endpoint repeatedly (in a loop).
Procedure
- Produce additional messages to the Kafka Bridge consumer, as described in Producing messages to topics and partitions.
Submit a
GET
request to therecords
endpoint:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
After creating and subscribing to a Kafka Bridge consumer, a first GET request will return an empty response because the poll operation starts a rebalancing process to assign partitions.
Repeat step two to retrieve messages from the Kafka Bridge consumer.
The Kafka Bridge returns an array of messages — describing the topic name, key, value, partition, and offset — in the response body, along with a
200
code. Messages are retrieved from the latest offset by default.HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
NoteIf an empty response is returned, produce more records to the consumer as described in Producing messages to topics and partitions, and then try retrieving messages again.
What to do next
After retrieving messages from a Kafka Bridge consumer, try committing offsets to the log.
Additional resources
- GET /consumers/{groupid}/instances/{name}/records in the API reference documentation.
7.2.7. Commiting offsets to the log
Next, use the offsets endpoint to manually commit offsets to the log for all messages received by the Kafka Bridge consumer. This is required because the Kafka Bridge consumer that you created earlier, in Creating a Kafka Bridge consumer, was configured with the enable.auto.commit
setting as false
.
Procedure
Commit offsets to the log for the
bridge-quickstart-consumer
:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
Because no request body is submitted, offsets are committed for all the records that have been received by the consumer. Alternatively, the request body can contain an array (OffsetCommitSeekList) that specifies the topics and partitions that you want to commit offsets for.
If the request is successful, the Kafka Bridge returns a
204
code only.
What to do next
After committing offsets to the log, try out the endpoints for seeking to offsets.
Additional resources
- POST /consumers/{groupid}/instances/{name}/offsets in the API reference documentation.
7.2.8. Seeking to offsets for a partition
Next, use the positions endpoints to configure the Kafka Bridge consumer to retrieve messages for a partition from a specific offset, and then from the latest offset. This is referred to in Apache Kafka as a seek operation.
Procedure
Seek to a specific offset for partition 0 of the
quickstart-bridge-topic
topic:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
If the request is successful, the Kafka Bridge returns a
204
code only.Submit a
GET
request to therecords
endpoint:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
The Kafka Bridge returns messages from the offset that you seeked to.
Restore the default message retrieval behavior by seeking to the last offset for the same partition. This time, use the positions/end endpoint.
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
If the request is successful, the Kafka Bridge returns another
204
code.
You can also use the positions/beginning endpoint to seek to the first offset for one or more partitions.
What to do next
In this quickstart, you have used the AMQ Streams Kafka Bridge to perform several common operations on a Kafka cluster. You can now delete the Kafka Bridge consumer that you created earlier.
Additional resources
- POST /consumers/{groupid}/instances/{name}/positions in the API reference documentation.
- POST /consumers/{groupid}/instances/{name}/positions/beginning in the API reference documentation.
- POST /consumers/{groupid}/instances/{name}/positions/end in the API reference documentation.
7.2.9. Deleting a Kafka Bridge consumer
Finally, delete the Kafa Bridge consumer that you used throughout this quickstart.
Procedure
Delete the Kafka Bridge consumer by sending a
DELETE
request to the instances endpoint.curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
If the request is successful, the Kafka Bridge returns a
204
code only.
Additional resources
- DELETE /consumers/{groupid}/instances/{name} in the API reference documentation.