Using the Streams for Apache Kafka Proxy
Enhancing Kafka with specialized features
Abstract
Preface Copy linkLink copied to clipboard!
Providing feedback on Red Hat documentation Copy linkLink copied to clipboard!
We appreciate your feedback on our documentation.
To propose improvements, open a Jira issue and describe your suggested changes. Provide as much detail as possible to enable us to address your request quickly.
Prerequisite
-
You have a Red Hat Customer Portal account. This account enables you to log in to the Red Hat Jira Software instance.
If you do not have an account, you will be prompted to create one.
Procedure
- Click the following: Create issue.
- In the Summary text box, enter a brief description of the issue.
In the Description text box, provide the following information:
- The URL of the page where you found the issue.
-
A detailed description of the issue.
You can leave the information in any other fields at their default values.
- Add a reporter name.
- Click Create to submit the Jira issue to the documentation team.
Thank you for taking the time to provide feedback.
Technology preview Copy linkLink copied to clipboard!
The Streams for Apache Kafka Proxy is a technology preview.
Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about the support scope, see Technology Preview Features Support Scope.
Chapter 1. Streams for Apache Kafka Proxy overview Copy linkLink copied to clipboard!
Streams for Apache Kafka Proxy is an Apache Kafka protocol-aware ("Layer 7") proxy designed to enhance Kafka-based systems. Through its filter mechanism it allows additional behavior to be introduced into a Kafka-based system without requiring changes to either your applications or the Kafka cluster itself. Built-in filters are provided as part of the solution.
Functioning as an intermediary, the Streams for Apache Kafka Proxy mediates communication between a Kafka cluster and its clients. It takes on the responsibility of receiving, filtering, and forwarding messages.
An API provides a convenient means for implementing custom logic within the proxy.
Chapter 2. Built-in filters Copy linkLink copied to clipboard!
Streams for Apache Kafka proxy comes with a suite of built-in filters designed to enhance the functionality and security of your Kafka clusters.
2.1. Record Encryption filter Copy linkLink copied to clipboard!
Streams for Apache Kafka Proxy’s Record Encryption filter enhances the security of Kafka messages. The filter uses industry-standard cryptographic techniques to apply encryption to Kafka messages, ensuring the confidentiality of data stored in the Kafka Cluster. Streams for Apache Kafka Proxy centralizes topic-level encryption, ensuring streamlined encryption across Kafka clusters.
There are three steps to using the filter:
- Setting up a Key Management System (KMS).
- Establishing the encryption keys within the KMS that will be used to encrypt the topics.
- Configuring the filter within Streams for Apache Kafka Proxy.
The filter integrates with a Key Management Service (KMS), which has ultimate responsibility for the safe storage of sensitive key material. The filter relies on a KMS implementation. Currently, Streams for Apache Kafka Proxy integrates with either HashiCorp Vault or AWS Key Management Service. You can provide implementations for your specific KMS systems. Additional KMS support will be added based on demand.
2.1.1. How encryption works Copy linkLink copied to clipboard!
The Record Encryption filter uses envelope encryption to encrypt records with symmetric encryption keys. The filter encrypts records from produce requests and decrypts records from fetch responses.
- Envelope encryption
- Envelope encryption is an industry-standard technique suited for encrypting large volumes of data in an efficient manner. Data is encrypted with a Data Encryption Key (DEK). The DEK is encrypted using a Key Encryption Key (KEK). The KEK is stored securely in a Key Management System (KMS).
- Symmetric encryption keys
- AES(GCM) 256 bit encryption symmetric encryption keys are used to encrypt and decrypt record data.
The process is as follows:
- The filter intercepts produce requests from producing applications and transforms them by encrypting the records.
- The produce request is forwarded to the broker.
- The filter intercepts fetch responses from the broker and transforms them by decrypting the records.
- The fetch response is forwarded to the consuming application.
The filter encrypts the record value only. Record keys, headers, and timestamps are not encrypted.
The entire process is transparent from the point of view of Kafka clients and Kafka brokers. Neither are aware that the records are being encrypted, nor do they have any access to the encryption keys or have any influence on the ciphering process to secure the records.
2.1.1.1. How the filter encrypts records Copy linkLink copied to clipboard!
The filter encrypts records from produce requests as follows:
- Filter selects a KEK to apply.
- Requests the KMS to generate a DEK for the KEK.
- Uses an encrypted DEK (DEK encrypted with the KEK) to encrypt the record.
- Replaces the original record with a ciphertext record (encrypted record, encrypted DEK, and metadata).
The filter uses a DEK reuse strategy. Encrypted records are sent to the same topic using the same DEK until a time-out or an encryption limit is reached.
2.1.1.2. How the filter decrypts records Copy linkLink copied to clipboard!
The filter decrypts records from fetch responses as follows:
- Filter receives a cipher record from the Kafka broker.
- Reverses the process that constructed the cipher record.
- Uses KMS to decrypt the DEK.
- Uses the decrypted DEK to decrypt the encrypted record.
- Replaces the cipher record with a decrypted record.
The filter uses an LRU (least recently used) strategy for caching decrypted DEKs. Decrypted DEKs are kept in memory to reduce interactions with the KMS.
2.1.1.3. How the filter uses the KMS Copy linkLink copied to clipboard!
To support the filter, the KMS provides the following:
- A secure repository for storing Key Encryption Keys (KEKs)
- A service for generating and decrypting Data Encryption Keys (DEKs)
KEKs stay within the KMS. The KMS generates a DEK (which is securely generated random data) for a given KEK, then returns the DEK and an encrypted DEK. The encrypted DEK has the same data but encrypted with the KEK. The KMS doesn’t store encrypted DEKs; they are stored as part of the cipher record in the broker.
The KMS must be available during runtime. If the KMS is unavailable, the filter will not be able to obtain new encrypted DEKs on the produce path or decrypt encrypted DEKs on the consume path. The filter will continue to use previously obtained DEKs, but eventually, production and consumption will become impossible. It is recommended to use the KMS in a high availability (HA) configuration.
2.1.1.4. Practicing key rotation Copy linkLink copied to clipboard!
Key rotation involves periodically replacing cryptographic keys with new ones and is considered a best practice in cryptography.
The filter allows the rotation of Key Encryption Keys (KEKs) within the Key Management System (KMS). When a KEK is rotated, the new key material is eventually used for newly produced records. Existing records, encrypted with older KEK versions, remain decryptable as long as the previous KEK versions are still available in the KMS.
If your encrypted topic is receiving regular traffic, the Data Encryption Key (DEK) will be refreshed as new records flow through. However, if messages are infrequent, the DEK might be used for up to 2 hours (by default) after its creation.
When the KEK is rotated in the external KMS, it will take up to 1 hour (by default) before all{fn-dek-refresh} records produced by the filter contain a DEK encrypted with the new key material. This is because existing encrypted DEKs are used for a configurable amount of time after creation, the Filter caches the encrypted DEK, one hour after creation they are eligible to be refreshed.
If you need to rotate key material immediately, execute a rolling restart of your cluster of Streams for Apache Kafka Proxy instances.
If an old KEK version is removed from the KMS, records encrypted with that key will become unreadable, causing fetch operations to fail. In such cases, the consumer offset must be advanced beyond those records.
2.1.1.5. What part of a record is encrypted? Copy linkLink copied to clipboard!
The record encryption filter encrypts only the values of records, leaving record keys, headers, and timestamps untouched. Null record values, which might represent deletions in compacted topics, are transmitted to the broker unencrypted. This approach ensures that compacted topics function correctly.
2.1.1.6. Unencrypted topics Copy linkLink copied to clipboard!
You may configure the system so that some topics are encrypted and others are not. This supports scenarios where topics with confidential information are encrypted and Kafka topics with non-sensitive information can be left unencrypted.
2.1.2. Setting up HashiCorp Vault Copy linkLink copied to clipboard!
To use HashiCorp Vault with the Record Encryption filter, use the following setup:
- Enable the Transit Engine as the Record Encryption filter relies on its APIs.
- Create a Vault policy specifically for the filter with permissions for generating and decrypting Data Encryption Keys (DEKs) for envelope encryption.
- Obtain a Vault token that includes the filter policy.
2.1.2.1. Enable the Transit Engine Copy linkLink copied to clipboard!
The filter integrates with the HashiCorp Vault Transit Engine. Vault does not enable the Transit Engine by default. It must be enabled before it can be used by the filter.
2.1.2.1.1. Vault Transit Engine URL Copy linkLink copied to clipboard!
The Vault Transit Engine URL is required so the filter knows the location of the Transit Engine within the Vault instance.
The URL is formed from the concatenation of the Api Address (reported by Vault reported by during starts up) with the complete path to Transit Engine, including the name of the engine itself. If Namespacing is used on the Vault instance, the path needs to include the namespace(s). The URL will end with /transit unless the -path parameter was used when enabling the engine.
If namespacing is not in use, the URL will look like this:
https://myvaultinstance:8200/v1/transit
https://myvaultinstance:8200/v1/transit
If namespacing is in use, the path must include the namespaces. For example, if there is a parent namespace is a and a child namespace is b, the URL will look like this:
https://myvaultinstance:8200/v1/a/b/transit
https://myvaultinstance:8200/v1/a/b/transit
If the name of the Transit engine was changed (using the -path argument to the vault secrets enable transit command) the URL will look like this:
https://myvaultinstance:8200/v1/mytransit
https://myvaultinstance:8200/v1/mytransit
2.1.2.1.2. Establish the naming convention for keys within Vault hierarchy Copy linkLink copied to clipboard!
Establish a naming convention for keys to keep the filter’s keys separate from those used by other systems. Here, we use a prefix of KEK_ for filter key name. Adjust the instructions if a different naming convention is used.
2.1.2.1.3. Role of the administrator Copy linkLink copied to clipboard!
To use the filter, an administrator (or administrative process) must create the encryption keys within Vault that will be used to encrypt the records. The organization deploying the Record Encryption filter is responsible for managing this administrator or process.
The administrator must have permissions to create keys beneath transit/keys/KEK_* in the Vault hierarchy.
As a guideline, the minimal Vault policy required by the administrator is as follows:
path "transit/keys/KEK_*" {
capabilities = ["read", "write"]
}
path "transit/keys/KEK_*" {
capabilities = ["read", "write"]
}
2.1.2.1.4. Establish an application identity for the filter Copy linkLink copied to clipboard!
The filter must authenticate to Vault in order to perform envelope encryption operations, such as generating and decrypting DEKs Therefore, a Vault identity with sufficient permissions must be created for the filter.
Create a Vault policy for the filter:
Create a Periodic (long-lived) Vault Token for the filter:
vault token create -display-name "kroxylicious record encryption" \
-policy=kroxylicious_encryption_filter_policy \
-period=768h \
-no-default-policy \
-orphan
vault token create -display-name "kroxylicious record encryption" \
-policy=kroxylicious_encryption_filter_policy \
-period=768h \
-no-default-policy \
-orphan
- 1
- Causes the token to be periodic (with every renewal using the given period).
- 2
- Detach the "default" policy from the policy set for this token. This is done so the token has least-privilege.
- 3
- Create the token with no parent. This is done so that expiration of a parent won’t expire the token used by the filter.
The example token create command illustrates the use of -no-default-policy and -orphan. The use of these flags is not functionally important. You may adapt the configuration of the token to suit the standards required by your organization.
The token create command yields the token. The token value is required later when configuring the vault within the filter.
token hvs.CAESIFJ_HHo0VnnW6DSbioJ80NqmuYm2WlON-QxAPmiJScZUGh4KHGh2cy5KdkdFZUJMZmhDY0JCSVhnY2JrbUNEWnE token_accessor 4uQZJbEnxW4YtbDBaW6yVzwP token_policies [kroxylicious_encryption_filter_policy]
token hvs.CAESIFJ_HHo0VnnW6DSbioJ80NqmuYm2WlON-QxAPmiJScZUGh4KHGh2cy5KdkdFZUJMZmhDY0JCSVhnY2JrbUNEWnE
token_accessor 4uQZJbEnxW4YtbDBaW6yVzwP
token_policies [kroxylicious_encryption_filter_policy]
The token must be renewed before expiration. It is the responsibility of the administrator to do this.
This can be done with a command like the following:
vault token renew --accessor <token_accessor>
vault token renew --accessor <token_accessor>
2.1.2.1.5. Testing the application identity for the filter using the CLI Copy linkLink copied to clipboard!
To test whether the application identity and the policy are working correctly, a script can be used.
First, as the administrator, create a KEK in the hierarchy at this path transit/keys/KEK_testkey.
VAULT_TOKEN=<kroxylicious encryption filter token> validate_vault_token.sh <kek path>
VAULT_TOKEN=<kroxylicious encryption filter token> validate_vault_token.sh <kek path>
The script should respond Ok. If errors are reported check the policy/token configuration.
transit/keys/KEK_testkey can now be removed.
2.1.2.2. Configuring the HashiCorp Vault KMS Copy linkLink copied to clipboard!
For HashiCorp Vault, the KMS configuration looks like this. Use the Vault Token and Vault Transit Engine URL values that you gathered above.
For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.
2.1.2.3. Creating HashiCorp Vault keys Copy linkLink copied to clipboard!
As the administrator, use either the HashiCorp UI or CLI to create AES-256 symmetric keys following your key naming convention. The key type must be aes256-gcm96, which is Vault’s default key type.
It is recommended to use a key rotation policy.
If using the Vault CLI, the command will look like:
vault write -f transit/keys/KEK_trades type=aes256-gcm96 auto_rotate_period=90d
vault write -f transit/keys/KEK_trades type=aes256-gcm96 auto_rotate_period=90d
2.1.3. Setting up AWS KMS Copy linkLink copied to clipboard!
To use AWS Key Management Service with the Record Encryption filter, use the following setup:
- Establish an AWS KMS aliasing convention for keys
- Configure the AWS KMS
- Create AWS KMS keys
You’ll need a privileged AWS user that is capable of creating users and policies to perform the set-up.
2.1.3.1. Establish an aliasing convention for keys within AWS KMS Copy linkLink copied to clipboard!
The filter references KEKs within AWS via an AWS key alias.
Establish a naming convention for key aliases to keep the filter’s keys separate from those used by other systems. Here, we use a prefix of KEK_ for filter aliases. Adjust the instructions if a different naming convention is used.
2.1.3.1.1. Role of the administrator Copy linkLink copied to clipboard!
To use the filter, an administrator (or administrative process) must create the encryption keys within AWS KMS that will be used to encrypt the records. The organization deploying the Record Encryption filter is responsible for managing this administrator or process.
The administrator must have permissions to create keys in AWS KMS. As a starting point, the built-in AWS policy AWSKeyManagementServicePowerUser confers sufficient key management privileges.
To get started, use the following commands to set up an administrator with permissions suitable for managing encryption keys in KMS through an AWS Cloud Shell. This example illustrates using the user name`kroxylicious-admin`, but you can choose a different name if preferred. Adjust the instructions accordingly if you use a different user name.
2.1.3.1.2. Establish an application identity for the filter Copy linkLink copied to clipboard!
The filter must authenticate to AWS in order to perform envelope encryption operations, such as generating and decrypting DEKs. Therefore, an AWS IAM identity with sufficient permissions must be created for the filter.
Use AWS IAM to create this identity and generate an Access Key for it. The Access Key/Secret Key pair is used by the filter. Do not enable console access for this user.
Using the CLI, the following commands create the IAM identity for the filter. This example uses the user name kroxylicious, but you can choose a different name if needed. Adjust the instructions accordingly if using a different user name.
aws iam create-user --user-name kroxylicious aws iam create-access-key --user-name kroxylicious
aws iam create-user --user-name kroxylicious
aws iam create-access-key --user-name kroxylicious
2.1.3.1.3. Create an alias-based policy for KEK aliases Copy linkLink copied to clipboard!
Create an alias based policy granting permissions to use keys aliased by the established alias naming convention.
2.1.3.1.4. Apply the alias-based policy to the filter’s application identity Copy linkLink copied to clipboard!
Attach the alias policy to the filter’s application identity. This will allow the filter to perform key operations on all KEKs with aliases that match the specified naming convention.
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws iam attach-user-policy --user-name kroxylicious --policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KroxyliciousRecordEncryption"
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws iam attach-user-policy --user-name kroxylicious --policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KroxyliciousRecordEncryption"
2.1.3.2. Configuring the AWS KMS Copy linkLink copied to clipboard!
For AWS KMS, the KMS configuration looks like this.
For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.
2.1.3.3. Creating AWS KMS keys Copy linkLink copied to clipboard!
As the administrator, use either the AWS Console or CLI to create a Symmetric key with Encrypt and decrypt usage. Multi-region keys are supported.
It is not possible to make use of keys from other AWS accounts. For more information on this limitation, see the issue for AWS KMS serde improvements.
Give the key an alias as described in Section 2.1.3.1, “Establish an aliasing convention for keys within AWS KMS”.
If using the CLI, this can be done with commands like this:
KEY_ALIAS="KEK_<name>"
KEY_ID=$(aws kms create-key | jq -r '.KeyMetadata.KeyId')
# the create key command will produce JSON output including the KeyId
aws kms create-alias --alias-name alias/${KEY_ALIAS} --target-key-id ${KEY_ID}
KEY_ALIAS="KEK_<name>"
KEY_ID=$(aws kms create-key | jq -r '.KeyMetadata.KeyId')
# the create key command will produce JSON output including the KeyId
aws kms create-alias --alias-name alias/${KEY_ALIAS} --target-key-id ${KEY_ID}
Once the key is created, it is recommended to use a key rotation policy.
aws kms enable-key-rotation --key-id ${KEY_ID} --rotation-period-in-days 180
aws kms enable-key-rotation --key-id ${KEY_ID} --rotation-period-in-days 180
2.1.4. Setting up the Record Encryption filter Copy linkLink copied to clipboard!
This procedure describes how to set up the Record Encryption filter. Provide the filter configuration and the Key Encryption Key (KEK) selector to use. The KEK selector maps topic name to key names. The filter looks up the resulting key name in the KMS.
Prerequisites
- An instance of Streams for Apache Kafka Proxy. For information on deploying Streams for Apache Kafka Proxy, see the samples and examples.
- A config map for Streams for Apache Kafka Proxy that includes the configuration for creating virtual clusters and filters.
- A KMS is installed and set up for the filter with KEKs to encrypt records set up for topics.
Procedure
Configure a
RecordEncryptiontype filter.Example Record Encryption filter configuration
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- The KMS service name.
- 2
- Configuration specific to the KMS provider.
- 3
- The Key Encryption Key (KEK) selector to use. The
${topicName}is a literal understood by the proxy. For example, if using theTemplateKekSelectorwith the templateKEK_${topicName}, create a key for every topic that is to be encrypted with the key name matching the topic name, prefixed by the stringKEK_. - 4
- The template for deriving the KEK, based on a specific topic name.
- 5
- How long after creation of a DEK before it becomes eligible for rotation. On the next encryption request, the cache will asynchronously create a new DEK. Encryption requests will continue to use the old DEK until the new DEK is ready.
- 6
- How long after creation of a DEK until it is removed from the cache. This setting puts an upper bound on how long a DEK can remain cached.
- 7
- The maximum number of records any DEK should be used to encrypt. After this limit is hit, that DEK will be destroyed and a new one created.
encryptionDekRefreshAfterWriteSeconds and encryptionDekExpireAfterWriteSeconds help govern the "originator usage period" of the DEK. That is the period of time the DEK will be used to encrypt records. Keeping the period short helps reduce the blast radius in the event that DEK key material is leaked. However, there is a trade-off. The additional KMS API calls will increase produce/consume latency and may increase your KMS provider costs.
maxEncryptionsPerDek helps prevent key exhaustion by placing an upper limit of the amount of times that a DEK may be used to encrypt records.
- Verify that the encryption has been applied to the specified topics by producing messages through the proxy and then consuming directly and indirectly from the Kafka cluster.
If the filter is unable to find the key in the KMS, the filter passes through the records belonging to that topic in the produce request without encrypting them.
2.2. (Preview) Record Validation filter Copy linkLink copied to clipboard!
The Record Validation filter validates records sent by a producer. Only records that pass the validation are sent to the broker. This filter can be used to prevent poison messages—such as those containing corrupted data or invalid formats—from entering the Kafka system, which may otherwise lead to consumer failure.
The filter currently supports two modes of operation:
- Schema Validation ensures the content of the record conforms to a schema stored in an Apicurio Registry.
- JSON Syntax Validation ensures the content of the record contains syntactically valid JSON.
Validation rules can be applied to check the content of the Kafka record key or value.
If the validation fails, the product request is rejected and the producing application receives an error response. The broker will not receive the rejected records.
This filter is currently in incubation and available as a preview. We would not recommend using it in a production environment.
2.2.1. (Preview) Setting up the Record Validation filter Copy linkLink copied to clipboard!
This procedure describes how to set up the Record Validation filter. Provide the filter configuration and rules that the filter uses to check against Kafka record keys and values.
Prerequisites
- An instance of Streams for Apache Kafka Proxy. For information on deploying Streams for Apache Kafka Proxy, see the samples and examples.
- A config map for Streams for Apache Kafka Proxy that includes the configuration for creating a virtual cluster.
- Apicurio Registry (if wanting to use Schema validation).
Procedure
-
Configure a
RecordValidationtype filter.
- 1
- List of topic names to which the validation rules will be applied.
- 2 5
- Validation rules that are applied to the record’s key.
- 3 6
- Validation rules that are applied to the record’s value.
- 4
- (Optional) Default rule that is applied to any topics for which there is no explict rule defined.
- 7
- If
false, any record failing validation will cause the entire produce request to be rejected. Iftrue, only partitions within the request where all records validated successfully will be forwarded to the broker. The response the client will receive will be a mixed response with response returned from the broker for the partitions whose records passed validation and error responses for the partitions that failed validation. If the client is using transactions, this setting is ignored. Any failing record validation will cause the entire produce request to be rejected.
Replace the token <rule definition> in the YAML configuration with either a Schema Validation rule or a JSON Syntax Validation rule depending on your requirements.
Example Schema Validation Rule Definition
The Schema Validation rule validates that the key or value matches a schema identified by its global ID within an Apicurio Schema Registry.
If the key or value does not adhere to the schema, the record will be rejected.
Additionally, if the kafka producer has embedded a global ID within the record it will be validated against the global ID defined by the rule. If they do not match, the record will be rejected. See the Apicurio documentation for details on how the global ID could be embedded into the record. The filter supports extracting ID’s from either the Apicurio globalId record header or from the initial bytes of the serialized content itself.
schemaValidationConfig:
apicurioGlobalId: 1001
apicurioRegistryUrl: http://registry.local:8080
allowNulls: true
allowEmpty: true
schemaValidationConfig:
apicurioGlobalId: 1001
apicurioRegistryUrl: http://registry.local:8080
allowNulls: true
allowEmpty: true
Schema validation mode currently has the capability to enforce only JSON schemas (issue)
Example JSON Syntax Validation Rule Definition
The JSON Syntax Validation rule validates that the key or value contains only syntactically correct JSON.
syntacticallyCorrectJson:
validateObjectKeysUnique: true
allowNulls: true
allowEmpty: true
syntacticallyCorrectJson:
validateObjectKeysUnique: true
allowNulls: true
allowEmpty: true
2.3. Using example deployment files to set up filters Copy linkLink copied to clipboard!
Streams for Apache Kafka includes example pre-configured installation artifacts for the proxy in the examples/proxy/ folder.
These installation files offer a quick setup for trying out the proxy.
Prerequisites
-
Installation requires an OpenShift user with
cluster-adminrole, such assystem:admin. -
The
occommand-line tool is installed and configured to connect to the OpenShift cluster with admin access. -
An OpenShift project namespace called
proxy, which is the same namespace where the proxy is installed by default. -
Any filter-specific prerequisites, as described in the
READMEfiles provided with the examples.
Using the example files
Download and extract the Streams for Apache Kafka Proxy installation artifacts.
The artifacts are included with installation and example files available from Streams for Apache Kafka software downloads page.
-
Follow the instructions in the
READMEfiles, as applicable.
Chapter 3. Configuring proxies Copy linkLink copied to clipboard!
Fine-tune your deployment by configuring proxies to include additional features according to your specific requirements.
3.1. Configuring virtual clusters Copy linkLink copied to clipboard!
A Kafka cluster is represented by the proxy as a virtual cluster. Clients connect to the virtual cluster rather than the actual cluster. When Streams for Apache Kafka Proxy is deployed, it includes configuration to create virtual clusters.
A virtual cluster has exactly one target cluster, but many virtual clusters can target the same cluster. Each virtual cluster targets a single listener on the target cluster, so multiple listeners on the Kafka side are represented as multiple virtual clusters by the proxy. Clients connect to a virtual cluster using a bootstrap_servers address. The virtual cluster has a bootstrap address that maps to each broker in the target cluster. When a client connects to the proxy, communication is proxied to the target broker by rewriting the address. Responses back to clients are rewritten to reflect the appropriate network addresses of the virtual clusters.
You can secure virtual cluster connections from clients and to target clusters.
Streams for Apache Kafka Proxy accepts keys and certificates in PEM (Privacy Enhanced Mail), PKCS #12 (Public-Key Cryptography Standards), or JKS (Java KeyStore) keystore format.
3.2. Example Streams for Apache Kafka Proxy configuration Copy linkLink copied to clipboard!
Streams for Apache Kafka Proxy configuration is defined in a ConfigMap resource. Use the data properties of the ConfigMap resource to configure the following:
- Virtual clusters that represent the Kafka clusters
- Network addresses for broker communication in a Kafka cluster
- Filters to introduce additional functionality to the Kafka deployment
In this example, configuration for the Record Encryption filter is shown.
Example Streams for Apache Kafka Proxy configuration
- 1
- Enables metrics for the proxy.
- 2
- Virtual cluster configuration.
- 3
- The name of the virtual cluster.
- 4
- The bootstrap address of the target physical Kafka Cluster being proxied.
- 5
- TLS configuration for the connection to the target cluster.
- 6
- The configuration for the cluster network address configuration provider that controls how the virtual cluster is presented to the network.
- 7
- The built-in types are
PortPerBrokerClusterNetworkAddressConfigProviderandSniRoutingClusterNetworkAddressConfigProvider. - 8
- The hostname and port of the bootstrap used by the Kafka clients. The hostname must be resolved by the clients.
- 9
- Logging is disabled by default. Enable logging related to network activity (
logNetwork) and messages (logFrames) by setting the logging properties totrue. - 10
- TLS encryption for securing connections with the clients.
- 11
- Filter configuration.
- 12
- The type of filter, which is the Record Encryption filter using Vault as the KMS in this example.
- 13
- The configuration specific to the type of filter.
- 14
- If required, you can also specify the credentials for TLS authentication with the KMS, with key names under which TLS certificates are stored.
3.3. Securing connections from clients Copy linkLink copied to clipboard!
To secure client connections to virtual clusters, configure TLS on the virtual cluster by doing the following:
-
Obtain a server certificate for the virtual cluster from a Certificate Authority (CA).
Ensure the certificate matches the names of the virtual cluster’s bootstrap and broker addresses.
This may require wildcard certificates and Subject Alternative Names (SANs). - Provide the TLS configuration using the tls properties in the virtual cluster’s configuration to enable it to present the certificate to clients. Depending on your certificate format, apply one of the following examples.
TLS is recommended on Kafka clients and virtual clusters for production configurations.
Example PKCS #12 configuration
- 1
- PKCS #12 store for the public CA certificate of the virtual cluster.
- 2
- Password to protect the PKCS #12 store.
- 3
- (Optional) Password for the key. If a password is not specified, the keystore’s password is used to decrypt the key too.
- 4
- (Optional) Keystore type. If a keystore type is not specified, the default JKS (Java Keystore) type is used.
Example PEM configuration
If required, configure the insecure property to disable trust and establish insecure connections with any Kafka Cluster, irrespective of certificate validity. However, this option is only intended for use in development and testing environments where proper certificates are hard to obtain and mange.
Example to enable insecure TLS
- 1
- Enables insecure TLS.
3.4. Securing connections to target clusters Copy linkLink copied to clipboard!
To secure a virtual cluster connection to a target cluster, configure TLS on the virtual cluster. The target cluster must already be configured to use TLS.
Specify TLS for the virtual cluster configuration using targetCluster.tls properties
Use an empty object ({}) to inherit trust from the underlying platform on which the cluster is running. This option is suitable if the target cluster is using a TLS certificate signed by a public CA.
Example target cluster configuration for TLS
If it is using a TLS certificate signed by a private CA, you must add truststore configuration for the target cluster.
Example truststore configuration for a target cluster
For mTLS, you can add keystore configuration for the virtual cluster too.
Example keystore and truststore configuration for mTLS
For the purposes of testing outside of a production environment, you can set the insecure property to true to turn off TLS so that the Streams for Apache Kafka Proxy can connect to any Kafka cluster.
Example configuration to turn off TLS
3.5. Configuring network addresses Copy linkLink copied to clipboard!
Virtual cluster configuration requires a network address configuration provider that manages network communication and provides broker address information to clients.
Streams for Apache Kafka Proxy has the following built-in providers:
-
Broker address provider (
PortPerBrokerClusterNetworkAddressConfigProvider) -
Node ID ranges provider (
RangeAwarePortPerNodeClusterNetworkAddressConfigProvider) -
SNI routing address provider (
SniRoutingClusterNetworkAddressConfigProvider)
Make sure that the virtual cluster bootstrap address and generated broker addresses are resolvable and routable by the Kafka client.
3.5.1. Broker address provider Copy linkLink copied to clipboard!
The per-broker network address configuration provider opens one port for a virtual cluster’s bootstrap address and one port for each broker in the target Kafka cluster. The number of open ports is maintained dynamically. For example, if a broker is removed from the cluster, the port assigned to it is closed. If you have two virtual clusters, each targeting a Kafka cluster with three brokers, eight ports are bound in total.
This provider works best with straightforward configurations. Ideally, the target cluster should have sequential, stable broker IDs and a known minimum broker ID, such as 0, 1, 2 for a cluster with three brokers. While it can handle non-sequential broker IDs, this would require exposing ports equal to maxBrokerId - minBrokerId, which could be excessive if your cluster contains broker IDs like 0 and 20000.
The provider supports both cleartext and TLS downstream connections.
Example broker address configuration
- 1
- The hostname and port of the bootstrap address used by Kafka clients.
- 2
- (Optional) The broker address pattern used to form broker addresses. If not defined, it defaults to the hostname part of the bootstrap address and the port number allocated to the broker.
- 3
- (Optional) The starting number for the broker port range. Defaults to the port of the bootstrap address plus 1.
- 4
- (Optional) The maximum number of broker ports that are permitted. Set this value according to the maximum number of brokers allowed by your operational rules. Defaults to 3.
- 5
- 6
- (Optional) The bind address used when binding the ports. If undefined, all network interfaces are bound.
Each broker’s ID must be greater than or equal to lowestTargetBrokerId and less than lowestTargetBrokerId + numberOfBrokerPorts. The current strategy for mapping node IDs to ports is as follows: nodeId → brokerStartPort + nodeId - lowestTargetBrokerId. The example configuration maps broker IDs 1000, 1001, and 1002 to ports 9193, 9194, and 9195, respectively. Reconfigure numberOfBrokerPorts to accommodate the number of brokers in the cluster.
The example broker address configuration creates the following broker addresses:
mybroker-0.mycluster.kafka.com:9193 mybroker-1.mycluster.kafka.com:9194 mybroker-2.mycluster.kafka.com:9194
mybroker-0.mycluster.kafka.com:9193
mybroker-1.mycluster.kafka.com:9194
mybroker-2.mycluster.kafka.com:9194
The brokerAddressPattern configuration parameter accepts the $(nodeId) replacement token, which is optional. If included, $(nodeId) is replaced by the broker’s node.id (or broker.id) in the target cluster.
For example, with the configuration shown above, if your cluster has three brokers, your Kafka client receives broker addresses like this:
0. mybroker-0.mycluster.kafka.com:9193 1. mybroker-1.mycluster.kafka.com:9194 2. mybroker-2.mycluster.kafka.com:9195
0. mybroker-0.mycluster.kafka.com:9193
1. mybroker-1.mycluster.kafka.com:9194
2. mybroker-2.mycluster.kafka.com:9195
3.5.2. Node ID ranges provider Copy linkLink copied to clipboard!
As an alternative to the broker address provider, the node ID ranges provider allows you to model specific ranges of node IDs in the target cluster, enabling efficient port allocation even when broker IDs are non-sequential or widely spaced This ensures a deterministic mapping of node IDs to ports while minimizing the number of ports needed.
Example node ID ranges configuration
Node ID ranges must be distinct, meaning a node ID cannot belong to more than one range.
KRaft roles given to cluster nodes can be accommodated in the configuration. For example, consider a target cluster using KRaft with the following node IDs and roles:
- nodeId: 0, roles: controller
- nodeId: 1, roles: controller
- nodeId: 2, roles: controller
- nodeId: 1000, roles: broker
- nodeId: 1001, roles: broker
- nodeId: 1002, roles: broker
- nodeId: 99999, roles: broker
This can be modeled as three node ID ranges, as shown in the following example.
Example node ID ranges configuration with KRaft roles
This configuration results in the following mapping from node ID to port:
- nodeId: 0 → port 9193
- nodeId: 1 → port 9194
- nodeId: 2 → port 9195
- nodeId: 1000 → port 9196
- nodeId: 1001 → port 9197
- nodeId: 1002 → port 9198
- nodeId: 99999 → port 9199
3.5.3. SNI routing address provider Copy linkLink copied to clipboard!
The SNI ((Server Name Indication) routing provider opens a single port for all virtual clusters or a port for each. You can open a port for the whole cluster or each broker. The SNI routing provider uses SNI information to determine where to route the traffic, so requires downstream TLS.
Example SNI routing address provider configuration
- 1
- A single address for all traffic, including bootstrap address and brokers.
In the SNI routing address configuration, the brokerAddressPattern specification is mandatory, as it is required to generate routes for each broker.
Single port operation may have cost advantages when using load balancers of public clouds, as it allows a single cloud provider load balancer to be shared across all virtual clusters.
Chapter 4. Introducing metrics Copy linkLink copied to clipboard!
If you want to introduce metrics to your Streams for Apache Kafka Proxy deployment, you can configure an insecure HTTP and Prometheus endpoint (at /metrics).
Add the following to the ConfigMap resource that defines the Streams for Apache Kafka Proxy configuration:
Minimal metrics configuration
adminHttp:
endpoints:
prometheus: {}
adminHttp:
endpoints:
prometheus: {}
By default, the HTTP endpoint listens on 0.0.0.0:9190. You can change the hostname and port as follows:
Example metrics configuration with hostname and port
adminHttp:
host: localhost
port: 9999
endpoints:
prometheus: {}
adminHttp:
host: localhost
port: 9999
endpoints:
prometheus: {}
The example files provided with the proxy include a PodMonitor resource. If you have enabled monitoring in OpenShift for user-defined projects, you can use a PodMonitor resource to ingest the proxy metrics.
Example PodMonitor resource configuration
Appendix A. Using your subscription Copy linkLink copied to clipboard!
Streams for Apache Kafka is provided through a software subscription. To manage your subscriptions, access your account at the Red Hat Customer Portal.
Accessing Your Account
- Go to access.redhat.com.
- If you do not already have an account, create one.
- Log in to your account.
Activating a Subscription
- Go to access.redhat.com.
- Navigate to My Subscriptions.
- Navigate to Activate a subscription and enter your 16-digit activation number.
Downloading Zip and Tar Files
To access zip or tar files, use the customer portal to find the relevant files for download. If you are using RPM packages, this step is not required.
- Open a browser and log in to the Red Hat Customer Portal Product Downloads page at access.redhat.com/downloads.
- Locate the Streams for Apache Kafka for Apache Kafka entries in the INTEGRATION AND AUTOMATION category.
- Select the desired Streams for Apache Kafka product. The Software Downloads page opens.
- Click the Download link for your component.
Installing packages with DNF
To install a package and all the package dependencies, use:
dnf install <package_name>
dnf install <package_name>
To install a previously-downloaded package from a local directory, use:
dnf install <path_to_download_package>
dnf install <path_to_download_package>
Revised on 2024-11-13 12:21:28 UTC