9.18. 从外部来源加载配置值
使用配置提供程序从外部来源加载配置数据。供应商独立于 Apache Kafka 的 Streams 操作。您可以使用它们为所有 Kafka 组件加载配置数据,包括生成者和消费者。您可以在组件的配置中引用外部源并提供访问权限。供应商在不重启 Kafka 组件或提取文件的情况下加载数据,即使引用新的外部源也是如此。例如,使用供应商提供 Kafka Connect 连接器配置的凭证。配置必须包含对外部源的任何访问权限。
9.18.1. 启用配置供应商
您可以使用组件的 spec
配置中的 config.providers
属性启用一个或多个配置供应商。
启用配置供应商的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: # ... config.providers: env config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# ...
config:
# ...
config.providers: env
config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
# ...
- KubernetesSecretConfigProvider
- 从 OpenShift 机密加载配置数据。您可以在存储配置数据的 secret 中指定 secret 和密钥。此提供程序可用于存储敏感配置数据,如密码或其他用户凭据。
- KubernetesConfigMapConfigProvider
- 从 OpenShift 配置映射加载配置数据。您可以在存储配置数据的配置映射中指定配置映射的名称和键。此提供程序可用于存储非敏感配置数据。
- EnvVarConfigProvider
- 从环境变量加载配置数据。您可以指定保存配置数据的环境变量的名称。此提供程序可用于配置容器中运行的应用,例如从机密映射的环境变量加载证书或 JAAS 配置。
- FileConfigProvider
- 从文件加载配置数据。您可以指定保存配置数据的文件的路径。此提供程序可用于从挂载到容器中的文件中加载配置数据。
- DirectoryConfigProvider
- 从目录中的文件加载配置数据。您可以指定存储配置文件的目录的路径。此提供程序可用于加载多个配置文件和将配置数据组织到单独的文件中。
要使用作为 OpenShift Configuration Provider 插件一部分的 KubernetesSecretConfigProvider
和 KubernetesConfigMapConfigProvider
,您必须为包含配置文件的命名空间设置访问权限。
您可以在不设置访问权限的情况下使用其他供应商。您可以通过执行以下操作来为 Kafka Connect 或 MirrorMaker 2 提供连接器配置:
- 将配置映射或 secret 挂载到 Kafka Connect pod 中作为环境变量或卷中
-
在 Kafka Connect 或 MirrorMaker 2 配置中启用
EnvVarConfigProvider
,FileConfigProvider
, 或DirectoryConfigProvider
-
使用
KafkaConnect
或KafkaMirrorMaker2
资源的spec
中的externalConfiguration
属性传递连接器配置
使用供应商有助于防止通过 Kafka Connect REST 接口传递受限信息。在以下情况下可以使用这个方法:
- 使用连接器用来连接和数据源通信的值挂载环境变量
- 使用配置 Kafka Connect 连接器的值挂载属性文件
- 在目录中挂载文件,其中包含连接器使用的 TLS 信任存储和密钥存储的值
将新的 Secret
或 ConfigMap
用于连接器时需要重启,这可能会破坏其他连接器。
9.18.2. 从 secret 或配置映射加载配置值
使用 KubernetesSecretConfigProvider
从 secret 或 KubernetesConfigMapConfigProvider
提供配置属性,从配置映射提供配置属性。
在此过程中,配置映射为连接器提供配置属性。属性指定为配置映射的键值。配置映射作为卷挂载到 Kafka Connect pod 中。
先决条件
- Kafka 集群正在运行。
- Cluster Operator 正在运行。
- 您有一个包含连接器配置的配置映射。
使用连接器属性的配置映射示例
apiVersion: v1 kind: ConfigMap metadata: name: my-connector-configuration data: option1: value1 option2: value2
apiVersion: v1
kind: ConfigMap
metadata:
name: my-connector-configuration
data:
option1: value1
option2: value2
流程
配置
KafkaConnect
资源。-
启用
KubernetesConfigMapConfigProvider
此处显示的规格支持从配置映射和 secret 加载值。
使用配置映射和 secret 的 Kafka 连接配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: # ... config.providers: secrets,configmaps config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: # ... config.providers: secrets,configmaps
1 config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider
2 config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
3 # ...
Copy to Clipboard Copied! -
启用
创建或更新资源以启用提供程序。
oc apply -f <kafka_connect_configuration_file>
oc apply -f <kafka_connect_configuration_file>
Copy to Clipboard Copied! 创建一个允许访问外部配置映射中值的角色。
要从配置映射中访问值的角色示例
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: connector-configuration-role rules: - apiGroups: [""] resources: ["configmaps"] resourceNames: ["my-connector-configuration"] verbs: ["get"] # ...
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: connector-configuration-role rules: - apiGroups: [""] resources: ["configmaps"] resourceNames: ["my-connector-configuration"] verbs: ["get"] # ...
Copy to Clipboard Copied! 该规则授予角色权限来访问
my-connector-configuration
配置映射。创建一个角色绑定,允许访问包含配置映射的命名空间。
访问包含配置映射的命名空间的角色绑定示例
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: connector-configuration-role-binding subjects: - kind: ServiceAccount name: my-connect-connect namespace: my-project roleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io # ...
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: connector-configuration-role-binding subjects: - kind: ServiceAccount name: my-connect-connect namespace: my-project roleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io # ...
Copy to Clipboard Copied! 角色绑定授予角色访问
my-project
命名空间的权限。服务帐户必须是 Kafka Connect 部署使用的相同。服务帐户名称是 <
cluster_name>-connect
,其中 <cluster_name
> 是KafkaConnect
自定义资源的名称。在连接器配置中引用配置映射。
引用配置映射的连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-connector labels: strimzi.io/cluster: my-connect spec: # ... config: option: ${configmaps:my-project/my-connector-configuration:option1} # ... # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-connector labels: strimzi.io/cluster: my-connect spec: # ... config: option: ${configmaps:my-project/my-connector-configuration:option1} # ... # ...
Copy to Clipboard Copied! 占位符结构是
configmaps:<path_and_file_name>:<property>
。KubernetesConfigMapConfigProvider
从外部配置映射读取和提取option1
属性值。
9.18.3. 从环境变量加载配置值
使用 EnvVarConfigProvider
提供配置属性作为环境变量。环境变量可以包含来自配置映射或 secret 的值。
在此过程中,环境变量为连接器提供配置属性,以便与 Amazon AWS 通信。连接器必须能够读取 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
。环境变量的值派生自挂载到 Kafka Connect pod 的 secret。
用户定义的环境变量的名称不能以 KAFKA_
或 STRIMZI_
开头。
先决条件
- Kafka 集群正在运行。
- Cluster Operator 正在运行。
- 您有一个包含连接器配置的 secret。
带有环境变量值的 secret 示例
apiVersion: v1 kind: Secret metadata: name: aws-creds type: Opaque data: awsAccessKey: QUtJQVhYWFhYWFhYWFhYWFg= awsSecretAccessKey: Ylhsd1lYTnpkMjl5WkE=
apiVersion: v1
kind: Secret
metadata:
name: aws-creds
type: Opaque
data:
awsAccessKey: QUtJQVhYWFhYWFhYWFhYWFg=
awsSecretAccessKey: Ylhsd1lYTnpkMjl5WkE=
流程
配置
KafkaConnect
资源。-
启用
EnvVarConfigProvider
-
使用
externalConfiguration
属性指定环境变量。
使用外部环境变量的 Kafka 连接配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: # ... config.providers: env config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider # ... externalConfiguration: env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: # ... config.providers: env
1 config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
2 # ... externalConfiguration: env: - name: AWS_ACCESS_KEY_ID
3 valueFrom: secretKeyRef: name: aws-creds
4 key: awsAccessKey
5 - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey # ...
Copy to Clipboard Copied! 注意secretKeyRef
属性引用 secret 中的键。如果您使用配置映射而不是 secret,请使用configMapKeyRef
属性。-
启用
创建或更新资源以启用提供程序。
oc apply -f <kafka_connect_configuration_file>
oc apply -f <kafka_connect_configuration_file>
Copy to Clipboard Copied! 在连接器配置中引用环境变量。
引用环境变量的连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-connector labels: strimzi.io/cluster: my-connect spec: # ... config: option: ${env:AWS_ACCESS_KEY_ID} option: ${env:AWS_SECRET_ACCESS_KEY} # ... # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-connector labels: strimzi.io/cluster: my-connect spec: # ... config: option: ${env:AWS_ACCESS_KEY_ID} option: ${env:AWS_SECRET_ACCESS_KEY} # ... # ...
Copy to Clipboard Copied! 占位符结构是
env:<environment_variable_name>
。EnvVarConfigProvider
从挂载的 secret 中读取并提取环境变量值。
9.18.4. 从目录中的文件载入配置值
使用 FileConfigProvider
从目录中的文件提供配置属性。文件可以是配置映射或 secret。
在此过程中,文件为连接器提供配置属性。数据库名称和密码被指定为 secret 的属性。secret 作为一个卷挂载到 Kafka Connect pod。卷挂载到路径 /opt/kafka/external-configuration/<volume-name
> 上。
先决条件
- Kafka 集群正在运行。
- Cluster Operator 正在运行。
- 您有一个包含连接器配置的 secret。
带有数据库属性的 secret 示例
apiVersion: v1 kind: Secret metadata: name: mysecret type: Opaque stringData: connector.properties: |- dbUsername: my-username dbPassword: my-password
apiVersion: v1
kind: Secret
metadata:
name: mysecret
type: Opaque
stringData:
connector.properties: |-
dbUsername: my-username
dbPassword: my-password
流程
配置
KafkaConnect
资源。-
启用
FileConfigProvider
-
使用
externalConfiguration
属性指定文件。
使用外部属性文件的 Kafka 连接配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider #... externalConfiguration: volumes: - name: connector-config secret: secretName: mysecret
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: config.providers: file
1 config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
2 #... externalConfiguration: volumes: - name: connector-config
3 secret: secretName: mysecret
4 Copy to Clipboard Copied! -
启用
创建或更新资源以启用提供程序。
oc apply -f <kafka_connect_configuration_file>
oc apply -f <kafka_connect_configuration_file>
Copy to Clipboard Copied! 将连接器配置中的文件属性引用为占位符。
引用该文件的连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 2 config: database.hostname: 192.168.99.1 database.port: "3306" database.user: "${file:/opt/kafka/external-configuration/connector-config/mysecret:dbUsername}" database.password: "${file:/opt/kafka/external-configuration/connector-config/mysecret:dbPassword}" database.server.id: "184054" #...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 2 config: database.hostname: 192.168.99.1 database.port: "3306" database.user: "${file:/opt/kafka/external-configuration/connector-config/mysecret:dbUsername}" database.password: "${file:/opt/kafka/external-configuration/connector-config/mysecret:dbPassword}" database.server.id: "184054" #...
Copy to Clipboard Copied! 占位符结构是
file:<path_and_file_name>:<property>
。FileConfigProvider
从挂载的 secret 读取并提取数据库 username 和 password 属性值。
9.18.5. 从目录中的多个文件载入配置值
使用 DirectoryConfigProvider
从目录中的多个文件提供配置属性。文件可以是配置映射或 secret。
在此过程中,secret 为连接器提供 TLS 密钥存储和信任存储用户凭证。凭据位于单独的文件中。secret 作为卷挂载到 Kafka Connect pod 中。卷挂载到路径 /opt/kafka/external-configuration/<volume-name
> 上。
先决条件
- Kafka 集群正在运行。
- Cluster Operator 正在运行。
- 您有一个包含用户凭证的 secret。
使用用户凭证的 secret 示例
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store
apiVersion: v1
kind: Secret
metadata:
name: my-user
labels:
strimzi.io/kind: KafkaUser
strimzi.io/cluster: my-cluster
type: Opaque
data:
ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate
user.crt: <user_certificate> # Public key of the user
user.key: <user_private_key> # Private key of the user
user.p12: <store> # PKCS #12 store for user certificates and keys
user.password: <password_for_store> # Protects the PKCS #12 store
my-user
secret 为连接器提供密钥存储凭证(user.crt
和 user.key
)。
在部署 Kafka 集群时生成的 <cluster_name>-cluster-ca-cert
secret 将集群 CA 证书提供为信任存储凭证(ca.crt
)。
流程
配置
KafkaConnect
资源。-
启用
DirectoryConfigProvider
-
使用
externalConfiguration
属性指定文件。
使用外部属性文件的 Kafka 连接配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: config.providers: directory config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider #... externalConfiguration: volumes: - name: cluster-ca secret: secretName: my-cluster-cluster-ca-cert - name: my-user secret: secretName: my-user
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: config.providers: directory
1 config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
2 #... externalConfiguration: volumes:
3 - name: cluster-ca
4 secret: secretName: my-cluster-cluster-ca-cert
5 - name: my-user secret: secretName: my-user
6 Copy to Clipboard Copied! -
启用
创建或更新资源以启用提供程序。
oc apply -f <kafka_connect_configuration_file>
oc apply -f <kafka_connect_configuration_file>
Copy to Clipboard Copied! 将连接器配置中的文件属性引用为占位符。
引用文件的连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 2 config: # ... database.history.producer.security.protocol: SSL database.history.producer.ssl.truststore.type: PEM database.history.producer.ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/cluster-ca:ca.crt}" database.history.producer.ssl.keystore.type: PEM database.history.producer.ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/my-user:user.crt}" database.history.producer.ssl.keystore.key: "${directory:/opt/kafka/external-configuration/my-user:user.key}" #...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 2 config: # ... database.history.producer.security.protocol: SSL database.history.producer.ssl.truststore.type: PEM database.history.producer.ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/cluster-ca:ca.crt}" database.history.producer.ssl.keystore.type: PEM database.history.producer.ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/my-user:user.crt}" database.history.producer.ssl.keystore.key: "${directory:/opt/kafka/external-configuration/my-user:user.key}" #...
Copy to Clipboard Copied! 占位符结构是
directory:<path>:<file_name>
。DirectoryConfigProvider
从挂载的 secret 读取并提取凭证。