Using the Streams for Apache Kafka Proxy


Red Hat Streams for Apache Kafka 2.8

Enhancing Kafka with specialized features

Abstract

Streams for Apache Kafka Proxy is an Apache Kafka protocol-aware proxy designed to enhance Kafka-based systems through its filter mechanism. In this preview, Streams for Apache Kafka Proxy includes a Record Encryption filter, which provides encryption at rest for data stored in a Kafka cluster.

Preface

Providing feedback on Red Hat documentation

We appreciate your feedback on our documentation.

To propose improvements, open a Jira issue and describe your suggested changes. Provide as much detail as possible to enable us to address your request quickly.

Prerequisite

  • You have a Red Hat Customer Portal account. This account enables you to log in to the Red Hat Jira Software instance.
    If you do not have an account, you will be prompted to create one.

Procedure

  1. Click the following: Create issue.
  2. In the Summary text box, enter a brief description of the issue.
  3. In the Description text box, provide the following information:

    • The URL of the page where you found the issue.
    • A detailed description of the issue.
      You can leave the information in any other fields at their default values.
  4. Add a reporter name.
  5. Click Create to submit the Jira issue to the documentation team.

Thank you for taking the time to provide feedback.

Technology preview

The Streams for Apache Kafka Proxy is a technology preview.

Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about the support scope, see Technology Preview Features Support Scope.

Streams for Apache Kafka Proxy is an Apache Kafka protocol-aware ("Layer 7") proxy designed to enhance Kafka-based systems. Through its filter mechanism it allows additional behavior to be introduced into a Kafka-based system without requiring changes to either your applications or the Kafka cluster itself. Built-in filters are provided as part of the solution.

Functioning as an intermediary, the Streams for Apache Kafka Proxy mediates communication between a Kafka cluster and its clients. It takes on the responsibility of receiving, filtering, and forwarding messages.

An API provides a convenient means for implementing custom logic within the proxy.

Chapter 2. Built-in filters

Streams for Apache Kafka proxy comes with a suite of built-in filters designed to enhance the functionality and security of your Kafka clusters.

2.1. Record Encryption filter

Streams for Apache Kafka Proxy’s Record Encryption filter enhances the security of Kafka messages. The filter uses industry-standard cryptographic techniques to apply encryption to Kafka messages, ensuring the confidentiality of data stored in the Kafka Cluster. Streams for Apache Kafka Proxy centralizes topic-level encryption, ensuring streamlined encryption across Kafka clusters.

There are three steps to using the filter:

  1. Setting up a Key Management System (KMS).
  2. Establishing the encryption keys within the KMS that will be used to encrypt the topics.
  3. Configuring the filter within Streams for Apache Kafka Proxy.

The filter integrates with a Key Management Service (KMS), which has ultimate responsibility for the safe storage of sensitive key material. The filter relies on a KMS implementation. Currently, Streams for Apache Kafka Proxy integrates with either HashiCorp Vault or AWS Key Management Service. You can provide implementations for your specific KMS systems. Additional KMS support will be added based on demand.

2.1.1. How encryption works

The Record Encryption filter uses envelope encryption to encrypt records with symmetric encryption keys. The filter encrypts records from produce requests and decrypts records from fetch responses.

Envelope encryption
Envelope encryption is an industry-standard technique suited for encrypting large volumes of data in an efficient manner. Data is encrypted with a Data Encryption Key (DEK). The DEK is encrypted using a Key Encryption Key (KEK). The KEK is stored securely in a Key Management System (KMS).
Symmetric encryption keys
AES(GCM) 256 bit encryption symmetric encryption keys are used to encrypt and decrypt record data.

The process is as follows:

  1. The filter intercepts produce requests from producing applications and transforms them by encrypting the records.
  2. The produce request is forwarded to the broker.
  3. The filter intercepts fetch responses from the broker and transforms them by decrypting the records.
  4. The fetch response is forwarded to the consuming application.

The filter encrypts the record value only. Record keys, headers, and timestamps are not encrypted.

The entire process is transparent from the point of view of Kafka clients and Kafka brokers. Neither are aware that the records are being encrypted, nor do they have any access to the encryption keys or have any influence on the ciphering process to secure the records.

2.1.1.1. How the filter encrypts records

The filter encrypts records from produce requests as follows:

  1. Filter selects a KEK to apply.
  2. Requests the KMS to generate a DEK for the KEK.
  3. Uses an encrypted DEK (DEK encrypted with the KEK) to encrypt the record.
  4. Replaces the original record with a ciphertext record (encrypted record, encrypted DEK, and metadata).

The filter uses a DEK reuse strategy. Encrypted records are sent to the same topic using the same DEK until a time-out or an encryption limit is reached.

2.1.1.2. How the filter decrypts records

The filter decrypts records from fetch responses as follows:

  1. Filter receives a cipher record from the Kafka broker.
  2. Reverses the process that constructed the cipher record.
  3. Uses KMS to decrypt the DEK.
  4. Uses the decrypted DEK to decrypt the encrypted record.
  5. Replaces the cipher record with a decrypted record.

The filter uses an LRU (least recently used) strategy for caching decrypted DEKs. Decrypted DEKs are kept in memory to reduce interactions with the KMS.

2.1.1.3. How the filter uses the KMS

To support the filter, the KMS provides the following:

  • A secure repository for storing Key Encryption Keys (KEKs)
  • A service for generating and decrypting Data Encryption Keys (DEKs)

KEKs stay within the KMS. The KMS generates a DEK (which is securely generated random data) for a given KEK, then returns the DEK and an encrypted DEK. The encrypted DEK has the same data but encrypted with the KEK. The KMS doesn’t store encrypted DEKs; they are stored as part of the cipher record in the broker.

Warning

The KMS must be available during runtime. If the KMS is unavailable, the filter will not be able to obtain new encrypted DEKs on the produce path or decrypt encrypted DEKs on the consume path. The filter will continue to use previously obtained DEKs, but eventually, production and consumption will become impossible. It is recommended to use the KMS in a high availability (HA) configuration.

2.1.1.4. Practicing key rotation

Key rotation involves periodically replacing cryptographic keys with new ones and is considered a best practice in cryptography.

The filter allows the rotation of Key Encryption Keys (KEKs) within the Key Management System (KMS). When a KEK is rotated, the new key material is eventually used for newly produced records. Existing records, encrypted with older KEK versions, remain decryptable as long as the previous KEK versions are still available in the KMS.

Important

If your encrypted topic is receiving regular traffic, the Data Encryption Key (DEK) will be refreshed as new records flow through. However, if messages are infrequent, the DEK might be used for up to 2 hours (by default) after its creation.

When the KEK is rotated in the external KMS, it will take up to 1 hour (by default) before all{fn-dek-refresh} records produced by the filter contain a DEK encrypted with the new key material. This is because existing encrypted DEKs are used for a configurable amount of time after creation, the Filter caches the encrypted DEK, one hour after creation they are eligible to be refreshed.

If you need to rotate key material immediately, execute a rolling restart of your cluster of Streams for Apache Kafka Proxy instances.

Warning

If an old KEK version is removed from the KMS, records encrypted with that key will become unreadable, causing fetch operations to fail. In such cases, the consumer offset must be advanced beyond those records.

2.1.1.5. What part of a record is encrypted?

The record encryption filter encrypts only the values of records, leaving record keys, headers, and timestamps untouched. Null record values, which might represent deletions in compacted topics, are transmitted to the broker unencrypted. This approach ensures that compacted topics function correctly.

2.1.1.6. Unencrypted topics

You may configure the system so that some topics are encrypted and others are not. This supports scenarios where topics with confidential information are encrypted and Kafka topics with non-sensitive information can be left unencrypted.

2.1.2. Setting up HashiCorp Vault

To use HashiCorp Vault with the Record Encryption filter, use the following setup:

  • Enable the Transit Engine as the Record Encryption filter relies on its APIs.
  • Create a Vault policy specifically for the filter with permissions for generating and decrypting Data Encryption Keys (DEKs) for envelope encryption.
  • Obtain a Vault token that includes the filter policy.
2.1.2.1. Enable the Transit Engine

The filter integrates with the HashiCorp Vault Transit Engine. Vault does not enable the Transit Engine by default. It must be enabled before it can be used by the filter.

2.1.2.1.1. Vault Transit Engine URL

The Vault Transit Engine URL is required so the filter knows the location of the Transit Engine within the Vault instance.

The URL is formed from the concatenation of the Api Address (reported by Vault reported by during starts up) with the complete path to Transit Engine, including the name of the engine itself. If Namespacing is used on the Vault instance, the path needs to include the namespace(s). The URL will end with /transit unless the -path parameter was used when enabling the engine.

If namespacing is not in use, the URL will look like this:

https://myvaultinstance:8200/v1/transit
Copy to Clipboard Toggle word wrap

If namespacing is in use, the path must include the namespaces. For example, if there is a parent namespace is a and a child namespace is b, the URL will look like this:

https://myvaultinstance:8200/v1/a/b/transit
Copy to Clipboard Toggle word wrap

If the name of the Transit engine was changed (using the -path argument to the vault secrets enable transit command) the URL will look like this:

https://myvaultinstance:8200/v1/mytransit
Copy to Clipboard Toggle word wrap

Establish a naming convention for keys to keep the filter’s keys separate from those used by other systems. Here, we use a prefix of KEK_ for filter key name. Adjust the instructions if a different naming convention is used.

2.1.2.1.3. Role of the administrator

To use the filter, an administrator (or administrative process) must create the encryption keys within Vault that will be used to encrypt the records. The organization deploying the Record Encryption filter is responsible for managing this administrator or process.

The administrator must have permissions to create keys beneath transit/keys/KEK_* in the Vault hierarchy.

As a guideline, the minimal Vault policy required by the administrator is as follows:

path "transit/keys/KEK_*" {
  capabilities = ["read", "write"]
}
Copy to Clipboard Toggle word wrap

The filter must authenticate to Vault in order to perform envelope encryption operations, such as generating and decrypting DEKs Therefore, a Vault identity with sufficient permissions must be created for the filter.

Create a Vault policy for the filter:

vault policy write kroxylicious_encryption_filter_policy - << EOF
path "transit/keys/KEK_*" {
  capabilities = ["read"]
}
path "/transit/datakey/plaintext/KEK_*" {
  capabilities = ["update"]
}
path "transit/decrypt/KEK_*" {
  capabilities = ["update"]
}
EOF
Copy to Clipboard Toggle word wrap

Create a Periodic (long-lived) Vault Token for the filter:

vault token create -display-name "kroxylicious record encryption" \
                   -policy=kroxylicious_encryption_filter_policy \
                   -period=768h \ 
1

                   -no-default-policy \ 
2

                   -orphan 
3
Copy to Clipboard Toggle word wrap
1
Causes the token to be periodic (with every renewal using the given period).
2
Detach the "default" policy from the policy set for this token. This is done so the token has least-privilege.
3
Create the token with no parent. This is done so that expiration of a parent won’t expire the token used by the filter.
Note

The example token create command illustrates the use of -no-default-policy and -orphan. The use of these flags is not functionally important. You may adapt the configuration of the token to suit the standards required by your organization.

The token create command yields the token. The token value is required later when configuring the vault within the filter.

token              hvs.CAESIFJ_HHo0VnnW6DSbioJ80NqmuYm2WlON-QxAPmiJScZUGh4KHGh2cy5KdkdFZUJMZmhDY0JCSVhnY2JrbUNEWnE
token_accessor     4uQZJbEnxW4YtbDBaW6yVzwP
token_policies     [kroxylicious_encryption_filter_policy]
Copy to Clipboard Toggle word wrap

The token must be renewed before expiration. It is the responsibility of the administrator to do this.

This can be done with a command like the following:

vault token renew --accessor <token_accessor>
Copy to Clipboard Toggle word wrap

To test whether the application identity and the policy are working correctly, a script can be used.

First, as the administrator, create a KEK in the hierarchy at this path transit/keys/KEK_testkey.

VAULT_TOKEN=<kroxylicious encryption filter token> validate_vault_token.sh <kek path>
Copy to Clipboard Toggle word wrap

The script should respond Ok. If errors are reported check the policy/token configuration.

transit/keys/KEK_testkey can now be removed.

2.1.2.2. Configuring the HashiCorp Vault KMS

For HashiCorp Vault, the KMS configuration looks like this. Use the Vault Token and Vault Transit Engine URL values that you gathered above.

kms: VaultKmsService                                          
1

kmsConfig:
  vaultTransitEngineUrl: <vault transit engine service url>   
2

  tls:                                                        
3

  vaultToken:                                                 
4

    passwordFile: /opt/vault/token
Copy to Clipboard Toggle word wrap
1
Name of the KMS provider. This must be VaultKmsService.
2
Vault Transit Engine URL including the protocol part, i.e. https: or http:
3
(Optional) TLS trust configuration.
4
File containing the Vault Token

For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.

2.1.2.3. Creating HashiCorp Vault keys

As the administrator, use either the HashiCorp UI or CLI to create AES-256 symmetric keys following your key naming convention. The key type must be aes256-gcm96, which is Vault’s default key type.

Tip

It is recommended to use a key rotation policy.

If using the Vault CLI, the command will look like:

vault write -f transit/keys/KEK_trades type=aes256-gcm96 auto_rotate_period=90d
Copy to Clipboard Toggle word wrap

2.1.3. Setting up AWS KMS

To use AWS Key Management Service with the Record Encryption filter, use the following setup:

  • Establish an AWS KMS aliasing convention for keys
  • Configure the AWS KMS
  • Create AWS KMS keys

You’ll need a privileged AWS user that is capable of creating users and policies to perform the set-up.

The filter references KEKs within AWS via an AWS key alias.

Establish a naming convention for key aliases to keep the filter’s keys separate from those used by other systems. Here, we use a prefix of KEK_ for filter aliases. Adjust the instructions if a different naming convention is used.

2.1.3.1.1. Role of the administrator

To use the filter, an administrator (or administrative process) must create the encryption keys within AWS KMS that will be used to encrypt the records. The organization deploying the Record Encryption filter is responsible for managing this administrator or process.

The administrator must have permissions to create keys in AWS KMS. As a starting point, the built-in AWS policy AWSKeyManagementServicePowerUser confers sufficient key management privileges.

To get started, use the following commands to set up an administrator with permissions suitable for managing encryption keys in KMS through an AWS Cloud Shell. This example illustrates using the user name`kroxylicious-admin`, but you can choose a different name if preferred. Adjust the instructions accordingly if you use a different user name.

ADMIN=kroxylicious-admin
INITIAL_PASSWORD=$(tr -dc 'A-Za-z0-9!?%=' < /dev/urandom | head -c 10)
CONSOLE_URL=https://$(aws sts get-caller-identity --query Account --output text).signin.aws.amazon.com/console
aws iam create-user --user-name ${ADMIN}
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSKeyManagementServicePowerUser
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/IAMUserChangePassword
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSCloudShellFullAccess
aws iam create-login-profile --user-name ${ADMIN} --password ${INITIAL_PASSWORD} --password-reset-required
echo Now log in at ${CONSOLE_URL}  with user name ${ADMIN} password ${INITIAL_PASSWORD} and change the password.
Copy to Clipboard Toggle word wrap

The filter must authenticate to AWS in order to perform envelope encryption operations, such as generating and decrypting DEKs. Therefore, an AWS IAM identity with sufficient permissions must be created for the filter.

Use AWS IAM to create this identity and generate an Access Key for it. The Access Key/Secret Key pair is used by the filter. Do not enable console access for this user.

Using the CLI, the following commands create the IAM identity for the filter. This example uses the user name kroxylicious, but you can choose a different name if needed. Adjust the instructions accordingly if using a different user name.

aws iam create-user --user-name kroxylicious
aws iam create-access-key --user-name kroxylicious
Copy to Clipboard Toggle word wrap

Create an alias based policy granting permissions to use keys aliased by the established alias naming convention.

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
cat > /tmp/policy << EOF
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AliasBasedIAMPolicy",
			"Effect": "Allow",
			"Action": [
				"kms:Encrypt",
				"kms:Decrypt",
				"kms:GenerateDataKey*",
				"kms:DescribeKey"
			],
			"Resource": [
                "arn:aws:kms:*:${AWS_ACCOUNT_ID}:key/*"
			],
			"Condition": {
				"ForAnyValue:StringLike": {
					"kms:ResourceAliases": "alias/KEK_*"
				}
			}
		}
	]
}
EOF
aws iam create-policy --policy-name KroxyliciousRecordEncryption --policy-document file:///tmp/policy
Copy to Clipboard Toggle word wrap

Attach the alias policy to the filter’s application identity. This will allow the filter to perform key operations on all KEKs with aliases that match the specified naming convention.

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws iam attach-user-policy --user-name kroxylicious --policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KroxyliciousRecordEncryption"
Copy to Clipboard Toggle word wrap
2.1.3.2. Configuring the AWS KMS

For AWS KMS, the KMS configuration looks like this.

kms: AwsKmsService                                            
1

kmsConfig:
  endpointUrl: https://kms.<region>.amazonaws.com             
2

  tls:                                                        
3

  accessKey:
    passwordFile: /opt/aws/accessKey                          
4

  secretKey:
    passwordFile: /opt/aws/secretKey                          
5

  region: <region>                                            
6
Copy to Clipboard Toggle word wrap
1
Name of the KMS provider. This must be AwsKmsService.
2
AWS Endpoint URL. This must include the https:// scheme part.
3
(Optional) TLS trust configuration.
4
File containing the AWS Access Key
5
File containing the AWS Secret Key
6
AWS region identifier (e.g. us-east-1)

For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.

2.1.3.3. Creating AWS KMS keys

As the administrator, use either the AWS Console or CLI to create a Symmetric key with Encrypt and decrypt usage. Multi-region keys are supported.

Note

It is not possible to make use of keys from other AWS accounts. For more information on this limitation, see the issue for AWS KMS serde improvements.

Give the key an alias as described in Section 2.1.3.1, “Establish an aliasing convention for keys within AWS KMS”.

If using the CLI, this can be done with commands like this:

KEY_ALIAS="KEK_<name>"
KEY_ID=$(aws kms create-key | jq -r '.KeyMetadata.KeyId')
# the create key command will produce JSON output including the KeyId
aws kms create-alias --alias-name alias/${KEY_ALIAS} --target-key-id ${KEY_ID}
Copy to Clipboard Toggle word wrap

Once the key is created, it is recommended to use a key rotation policy.

aws kms enable-key-rotation --key-id ${KEY_ID} --rotation-period-in-days 180
Copy to Clipboard Toggle word wrap

2.1.4. Setting up the Record Encryption filter

This procedure describes how to set up the Record Encryption filter. Provide the filter configuration and the Key Encryption Key (KEK) selector to use. The KEK selector maps topic name to key names. The filter looks up the resulting key name in the KMS.

Prerequisites

  • An instance of Streams for Apache Kafka Proxy. For information on deploying Streams for Apache Kafka Proxy, see the samples and examples.
  • A config map for Streams for Apache Kafka Proxy that includes the configuration for creating virtual clusters and filters.
  • A KMS is installed and set up for the filter with KEKs to encrypt records set up for topics.

Procedure

  1. Configure a RecordEncryption type filter.

    Example Record Encryption filter configuration

    filters:
      - type: RecordEncryption
        config:
          kms: <kms_service_name> 
    1
    
          kmsConfig:
            <kms_specific_config> 
    2
    
            # ...
          selector: <KEK_selector_service_name> 
    3
    
          selectorConfig:
            template: "KEK_${topicName}" 
    4
    
          experimental:
            encryptionDekRefreshAfterWriteSeconds: 3600 
    5
    
            encryptionDekExpireAfterWriteSeconds: 7200 
    6
    
            maxEncryptionsPerDek: 5000000 
    7
    
    	 # ...
    Copy to Clipboard Toggle word wrap

    1
    The KMS service name.
    2
    Configuration specific to the KMS provider.
    3
    The Key Encryption Key (KEK) selector to use. The ${topicName} is a literal understood by the proxy. For example, if using the TemplateKekSelector with the template KEK_${topicName}, create a key for every topic that is to be encrypted with the key name matching the topic name, prefixed by the string KEK_.
    4
    The template for deriving the KEK, based on a specific topic name.
    5
    How long after creation of a DEK before it becomes eligible for rotation. On the next encryption request, the cache will asynchronously create a new DEK. Encryption requests will continue to use the old DEK until the new DEK is ready.
    6
    How long after creation of a DEK until it is removed from the cache. This setting puts an upper bound on how long a DEK can remain cached.
    7
    The maximum number of records any DEK should be used to encrypt. After this limit is hit, that DEK will be destroyed and a new one created.
Note

encryptionDekRefreshAfterWriteSeconds and encryptionDekExpireAfterWriteSeconds help govern the "originator usage period" of the DEK. That is the period of time the DEK will be used to encrypt records. Keeping the period short helps reduce the blast radius in the event that DEK key material is leaked. However, there is a trade-off. The additional KMS API calls will increase produce/consume latency and may increase your KMS provider costs.

maxEncryptionsPerDek helps prevent key exhaustion by placing an upper limit of the amount of times that a DEK may be used to encrypt records.

  1. Verify that the encryption has been applied to the specified topics by producing messages through the proxy and then consuming directly and indirectly from the Kafka cluster.
Note

If the filter is unable to find the key in the KMS, the filter passes through the records belonging to that topic in the produce request without encrypting them.

2.2. (Preview) Record Validation filter

The Record Validation filter validates records sent by a producer. Only records that pass the validation are sent to the broker. This filter can be used to prevent poison messages—such as those containing corrupted data or invalid formats—from entering the Kafka system, which may otherwise lead to consumer failure.

The filter currently supports two modes of operation:

  1. Schema Validation ensures the content of the record conforms to a schema stored in an Apicurio Registry.
  2. JSON Syntax Validation ensures the content of the record contains syntactically valid JSON.

Validation rules can be applied to check the content of the Kafka record key or value.

If the validation fails, the product request is rejected and the producing application receives an error response. The broker will not receive the rejected records.

Note

This filter is currently in incubation and available as a preview. We would not recommend using it in a production environment.

This procedure describes how to set up the Record Validation filter. Provide the filter configuration and rules that the filter uses to check against Kafka record keys and values.

Prerequisites

  • An instance of Streams for Apache Kafka Proxy. For information on deploying Streams for Apache Kafka Proxy, see the samples and examples.
  • A config map for Streams for Apache Kafka Proxy that includes the configuration for creating a virtual cluster.
  • Apicurio Registry (if wanting to use Schema validation).

Procedure

  1. Configure a RecordValidation type filter.
filters:
  - type: RecordValidation
    config:
        rules:
        - topicNames:                                                  
1

            - <topic name>
          keyRule:
            <rule definition>                                          
2

          valueRule:
            <rule definition>                                          
3

        defaultRule:                                                   
4

          keyRule:
            <rule definition>                                          
5

          valueRule:
            <rule definition>                                          
6

        forwardPartialRequests: false                                  
7
Copy to Clipboard Toggle word wrap
1
List of topic names to which the validation rules will be applied.
2 5
Validation rules that are applied to the record’s key.
3 6
Validation rules that are applied to the record’s value.
4
(Optional) Default rule that is applied to any topics for which there is no explict rule defined.
7
If false, any record failing validation will cause the entire produce request to be rejected. If true, only partitions within the request where all records validated successfully will be forwarded to the broker. The response the client will receive will be a mixed response with response returned from the broker for the partitions whose records passed validation and error responses for the partitions that failed validation. If the client is using transactions, this setting is ignored. Any failing record validation will cause the entire produce request to be rejected.

Replace the token <rule definition> in the YAML configuration with either a Schema Validation rule or a JSON Syntax Validation rule depending on your requirements.

Example Schema Validation Rule Definition

The Schema Validation rule validates that the key or value matches a schema identified by its global ID within an Apicurio Schema Registry.

If the key or value does not adhere to the schema, the record will be rejected.

Additionally, if the kafka producer has embedded a global ID within the record it will be validated against the global ID defined by the rule. If they do not match, the record will be rejected. See the Apicurio documentation for details on how the global ID could be embedded into the record. The filter supports extracting ID’s from either the Apicurio globalId record header or from the initial bytes of the serialized content itself.

schemaValidationConfig:
    apicurioGlobalId: 1001                                       
1

    apicurioRegistryUrl: http://registry.local:8080              
2

allowNulls: true                                                 
3

allowEmpty: true                                                 
4
Copy to Clipboard Toggle word wrap
1
Apicurio registry global ID identifying the schema that will be enforced.
2
Apicurio Registry endpoint.
3
if true, the validator allows keys and or values to be null. The default is false.
4
if true, the validator allows keys and or values to be empty. The default is false.
Note

Schema validation mode currently has the capability to enforce only JSON schemas (issue)

Example JSON Syntax Validation Rule Definition

The JSON Syntax Validation rule validates that the key or value contains only syntactically correct JSON.

syntacticallyCorrectJson:
    validateObjectKeysUnique: true                               
1

allowNulls: true                                                 
2

allowEmpty: true                                                 
3
Copy to Clipboard Toggle word wrap
1
If true, the validator enforces that objects keys must be unique. The default is false.
2
if true, the validator allows keys and or values to be null. The default is false.
3
if true, the validator allows keys and or values to be empty. The default is false.

Streams for Apache Kafka includes example pre-configured installation artifacts for the proxy in the examples/proxy/ folder.

These installation files offer a quick setup for trying out the proxy.

Prerequisites

  • Installation requires an OpenShift user with cluster-admin role, such as system:admin.
  • The oc command-line tool is installed and configured to connect to the OpenShift cluster with admin access.
  • An OpenShift project namespace called proxy, which is the same namespace where the proxy is installed by default.
  • Any filter-specific prerequisites, as described in the README files provided with the examples.

Using the example files

  1. Download and extract the Streams for Apache Kafka Proxy installation artifacts.

    The artifacts are included with installation and example files available from Streams for Apache Kafka software downloads page.

  2. Follow the instructions in the README files, as applicable.

Chapter 3. Configuring proxies

Fine-tune your deployment by configuring proxies to include additional features according to your specific requirements.

3.1. Configuring virtual clusters

A Kafka cluster is represented by the proxy as a virtual cluster. Clients connect to the virtual cluster rather than the actual cluster. When Streams for Apache Kafka Proxy is deployed, it includes configuration to create virtual clusters.

A virtual cluster has exactly one target cluster, but many virtual clusters can target the same cluster. Each virtual cluster targets a single listener on the target cluster, so multiple listeners on the Kafka side are represented as multiple virtual clusters by the proxy. Clients connect to a virtual cluster using a bootstrap_servers address. The virtual cluster has a bootstrap address that maps to each broker in the target cluster. When a client connects to the proxy, communication is proxied to the target broker by rewriting the address. Responses back to clients are rewritten to reflect the appropriate network addresses of the virtual clusters.

You can secure virtual cluster connections from clients and to target clusters.

Streams for Apache Kafka Proxy accepts keys and certificates in PEM (Privacy Enhanced Mail), PKCS #12 (Public-Key Cryptography Standards), or JKS (Java KeyStore) keystore format.

Streams for Apache Kafka Proxy configuration is defined in a ConfigMap resource. Use the data properties of the ConfigMap resource to configure the following:

  • Virtual clusters that represent the Kafka clusters
  • Network addresses for broker communication in a Kafka cluster
  • Filters to introduce additional functionality to the Kafka deployment

In this example, configuration for the Record Encryption filter is shown.

Example Streams for Apache Kafka Proxy configuration

apiVersion: v1
kind: ConfigMap
metadata:
  name: proxy-config
data:
  config.yaml: |
    adminHttp: 
1

      endpoints:
        prometheus: {}
    virtualClusters: 
2

      my-cluster-proxy: 
3

        targetCluster:
          bootstrap_servers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093 
4

          tls: 
5

            trust:
              storeFile: /opt/proxy/trust/ca.p12
              storePassword:
                passwordFile: /opt/proxy/trust/ca.password
        clusterNetworkAddressConfigProvider: 
6

          type: SniRoutingClusterNetworkAddressConfigProvider 
7

          Config:
            bootstrapAddress: my-cluster-proxy.kafka:9092 
8

            brokerAddressPattern: broker$(nodeId).my-cluster-proxy.kafka
        logNetwork: false 
9

        logFrames: false
        tls: 
10

          key:
            storeFile: /opt/proxy/server/key-material/keystore.p12
            storePassword:
              passwordFile: /opt/proxy/server/keystore-password/storePassword
filters: 
11

  - type: EnvelopeEncryption 
12

    config: 
13

      kms: VaultKmsService
      kmsConfig:
        vaultTransitEngineUrl: https://vault.vault.svc.cluster.local:8200/v1/transit
        vaultToken:
          passwordFile: /opt/proxy/server/token.txt
        tls: 
14

          key:
            storeFile: /opt/cert/server.p12
            storePassword:
              passwordFile: /opt/cert/store.password
            keyPassword:
              passwordFile: /opt/cert/key.password
            storeType: PKCS12
      selector: TemplateKekSelector
      selectorConfig:
        template: "${topicName}"
Copy to Clipboard Toggle word wrap

1
Enables metrics for the proxy.
2
Virtual cluster configuration.
3
The name of the virtual cluster.
4
The bootstrap address of the target physical Kafka Cluster being proxied.
5
TLS configuration for the connection to the target cluster.
6
The configuration for the cluster network address configuration provider that controls how the virtual cluster is presented to the network.
7
The built-in types are PortPerBrokerClusterNetworkAddressConfigProvider and SniRoutingClusterNetworkAddressConfigProvider.
8
The hostname and port of the bootstrap used by the Kafka clients. The hostname must be resolved by the clients.
9
Logging is disabled by default. Enable logging related to network activity (logNetwork) and messages (logFrames) by setting the logging properties to true.
10
TLS encryption for securing connections with the clients.
11
Filter configuration.
12
The type of filter, which is the Record Encryption filter using Vault as the KMS in this example.
13
The configuration specific to the type of filter.
14
If required, you can also specify the credentials for TLS authentication with the KMS, with key names under which TLS certificates are stored.

3.3. Securing connections from clients

To secure client connections to virtual clusters, configure TLS on the virtual cluster by doing the following:

  • Obtain a server certificate for the virtual cluster from a Certificate Authority (CA).
    Ensure the certificate matches the names of the virtual cluster’s bootstrap and broker addresses.
    This may require wildcard certificates and Subject Alternative Names (SANs).
  • Provide the TLS configuration using the tls properties in the virtual cluster’s configuration to enable it to present the certificate to clients. Depending on your certificate format, apply one of the following examples.
Note

TLS is recommended on Kafka clients and virtual clusters for production configurations.

Example PKCS #12 configuration

virtualClusters:
  my-cluster-proxy:
    tls:
      key:
        storeFile: <path>/server.p12  
1

        storePassword:
          passwordFile: <path>/store.password 
2

        keyPassword:
          passwordFile: <path>/key.password 
3

        storeType: PKCS12 
4

      # ...
Copy to Clipboard Toggle word wrap

1
PKCS #12 store for the public CA certificate of the virtual cluster.
2
Password to protect the PKCS #12 store.
3
(Optional) Password for the key. If a password is not specified, the keystore’s password is used to decrypt the key too.
4
(Optional) Keystore type. If a keystore type is not specified, the default JKS (Java Keystore) type is used.

Example PEM configuration

virtualClusters:
  my-cluster-proxy:
    tls:
      key:
        privateKeyFile: <path>/server.key   
1

        certificateFile: <path>/server.crt 
2

        keyPassword:
          passwordFile: <path>/key.password 
3

# …
Copy to Clipboard Toggle word wrap

1
Private key of the virtual cluster.
2
Public CA certificate of the virtual cluster.
3
(Optional) Password for the key.

If required, configure the insecure property to disable trust and establish insecure connections with any Kafka Cluster, irrespective of certificate validity. However, this option is only intended for use in development and testing environments where proper certificates are hard to obtain and mange.

Example to enable insecure TLS

virtualClusters:
  demo:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls:
        trust:
          insecure: true 
1

      #...
# …
Copy to Clipboard Toggle word wrap

1
Enables insecure TLS.

3.4. Securing connections to target clusters

To secure a virtual cluster connection to a target cluster, configure TLS on the virtual cluster. The target cluster must already be configured to use TLS.

Specify TLS for the virtual cluster configuration using targetCluster.tls properties

Use an empty object ({}) to inherit trust from the underlying platform on which the cluster is running. This option is suitable if the target cluster is using a TLS certificate signed by a public CA.

Example target cluster configuration for TLS

virtualClusters:
  my-cluster-proxy:
    targetCluster:
      bootstrap_servers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093
      tls: {}
      #...
Copy to Clipboard Toggle word wrap

If it is using a TLS certificate signed by a private CA, you must add truststore configuration for the target cluster.

Example truststore configuration for a target cluster

virtualClusters:
  my-cluster-proxy:
    targetCluster:
      bootstrap_servers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093
      tls:
        trust:
          storeFile: <path>/trust.p12 
1

          storePassword:
            passwordFile: <path>/store.password 
2

          storeType: PKCS12 
3

      #...
Copy to Clipboard Toggle word wrap

1
PKCS #12 store for the public CA certificate of the Kafka cluster.
2
Password to access the public Kafka cluster CA certificate.
3
(Optional) Keystore type. If a keystore type is not specified, the default JKS (Java Keystore) type is used.

For mTLS, you can add keystore configuration for the virtual cluster too.

Example keystore and truststore configuration for mTLS

virtualClusters:
  my-cluster-proxy:
    targetCluster:
      bootstrap_servers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093:9092
      tls:
        key:
          privateKeyFile: <path>/client.key 
1

          certificateFile: <path>/client.crt 
2

        trust:
          storeFile: <path>/server.crt
          storeType: PEM
# ...
Copy to Clipboard Toggle word wrap

1
Private key of the virtual cluster.
2
Public CA certificate of the virtual cluster.

For the purposes of testing outside of a production environment, you can set the insecure property to true to turn off TLS so that the Streams for Apache Kafka Proxy can connect to any Kafka cluster.

Example configuration to turn off TLS

virtualClusters:
  my-cluster-proxy:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls:
        trust:
          insecure: true
      #...
Copy to Clipboard Toggle word wrap

3.5. Configuring network addresses

Virtual cluster configuration requires a network address configuration provider that manages network communication and provides broker address information to clients.

Streams for Apache Kafka Proxy has the following built-in providers:

  • Broker address provider (PortPerBrokerClusterNetworkAddressConfigProvider)
  • Node ID ranges provider (RangeAwarePortPerNodeClusterNetworkAddressConfigProvider)
  • SNI routing address provider (SniRoutingClusterNetworkAddressConfigProvider)
Important

Make sure that the virtual cluster bootstrap address and generated broker addresses are resolvable and routable by the Kafka client.

3.5.1. Broker address provider

The per-broker network address configuration provider opens one port for a virtual cluster’s bootstrap address and one port for each broker in the target Kafka cluster. The number of open ports is maintained dynamically. For example, if a broker is removed from the cluster, the port assigned to it is closed. If you have two virtual clusters, each targeting a Kafka cluster with three brokers, eight ports are bound in total.

This provider works best with straightforward configurations. Ideally, the target cluster should have sequential, stable broker IDs and a known minimum broker ID, such as 0, 1, 2 for a cluster with three brokers. While it can handle non-sequential broker IDs, this would require exposing ports equal to maxBrokerId - minBrokerId, which could be excessive if your cluster contains broker IDs like 0 and 20000.

The provider supports both cleartext and TLS downstream connections.

Example broker address configuration

clusterNetworkAddressConfigProvider:
  type: PortPerBrokerClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192 
1

    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com 
2

    brokerStartPort: 9193 
3

    numberOfBrokerPorts: 3 
4

    lowestTargetBrokerId: 1000 
5

    bindAddress: 192.168.0.1 
6
Copy to Clipboard Toggle word wrap

1
The hostname and port of the bootstrap address used by Kafka clients.
2
(Optional) The broker address pattern used to form broker addresses. If not defined, it defaults to the hostname part of the bootstrap address and the port number allocated to the broker.
3
(Optional) The starting number for the broker port range. Defaults to the port of the bootstrap address plus 1.
4
(Optional) The maximum number of broker ports that are permitted. Set this value according to the maximum number of brokers allowed by your operational rules. Defaults to 3.
5
(Optional) The lowest broker ID in the target cluster. Defaults to 0. This should match the lowest node.id (or broker.id) in the target cluster.
6
(Optional) The bind address used when binding the ports. If undefined, all network interfaces are bound.

Each broker’s ID must be greater than or equal to lowestTargetBrokerId and less than lowestTargetBrokerId + numberOfBrokerPorts. The current strategy for mapping node IDs to ports is as follows: nodeId → brokerStartPort + nodeId - lowestTargetBrokerId. The example configuration maps broker IDs 1000, 1001, and 1002 to ports 9193, 9194, and 9195, respectively. Reconfigure numberOfBrokerPorts to accommodate the number of brokers in the cluster.

The example broker address configuration creates the following broker addresses:

mybroker-0.mycluster.kafka.com:9193
mybroker-1.mycluster.kafka.com:9194
mybroker-2.mycluster.kafka.com:9194
Copy to Clipboard Toggle word wrap

The brokerAddressPattern configuration parameter accepts the $(nodeId) replacement token, which is optional. If included, $(nodeId) is replaced by the broker’s node.id (or broker.id) in the target cluster.

For example, with the configuration shown above, if your cluster has three brokers, your Kafka client receives broker addresses like this:

0.  mybroker-0.mycluster.kafka.com:9193
1.  mybroker-1.mycluster.kafka.com:9194
2.  mybroker-2.mycluster.kafka.com:9195
Copy to Clipboard Toggle word wrap

3.5.2. Node ID ranges provider

As an alternative to the broker address provider, the node ID ranges provider allows you to model specific ranges of node IDs in the target cluster, enabling efficient port allocation even when broker IDs are non-sequential or widely spaced This ensures a deterministic mapping of node IDs to ports while minimizing the number of ports needed.

Example node ID ranges configuration

clusterNetworkAddressConfigProvider:
  type: RangeAwarePortPerNodeClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192
    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com
    brokerStartPort: 9193
    nodeIdRanges: 
1

      - name: brokers 
2

        range:
          startInclusive: 0 
3

          endExclusive: 3 
4
Copy to Clipboard Toggle word wrap

1
The list of Node ID ranges, which must be non-empty.
2
The name of the range, which must be unique within the nodeIdRanges list.
3
The start of the range (inclusive).
4
The end of the range (exclusive). It must be greater than startInclusive; empty ranges are not allowed.

Node ID ranges must be distinct, meaning a node ID cannot belong to more than one range.

KRaft roles given to cluster nodes can be accommodated in the configuration. For example, consider a target cluster using KRaft with the following node IDs and roles:

  • nodeId: 0, roles: controller
  • nodeId: 1, roles: controller
  • nodeId: 2, roles: controller
  • nodeId: 1000, roles: broker
  • nodeId: 1001, roles: broker
  • nodeId: 1002, roles: broker
  • nodeId: 99999, roles: broker

This can be modeled as three node ID ranges, as shown in the following example.

Example node ID ranges configuration with KRaft roles

    clusterNetworkAddressConfigProvider:
      type: RangeAwarePortPerNodeClusterNetworkAddressConfigProvider
      config:
        bootstrapAddress: mycluster.kafka.com:9192
        nodeIdRanges:
          - name: controller
            range:
              startInclusive: 0
              endExclusive: 3
          - name: brokers
            range:
              startInclusive: 1000
              endExclusive: 1003
          - name: broker-outlier
            range:
              startInclusive: 99999
              endExclusive: 100000
Copy to Clipboard Toggle word wrap

This configuration results in the following mapping from node ID to port:

  • nodeId: 0 → port 9193
  • nodeId: 1 → port 9194
  • nodeId: 2 → port 9195
  • nodeId: 1000 → port 9196
  • nodeId: 1001 → port 9197
  • nodeId: 1002 → port 9198
  • nodeId: 99999 → port 9199

3.5.3. SNI routing address provider

The SNI ((Server Name Indication) routing provider opens a single port for all virtual clusters or a port for each. You can open a port for the whole cluster or each broker. The SNI routing provider uses SNI information to determine where to route the traffic, so requires downstream TLS.

Example SNI routing address provider configuration

clusterNetworkAddressConfigProvider:
  type: SniRoutingClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192 
1

    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com
    bindAddress: 192.168.0.1
Copy to Clipboard Toggle word wrap

1
A single address for all traffic, including bootstrap address and brokers.

In the SNI routing address configuration, the brokerAddressPattern specification is mandatory, as it is required to generate routes for each broker.

Note

Single port operation may have cost advantages when using load balancers of public clouds, as it allows a single cloud provider load balancer to be shared across all virtual clusters.

Chapter 4. Introducing metrics

If you want to introduce metrics to your Streams for Apache Kafka Proxy deployment, you can configure an insecure HTTP and Prometheus endpoint (at /metrics).

Add the following to the ConfigMap resource that defines the Streams for Apache Kafka Proxy configuration:

Minimal metrics configuration

adminHttp:
  endpoints:
    prometheus: {}
Copy to Clipboard Toggle word wrap

By default, the HTTP endpoint listens on 0.0.0.0:9190. You can change the hostname and port as follows:

Example metrics configuration with hostname and port

adminHttp:
  host: localhost
  port: 9999
  endpoints:
    prometheus: {}
Copy to Clipboard Toggle word wrap

The example files provided with the proxy include a PodMonitor resource. If you have enabled monitoring in OpenShift for user-defined projects, you can use a PodMonitor resource to ingest the proxy metrics.

Example PodMonitor resource configuration

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: proxy
  labels:
    app: proxy
spec:
  selector:
    matchLabels:
      app: proxy
  namespaceSelector:
    matchNames:
      - proxy
  podMetricsEndpoints:
  - path: /metrics
    port: metrics
Copy to Clipboard Toggle word wrap

Appendix A. Using your subscription

Streams for Apache Kafka is provided through a software subscription. To manage your subscriptions, access your account at the Red Hat Customer Portal.

Accessing Your Account

  1. Go to access.redhat.com.
  2. If you do not already have an account, create one.
  3. Log in to your account.

Activating a Subscription

  1. Go to access.redhat.com.
  2. Navigate to My Subscriptions.
  3. Navigate to Activate a subscription and enter your 16-digit activation number.

Downloading Zip and Tar Files

To access zip or tar files, use the customer portal to find the relevant files for download. If you are using RPM packages, this step is not required.

  1. Open a browser and log in to the Red Hat Customer Portal Product Downloads page at access.redhat.com/downloads.
  2. Locate the Streams for Apache Kafka for Apache Kafka entries in the INTEGRATION AND AUTOMATION category.
  3. Select the desired Streams for Apache Kafka product. The Software Downloads page opens.
  4. Click the Download link for your component.

Installing packages with DNF

To install a package and all the package dependencies, use:

dnf install <package_name>
Copy to Clipboard Toggle word wrap

To install a previously-downloaded package from a local directory, use:

dnf install <path_to_download_package>
Copy to Clipboard Toggle word wrap

Revised on 2024-11-13 12:21:28 UTC

Legal Notice

Copyright © 2024 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Back to top
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2025 Red Hat