このコンテンツは選択した言語では利用できません。

Record Encryption filter guide


Red Hat Streams for Apache Kafka 3.1

Encrypt Kafka records with envelope encryption for end-to-end security

Abstract

Streams for Apache Kafka Proxy is a protocol-aware proxy that extends and secures Kafka-based systems with a flexible filtering mechanism. This guide explains how to use the Record Encryption filter.

Providing feedback on Red Hat documentation

We appreciate your feedback on our documentation.

To propose improvements, open a Jira issue and describe your suggested changes. Provide as much detail as possible to enable us to address your request quickly.

Prerequisite

  • You have a Red Hat Customer Portal account. This account enables you to log in to the Red Hat Jira Software instance. If you do not have an account, you will be prompted to create one.

Procedure

  1. Click Create issue.
  2. In the Summary text box, enter a brief description of the issue.
  3. In the Description text box, provide the following information:

    • The URL of the page where you found the issue.
    • A detailed description of the issue.
      You can leave the information in any other fields at their default values.
  4. Add a reporter name.
  5. Click Create to submit the Jira issue to the documentation team.

Thank you for taking the time to provide feedback.

About this guide

This guide covers using the Streams for Apache Kafka Proxy Record Encryption Filter to provide encryption-at-rest for Apache Kafka. Refer to other Streams for Apache Kafka Proxy guides for information on running the proxy or for advanced topics such as plugin development.

The Kroxylicious Record Encryption filter enhances the security of Kafka messages. The filter uses industry-standard cryptographic techniques to apply encryption to Kafka messages, ensuring the confidentiality of data stored in the Kafka Cluster. By centralizing topic-level encryption, Kroxylicious provides streamlined protection across Kafka clusters.

To use the filter, follow these steps:

  1. Set up a Key Management System (KMS)
  2. Establish encryption keys within the KMS for securing the topics
  3. Configure the filter within Kroxylicious

The filter integrates with a Key Management Service (KMS), which is responsible for the safe storage of sensitive key material. Kroxylicious supports the following KMS providers:

  • HashiCorp Vault (on-premise, HCP Vault, and Vault Enterprise; HCP Vault Secrets free tier is not supported)
  • AWS Key Management Service

You can provide implementations for your specific KMS systems. Additional KMS support may be added based on demand.

Chapter 1. How encryption works

The Record Encryption filter uses envelope encryption to encrypt records with symmetric encryption keys. The filter encrypts records from produce requests and decrypts records from fetch responses.

Envelope encryption
Envelope encryption is an industry-standard technique suited for encrypting large volumes of data in an efficient manner. Data is encrypted with a Data Encryption Key (DEK). The DEK is encrypted using a Key Encryption Key (KEK). The KEK is stored securely in a Key Management System (KMS).
Symmetric encryption keys
AES(GCM) 256 bit encryption symmetric encryption keys are used to encrypt and decrypt record data.
Note

If you are using Azure Key Vault and Managed HSM is not available, you can use RSA-OAEP-256 encryption, using a 2048-bit (or greater) asymmetric key instead of 256-bit AES-GCM symmetric keys. This approach is not quantum-resistant.

The process is as follows:

  1. The filter intercepts produce requests from producing applications and transforms them by encrypting the records.
  2. The produce request is forwarded to the broker.
  3. The filter intercepts fetch responses from the broker and transforms them by decrypting the records.
  4. The fetch response is forwarded to the consuming application.

The filter encrypts the record value only. Record keys, headers, and timestamps are not encrypted.

The entire process is transparent from the point of view of Kafka clients and Kafka brokers. Neither are aware that the records are being encrypted, nor do they have any access to the encryption keys or have any influence on the ciphering process to secure the records.

1.1. How the filter encrypts records

The filter encrypts records from produce requests as follows:

  1. Filter selects a KEK to apply.
  2. Requests the KMS to generate a DEK for the KEK.
  3. Uses an encrypted DEK (DEK encrypted with the KEK) to encrypt the record.
  4. Replaces the original record with a ciphertext record (encrypted record, encrypted DEK, and metadata).

The filter uses a DEK reuse strategy. Encrypted records are sent to the same topic using the same DEK until a time-out or an encryption limit is reached.

1.2. How the filter decrypts records

The filter decrypts records from fetch responses as follows:

  1. Filter receives a cipher record from the Kafka broker.
  2. Reverses the process that constructed the cipher record.
  3. Uses KMS to decrypt the DEK.
  4. Uses the decrypted DEK to decrypt the encrypted record.
  5. Replaces the cipher record with a decrypted record.

The filter uses an LRU (least recently used) strategy for caching decrypted DEKs. Decrypted DEKs are kept in memory to reduce interactions with the KMS.

1.3. How the filter uses the KMS

To support the filter, the KMS provides the following:

  • A secure repository for storing Key Encryption Keys (KEKs)
  • A service for generating and decrypting Data Encryption Keys (DEKs)

KEKs stay within the KMS. The KMS generates a DEK (which is securely generated random data) for a given KEK, then returns the DEK and an encrypted DEK. The encrypted DEK has the same data but encrypted with the KEK. The KMS doesn’t store encrypted DEKs; they are stored as part of the cipher record in the broker.

Warning

The KMS must be available during runtime. If the KMS is unavailable, the filter will not be able to obtain new encrypted DEKs on the produce path or decrypt encrypted DEKs on the consume path. The filter will continue to use previously obtained DEKs, but eventually, production and consumption will become impossible. It is recommended to use the KMS in a high availability (HA) configuration.

1.4. Practicing key rotation

Key rotation involves periodically replacing cryptographic keys with new ones and is considered a best practice in cryptography.

The filter allows the rotation of Key Encryption Keys (KEKs) within the Key Management System (KMS). When a KEK is rotated, the new key material is eventually used for newly produced records. Existing records, encrypted with older KEK versions, remain decryptable as long as the previous KEK versions are still available in the KMS.

Important

If your encrypted topic is receiving regular traffic, the Data Encryption Key (DEK) will be refreshed as new records flow through. However, if messages are infrequent, the DEK might be used for up to 2 hours (by default) after its creation.

When the KEK is rotated in the external KMS, it will take up to 1 hour (by default) before all records produced by the filter contain a DEK encrypted with the new key material. This is because existing encrypted DEKs are used for a configurable amount of time after creation, the Filter caches the encrypted DEK, one hour after creation they are eligible to be refreshed.

If you need to rotate key material immediately, execute a rolling restart of your cluster of Streams for Apache Kafka Proxy instances.

Warning

If an old KEK version is removed from the KMS, records encrypted with that key will become unreadable, causing fetch operations to fail. In such cases, the consumer offset must be advanced beyond those records.

1.5. What part of a record is encrypted?

The record encryption filter encrypts only the values of records, leaving record keys, headers, and timestamps untouched. Null record values, which might represent deletions in compacted topics, are transmitted to the broker unencrypted. This approach ensures that compacted topics function correctly.

1.6. Unencrypted topics

You may configure the system so that some topics are encrypted and others are not. This supports scenarios where topics with confidential information are encrypted and Kafka topics with non-sensitive information can be left unencrypted.

Chapter 2. Preparing your KMS

This section assumes that you already have a supported KMS instance up and running. It describes how to prepare the KMS for use with the filter.

2.1. Preparing HashiCorp Vault

To use HashiCorp Vault with the Record Encryption filter, use the following setup:

  • Enable the Transit Engine as the Record Encryption filter relies on its APIs.
  • Create a Vault policy specifically for the filter with permissions for generating and decrypting Data Encryption Keys (DEKs) for envelope encryption.
  • Obtain a Vault token that includes the filter policy.

2.1.1. Enable the Transit Engine

The filter integrates with the HashiCorp Vault Transit Engine. Vault does not enable the Transit Engine by default. It must be enabled before it can be used by the filter.

2.1.1.1. Vault Transit Engine URL

The Vault Transit Engine URL is required so the filter knows the location of the Transit Engine within the Vault instance.

The URL is formed from the concatenation of the Api Address (reported by Vault reported by during startup) with the complete path to Transit Engine, including the name of the engine itself.

If Namespacing is used on the Vault instance, the path must include the namespaces. The URL will end with /transit unless the -path parameter was used when enabling the engine.

If namespacing is not in use, the URL looks like this:

https://myvaultinstance:8200/v1/transit
Copy to Clipboard Toggle word wrap

If namespacing is in use, the path must include the namespaces. For example, if there is a parent namespace is a and a child namespace is b, the URL looks like this:

https://myvaultinstance:8200/v1/a/b/transit
Copy to Clipboard Toggle word wrap

If the name of the Transit engine was changed (using the -path argument to the vault secrets enable transit command) the URL looks like this:

https://myvaultinstance:8200/v1/mytransit
Copy to Clipboard Toggle word wrap
2.1.1.2. Role of the administrator

To use the filter, an administrator or an administrative process must create the encryption keys within Vault, which are used by the envelope encryption process.

The organization deploying the Record Encryption filter is responsible for managing this administrator or process.

The administrator must have permissions to create keys beneath transit/keys/KEK-* in the Vault hierarchy.

As a guideline, the minimal Vault policy required by the administrator is as follows:

path "transit/keys/KEK-*" {
  capabilities = ["read", "write"]
}
Copy to Clipboard Toggle word wrap
2.1.1.3. Establish an application identity for the filter

The filter must authenticate to Vault in order to perform envelope encryption operations, such as generating and decrypting DEKs Therefore, a Vault identity with sufficient permissions must be created for the filter.

Create a Vault policy for the filter:

vault policy write kroxylicious_encryption_filter_policy - << EOF
path "transit/keys/KEK-*" {
  capabilities = ["read"]
}
path "/transit/datakey/plaintext/KEK-*" {
  capabilities = ["update"]
}
path "transit/decrypt/KEK-*" {
  capabilities = ["update"]
}
EOF
Copy to Clipboard Toggle word wrap

Create a Periodic (long-lived) Vault Token for the filter:

vault token create -display-name "kroxylicious record encryption" \
                   -policy=kroxylicious_encryption_filter_policy \
                   -period=768h \ 
1

                   -no-default-policy \ 
2

                   -orphan 
3
Copy to Clipboard Toggle word wrap
1
Causes the token to be periodic (with every renewal using the given period).
2
Detach the "default" policy from the policy set for this token. This is done so the token has least-privilege.
3
Create the token with no parent. This is done so that expiration of a parent won’t expire the token used by the filter.
Note

The example token create command illustrates the use of -no-default-policy and -orphan. The use of these flags is not functionally important. You may adapt the configuration of the token to suit the standards required by your organization.

The token create command yields the token. The token value is required later when configuring the vault within the filter.

token              hvs.CAESIFJ_HHo0VnnW6DSbioJ80NqmuYm2WlON-QxAPmiJScZUGh4KHGh2cy5KdkdFZUJMZmhDY0JCSVhnY2JrbUNEWnE
token_accessor     4uQZJbEnxW4YtbDBaW6yVzwP
token_policies     [kroxylicious_encryption_filter_policy]
Copy to Clipboard Toggle word wrap

The token must be renewed before expiration. It is the responsibility of the administrator to do this.

This can be done with a command like the following:

vault token renew --accessor <token_accessor>
Copy to Clipboard Toggle word wrap
2.1.1.4. Testing the application identity for the filter using the CLI

To test whether the application identity and the policy are working correctly, a script can be used.

First, as the administrator, create a KEK in the hierarchy at this path transit/keys/KEK-testkey.

VAULT_TOKEN=<kroxylicious encryption filter token> validate_vault_token.sh <kek path>
Copy to Clipboard Toggle word wrap

The script should respond Ok. If errors are reported check the policy/token configuration.

transit/keys/KEK-testkey can now be removed.

2.1.2. Creating HashiCorp Vault keys

This procedure describes how to create AES-256 symmetric keys for use with the Record Encryption filter. The procedure establishes a naming convention for keys, then uses the HashiCorp CLI to create a key with an optional rotation policy.

Prerequisites

  • Access to the HashiCorp Vault CLI with permissions to create keys.
  • Familiarity with basic HashiCorp Vault Transit Engine key management operations.

Procedure

  1. Establish a naming convention for keys to ensure that the filter’s keys remain separate from those used by other systems.

    In this example, a KEK- prefix is used for filter key names. Adjust the naming convention as needed.

  2. Create a symmetric key:

    vault write -f transit/keys/KEK-trades type=aes256-gcm96 auto_rotate_period=90d
    Copy to Clipboard Toggle word wrap
    • The key type must be aes256-gcm96, which is the default type for the Transit Engine.
    • (Optional) The auto_rotate_period parameter enables automatic 90-day key rotation. Adjust the rotation period if required.
Tip

It is recommended to use a key rotation policy.

2.2. Preparing AWS KMS

To prepare AWS Key Management Service for use with the Record Encryption filter, use the following setup:

  • Establish an AWS KMS aliasing convention for keys
  • Create AWS KMS keys

You’ll need a privileged AWS user that is capable of creating users and policies to perform the set-up.

2.2.1. Establish an aliasing convention for keys within AWS KMS

The filter references KEKs within AWS via an AWS key alias.

Establish a naming convention for key aliases to keep the filter’s keys separate from those used by other systems. Here, we use a prefix of KEK- for filter aliases. Adjust the instructions if a different naming convention is used.

2.2.1.1. Role of the administrator

To use the filter, an administrator or an administrative process must create the encryption keys within AWS KMS, which are used by the envelope encryption process.

The organization deploying the Record Encryption filter is responsible for managing this administrator or process.

The administrator must have permissions to create keys in AWS KMS. As a starting point, the built-in AWS policy AWSKeyManagementServicePowerUser confers sufficient key management privileges.

To get started, use the following commands to set up an administrator with permissions suitable for managing encryption keys in KMS through an AWS Cloud Shell. This example illustrates using the user name kroxylicious-admin, but you can choose a different name if preferred. Adjust the instructions accordingly if you use a different user name.

ADMIN=kroxylicious-admin
INITIAL_PASSWORD=$(aws secretsmanager get-random-password  --output text)
CONSOLE_URL=https://$(aws sts get-caller-identity --query Account --output text).signin.aws.amazon.com/console
aws iam create-user --user-name ${ADMIN}
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSKeyManagementServicePowerUser
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/IAMUserChangePassword
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSCloudShellFullAccess
aws iam create-login-profile --user-name ${ADMIN} --password "${INITIAL_PASSWORD}" --password-reset-required
echo Now log in at ${CONSOLE_URL}  with user name ${ADMIN} password "${INITIAL_PASSWORD}" and change the password.
Copy to Clipboard Toggle word wrap
2.2.1.2. Create an alias-based policy for KEK aliases

Create an alias-based policy granting permissions to use keys aliased by the established alias naming convention.

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
cat > /tmp/policy << EOF
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AliasBasedIAMPolicy",
			"Effect": "Allow",
			"Action": [
				"kms:Encrypt",
				"kms:Decrypt",
				"kms:GenerateDataKey*",
				"kms:DescribeKey"
			],
			"Resource": [
                "arn:aws:kms:*:${AWS_ACCOUNT_ID}:key/*"
			],
			"Condition": {
				"ForAnyValue:StringLike": {
					"kms:ResourceAliases": "alias/KEK-*"
				}
			}
		}
	]
}
EOF
aws iam create-policy --policy-name KroxyliciousRecordEncryption --policy-document file:///tmp/policy
Copy to Clipboard Toggle word wrap
2.2.1.3. Establish an authentication mechanism for the filter

The filter must authenticate to AWS in order to perform envelope encryption operations, such as generating and decrypting DEKs.

2.2.1.4. Authenticating using long-term IAM identity

This procedure describes how to create a long-term IAM identity for the Record Encryption filter to authenticate to AWS KMS. The process involves creating an IAM user and access key, and attaching an alias-based policy that grants permissions to perform KMS operations on specific KEKs.

Note

Do not enable console access for this user. The filter requires only API access, and console access would unnecessarily increase the security risk.

Prerequisites

Procedure

  1. Create the IAM user and access key:

    aws iam create-user --user-name kroxylicious
    aws iam create-access-key --user-name kroxylicious
    Copy to Clipboard Toggle word wrap

    This example uses kroxylicious as the user name, but you can substitute a different name if necessary.

  2. Attach the alias-based policy to the IAM identity:

    AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
    aws iam attach-user-policy --user-name kroxylicious --policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KroxyliciousRecordEncryption"
    Copy to Clipboard Toggle word wrap

    This step grants the user permission to perform KMS operations on KEKs that use the alias naming convention defined in the KroxyliciousRecordEncryption policy.

  3. Verify that the policy has been successfully attached:

    aws iam list-attached-user-policies --user-name kroxylicious
    Copy to Clipboard Toggle word wrap

2.2.2. Creating AWS KMS keys

This procedure describes how to create a Symmetric key with encrypt and decrypt usage for the Record Encryption filter. The procedure uses the AWS CLI to create a key and, optionally, apply a rotation policy.

Note

Multi-region keys are supported.

Prerequisites

Procedure

  1. Create a key and apply the alias:

    KEY_ALIAS="KEK-<name>"
    KEY_ID=$(aws kms create-key | jq -r '.KeyMetadata.KeyId')
    # the create key command will produce JSON output including the KeyId
    aws kms create-alias --alias-name alias/${KEY_ALIAS} --target-key-id ${KEY_ID}
    Copy to Clipboard Toggle word wrap
    Note

    You cannot use keys from other AWS accounts. For more information on this limitation, see the issue for AWS KMS serde improvements.

  2. (Optional) Enable a key rotation policy:

    aws kms enable-key-rotation --key-id ${KEY_ID} --rotation-period-in-days 180
    Copy to Clipboard Toggle word wrap
    Tip

    It is recommended to use a key rotation policy.

2.3. Preparing Azure Key Vault

To prepare Azure Key Vault for use with the Record Encryption filter, use the following setup:

  • Setup Azure resources
  • Establish a naming convention for keys
  • Create Azure Key Vault keys

You’ll need a privileged Azure user that is capable of creating users and resources to perform the set-up.

2.3.1. Setting up Azure resources

This procedure describes how to prepare the Azure resources required to use the Streams for Apache Kafka Proxy Record Encryption filter with Azure Key Vault. This process uses the Azure CLI to create a resource group, provision a key vault, configure a key management user, and establish an authentication method that the filter will use to access the key vault.

Prerequisites

  • An Azure subscription that includes Azure Key Vault
  • An Azure user with sufficient permissions to create and manage users, resource groups, roles, and resources

Procedure

  1. Create a resource group

    Before you create a key vault, you must create a resource group to contain it.

    If you deploy Streams for Apache Kafka Proxy into multiple different environments, you must have separate resource groups and key vaults for each environment.

    Note

    Resource groups are tied to the region they were created with. While resources (such as key vaults) within a resource group can be deployed to any region, Microsoft recommends that they should be located in the same region as the resource group. Keep this in mind when choosing a region for your resource groups.

    If using the Azure CLI, you can create a resource group with a command like this:

    az group create --name "my-resource-group" --location "eastus"
    Copy to Clipboard Toggle word wrap
  2. Create a key vault with the Azure CLI:

    KEY_VAULT_NAME="kroxylicious-key-vault"
    RESOURCE_GROUP_NAME="my-resource-group"
    az keyvault create --name ${KEY_VAULT_NAME} --resource-group ${RESOURCE_GROUP_NAME}
    Copy to Clipboard Toggle word wrap
    • Key vault names must be unique.
    • RESOURCE_GROUP_NAME should match the name of the resource group created in the previous section.

      There are several methods to create key vault resources in Azure. This step describes how to do this using the Azure CLI. The Microsoft Azure documentation has further instructions on how to create key vaults via the Azure portal, Azure CLI, and Azure PowerShell.

      Note

      Microsoft’s best practice guide for Azure Key Vault recommends having one key vault per application, per environment, per region. For example, if you have Streams for Apache Kafka Proxy deployed in your development, test, and production environments, and each of those environments is deployed across two different regions, you would create six key vaults for Streams for Apache Kafka Proxy — one for each Streams for Apache Kafka Proxy deployment in each environment in each region.

  3. Create a key management user.

    To use the filter, a user such as the Key Vault Owner or any user with the Key Vault Crypto Officer RBAC role must create the encryption keys in the key vault. These keys are used by the envelope encryption process.

    When you create a key vault, only the creator (the Key Vault Owner) can manage it. Access to key vaults is controlled through RBAC roles. A user with either the Role Based Access Control Administrator role or the User Access Administrator role can grant those roles to others. For information on exceptions, see the Microsoft documentation.

    The following Azure CLI commands demonstrate creating a user in Microsoft Entra ID and granting key management RBAC privileges. This example illustrates using the domain example.com but you must use your own domain, such as a *.onmicrosoft.com domain. Substitute the value in the USER_NAME field (kroxylicious-user) for a different username if preferred.

    KEY_VAULT_NAME="kroxylicious-key-vault"
    RESOURCE_GROUP_NAME="my-resource-group"
    
    SUBSCRIPTION_ID=$(az account show --query id --output tsv)
    PORTAL_URL="$(az cloud show --query endpoints.portal --output tsv)/$(az account show --query tenantId --output tsv)"
    
    USER_NAME="kroxylicious-user"
    DEFAULT_DOMAIN=$(az rest --method get --url https://graph.microsoft.com/v1.0/domains --query "value[?isDefault].id" -o tsv)
    USER_PRINCIPAL_NAME="${USER_NAME}@${DEFAULT_DOMAIN}"
    INITIAL_PASSWORD=$(tr -dc 'A-Za-z0-9!?%=' < /dev/urandom | head -c 16)
    
    # Create user
    az ad user create \
      --display-name "${USER_NAME}" \
      --user-principal-name "${USER_PRINCIPAL_NAME}" \
      --password "${INITIAL_PASSWORD}" \
      --force-change-password-next-sign-in true
    
    # Create RBAC role assignment
    az role assignment create \
      --role "Key Vault Crypto Officer" \
      --assignee ${USER_PRINCIPAL_NAME} \
      --scope "/subscriptions/${SUBSCRIPTION_ID}/resourceGroups/${RESOURCE_GROUP_NAME}/providers/Microsoft.KeyVault/vaults/${KEY_VAULT_NAME}"
    
    echo "Now log in at ${PORTAL_URL} with user name ${USER_PRINCIPAL_NAME} and password \"${INITIAL_PASSWORD}\" and change the password."
    Copy to Clipboard Toggle word wrap
    • View all subscriptions for the logged-in user with az account list, and change the active subscription with az account set.
    • The Azure portal URL varies depending on your current Azure cloud.
    • The Key Vault Crypto Officer role lets a user perform any action on the keys within the key vault identified in the --scope, except managing permissions.
    • The scope in the role assignment command specifies the full scope hierarchy of the key vault. For more information on this format, see the Microsoft documentation.
  4. Establish an authentication mechanism for the filter.

    The filter must authenticate to Azure Key Vault in order to perform envelope encryption operations, such as generating and decrypting DEKs. See the following procedures for details of the authentication methods available.

2.3.1.1. Authenticating with Microsoft Identity Platform via OAuth 2.0

This procedure describes how to create a Service Principal (an application identity) within Microsoft Entra ID to allow the Record Encryption filter to authenticate to Azure Key Vault using the OAuth 2.0 Client Credentials flow. The process uses the Azure CLI to create the application identity and assign the required Azure Role-Based Access Control (RBAC) role.

Note

This identity is used solely for application-to-application (machine-to-machine) authentication. Grant the Service Principal only the minimum permissions needed to access Azure Key Vault to avoid increasing security risk.

Prerequisites

  • Access to the Azure CLI
  • A user with permissions to create and manage service principals and RBAC roles

Procedure

  1. Create the service principal and retrieve the credentials:

    az ad sp create-for-rbac --name "kroxylicious" --query '[appId, password, tenant, appId]' --output tsv
    Copy to Clipboard Toggle word wrap

    This command creates the Service Principal and outputs, in order, the Client ID, Client Secret, Tenant ID, and App ID of the Service Principal. You require the Principal ID for the next step. You can replace kroxylicious with a different user name.

  2. Assign the built-in Azure Key Vault RBAC role to the service principal:

    PRINCIPAL_ID="00000000-0000-0000-0000-000000000000"
    KEY_VAULT_NAME="kroxylicious-key-vault"
    RESOURCE_GROUP="my-resource-group"
    SCOPE_ID=$(az keyvault show --name ${KEY_VAULT_NAME} --resource-group ${RESOURCE_GROUP} --query "id" --output tsv)
    az role assignment create --assignee ${PRINCIPAL_ID} --role "Key Vault Crypto Service Encryption User" --scope ${SCOPE_ID}
    Copy to Clipboard Toggle word wrap
    • Set PRINCIPAL_ID to the App ID output in the previous step.
    • Set RESOURCE_GROUP to the resource group where your key vault is deployed.

      These commands assign the built-in Key Vault Crypto Service Encryption User role to the Service Principal, scoped to the target Key Vault. Replace KEY_VAULT_NAME and RESOURCE_GROUP with your actual values.

  3. (Optional) If you do not use the built-in RBAC roles, assign the following permissions to the managed identity instead:

    • Microsoft.KeyVault/vaults/keys/read
    • Microsoft.KeyVault/vaults/keys/wrap/action
    • Microsoft.KeyVault/vaults/keys/unwrap/action

      If you are using Managed HSM, also assign Microsoft.KeyVault/managedhsms/rng/action.

  4. Verify that the Service Principal has the correct role assignment and scope:

    PRINCIPAL_ID="00000000-0000-0000-0000-000000000000"
    SCOPE_ID=$(az keyvault show --name ${KEY_VAULT_NAME} --resource-group ${RESOURCE_GROUP} --query "id" --output tsv)
    az role assignment list --assignee "${PRINCIPAL_ID}" --query '[].{Role:roleDefinitionName, Scope:scope}' --output tsv --scope ${SCOPE_ID}
    Copy to Clipboard Toggle word wrap
    • PRINCIPAL_ID is the object identifier (Service Principal App ID) we output in the first step.

      Confirm that the output shows the Key Vault Crypto Service Encryption User role and the full resource ID of your key vault (for example, /subscriptions/…​/resourceGroups/…​/vaults/kroxylicious-key-vault).

2.3.2. Creating Azure Key Vault keys

This procedure describes how to create a key with wrapKey and unwrapKey operations enabled for use with the Record Encryption filter. The procedure uses the Azure CLI to create a key and, optionally, apply a rotation policy.

Prerequisites

  • Access to the Azure CLI.
  • A user with the Key Vault Crypto Officer role or equivalent RBAC permissions to manage keys in Azure Key Vault.
  • Familiarity with basic Azure Key Vault key management operations.

Procedure

  1. Establish a naming convention for keys within key vault for easy identification and use with the Record Encryption filter. Use separate key vaults per application, environment, and region. This ensures separation between the filter’s keys and those used by other systems.
  2. Select a key type.

    Note

    Available key types depend on your Azure Key Vault service tier. See the Azure Key Vault documentation for background information on key types.

    The filter accepts the following key types and their associated wrapping algorithms:

    • Symmetric Keys (oct or oct-HSM): Uses the quantum-resistant A256GCM (256-bit AES GCM) wrapping algorithm, which requires a Managed HSM subscription.
    • Asymmetric Keys (RSA or RSA-HSM): Uses the RSA-OAEP-256 (RSAES-OAEP with SHA-256 hash and MGF1/SHA-256 mask) wrapping algorithm, which is not quantum-resistant. This wrapping algorithm does not require a Managed HSM subscription to use (however, RSA-HSM is not available without a premium Azure Key Vault subscription).
  3. Create a key with the Azure CLI:

    KEY_VAULT_NAME="kroxylicious-key-vault"
    KEY_NAME="KEK-<name>"
    KEY_TYPE="rsa"
    az keyvault key create --vault-name ${KEY_VAULT_NAME} --name ${KEY_NAME} --kty ${KEY_TYPE} --ops wrapKey unwrapKey
    Copy to Clipboard Toggle word wrap
    • Change KEY_TYPE if you want to use a key type other than RSA.
    • The wrapKey and unwrapKey operations must be enabled.
  4. (Optional) Enable a key rotation policy.

    1. Save your policy to a JSON file. You can refer to an example policy in the Azure CLI documentation.
    2. Apply the rotation policy:

      az keyvault key rotation-policy update --vault-name ${VAULT_NAME} --name ${KEY_NAME} --value path/to/my_policy.json
      Copy to Clipboard Toggle word wrap
      Tip

      It is recommended to use a key rotation policy.

Chapter 3. Configuring the Record Encryption filter

This section describes at a high level how to configure the Record Encryption filter using a previously prepared KMS. Subsections provide in-depth details.

Prerequisites

Procedure

  1. Configure the plugin for your supported KMS, as required.

  2. Create a filter configuration that references the configured KMS plugins.

    See Section 3.4, “Filter configuration”

  3. Apply the filter configuration:

3.1. HashiCorp Vault plugin configuration

For HashiCorp Vault, the KMS configuration used by the filter looks like this. Use the Vault Token and Vault Transit Engine URL values from the KMS setup.

kms: VaultKmsService                                          
1

kmsConfig:
  vaultTransitEngineUrl: <vault transit engine service url>   
2

  tls:                                                        
3

    # ...
  vaultToken:                                                 
4

    passwordFile: /opt/vault/token
Copy to Clipboard Toggle word wrap
1
Specifies the name of the KMS provider. Use VaultKmsService.
2
Vault Transit Engine URL including the protocol part, such as https: or http:.
3
(Optional) TLS trust configuration.
4
File containing the Vault Token.

A TLS client certificate can be specified using a PKCS#12 or JKS key store file.

Example TLS client certificate configuration using a PKCS#12 key store file

key:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
  keyPassword: 
4

    passwordFile: /opt/cert/key.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.
4
Optionally, a password may be specified for the key entry within the file.

A set of trust anchors for the TLS client can be specified using a PKCS#12 or JKS key store file.

Example TLS client trust change configuration using a PKCS#12 key store file

trust:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.

3.2. AWS KMS plugin configuration

For AWS KMS the configuration for authenticating with AWS KMS services looks like this:

Configuration for authenticating with a long-term IAM identity

kms: AwsKmsService                                            
1

kmsConfig:
  endpointUrl: https://kms.<region>.amazonaws.com             
2

  tls:                                                        
3

    # ...
  longTermCredentials:
    accessKeyId:
      passwordFile: /opt/aws/accessKey                        
4

    secretAccessKey:
      passwordFile: /opt/aws/secretKey                        
5

  region: <region>                                            
6
Copy to Clipboard Toggle word wrap

1
Specifies the name of the KMS provider. Use AwsKmsService.
2
AWS KMS endpoint URL, which must include the https:// scheme.
3
(Optional) TLS trust configuration.
4
File containing the AWS access key ID.
5
File containing the AWS secret access key.
6
The AWS region identifier, such as us-east-1, specifying where your KMS resources are located. This must match the region of the KMS endpoint you’re using.

A TLS client certificate can be specified using a PKCS#12 or JKS key store file.

Example TLS client certificate configuration using a PKCS#12 key store file

key:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
  keyPassword: 
4

    passwordFile: /opt/cert/key.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.
4
Optionally, a password may be specified for the key entry within the file.

A set of trust anchors for the TLS client can be specified using a PKCS#12 or JKS key store file.

Example TLS client trust change configuration using a PKCS#12 key store file

trust:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.

3.3. Azure Key Vault plugin configuration

For Azure Key Vault the configuration for authenticating with Microsoft Identity Platform looks like this:

Configuration for authenticating with Microsoft Identity Platform via OAuth 2.0

kms: AzureKeyVault
kmsConfig:
  keyVaultName: my-key-vault
  keyVaultHost: vault.azure.net
  tls:
    # ... client trust configuration for Azure Key Vault
  oauth2ClientCredentials:
    oauthEndpoint: https://login.microsoftonline.com
    tenantId: "00000000-0000-0000-0000-000000000000"
    clientId:
      passwordFile: /path/to/id
    clientSecret:
      passwordFile: /path/to/secret
    scope: https://vault.azure.net/.default
    tls:
      # ... client trust configuration for Microsoft Identity Platform OAuth2
Copy to Clipboard Toggle word wrap

  • kms specifies the name of the KMS provider. Use AzureKeyVault.
  • keyVaultName is the name of the key vault the filter uses.
  • keyVaultHost is the key vault host name, without the key vault name prefix.
  • oauthEndpoint is the URL used for the OAuth 2.0 Client Credentials flow.
  • tenantId is the 32-character identifier for the Microsoft Entra tenant where the OAuth credentials were created.
  • clientId.passwordFile specifies the file that contains the OAuth client ID.
  • clientSecret.passwordFile specifies the file that contains the OAuth client secret.
  • scope is the App ID URI of the target resource that the proxy authenticates to (your Azure Key Vault URI).

A TLS client certificate can be specified using a PKCS#12 or JKS key store file.

Example TLS client certificate configuration using a PKCS#12 key store file

key:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
  keyPassword: 
4

    passwordFile: /opt/cert/key.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.
4
Optionally, a password may be specified for the key entry within the file.

A set of trust anchors for the TLS client can be specified using a PKCS#12 or JKS key store file.

Example TLS client trust change configuration using a PKCS#12 key store file

trust:
  storeFile: /opt/cert/server.p12 
1

  storeType: PKCS12 
2

  storePassword: 
3

    passwordFile: /opt/cert/store.password
Copy to Clipboard Toggle word wrap

1
storeFile specifies PKCS#12 file
2
storeType specifies what the keystore file type is. Supported values include PKCS12 and JKS.
3
Optionally, a keystore file password may be specified.

3.4. Filter configuration

This procedure describes how to configure the Record Encryption filter. Provide the filter configuration and the Key Encryption Key (KEK) selector to use. The KEK selector maps topic name to key names. The filter looks up the resulting key name in the KMS.

Prerequisites

Procedure

  1. Configure a RecordEncryption type filter.

    Example Record Encryption filter configuration

    kms: <kms_service_name> 
    1
    
    kmsConfig:
      <kms_specific_config> 
    2
    
      # ...
    selector: <KEK-selector_service_name> 
    3
    
    selectorConfig:
      template: "KEK-$(topicName)" 
    4
    
    unresolvedKeyPolicy: PASSTHROUGH_UNENCRYPTED 
    5
    
    experimental:
      encryptionDekRefreshAfterWriteSeconds: 3600 
    6
    
      encryptionDekExpireAfterWriteSeconds: 7200 
    7
    
      maxEncryptionsPerDek: 5000000 
    8
    Copy to Clipboard Toggle word wrap

    1
    The KMS service name.
    2
    Configuration specific to the KMS provider.
    3
    The Key Encryption Key (KEK) selector to use. The $(topicName) is a literal understood by the proxy. For example, if using the TemplateKekSelector with the template KEK-$(topicName), create a key for every topic that is to be encrypted with the key name matching the topic name, prefixed by the string KEK-.
    4
    The template for deriving the KEK, based on a specific topic name.
    5
    Optional policy governing the behaviour when the KMS does not contain a key. The default is PASSTHROUGH_UNENCRYPTED which causes the record to be forwarded, unencrypted, to the target cluster. Users can alternatively specify REJECT which will cause the entire produce request to be rejected. This is a safer alternative if you know that all traffic sent to the Virtual Cluster should be encrypted because unencrypted data will never be forwarded.
    6
    How long after creation of a DEK before it becomes eligible for rotation. On the next encryption request, the cache will asynchronously create a new DEK. Encryption requests will continue to use the old DEK until the new DEK is ready.
    7
    How long after creation of a DEK until it is removed from the cache. This setting puts an upper bound on how long a DEK can remain cached.
    8
    The maximum number of records any DEK should be used to encrypt. After this limit is hit, that DEK will be destroyed and a new one created.

    encryptionDekRefreshAfterWriteSeconds and encryptionDekExpireAfterWriteSeconds properties govern the originator usage period of the DEK, which is the amount of time the DEK remains valid for encrypting records. Shortening this period helps limit the impact if the DEK key material is leaked. However, shorter periods increase the number of KMS API calls, which might affect produce and consume latency and raise KMS provider costs.

    maxEncryptionsPerDek helps prevent key exhaustion by placing an upper limit of the amount of times that a DEK may be used to encrypt records.

  2. Verify that the encryption has been applied to the specified topics by producing messages through the proxy and then consuming directly and indirectly from the Kafka cluster.
Note

If the filter is unable to find the key in the KMS, the filter passes through the records belonging to that topic in the produce request without encrypting them.

3.5. Example KafkaProtocolFilter resource

If your instance of Streams for Apache Kafka Proxy runs on OpenShift, you must use a KafkaProtocolFilter resource to contain the filter configuration.

Here’s a complete example of a KafkaProtocolFilter resource configured for record encryption with Vault KMS:

Example KafkaProtocolFilter resource

kind: KafkaProtocolFilter
metadata:
  name: my-encryption-filter
spec:
  type: RecordEncryption
  configTemplate:
    kms: VaultKmsService
    kmsConfig:
      vaultTransitEngineUrl: # ...
      tls: # ...
      vaultToken:
        passwordFile: ${secret:encryption-filter:vault-token}
    selector: TemplateKekSelector
    selectorConfig:
      template: "KEK-$(topicName)"
    unresolvedKeyPolicy: PASSTHROUGH_UNENCRYPTED
    experimental:
      encryptionDekRefreshAfterWriteSeconds: 3600
      encryptionDekExpireAfterWriteSeconds: 7200
      maxEncryptionsPerDek: 5000000
Copy to Clipboard Toggle word wrap

Refer to the Deploying and Managing Streams for Apache Kafka Proxy on OpenShift guide for more information about configuration on OpenShift.

Chapter 4. Monitoring the Record Encryption filter

This section describes how to monitor the Record Encryption filter.

4.1. Record Encryption filter metrics

The filter emits metrics that provide insights into its interactions with the configured KMS. They indicate the load the filter places on the KMS infrastructure and how often its interactions with the KMS fail.

The filter emits metrics that count the number of records that are being encrypted. This can help you verify that the filter is configured properly and encrypting specific topics as intended.

These metrics are made available automatically once metrics are enabled in the proxy.

4.1.1. KMS metrics

KMS metrics track and count the following types of interactions:

  • Generating DEK pairs
  • Decrypting EDEKs
  • Resolving KEK aliases
Expand
Table 4.1. KMS metrics
Metric NameTypeLabelsDescription

kroxylicious_kms_operation_attempt_total

Counter

operation

Count of the number of KMS operations attempted.

kroxylicious_kms_operation_outcome_total

Counter

operation, outcome

Count of the number of KMS operations grouped by outcome.

Expand
Table 4.2. Labels used on the KMS metrics
LabelDomainDescription

operation

generate_dek_pair, decrypt_edek, resolve_alias

Type of operation performed.

outcome

SUCCESS, EXCEPTION, NOT_FOUND

Result of the operation.

4.1.2. Encryption accounting metrics

Encryption accounting metrics count the number of records sent to topics that are encrypted and the number of records sent to topics that are not configured for encryption. These metrics are discriminated by topic name. Use these metrics to confirm you configuration is having the effect you desired.

Expand
Table 4.3. Encryption accounting metrics
Metric NameTypeLabelsDescription

kroxylicious_filter_record_encryption_encrypted_records

Counter

topic_name

Count of the number of records encrypted by the filter.

kroxylicious_filter_record_encryption_plain_records

Counter

topic_name

Count of the number of records not encrypted by the filter.

Chapter 5. Operations

This section documents the operational aspects of using the record encryption filter.

5.1. Handling lost KEKs

This section describes how to recover or mitigate the loss of a Key Encryption Key (KEK) required for decryption.

A KEK is considered lost if it is no longer usable for decryption even though the Key Management System (KMS) remains accessible to the proxy. For example, the key might be scheduled for deletion or in an invalid state.

Warning

Do not delete KEKs from your KMS. Determining which KEKs are still required for decryption is complex and error-prone. If a KEK is deleted while encrypted records still depend on it, those records become unrecoverable. As a result, consuming applications will encounter errors and stop processing unless additional action is taken. Only follow the procedures in this section if absolutely necessary.

When a consumer attempts to fetch a record that cannot be decrypted, the proxy returns an error. The exact error depends on the Kafka client library:

Apache Kafka client
Unexpected error code 91 while fetching at offset n from topic-partition <topic>-<partition>
librdkafka-based client
Fetch from broker 0 failed at offset n (leader epoch 0): Broker: Request illegally referred to resource that does not exist

These errors indicate that the KEK required for decryption is missing. Error code 91 (RESOURCE_NOT_FOUND) is returned by the Record Encryption filter when the KEK is unavailable.

To confirm the issue, check the proxy logs for entries like the following:

Failed to decrypt record in topic-partition <topic>-<partition> owing to key not found condition.
This will be reported to the client as a RESOURCE_NOT_FOUND(91).
Client may see a message like 'Unexpected error code 91 while fetching at offset' (java) or or 'Request illegally referred to resource that does not exist' (librdkafka).
Cause message: key 'd691a642-d8b4-4445-b668-d390df7000bb' is not found (AWS error: ErrorResponse{type='KMSInvalidStateException', message='arn:aws:kms:us-east-1:000000000000:key/d691a642-d8b4-4445-b668-d390df7000bb is pending deletion.'}).
Raise log level to DEBUG to see the stack.
Copy to Clipboard Toggle word wrap

If you confirm that a KEK is lost, take one of the following actions:

  • Cancel key deletion
  • Restore key from backup
  • Delete or skip affected records

The actions are listed in recommended order to help restore record consumption. After applying any of the strategies, restart all proxy instances to resume consuming records.

5.1.1. Cancel key deletion

Some KMS providers schedule keys for deletion instead of deleting them immediately. During this time, the key appears unavailable but can still be recovered:

  1. Use your KMS console or API to check if the missing key is scheduled for deletion.
  2. If so, cancel the deletion to restore the key.

Refer to the documentation of the KMS for more details.

5.1.2. Restore key from backup

If the key was backed up, restore it from the backup:

  1. Use your KMS’s backup and restore tools to recover the KEK.
  2. Ensure that you also restore the original key metadata, such as the key identifier. The Record Encryption filter uses the identifier to reference the KEK in cipher text records.
Important

Restoring the key material alone does not ensure compatibility with encrypted records. You must also recover related metadata, such as the key identifier, to resume successful decryption.

5.1.3. Delete or skip affected records

If the KEK cannot be recovered, you must do one of the following:

  • Delete the encrypted records
  • Advance consumer group offsets to skip the affected records

The process is as follows:

  1. Identify the earliest offset after which all records can be successfully decrypted.

    Proxy instances may not switch to a new KEK at the same time, so records encrypted with different keys might appear together in the log. As a result, there may be no single offset where encryption clearly transitions from one KEK to the next.

    Use kafka-console-consumer.sh with a binary search approach to find the lowest offset for each affected topic partition where decryption succeeds. Domain-specific knowledge can help narrow the search.

  2. Use the new starting offset for each affected topic partition to do one of the following:

    • Delete records using kafka-delete-records.sh
      This tool deletes all records up to the specified offset, including any that may still be readable.
    • Advance consumer group offsets using kafka-consumer-groups.sh
      You must reset offsets for every consumer group that must skip the records that cannot be decrypted.

Appendix A. Using your subscription

Streams for Apache Kafka is provided through a software subscription. To manage your subscriptions, access your account at the Red Hat Customer Portal.

A.1. Accessing Your Account

  1. Go to access.redhat.com.
  2. If you do not already have an account, create one.
  3. Log in to your account.

A.2. Activating a Subscription

  1. Go to access.redhat.com.
  2. Navigate to My Subscriptions.
  3. Navigate to Activate a subscription and enter your 16-digit activation number.

A.3. Downloading Zip and Tar Files

To access zip or tar files, use the customer portal to find the relevant files for download. If you are using RPM packages, this step is not required.

  1. Open a browser and log in to the Red Hat Customer Portal Product Downloads page at access.redhat.com/downloads.
  2. Locate the Streams for Apache Kafka entries in the INTEGRATION AND AUTOMATION category.
  3. Select the desired Streams for Apache Kafka product. The Software Downloads page opens.
  4. Click the Download link for your component.

A.4. Installing packages with DNF

To install a package and all the package dependencies, use:

dnf install <package_name>
Copy to Clipboard Toggle word wrap

To install a previously-downloaded package from a local directory, use:

dnf install <path_to_download_package>
Copy to Clipboard Toggle word wrap

Revised on 2025-12-16 10:57:57 UTC

Legal Notice

Copyright © Red Hat.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat Software Collections is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る