第 9 章 Multicloud 对象网关中的存储桶通知
bucket 通知允许您在存储桶中存在事件时向外部服务器发送通知。Multicloud Object Gateway (MCG)支持多个存储桶事件通知类型。您可以接收存储桶通知,用于流程创建、触发数据摘要等活动。MCG 支持以下通知配置中指定的事件类型:
- s3:TestEvent
- s3:ObjectCreated:*
- s3:ObjectCreated:Put
- s3:ObjectCreated:Post
- s3:ObjectCreated:Copy
- s3:ObjectCreated:CompleteMultipartUpload
- s3:ObjectRemoved:*
- s3:ObjectRemoved:Delete
- s3:ObjectRemoved:DeleteMarkerCreated
- s3:LifecycleExpiration:*
- s3:LifecycleExpiration:Delete
- s3:LifecycleExpiration:DeleteMarkerCreated
- s3:ObjectRestore:*
- s3:ObjectRestore:Post
- s3:ObjectRestore:Completed
- s3:ObjectRestore:Delete
- s3:ObjectTagging:*
- s3:ObjectTagging:Put
- s3:ObjectTagging:Delete
9.1. 在 Multicloud 对象网关中配置存储桶通知 复制链接链接已复制到粘贴板!
先决条件
在配置前,请确保有以下之一:
部署 Kafka 集群。
例如,您可以通过引用 OpenShift 中的 AMQ Streams 入门来使用
AMQ/strimzi 部署 Kafka。HTTP 服务器已连接。
例如,您可以设置 HTTP 服务器来记录传入的 HTTP 请求,以便您可以使用
oc logs命令观察它们,如下所示:$ cat http_logging_server.yaml apiVersion: v1 kind: List metadata: {} items: - apiVersion: apps/v1 kind: Deployment metadata: name: http-logger spec: replicas: 1 selector: matchLabels: app: http-logger template: metadata: labels: app: http-logger spec: containers: - name: http-logger image: registry.redhat.io/ubi9/python-39:latest command: - /bin/sh - -c - | set -e # Fail on any error mkdir -p /tmp/app pip install flask cat <<EOF > /tmp/app/server.py from flask import Flask, request app = Flask(__name__) @app.route("/", methods=["POST", "GET"]) def log_request(): body = request.get_data(as_text=True) print(body) # Simple one-line logging per request return "", 200 if __name__ == "__main__": app.run(host="0.0.0.0", port=8676, debug=True) EOF exec python /tmp/app/server.py ports: - containerPort: 8676 protocol: TCP securityContext: runAsNonRoot: true allowPrivilegeEscalation: false - apiVersion: v1 kind: Service metadata: name: http-logger labels: app: http-logger spec: selector: app: http-logger ports: - port: 8676 targetPort: 8676 protocol: TCP name: http $ oc create -f http_logging_server.yaml -n <http-server-namespace>
- 确保可从发送通知的 MCG 核心 pod 访问服务器。
确保获取 MCG 凭证:
NOOBAA_ACCESS_KEY=$(oc extract secret/noobaa-admin -n openshift-storage --keys=AWS_ACCESS_KEY_ID --to=- 2>/dev/null); \ NOOBAA_SECRET_KEY=$(oc extract secret/noobaa-admin -n openshift-storage --keys=AWS_SECRET_ACCESS_KEY --to=- 2>/dev/null); \ S3_ENDPOINT=https://$(oc get route s3 -n openshift-storage -o json | jq -r ".spec.host") alias aws_alias='AWS_ACCESS_KEY_ID=$NOOBAA_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$NOOBAA_SECRET_KEY aws --endpoint $S3_ENDPOINT --no-verify-
流程
描述使用
json文件的连接,如本地机器上的connection.json:对于 Kafka:
{ "name": "kafka_notif_conn_file" <-- any string "notification_protocol": "kafka", "metadata.broker.list": "<kafka-service-name>.<project>.svc.cluster.local:<kafka-service-port>", "topic": "my-topic",<-- refer to an existing KafkaTopic resource }metadata.broker.list下的结构必须是 <service-name>.<namespace>.svc.cluster.local:9092,主题必须引用命名空间中现有kafkatopic资源的名称。对于 HTTP:
{ "name": "http_notif_connection_config", "notification_protocol": "http", <-- or "https" "agent_request_object": { "host": "<http-service>.<http-server-namespace>.svc.cluster.local", "port": <http-server-port> } }其他选项:
- request_options_object
- 值是一个 JSON,传递给 nodejs 的 http (s)请求(可选)。
nodejs 的 http (s)请求选项支持的任何字段都可以使用,例如:
- 'path'
- 用于指定 url 路径
- 'auth'
- 用于 http 简单 auth。'auth' 的值的语法为:<name>:<password>。
从
openshift-storage命名空间中的文件创建 secret:$ oc create secret generic <connection-secret> --from-file=connect.json -n openshift-storage使用连接 secret 和可选的 CephFS RWX PVC 更新
openshift-storage命名空间中的NooBaaCR。如果没有提供 PVC,MCG 会根据需要自动创建 PVC:$ oc get pvc bn-pvc -n openshift-storage NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE bn-pvc Bound pvc-24683f8d-48e4-4c6b-b108-d507ca2f4fd1 25Gi RWX ocs-storagecluster-cephfs <unset> 25h$ oc patch noobaa noobaa --type='merge' -n openshift-storage -p '{ "spec": { "bucketNotifications": { "connections": [ { "name": <connection-secret>, "namespace": "openshift-storage" } ], "enabled": true, "pvc": "bn-pvc" <-- optional } } }'Important连接 secret 的名称在列表中必须是唯一的,即使命名空间不同。
等待
noobaa-core和noobaa-endpointpod 重启,然后继续下一步。使用 S3 别名下的
noobaa-admin凭证和 S3 端点在 MCG 存储桶上使用S3:PutBucketNotification。$ aws_alias s3api put-bucket-notification --bucket first.bucket --notification-configuration '{ "TopicConfiguration": { "Id": "<notif_event_kafka>", <-- Unique string "Events": ["s3:ObjectCreated:*"], <-- a filter for events "Topic": "<connection-secret/connect.json>" } }'
验证步骤
验证存储桶上是否设置了存储桶通知配置:
$ aws_alias s3api get-bucket-notification-configuration --bucket first.bucket { "TopicConfigurations": [ { "Id": "notif_event_kafka", "TopicArn": "kafka-connection-secret/connect.json", "Events": [ "s3:ObjectCreated:*" ] } ] }在存储桶中添加一些对象:
echo 'a' | aws_alias s3 cp - s3://first.bucket/a等待一段时间并查询主题消息,以验证是否已发送和接收预期的通知。
例如:
对于 Kafka
$ oc -n your-kafka-project rsh my-cluster-kafka-0 bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap.myproject.svc.cluster.local:9092 \ --topic my-topic \ --from-beginning --timeout-ms 10000 | grep '^{.*}' | jq -c '.' | jq对于 HTTP
$ oc logs deployment/http-logger -n <http-server-namespace> | grep '^{.*}' | jq输出
{ "Records": [ { "eventVersion": "2.3", "eventSource": "noobaa:s3", "eventTime": "2024-11-27T12:44:21.987Z", "s3": { "s3SchemaVersion": "1.0", "object": { "sequencer": 10, "key": "a", "eTag": "60b725f10c9c85c70d97880dfe8191b3" }, "bucket": { "name": "second.bucket", "ownerIdentity": { "principalId": "admin@noobaa.io" }, "arn": "arn:aws:s3:::first.bucket" } }, "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "noobaa" }, "requestParameters": { "sourceIPAddress": "100.64.0.3" }, "responseElements": { "x-amz-request-id": "m3zvo0cm-5239xd-bhj", "x-amz-id-2": "m3zvo0cm-5239xd-bhj" } } ] }