6.4. 部署 Kafka 连接
Kafka Connect 是一个在 Apache Kafka 和其他系统间流传输数据的工具。例如,Kafka Connect 可能会将 Kafka 与外部数据库或存储和消息传递系统集成。
在 AMQ Streams 中,Kafka Connect 部署为分布式模式。Kafka Connect 也可以以独立模式工作,但 AMQ Streams 不支持它。
使用 连接器 的概念,Kafka Connect 提供了一个框架,用于将大量数据移到 Kafka 集群中,同时保持可扩展性和可靠性。
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect
集群,以及利用 KafkaConnector
资源创建的连接器。
要使用 Kafka Connect,您需要执行以下操作。
术语 连接器 会互换使用,以表示在 Kafka Connect 集群或连接器类中运行的连接器实例。在本指南中,当含义从上下文中清除时,会使用术语 连接器。
6.4.1. 将 Kafka Connect 部署到 OpenShift 集群
此流程演示了如何使用 Cluster Operator 将 Kafka Connect 集群部署到 OpenShift 集群。
Kafka Connect 集群部署使用可配置的节点数(也称为 worker),将连接器工作负载作为 任务 分发,以便消息流高度可扩展且可靠。
部署使用 YAML 文件来提供规格来创建 KafkaConnect
资源。
AMQ Streams 提供示例配置文件。在此过程中,我们使用以下示例文件:
-
examples/connect/kafka-connect.yaml
流程
部署 Kafka 连接到 OpenShift 集群。使用
examples/connect/kafka-connect.yaml
文件来部署 Kafka Connect。oc apply -f examples/connect/kafka-connect.yaml
检查部署的状态:
oc get pods -n <my_cluster_operator_namespace>
输出显示部署名称和就绪度
NAME READY STATUS RESTARTS my-connect-cluster-connect-<pod_id> 1/1 Running 0
my-connect-cluster
是 Kafka Connect 集群的名称。pod ID 标识创建的每个 pod。
使用默认部署,您可以创建一个 Kafka Connect pod。
READY
显示就绪/预期的副本数。当STATUS
显示为Running
时,部署可以成功。
其他资源
6.4.2. 为多个实例配置 Kafka 连接
如果您运行多个 Kafka Connect 实例,您必须更改以下配置属性 的默认配置
:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: connect-cluster 1 offset.storage.topic: connect-cluster-offsets 2 config.storage.topic: connect-cluster-configs 3 status.storage.topic: connect-cluster-status 4 # ... # ...
对于具有相同 group.id
的所有 Kafka Connect 实例,这三个主题的值必须相同。
除非更改默认设置,否则每个连接到同一 Kafka 集群的 Kafka Connect 实例都使用相同的值部署。实际上,所有实例都是在集群中运行并使用相同的主题的所有实例。
如果多个 Kafka Connect 集群尝试使用相同的主题,Kafka Connect 将无法正常工作,并生成错误。
如果要运行多个 Kafka Connect 实例,请更改每个实例的这些属性值。
6.4.3. 添加连接器
Kafka Connect 使用连接器与其他系统集成来流传输数据。连接器是 Kafka Connector
类的实例,可以是以下类型之一:
- 源连接器
- 源连接器是一个运行时实体,它从外部系统获取数据并将其传送到 Kafka 作为信息。
- sink 连接器
- sink 连接器是一个运行时实体,它从 Kafka 主题获取信息并将其传送到外部系统。
Kafka Connect 使用插件架构为连接器提供实施工件。插件允许连接到其他系统,并提供额外的配置来操作数据。插件包括连接器和其他组件,如数据转换器和转换。连接器使用特定类型的外部系统运行。每个连接器都定义了其配置架构。您提供到 Kafka Connect 的配置,以在 Kafka Connect 中创建连接器实例。然后,连接器实例定义了一组用于在系统之间移动数据的任务。
使用以下方法之一将连接器插件添加到 Kafka Connect 中:
将插件添加到容器镜像后,您可以使用以下方法启动、停止和管理连接器实例:
您还可以使用这些选项创建新的连接器实例。
6.4.3.1. 自动使用连接器插件构建新容器镜像
配置 Kafka Connect,以便 AMQ Streams 会自动使用额外的连接器构建新容器镜像。您可以使用 KafkaConnect
自定义资源的 .spec.build.plugins
属性定义连接器插件。AMQ Streams 将自动下载连接器插件并将其添加到新容器镜像中。容器被推送到 .spec.build.output
中指定的容器存储库中,并在 Kafka Connect 部署中自动使用。
先决条件
- 必须部署 Cluster Operator。
- 容器 registry。
您需要提供自己的容器 registry,其中可将镜像推送到、存储和拉取镜像。AMQ Streams 支持私有容器 registry 以及公共 registry,如 Quay 或 Docker Hub。
流程
通过在
.spec.build.output
中指定容器 registry 来配置KafkaConnect
自定义资源,并在.spec.build.plugins
中指定其他连接器:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... build: output: 2 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 3 - name: debezium-postgres-connector artifacts: - type: tgz url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79 - name: camel-telegram artifacts: - type: tgz url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91 #...
创建或更新资源:
$ oc apply -f <kafka_connect_configuration_file>
- 等待新容器镜像构建,并且部署 Kafka Connect 集群。
-
使用 Kafka Connect REST API 或
KafkaConnector
自定义资源使用您添加的连接器插件。
6.4.3.2. 使用 Kafka Connect 基础镜像中的连接器插件构建新容器镜像
使用 Kafka Connect 基础镜像中的连接器插件创建自定义 Docker 镜像,将自定义镜像添加到 /opt/kafka/plugins
目录中。
您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,使用额外的连接器插件创建自己的自定义镜像。
在启动时,Kafka Connect 的 AMQ Streams 版本会加载 /opt/kafka/plugins
目录中包含的任何第三方连接器插件。
流程
使用
registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
作为基础镜像创建新的Dockerfile
:FROM registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
插件文件示例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-<version>.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-<version>.jar │ ├── debezium-core-<version>.jar │ ├── LICENSE.txt │ ├── mongodb-driver-core-<version>.jar │ ├── README.md │ └── # ... ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-<version>.jar │ ├── debezium-core-<version>.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-<version>.jar │ ├── mysql-connector-java-<version>.jar │ ├── README.md │ └── # ... └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-<version>.jar ├── debezium-core-<version>.jar ├── LICENSE.txt ├── postgresql-<version>.jar ├── protobuf-java-<version>.jar ├── README.md └── # ...
COPY 命令指向要复制到容器镜像的插件文件。
本例为 Debezium 连接器(MongoDB、MySQL 和 PostgreSQL)添加了插件,但并非所有文件都被列为 brevity。在 Kafka Connect 中运行的 Debezium 与任何其他 Kafka Connect 任务相同。
- 构建容器镜像。
- 将自定义镜像推送到容器 registry。
指向新容器镜像。
您可以使用以下方法之一指向镜像:
编辑
KafkaConnect
自定义资源的KafkaConnect.spec.image
属性。如果设置,此属性会覆盖 Cluster Operator 中的
STRIMZI_KAFKA_CONNECT_IMAGES
环境变量。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
-
编辑
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
文件中的STRIMZI_KAFKA_CONNECT_IMAGES
环境变量,然后重新安装 Cluster Operator。
6.4.3.3. 部署 KafkaConnector 资源
部署 KafkaConnector
资源来管理连接器。KafkaConnector
自定义资源提供了一种 OpenShift 原生方法来管理 Cluster Operator 连接器。您不需要发送 HTTP 请求来管理连接器,就像 Kafka Connect REST API 一样。您可以通过更新对应的 KafkaConnector
资源来管理正在运行的连接器实例,然后应用更新。Cluster Operator 更新正在运行的连接器实例的配置。您可以通过删除对应的 KafkaConnector
来删除连接器。
KafkaConnector
资源必须部署到与其链接到的 Kafka Connect 集群相同的命名空间中。
在此过程中显示的配置中,autoRestart
属性被设置为 true
。这可让自动重启失败的连接器和任务。最多进行 7 个重启尝试,之后必须手动重新启动。您可以注解 KafkaConnector
资源来 重启连接器 或 手动重启连接器任务。
连接器示例
您可以使用您自己的连接器,或尝试 AMQ Streams 提供的示例。直到 Apache Kafka 3.1.0 之前,Apache Kafka 中包含文件连接器插件示例。从 Apache Kafka 的 3.1.1 和 3.2.0 版本开始,需要将示例 添加到插件路径中,作为任何其他连接器。
AMQ Streams 为示例文件连接器插件提供了 KafkaConnector
配置文件(example /connect/source-connector.yaml )的示例
KafkaConnector 配置文件,它将以下连接器实例创建为 KafkaConnector
资源:
-
从 Kafka 许可证文件(源)读取每行的
FileStreamSourceConnector
实例,并将数据作为信息写入单个 Kafka 主题。 -
从 Kafka 主题读取消息的
FileStreamSinkConnector
实例,并将信息写入临时文件(接收器)。
我们使用此流程使用示例文件创建连接器。
示例连接器不应在生产环境中使用。
先决条件
- Kafka Connect 部署
- Cluster Operator 正在运行
流程
使用以下方法之一将
FileStreamSourceConnector
和FileStreamSinkConnector
插件添加到 Kafka Connect 中:在 Kafka Connect 配置中,将
strimzi.io/use-connector-resources 注解设置为
true
。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ...
启用
KafkaConnector
资源后,Cluster Operator 会监视它们。编辑
examples/connect/source-connector.yaml
文件:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 autoRestart: 5 enabled: true config: 6 file: "/opt/kafka/LICENSE" 7 topic: my-topic 8 # ...
- 1
KafkaConnector
资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的完整名称或别名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 启用自动重启失败的连接器和任务。
- 6
- 连接器配置 作为键值对。
- 7
- 这个示例源连接器配置从
/opt/kafka/LICENSE
文件中读取数据。 - 8
- 将源数据发布到的 Kafka 主题。
在 OpenShift 集群中创建源
KafkaConnector
:oc apply -f examples/connect/source-connector.yaml
创建
examples/connect/sink-connector.yaml
文件:touch examples/connect/sink-connector.yaml
将以下 YAML 粘贴到
sink-connector.yaml
文件中:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector 1 tasksMax: 2 config: 2 file: "/tmp/my-file" 3 topics: my-topic 4
在 OpenShift 集群中创建 sink
KafkaConnector
:oc apply -f examples/connect/sink-connector.yaml
检查是否创建了连接器资源:
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connector
将 <my_connect_cluster> 替换为 Kafka Connect 集群的名称。
在容器中,执行
kafka-console-consumer.sh
来读取源连接器写入主题的消息:oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
将 <my_kafka_cluster> 替换为 Kafka 集群的名称。
源和接收器连接器配置选项
连接器配置在 KafkaConnector
资源的 spec.config
属性中定义。
FileStreamSourceConnector
和 FileStreamSinkConnector
类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。
名称 | 类型 | 默认值 | Description |
---|---|---|---|
| 字符串 | null | 要写入消息的源文件。如果没有指定,则使用标准输入。 |
| list | null | 将数据发布到的 Kafka 主题。 |
名称 | 类型 | 默认值 | Description |
---|---|---|---|
| 字符串 | null | 要写入消息的目标文件。如果没有指定,则使用标准输出。 |
| list | null | 从中读取数据的一个或多个 Kafka 主题。 |
| 字符串 | null | 与一个或多个 Kafka 主题匹配的正则表达式,以便从中读取数据。 |
6.4.3.4. 手动重启连接器
如果您使用 KafkaConnector
资源来管理连接器,请使用 restart
注解来手动触发连接器重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
通过在 OpenShift 中注解
KafkaConnector
资源来重启连接器。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true
restart
注解设置为true
。等待下一个协调发生(默认为两分钟)。
Kafka 连接器会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。
6.4.3.5. 手动重启 Kafka 连接器任务
如果您使用 KafkaConnector
资源来管理连接器,请使用 restart-task
注解来手动触发连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器任务的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
从
KafkaConnector
自定义资源查找要重启的任务 ID。任务 ID 是非负整数,从 0 开始:oc describe KafkaConnector <kafka_connector_name>
通过在 OpenShift 中注解
KafkaConnector
资源,使用 ID 重启连接器任务:oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0
在本例中,任务
0
被重启。等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka 连接器任务会被重启。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。
6.4.3.6. 公开 Kafka Connect API
使用 Kafka Connect REST API 作为使用 KafkaConnector
资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083
的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。服务在创建 Kafka Connect 实例时创建。
Kafka Connect REST API 支持的操作请参考 Apache Kafka Connect API 文档。
strimzi.io/use-connector-resources
注解启用 KafkaConnectors。如果您将注解应用到 KafkaConnect
资源配置,则需要将其删除以使用 Kafka Connect API。否则,Cluster Operator 会恢复使用 Kafka Connect REST API 进行的手动更改。
您可以将连接器配置添加为 JSON 对象。
添加连接器配置的 curl 请求示例
curl -X POST \ http://my-connect-cluster-connect-api:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "my-source-connector", "config": { "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "/opt/kafka/LICENSE", "topic":"my-topic", "tasksMax": "4", "type": "source" } }'
该 API 仅在 OpenShift 集群中访问。如果要使 Kafka Connect API 可供 OpenShift 集群中运行的应用程序访问,您可以通过创建以下功能之一来手动公开它:
-
LoadBalancer
或NodePort
类型服务 -
Ingress
资源(仅限 Kubernetes) - OpenShift 路由(仅限 OpenShift)
连接是不安全的,因此建议进行外部访问。
如果您决定创建服务,请使用 < connect_cluster_name>-connect-api
服务 选择器
中的标签来配置服务将流量路由到的 pod:
服务的选择器配置
# ... selector: strimzi.io/cluster: my-connect-cluster 1 strimzi.io/kind: KafkaConnect strimzi.io/name: my-connect-cluster-connect 2 #...
您还必须创建一个允许来自外部客户端的 HTTP 请求的 NetworkPolicy
。
允许请求 Kafka Connect API 的 NetworkPolicy 示例
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: my-custom-connect-network-policy
spec:
ingress:
- from:
- podSelector: 1
matchLabels:
app: my-connector-manager
ports:
- port: 8083
protocol: TCP
podSelector:
matchLabels:
strimzi.io/cluster: my-connect-cluster
strimzi.io/kind: KafkaConnect
strimzi.io/name: my-connect-cluster-connect
policyTypes:
- Ingress
- 1
- 允许连接到 API 的 pod 标签。
要在集群外添加连接器配置,请使用 curl 命令中公开 API 的资源 URL。
6.4.3.7. 限制对 Kafka Connect API 的访问
仅将对 Kafka Connect API 的访问限制为可信用户,以防止未经授权的操作和潜在的安全问题。Kafka Connect API 提供了大量更改连接器配置的功能,这有助于采取安全措施。有权访问 Kafka Connect API 的人员可能会获得管理员可能假设的敏感信息是安全的。
Kafka Connect REST API 可以被经过身份验证访问 OpenShift 集群的任何人访问,并知道端点 URL,其中包括主机名/IP 地址和端口号。
例如,假设机构使用 Kafka Connect 集群和连接器将客户数据库的敏感数据流传输到中央数据库。管理员使用配置供应商插件存储与连接到客户数据库和中央数据库(如数据库连接详情和身份验证凭据)相关的敏感信息。配置提供程序保护此敏感信息无法向未授权用户公开。但是,有权访问 Kafka Connect API 的用户仍然可以获得对客户数据库的访问权限,而无需同意管理员。它们可以通过设置虚拟数据库并配置连接器来连接它。然后,它们修改连接器配置以指向客户数据库,而不是将数据发送到中央数据库,而是将其发送到虚拟数据库。通过将连接器配置为连接到虚拟数据库,可以截获连接到客户端数据库的登录详情和凭证,即使它们存储在配置提供程序中。
如果您使用 KafkaConnector
自定义资源,默认情况下,OpenShift RBAC 规则只允许 OpenShift 集群管理员更改连接器。您还可以指定非集群管理员来管理 AMQ Streams 资源。在 Kafka Connect 配置中启用 KafkaConnector
资源后,Cluster Operator 会恢复使用 Kafka Connect REST API 所做的更改。如果您不使用 KafkaConnector
资源,默认的 RBAC 规则不会限制对 Kafka Connect API 的访问。如果要使用 OpenShift RBAC 限制对 Kafka Connect REST API 的直接访问,则需要启用和使用 KafkaConnector
资源。
为了提高安全性,我们建议为 Kafka Connect API 配置以下属性:
org.apache.kafka.disallowed.login.modules
(Kafka 3.4 或更高版本)设置
org.apache.kafka.disallowed.login.modules
Java 系统属性,以防止使用不安全的登录模块。例如,指定com.sun.security.auth.module.JndiLoginModule
可防止使用 KafkaJndiLoginModule
。禁止登录模块的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ... jvmOptions: javaSystemProperties: - name: org.apache.kafka.disallowed.login.modules value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule # ...
只允许可信登录模块,并按照您使用的版本的 Kafka 的最新建议进行操作。作为最佳实践,您应该通过使用
org.apache.kafka.disallowed.login.modules
系统属性明确禁止 Kafka Connect 配置中不安全的登录模块。connector.client.config.override.policy
将
connector.client.config.override.policy
属性设置为None
,以防止连接器配置覆盖 Kafka Connect 配置及其使用的用户和制作者。指定连接器覆盖策略的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: connector.client.config.override.policy: None # ...
6.4.3.8. 从 Kafka Connect API 切换到使用 KafkaConnector 自定义资源
您可以从使用 Kafka Connect API 切换到使用 KafkaConnector
自定义资源来管理连接器。要进行交换机,请按照所示的顺序执行以下操作:
-
使用配置部署
KafkaConnector
资源,以创建您的连接器实例。 -
通过将
strimzi.io/use-connector-resources
注解设置为true
,在 Kafka Connect 配置中启用KafkaConnector
资源。
如果在创建 KafkaConnector
资源前启用 KafkaConnector 资源,则删除所有连接器。
要从使用 KafkaConnector
资源切换到使用 Kafka Connect API,首先从 Kafka Connect 配置中删除启用 KafkaConnector
资源的注解。否则,Cluster Operator 会恢复使用 Kafka Connect REST API 进行的手动更改。
在进行交换机时 ,检查 KafkaConnect
资源的状态。metadata.generation
的值(当前版本的部署)必须与 status.observedGeneration
(资源最新协调)匹配。当 Kafka Connect 集群为 Ready
时,您可以删除 KafkaConnector
资源。