此内容没有您所选择的语言版本。
Using the AMQ Streams Kafka Bridge
Use the AMQ Streams Kafka Bridge to connect with a Kafka cluster
Abstract
Making open source more inclusive 复制链接链接已复制到粘贴板!
Red Hat is committed to replacing problematic language in our code, documentation, and web properties. We are beginning with these four terms: master, slave, blacklist, and whitelist. Because of the enormity of this endeavor, these changes will be implemented gradually over several upcoming releases. For more details, see our CTO Chris Wright’s message.
Chapter 1. Kafka Bridge overview 复制链接链接已复制到粘贴板!
Use the AMQ Streams Kafka Bridge to make HTTP requests to a Kafka cluster.
You can use the Kafka Bridge to integrate HTTP client applications with your Kafka cluster.
HTTP client integration
1.1. Running the Kafka Bridge 复制链接链接已复制到粘贴板!
Install the AMQ Streams Kafka Bridge to run in the same environment as your Kafka cluster.
You can download and add the Kafka Bridge installation artifacts to your host machine. To try out the Kafka Bridge in your local environment, see the Kafka Bridge quickstart.
If you deployed AMQ Streams on OpenShift, you can use the AMQ Streams Cluster Operator to deploy the Kafka Bridge to the OpenShift cluster. You’ll need a running Kafka cluster that was deployed by the Cluster Operator in an OpenShift namespace. You can configure your deployment to access the Kafka Bridge outside the OpenShift cluster.
1.2. Kafka Bridge interface 复制链接链接已复制到粘贴板!
The Kafka Bridge provides a RESTful interface that allows HTTP-based clients to interact with a Kafka cluster. It offers the advantages of a web API connection to AMQ Streams, without the need for client applications to interpret the Kafka protocol.
The API has two main resources — consumers and topics — that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.
1.2.1. HTTP requests 复制链接链接已复制到粘贴板!
The Kafka Bridge supports HTTP requests to a Kafka cluster, with methods to:
- Send messages to a topic.
- Retrieve messages from topics.
- Retrieve a list of partitions for a topic.
- Create and delete consumers.
- Subscribe consumers to topics, so that they start receiving messages from those topics.
- Retrieve a list of topics that a consumer is subscribed to.
- Unsubscribe consumers from topics.
- Assign partitions to consumers.
- Commit a list of consumer offsets.
- Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.
The methods provide JSON responses and HTTP response code error handling. Messages can be sent in JSON or binary formats.
Clients can produce and consume messages without the requirement to use the native Kafka protocol.
1.3. Securing connectivity to the Kafka cluster 复制链接链接已复制到粘贴板!
You can configure the following between the Kafka Bridge and your Kafka cluster:
- TLS or SASL-based authentication
- A TLS-encrypted connection
You configure the Kafka Bridge for authentication through its properties file.
You can also use ACLs in Kafka brokers to restrict the topics that can be consumed and produced using the Kafka Bridge.
1.4. Securing the Kafka Bridge HTTP interface 复制链接链接已复制到粘贴板!
Authentication and encryption between HTTP clients and the Kafka Bridge is not supported directly by the Kafka Bridge. Requests sent from clients to the Kafka Bridge are sent without authentication or encryption. Requests must use HTTP rather than HTTPS.
You can combine the Kafka Bridge with the following tools to secure it:
- Network policies and firewalls that define which pods can access the Kafka Bridge
- Reverse proxies (for example, OAuth 2.0)
- API gateways
1.5. Requests to the Kafka Bridge 复制链接链接已复制到粘贴板!
Specify data formats and HTTP headers to ensure valid requests are submitted to the Kafka Bridge.
1.5.1. Content Type headers 复制链接链接已复制到粘贴板!
API request and response bodies are always encoded as JSON.
When performing consumer operations,
POSTrequests must provide the followingContent-Typeheader if there is a non-empty body:Content-Type: application/vnd.kafka.v2+json
Content-Type: application/vnd.kafka.v2+jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow When performing producer operations,
POSTrequests must provideContent-Typeheaders specifying the embedded data format of the messages produced. This can be eitherjsonorbinary.Expand Embedded data format Content-Type header JSON
Content-Type: application/vnd.kafka.json.v2+jsonBinary
Content-Type: application/vnd.kafka.binary.v2+json
The embedded data format is set per consumer, as described in the next section.
The Content-Type must not be set if the POST request has an empty body. An empty body can be used to create a consumer with the default values.
1.5.2. Embedded data format 复制链接链接已复制到粘贴板!
The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON and binary.
When creating a consumer using the /consumers/groupid endpoint, the POST request body must specify an embedded data format of either JSON or binary. This is specified in the format field, for example:
{
"name": "my-consumer",
"format": "binary",
# ...
}
{
"name": "my-consumer",
"format": "binary",
# ...
}
- 1
- A binary embedded data format.
The embedded data format specified when creating a consumer must match the data format of the Kafka messages it will consume.
If you choose to specify a binary embedded data format, subsequent producer requests must provide the binary data in the request body as Base64-encoded strings. For example, when sending messages using the /topics/topicname endpoint, records.value must be encoded in Base64:
Producer requests must also provide a Content-Type header that corresponds to the embedded data format, for example, Content-Type: application/vnd.kafka.binary.v2+json.
1.5.3. Message format 复制链接链接已复制到粘贴板!
When sending messages using the /topics endpoint, you enter the message payload in the request body, in the records parameter.
The records parameter can contain any of these optional fields:
-
Message
headers -
Message
key -
Message
value -
Destination
partition
Example POST request to /topics
- 1
- The header value in binary format and encoded as Base64.
1.5.4. Accept headers 复制链接链接已复制到粘贴板!
After creating a consumer, all subsequent GET requests must provide an Accept header in the following format:
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
The EMBEDDED-DATA-FORMAT is either json or binary.
For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header:
Accept: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.json.v2+json
1.6. CORS 复制链接链接已复制到粘贴板!
Cross-Origin Resource Sharing (CORS) allows you to specify allowed methods and originating URLs for accessing the Kafka cluster in your Kafka Bridge HTTP configuration.
Example CORS configuration for Kafka Bridge
# ... http.cors.enabled=true http.cors.allowedOrigins=https://strimzi.io http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
# ...
http.cors.enabled=true
http.cors.allowedOrigins=https://strimzi.io
http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
CORS allows for simple and preflighted requests between origin sources on different domains.
Simple requests are suitable for standard requests using GET, HEAD, POST methods.
A preflighted request sends a HTTP OPTIONS request as an initial check that the actual request is safe to send. On confirmation, the actual request is sent. Preflight requests are suitable for methods that require greater safeguards, such as PUT and DELETE, and use non-standard headers.
All requests require an origins value in their header, which is the source of the HTTP request.
1.6.1. Simple request 复制链接链接已复制到粘贴板!
For example, this simple request header specifies the origin as https://strimzi.io.
Origin: https://strimzi.io
Origin: https://strimzi.io
The header information is added to the request.
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \ -H 'Origin: https://strimzi.io'\ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'
In the response from the Kafka Bridge, an Access-Control-Allow-Origin header is returned.
HTTP/1.1 200 OK Access-Control-Allow-Origin: *
HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
- 1
- Returning an asterisk (
*) shows the resource can be accessed by any domain.
1.6.2. Preflighted request 复制链接链接已复制到粘贴板!
An initial preflight request is sent to Kafka Bridge using an OPTIONS method. The HTTP OPTIONS request sends header information to check that Kafka Bridge will allow the actual request.
Here the preflight request checks that a POST request is valid from https://strimzi.io.
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1 Origin: https://strimzi.io Access-Control-Request-Method: POST Access-Control-Request-Headers: Content-Type
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1
Origin: https://strimzi.io
Access-Control-Request-Method: POST
Access-Control-Request-Headers: Content-Type
OPTIONS is added to the header information of the preflight request.
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \
-H 'Access-Control-Request-Method: POST' \
-H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge responds to the initial request to confirm that the request will be accepted. The response header returns allowed origins, methods and headers.
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io
Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH
Access-Control-Allow-Headers: content-type
If the origin or method is rejected, an error message is returned.
The actual request does not require Access-Control-Request-Method header, as it was confirmed in the preflight request, but it does require the origin header.
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \ -H 'Origin: https://strimzi.io' \ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'
The response shows the originating URL is allowed.
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io
HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io
1.7. Configuring loggers for the Kafka Bridge 复制链接链接已复制到粘贴板!
The AMQ Streams Kafka bridge allows you to set a different log level for each operation that is defined by the related OpenAPI specification.
Each operation has a corresponding API endpoint through which the bridge receives requests from HTTP clients. You can change the log level on each endpoint to produce more or less fine-grained logging information about the incoming and outgoing HTTP requests.
Loggers are defined in the log4j.properties file, which has the following default configuration for healthy and ready endpoints:
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false
The log level of all other operations is set to INFO by default. Loggers are formatted as follows:
log4j.logger.http.openapi.operation.<operation_id>
log4j.logger.http.openapi.operation.<operation_id>
Where <operation_id> is the identifier of the specific operation.
List of operations defined by the OpenAPI specification
-
createConsumer -
deleteConsumer -
subscribe -
unsubscribe -
poll -
assign -
commit -
send -
sendToPartition -
seekToBeginning -
seekToEnd -
seek -
healthy -
ready -
openapi
Chapter 2. Kafka Bridge quickstart 复制链接链接已复制到粘贴板!
Use this quickstart to try out the AMQ Streams Kafka Bridge in your local development environment.
You will learn how to do the following:
- Produce messages to topics and partitions in your Kafka cluster
- Create a Kafka Bridge consumer
- Perform basic consumer operations, such as subscribing the consumer to topics and retrieving the messages that you produced
In this quickstart, HTTP requests are formatted as curl commands that you can copy and paste to your terminal.
Ensure you have the prerequisites and then follow the tasks in the order provided in this chapter.
About data formats
In this quickstart, you will produce and consume messages in JSON format, not binary.
Prerequisites for the quickstart
- A Kafka cluster is running on the host machine.
2.1. Downloading a Kafka Bridge archive 复制链接链接已复制到粘贴板!
A zipped distribution of the AMQ Streams Kafka Bridge is available for download.
Procedure
- Download the latest version of the AMQ Streams Kafka Bridge archive from the Customer Portal.
2.2. Configuring Kafka Bridge properties 复制链接链接已复制到粘贴板!
This procedure describes how to configure the Kafka and HTTP connection properties used by the AMQ Streams Kafka Bridge.
You configure the Kafka Bridge, as any other Kafka client, using appropriate prefixes for Kafka-related properties.
-
kafka.for general configuration that applies to producers and consumers, such as server connection and security. -
kafka.consumer.for consumer-specific configuration passed only to the consumer. -
kafka.producer.for producer-specific configuration passed only to the producer.
As well as enabling HTTP access to a Kafka cluster, HTTP properties provide the capability to enable and define access control for the Kafka Bridge through Cross-Origin Resource Sharing (CORS). CORS is a HTTP mechanism that allows browser access to selected resources from more than one origin. To configure CORS, you define a list of allowed resource origins and HTTP methods to access them. Additional HTTP headers in requests describe the CORS origins that are permitted access to the Kafka cluster.
Prerequisites
Procedure
Edit the
application.propertiesfile provided with the AMQ Streams Kafka Bridge installation archive.Use the properties file to specify Kafka and HTTP-related properties, and to enable distributed tracing.
Configure standard Kafka-related properties, including properties specific to the Kafka consumers and producers.
Use:
-
kafka.bootstrap.serversto define the host/port connections to the Kafka cluster -
kafka.producer.acksto provide acknowledgments to the HTTP client kafka.consumer.auto.offset.resetto determine how to manage reset of the offset in KafkaFor more information on configuration of Kafka properties, see the Apache Kafka website
-
Configure HTTP-related properties to enable HTTP access to the Kafka cluster.
For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.3. Installing the Kafka Bridge 复制链接链接已复制到粘贴板!
Follow this procedure to install the AMQ Streams Kafka Bridge.
Prerequisites
Procedure
- If you have not already done so, unzip the Kafka Bridge installation archive to any directory.
Run the Kafka Bridge script using the configuration properties as a parameter:
For example:
./bin/kafka_bridge_run.sh --config-file=<path>/configfile.properties
./bin/kafka_bridge_run.sh --config-file=<path>/configfile.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check to see that the installation was successful in the log.
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.4. Producing messages to topics and partitions 复制链接链接已复制到粘贴板!
Use the Kafka Bridge to produce messages to a Kafka topic in JSON format by using the topics endpoint.
You can produce messages to topics in JSON format by using the topics endpoint. You can specify destination partitions for messages in the request body. The partitions endpoint provides an alternative method for specifying a single destination partition for all messages as a path parameter.
In this procedure, messages are produced to a topic called bridge-quickstart-topic.
Prerequisites
The Kafka cluster has a topic with three partitions.
You can use the
kafka-topics.shutility to create topics.Example topic creation with three partitions
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verifying the topic was created
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow
If you deployed AMQ Streams on OpenShift, you can create a topic using the KafkaTopic custom resource.
Procedure
Using the Kafka Bridge, produce three messages to the topic you created:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
sales-lead-0001is sent to a partition based on the hash of the key. -
sales-lead-0002is sent directly to partition 2. -
sales-lead-0003is sent to a partition in thebridge-quickstart-topictopic using a round-robin method.
-
If the request is successful, the Kafka Bridge returns an
offsetsarray, along with a200code and acontent-typeheader ofapplication/vnd.kafka.v2+json. For each message, theoffsetsarray describes:- The partition that the message was sent to
The current message offset of the partition
Example response
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Additional topic requests
Make other curl requests to find information on topics and partitions.
- List topics
curl -X GET \ http://localhost:8080/topics
curl -X GET \ http://localhost:8080/topicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Example response
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Get topic configuration and partition details
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow Example response
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - List the partitions of a specific topic
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Example response
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - List the details of a specific topic partition
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0Copy to Clipboard Copied! Toggle word wrap Toggle overflow Example response
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - List the offsets of a specific topic partition
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsetsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Example response
{ "beginning_offset": 0, "end_offset": 1 }{ "beginning_offset": 0, "end_offset": 1 }Copy to Clipboard Copied! Toggle word wrap Toggle overflow
What to do next
After producing messages to topics and partitions, create a Kafka Bridge consumer.
2.5. Creating a Kafka Bridge consumer 复制链接链接已复制到粘贴板!
Before you can perform any consumer operations in the Kafka cluster, you must first create a consumer by using the consumers endpoint. The consumer is referred to as a Kafka Bridge consumer.
Procedure
Create a Kafka Bridge consumer in a new consumer group named
bridge-quickstart-consumer-group:Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
The consumer is named
bridge-quickstart-consumerand the embedded data format is set asjson. - Some basic configuration settings are defined.
The consumer will not commit offsets to the log automatically because the
enable.auto.commitsetting isfalse. You will commit the offsets manually later in this quickstart.If the request is successful, the Kafka Bridge returns the consumer ID (
instance_id) and base URL (base_uri) in the response body, along with a200code.Example response
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
The consumer is named
-
Copy the base URL (
base_uri) to use in the other consumer operations in this quickstart.
What to do next
Now that you have created a Kafka Bridge consumer, you can subscribe it to topics.
2.6. Subscribing a Kafka Bridge consumer to topics 复制链接链接已复制到粘贴板!
After you have created a Kafka Bridge consumer, subscribe it to one or more topics by using the subscription endpoint. When subscribed, the consumer starts receiving all messages that are produced to the topic.
Procedure
Subscribe the consumer to the
bridge-quickstart-topictopic that you created earlier, in Producing messages to topics and partitions:Copy to Clipboard Copied! Toggle word wrap Toggle overflow The
topicsarray can contain a single topic (as shown here) or multiple topics. If you want to subscribe the consumer to multiple topics that match a regular expression, you can use thetopic_patternstring instead of thetopicsarray.If the request is successful, the Kafka Bridge returns a
204(No Content) code only.
What to do next
After subscribing a Kafka Bridge consumer to topics, you can retrieve messages from the consumer.
Retrieve the latest messages from the Kafka Bridge consumer by requesting data from the records endpoint. In production, HTTP clients can call this endpoint repeatedly (in a loop).
Procedure
- Produce additional messages to the Kafka Bridge consumer, as described in Producing messages to topics and partitions.
Submit a
GETrequest to therecordsendpoint:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'Copy to Clipboard Copied! Toggle word wrap Toggle overflow After creating and subscribing to a Kafka Bridge consumer, a first GET request will return an empty response because the poll operation starts a rebalancing process to assign partitions.
Repeat step two to retrieve messages from the Kafka Bridge consumer.
The Kafka Bridge returns an array of messages — describing the topic name, key, value, partition, and offset — in the response body, along with a
200code. Messages are retrieved from the latest offset by default.Copy to Clipboard Copied! Toggle word wrap Toggle overflow NoteIf an empty response is returned, produce more records to the consumer as described in Producing messages to topics and partitions, and then try retrieving messages again.
What to do next
After retrieving messages from a Kafka Bridge consumer, try committing offsets to the log.
2.8. Commiting offsets to the log 复制链接链接已复制到粘贴板!
Use the offsets endpoint to manually commit offsets to the log for all messages received by the Kafka Bridge consumer. This is required because the Kafka Bridge consumer that you created earlier, in Creating a Kafka Bridge consumer, was configured with the enable.auto.commit setting as false.
Procedure
Commit offsets to the log for the
bridge-quickstart-consumer:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsetsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Because no request body is submitted, offsets are committed for all the records that have been received by the consumer. Alternatively, the request body can contain an array (OffsetCommitSeekList) that specifies the topics and partitions that you want to commit offsets for.
If the request is successful, the Kafka Bridge returns a
204code only.
What to do next
After committing offsets to the log, try out the endpoints for seeking to offsets.
2.9. Seeking to offsets for a partition 复制链接链接已复制到粘贴板!
Use the positions endpoints to configure the Kafka Bridge consumer to retrieve messages for a partition from a specific offset, and then from the latest offset. This is referred to in Apache Kafka as a seek operation.
Procedure
Seek to a specific offset for partition 0 of the
quickstart-bridge-topictopic:Copy to Clipboard Copied! Toggle word wrap Toggle overflow If the request is successful, the Kafka Bridge returns a
204code only.Submit a
GETrequest to therecordsendpoint:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'Copy to Clipboard Copied! Toggle word wrap Toggle overflow The Kafka Bridge returns messages from the offset that you seeked to.
Restore the default message retrieval behavior by seeking to the last offset for the same partition. This time, use the positions/end endpoint.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow If the request is successful, the Kafka Bridge returns another
204code.
You can also use the positions/beginning endpoint to seek to the first offset for one or more partitions.
What to do next
In this quickstart, you have used the AMQ Streams Kafka Bridge to perform several common operations on a Kafka cluster. You can now delete the Kafka Bridge consumer that you created earlier.
2.10. Deleting a Kafka Bridge consumer 复制链接链接已复制到粘贴板!
Delete the Kafka Bridge consumer that you used throughout this quickstart.
Procedure
Delete the Kafka Bridge consumer by sending a
DELETErequest to the instances endpoint.curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumerCopy to Clipboard Copied! Toggle word wrap Toggle overflow If the request is successful, the Kafka Bridge returns a
204code.
Chapter 3. AMQ Streams Kafka Bridge API Reference 复制链接链接已复制到粘贴板!
3.1. Overview 复制链接链接已复制到粘贴板!
The AMQ Streams Kafka Bridge provides a REST API for integrating HTTP based client applications with a Kafka cluster. You can use the API to create and manage consumers and send and receive records over HTTP rather than the native Kafka protocol.
3.1.1. Version information 复制链接链接已复制到粘贴板!
Version : 0.1.0
3.1.2. Tags 复制链接链接已复制到粘贴板!
- Consumers : Consumer operations to create consumers in your Kafka cluster and perform common actions, such as subscribing to topics, retrieving processed records, and committing offsets.
- Producer : Producer operations to send records to a specified topic or topic partition.
- Seek : Seek operations that enable a consumer to begin receiving messages from a given offset position.
- Topics : Topic operations to send messages to a specified topic or topic partition, optionally including message keys in requests. You can also retrieve topics and topic metadata.
3.1.3. Consumes 复制链接链接已复制到粘贴板!
-
application/json
3.1.4. Produces 复制链接链接已复制到粘贴板!
-
application/json
3.2. Definitions 复制链接链接已复制到粘贴板!
3.2.1. AssignedTopicPartitions 复制链接链接已复制到粘贴板!
Type : < string, < integer (int32) > array > map
3.2.2. BridgeInfo 复制链接链接已复制到粘贴板!
Information about Kafka Bridge instance.
| Name | Schema |
|---|---|
|
bridge_version | string |
3.2.3. Consumer 复制链接链接已复制到粘贴板!
| Name | Description | Schema |
|---|---|---|
|
auto.offset.reset |
Resets the offset position for the consumer. If set to | string |
|
consumer.request.timeout.ms | Sets the maximum amount of time, in milliseconds, for the consumer to wait for messages for a request. If the timeout period is reached without a response, an error is returned. | integer |
|
enable.auto.commit |
If set to | boolean |
|
fetch.min.bytes | Sets the minimum ammount of data, in bytes, for the consumer to receive. The broker waits until the data to send exceeds this amount. | integer |
|
format |
The allowable message format for the consumer, which can be | string |
|
isolation.level |
If set to | string |
|
name | The unique name for the consumer instance. The name is unique within the scope of the consumer group. The name is used in URLs. | string |
3.2.4. ConsumerRecord 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
headers | |
|
key | string |
|
offset | integer (int64) |
|
partition | integer (int32) |
|
topic | string |
|
value | string |
3.2.5. ConsumerRecordList 复制链接链接已复制到粘贴板!
Type : < ConsumerRecord > array
3.2.6. CreatedConsumer 复制链接链接已复制到粘贴板!
| Name | Description | Schema |
|---|---|---|
|
base_uri | Base URI used to construct URIs for subsequent requests against this consumer instance. | string |
|
instance_id | Unique ID for the consumer instance in the group. | string |
3.2.7. Error 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
error_code | integer (int32) |
|
message | string |
3.2.8. KafkaHeader 复制链接链接已复制到粘贴板!
| Name | Description | Schema |
|---|---|---|
|
key | string | |
|
value |
The header value in binary format, base64-encoded | string (byte) |
3.2.9. KafkaHeaderList 复制链接链接已复制到粘贴板!
Type : < KafkaHeader > array
3.2.10. OffsetCommitSeek 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
offset | integer (int64) |
|
partition | integer (int32) |
|
topic | string |
3.2.11. OffsetCommitSeekList 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
offsets | < OffsetCommitSeek > array |
3.2.12. OffsetRecordSent 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
offset | integer (int64) |
|
partition | integer (int32) |
3.2.13. OffsetRecordSentList 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
offsets | < OffsetRecordSent > array |
3.2.14. OffsetsSummary 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
beginning_offset | integer (int64) |
|
end_offset | integer (int64) |
3.2.15. Partition 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
partition | integer (int32) |
|
topic | string |
3.2.16. PartitionMetadata 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
leader | integer (int32) |
|
partition | integer (int32) |
|
replicas | < Replica > array |
3.2.17. Partitions 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
partitions | < Partition > array |
3.2.18. ProducerRecord 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
headers | |
|
partition | integer (int32) |
3.2.19. ProducerRecordList 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
records | < ProducerRecord > array |
3.2.20. ProducerRecordToPartition 复制链接链接已复制到粘贴板!
Type : object
3.2.21. ProducerRecordToPartitionList 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
records | < ProducerRecordToPartition > array |
3.2.22. Replica 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
broker | integer (int32) |
|
in_sync | boolean |
|
leader | boolean |
3.2.23. SubscribedTopicList 复制链接链接已复制到粘贴板!
| Name | Schema |
|---|---|
|
partitions | < AssignedTopicPartitions > array |
|
topics |
3.2.24. TopicMetadata 复制链接链接已复制到粘贴板!
| Name | Description | Schema |
|---|---|---|
|
configs | Per-topic configuration overrides | < string, string > map |
|
name | Name of the topic | string |
|
partitions | < PartitionMetadata > array |
3.2.25. Topics 复制链接链接已复制到粘贴板!
| Name | Description | Schema |
|---|---|---|
|
topic_pattern | A regex topic pattern for matching multiple topics | string |
|
topics | < string > array |
3.3. Paths 复制链接链接已复制到粘贴板!
3.3.1. GET / 复制链接链接已复制到粘贴板!
3.3.1.1. Description 复制链接链接已复制到粘贴板!
Retrieves information about the Kafka Bridge instance, in JSON format.
3.3.1.2. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Information about Kafka Bridge instance. |
3.3.1.3. Produces 复制链接链接已复制到粘贴板!
-
application/json
3.3.1.4. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.1.4.1. Response 200 复制链接链接已复制到粘贴板!
{
"bridge_version" : "0.16.0"
}
{
"bridge_version" : "0.16.0"
}
3.3.2. POST /consumers/{groupid} 复制链接链接已复制到粘贴板!
3.3.2.1. Description 复制链接链接已复制到粘贴板!
Creates a consumer instance in the given consumer group. You can optionally specify a consumer name and supported configuration options. It returns a base URI which must be used to construct URLs for subsequent requests against this consumer instance.
3.3.2.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group in which to create the consumer. | string |
| Body |
body | Name and configuration of the consumer. The name is unique within the scope of the consumer group. If a name is not specified, a randomly generated name is assigned. All parameters are optional. The supported configuration options are shown in the following example. |
3.3.2.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Consumer created successfully. | |
| 409 | A consumer instance with the specified name already exists in the Kafka Bridge. | |
| 422 | One or more consumer configuration options have invalid values. |
3.3.2.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.2.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.2.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.2.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.2.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.2.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.2.8.1. Response 200 复制链接链接已复制到粘贴板!
{
"instance_id" : "consumer1",
"base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1"
}
{
"instance_id" : "consumer1",
"base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1"
}
3.3.2.8.2. Response 409 复制链接链接已复制到粘贴板!
{
"error_code" : 409,
"message" : "A consumer instance with the specified name already exists in the Kafka Bridge."
}
{
"error_code" : 409,
"message" : "A consumer instance with the specified name already exists in the Kafka Bridge."
}
3.3.2.8.3. Response 422 复制链接链接已复制到粘贴板!
{
"error_code" : 422,
"message" : "One or more consumer configuration options have invalid values."
}
{
"error_code" : 422,
"message" : "One or more consumer configuration options have invalid values."
}
3.3.3. DELETE /consumers/{groupid}/instances/{name} 复制链接链接已复制到粘贴板!
3.3.3.1. Description 复制链接链接已复制到粘贴板!
Deletes a specified consumer instance. The request for this operation MUST use the base URL (including the host and port) returned in the response from the POST request to /consumers/{groupid} that was used to create this consumer.
3.3.3.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the consumer belongs. | string |
| Path |
name | Name of the consumer to delete. | string |
3.3.3.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Consumer removed successfully. | No Content |
| 404 | The specified consumer instance was not found. |
3.3.3.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.3.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.3.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.3.7. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.3.7.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.4.1. Description 复制链接链接已复制到粘贴板!
Assigns one or more topic partitions to a consumer.
3.3.4.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the consumer belongs. | string |
| Path |
name | Name of the consumer to assign topic partitions to. | string |
| Body |
body | List of topic partitions to assign to the consumer. |
3.3.4.3. Responses 复制链接链接已复制到粘贴板!
3.3.4.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.4.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.4.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.4.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.4.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.4.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.4.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.4.8.2. Response 409 复制链接链接已复制到粘贴板!
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
3.3.5.1. Description 复制链接链接已复制到粘贴板!
Commits a list of consumer offsets. To commit offsets for all records fetched by the consumer, leave the request body empty.
3.3.5.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the consumer belongs. | string |
| Path |
name | Name of the consumer. | string |
| Body |
body | List of consumer offsets to commit to the consumer offsets commit log. You can specify one or more topic partitions to commit offsets for. |
3.3.5.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Commit made successfully. | No Content |
| 404 | The specified consumer instance was not found. |
3.3.5.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.5.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.5.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.5.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.5.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.5.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.5.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.6.1. Description 复制链接链接已复制到粘贴板!
Configures a subscribed consumer to fetch offsets from a particular offset the next time it fetches a set of records from a given topic partition. This overrides the default fetch behavior for consumers. You can specify one or more topic partitions.
3.3.6.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the consumer belongs. | string |
| Path |
name | Name of the subscribed consumer. | string |
| Body |
body | List of partition offsets from which the subscribed consumer will next fetch records. |
3.3.6.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Seek performed successfully. | No Content |
| 404 | The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned. |
3.3.6.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.6.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.6.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
- Seek
3.3.6.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.6.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.6.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.6.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.7.1. Description 复制链接链接已复制到粘贴板!
Configures a subscribed consumer to seek (and subsequently read from) the first offset in one or more given topic partitions.
3.3.7.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the subscribed consumer. | string |
| Body |
body | List of topic partitions to which the consumer is subscribed. The consumer will seek the first offset in the specified partitions. |
3.3.7.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Seek to the beginning performed successfully. | No Content |
| 404 | The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned. |
3.3.7.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.7.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.7.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
- Seek
3.3.7.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.7.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.7.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.7.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.8.1. Description 复制链接链接已复制到粘贴板!
Configures a subscribed consumer to seek (and subsequently read from) the offset at the end of one or more of the given topic partitions.
3.3.8.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the subscribed consumer. | string |
| Body |
body | List of topic partitions to which the consumer is subscribed. The consumer will seek the last offset in the specified partitions. |
3.3.8.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Seek to the end performed successfully. | No Content |
| 404 | The specified consumer instance was not found, or the specified consumer instance did not have one of the specified partitions assigned. |
3.3.8.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.8.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.8.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
- Seek
3.3.8.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.8.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.8.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.8.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.9. GET /consumers/{groupid}/instances/{name}/records 复制链接链接已复制到粘贴板!
3.3.9.1. Description 复制链接链接已复制到粘贴板!
Retrieves records for a subscribed consumer, including message values, topics, and partitions. The request for this operation MUST use the base URL (including the host and port) returned in the response from the POST request to /consumers/{groupid} that was used to create this consumer.
3.3.9.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the subscribed consumer to retrieve records from. | string |
| Query |
max_bytes | The maximum size, in bytes, of unencoded keys and values that can be included in the response. Otherwise, an error response with code 422 is returned. | integer |
| Query |
timeout | The maximum amount of time, in milliseconds, that the HTTP Bridge spends retrieving records before timing out the request. | integer |
3.3.9.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Poll request executed successfully. | |
| 404 | The specified consumer instance was not found. | |
| 406 |
The | |
| 422 | Response exceeds the maximum number of bytes the consumer can receive |
3.3.9.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.json.v2+json -
application/vnd.kafka.binary.v2+json -
application/vnd.kafka.v2+json
3.3.9.5. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.9.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.9.6.1. Response 200 复制链接链接已复制到粘贴板!
3.3.9.6.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.9.6.3. Response 406 复制链接链接已复制到粘贴板!
{
"error_code" : 406,
"message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request."
}
{
"error_code" : 406,
"message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request."
}
3.3.9.6.4. Response 422 复制链接链接已复制到粘贴板!
{
"error_code" : 422,
"message" : "Response exceeds the maximum number of bytes the consumer can receive"
}
{
"error_code" : 422,
"message" : "Response exceeds the maximum number of bytes the consumer can receive"
}
3.3.10.1. Description 复制链接链接已复制到粘贴板!
Subscribes a consumer to one or more topics. You can describe the topics to which the consumer will subscribe in a list (of Topics type) or as a topic_pattern field. Each call replaces the subscriptions for the subscriber.
3.3.10.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the consumer to subscribe to topics. | string |
| Body |
body | List of topics to which the consumer will subscribe. |
3.3.10.3. Responses 复制链接链接已复制到粘贴板!
3.3.10.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.10.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.10.6. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.10.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.10.7.1. Request body 复制链接链接已复制到粘贴板!
{
"topics" : [ "topic1", "topic2" ]
}
{
"topics" : [ "topic1", "topic2" ]
}
3.3.10.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.10.8.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.10.8.2. Response 409 复制链接链接已复制到粘贴板!
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
3.3.10.8.3. Response 422 复制链接链接已复制到粘贴板!
{
"error_code" : 422,
"message" : "A list (of Topics type) or a topic_pattern must be specified."
}
{
"error_code" : 422,
"message" : "A list (of Topics type) or a topic_pattern must be specified."
}
3.3.11.1. Description 复制链接链接已复制到粘贴板!
Retrieves a list of the topics to which the consumer is subscribed.
3.3.11.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the subscribed consumer. | string |
3.3.11.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | List of subscribed topics and partitions. | |
| 404 | The specified consumer instance was not found. |
3.3.11.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.11.5. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.11.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.11.6.1. Response 200 复制链接链接已复制到粘贴板!
3.3.11.6.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.12.1. Description 复制链接链接已复制到粘贴板!
Unsubscribes a consumer from all topics.
3.3.12.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
groupid | ID of the consumer group to which the subscribed consumer belongs. | string |
| Path |
name | Name of the consumer to unsubscribe from topics. | string |
3.3.12.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 204 | Consumer unsubscribed successfully. | No Content |
| 404 | The specified consumer instance was not found. |
3.3.12.4. Tags 复制链接链接已复制到粘贴板!
- Consumers
3.3.12.5. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.12.5.1. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.13. GET /healthy 复制链接链接已复制到粘贴板!
3.3.13.1. Description 复制链接链接已复制到粘贴板!
Check if the bridge is running. This does not necessarily imply that it is ready to accept requests.
3.3.13.2. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | The bridge is healthy | No Content |
3.3.14. GET /openapi 复制链接链接已复制到粘贴板!
3.3.14.1. Description 复制链接链接已复制到粘贴板!
Retrieves the OpenAPI v2 specification in JSON format.
3.3.14.2. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | OpenAPI v2 specification in JSON format retrieved successfully. | string |
3.3.14.3. Produces 复制链接链接已复制到粘贴板!
-
application/json
3.3.15. GET /ready 复制链接链接已复制到粘贴板!
3.3.15.1. Description 复制链接链接已复制到粘贴板!
Check if the bridge is ready and can accept requests.
3.3.15.2. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | The bridge is ready | No Content |
3.3.16. GET /topics 复制链接链接已复制到粘贴板!
3.3.16.1. Description 复制链接链接已复制到粘贴板!
Retrieves a list of all topics.
3.3.16.2. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | List of topics. | < string > array |
3.3.16.3. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.16.4. Tags 复制链接链接已复制到粘贴板!
- Topics
3.3.16.5. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.16.5.1. Response 200 复制链接链接已复制到粘贴板!
[ "topic1", "topic2" ]
[ "topic1", "topic2" ]
3.3.17. POST /topics/{topicname} 复制链接链接已复制到粘贴板!
3.3.17.1. Description 复制链接链接已复制到粘贴板!
Sends one or more records to a given topic, optionally specifying a partition, key, or both.
3.3.17.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
topicname | Name of the topic to send records to or retrieve metadata from. | string |
| Body |
body |
3.3.17.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Records sent successfully. | |
| 404 | The specified topic was not found. | |
| 422 | The record list is not valid. |
3.3.17.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.json.v2+json -
application/vnd.kafka.binary.v2+json
3.3.17.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.17.6. Tags 复制链接链接已复制到粘贴板!
- Producer
- Topics
3.3.17.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.17.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.17.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.17.8.1. Response 200 复制链接链接已复制到粘贴板!
3.3.17.8.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
3.3.17.8.3. Response 422 复制链接链接已复制到粘贴板!
{
"error_code" : 422,
"message" : "The record list contains invalid records."
}
{
"error_code" : 422,
"message" : "The record list contains invalid records."
}
3.3.18. GET /topics/{topicname} 复制链接链接已复制到粘贴板!
3.3.18.1. Description 复制链接链接已复制到粘贴板!
Retrieves the metadata about a given topic.
3.3.18.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
topicname | Name of the topic to send records to or retrieve metadata from. | string |
3.3.18.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Topic metadata |
3.3.18.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.18.5. Tags 复制链接链接已复制到粘贴板!
- Topics
3.3.18.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.18.6.1. Response 200 复制链接链接已复制到粘贴板!
3.3.19. GET /topics/{topicname}/partitions 复制链接链接已复制到粘贴板!
3.3.19.1. Description 复制链接链接已复制到粘贴板!
Retrieves a list of partitions for the topic.
3.3.19.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
topicname | Name of the topic to send records to or retrieve metadata from. | string |
3.3.19.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | List of partitions | < PartitionMetadata > array |
| 404 | The specified topic was not found. |
3.3.19.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.19.5. Tags 复制链接链接已复制到粘贴板!
- Topics
3.3.19.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.19.6.1. Response 200 复制链接链接已复制到粘贴板!
3.3.19.6.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
3.3.20.1. Description 复制链接链接已复制到粘贴板!
Sends one or more records to a given topic partition, optionally specifying a key.
3.3.20.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
partitionid | ID of the partition to send records to or retrieve metadata from. | integer |
| Path |
topicname | Name of the topic to send records to or retrieve metadata from. | string |
| Body |
body | List of records to send to a given topic partition, including a value (required) and a key (optional). |
3.3.20.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Records sent successfully. | |
| 404 | The specified topic partition was not found. | |
| 422 | The record is not valid. |
3.3.20.4. Consumes 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.json.v2+json -
application/vnd.kafka.binary.v2+json
3.3.20.5. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.20.6. Tags 复制链接链接已复制到粘贴板!
- Producer
- Topics
3.3.20.7. Example HTTP request 复制链接链接已复制到粘贴板!
3.3.20.7.1. Request body 复制链接链接已复制到粘贴板!
3.3.20.8. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.20.8.1. Response 200 复制链接链接已复制到粘贴板!
3.3.20.8.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
3.3.20.8.3. Response 422 复制链接链接已复制到粘贴板!
{
"error_code" : 422,
"message" : "The record is not valid."
}
{
"error_code" : 422,
"message" : "The record is not valid."
}
3.3.21. GET /topics/{topicname}/partitions/{partitionid} 复制链接链接已复制到粘贴板!
3.3.21.1. Description 复制链接链接已复制到粘贴板!
Retrieves partition metadata for the topic partition.
3.3.21.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
partitionid | ID of the partition to send records to or retrieve metadata from. | integer |
| Path |
topicname | Name of the topic to send records to or retrieve metadata from. | string |
3.3.21.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | Partition metadata | |
| 404 | The specified topic partition was not found. |
3.3.21.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.21.5. Tags 复制链接链接已复制到粘贴板!
- Topics
3.3.21.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.21.6.1. Response 200 复制链接链接已复制到粘贴板!
3.3.21.6.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
3.3.22.1. Description 复制链接链接已复制到粘贴板!
Retrieves a summary of the offsets for the topic partition.
3.3.22.2. Parameters 复制链接链接已复制到粘贴板!
| Type | Name | Description | Schema |
|---|---|---|---|
| Path |
partitionid | ID of the partition. | integer |
| Path |
topicname | Name of the topic containing the partition. | string |
3.3.22.3. Responses 复制链接链接已复制到粘贴板!
| HTTP Code | Description | Schema |
|---|---|---|
| 200 | A summary of the offsets for the topic partition. | |
| 404 | The specified topic partition was not found. |
3.3.22.4. Produces 复制链接链接已复制到粘贴板!
-
application/vnd.kafka.v2+json
3.3.22.5. Tags 复制链接链接已复制到粘贴板!
- Topics
3.3.22.6. Example HTTP response 复制链接链接已复制到粘贴板!
3.3.22.6.1. Response 200 复制链接链接已复制到粘贴板!
{
"beginning_offset" : 10,
"end_offset" : 50
}
{
"beginning_offset" : 10,
"end_offset" : 50
}
3.3.22.6.2. Response 404 复制链接链接已复制到粘贴板!
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
Appendix A. Using your subscription 复制链接链接已复制到粘贴板!
AMQ Streams is provided through a software subscription. To manage your subscriptions, access your account at the Red Hat Customer Portal.
Accessing Your Account
- Go to access.redhat.com.
- If you do not already have an account, create one.
- Log in to your account.
Activating a Subscription
- Go to access.redhat.com.
- Navigate to My Subscriptions.
- Navigate to Activate a subscription and enter your 16-digit activation number.
Downloading Zip and Tar Files
To access zip or tar files, use the customer portal to find the relevant files for download. If you are using RPM packages, this step is not required.
- Open a browser and log in to the Red Hat Customer Portal Product Downloads page at access.redhat.com/downloads.
- Locate the AMQ Streams for Apache Kafka entries in the INTEGRATION AND AUTOMATION category.
- Select the desired AMQ Streams product. The Software Downloads page opens.
- Click the Download link for your component.
Revised on 2022-04-13 17:50:11 UTC