第 9 章 Multicloud 对象网关中的存储桶通知


bucket 通知允许您在存储桶中存在事件时向外部服务器发送通知。Multicloud Object Gateway (MCG)支持多个存储桶事件通知类型。您可以接收存储桶通知,用于流程创建、触发数据摘要等活动。MCG 支持以下通知配置中指定的事件类型:

  • s3:TestEvent
  • s3:ObjectCreated:*
  • s3:ObjectCreated:Put
  • s3:ObjectCreated:Post
  • s3:ObjectCreated:Copy
  • s3:ObjectCreated:CompleteMultipartUpload
  • s3:ObjectRemoved:*
  • s3:ObjectRemoved:Delete
  • s3:ObjectRemoved:DeleteMarkerCreated
  • s3:LifecycleExpiration:*
  • s3:LifecycleExpiration:Delete
  • s3:LifecycleExpiration:DeleteMarkerCreated
  • s3:ObjectRestore:*
  • s3:ObjectRestore:Post
  • s3:ObjectRestore:Completed
  • s3:ObjectRestore:Delete
  • s3:ObjectTagging:*
  • s3:ObjectTagging:Put
  • s3:ObjectTagging:Delete

9.1. 在 Multicloud 对象网关中配置存储桶通知

先决条件

  • 在配置前,请确保有以下之一:

    • 部署 Kafka 集群。

      例如,您可以通过引用 OpenShift 中的 AMQ Streams 入门来使用 AMQ /strimzi 部署 Kafka。

    • HTTP 服务器已连接。

      例如,您可以设置 HTTP 服务器来记录传入的 HTTP 请求,以便您可以使用 oc logs 命令观察它们,如下所示:

      $ cat http_logging_server.yaml
      apiVersion: v1
      kind: List
      metadata: {}
      items:
      - apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: http-logger
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: http-logger
          template:
            metadata:
              labels:
                app: http-logger
            spec:
              containers:
              - name: http-logger
                image: registry.redhat.io/ubi9/python-39:latest
                command:
                  - /bin/sh
                  - -c
                  - |
                    set -e  # Fail on any error
                    mkdir -p /tmp/app
                    pip install flask
                    cat <<EOF > /tmp/app/server.py
                    from flask import Flask, request
                    app = Flask(__name__)
                    @app.route("/", methods=["POST", "GET"])
                    def log_request():
                        body = request.get_data(as_text=True)
                        print(body)  # Simple one-line logging per request
                        return "", 200
                    if __name__ == "__main__":
                        app.run(host="0.0.0.0", port=8676, debug=True)
                    EOF
                    exec python /tmp/app/server.py
                ports:
                - containerPort: 8676
                  protocol: TCP
                securityContext:
                  runAsNonRoot: true
                  allowPrivilegeEscalation: false
      - apiVersion: v1
        kind: Service
        metadata:
          name: http-logger
          labels:
            app: http-logger
        spec:
          selector:
            app: http-logger
          ports:
            - port: 8676
              targetPort: 8676
              protocol: TCP
              name: http
      
      $ oc create -f http_logging_server.yaml -n <http-server-namespace>
  • 确保可从发送通知的 MCG 核心 pod 访问服务器。
  • 确保获取 MCG 凭证:

    NOOBAA_ACCESS_KEY=$(oc extract secret/noobaa-admin -n openshift-storage --keys=AWS_ACCESS_KEY_ID --to=- 2>/dev/null); \
    NOOBAA_SECRET_KEY=$(oc extract secret/noobaa-admin -n openshift-storage --keys=AWS_SECRET_ACCESS_KEY --to=- 2>/dev/null); \
    S3_ENDPOINT=https://$(oc get route s3 -n openshift-storage -o json | jq -r ".spec.host")
    alias aws_alias='AWS_ACCESS_KEY_ID=$NOOBAA_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$NOOBAA_SECRET_KEY aws --endpoint $S3_ENDPOINT --no-verify-

流程

  1. 描述使用 json 文件的连接,如本地机器上的 connection.json

    对于 Kafka:

    {
      "name": "kafka_notif_conn_file" <-- any string
      "notification_protocol": "kafka",
      "metadata.broker.list": "<kafka-service-name>.<project>.svc.cluster.local:<kafka-service-port>",
      "topic": "my-topic",<--  refer to an existing KafkaTopic resource
    }

    metadata.broker.list 下的结构必须是 & lt;service-name>.<namespace>.svc.cluster.local:9092,主题必须引用命名空间中现有 kafkatopic 资源的名称。

    对于 HTTP:

    {
      "name": "http_notif_connection_config",
      "notification_protocol": "http", <-- or "https"
      "agent_request_object": {
        "host": "<http-service>.<http-server-namespace>.svc.cluster.local",
        "port": <http-server-port>
       }
    }

    其他选项:

    request_options_object
    值是一个 JSON,传递给 nodejs 的 http (s)请求(可选)。

    nodejs 的 http (s)请求选项支持的任何字段都可以使用,例如:

    'path'
    用于指定 url 路径
    'auth'
    用于 http 简单 auth。'auth' 的值的语法为:<name>:<password>。
  2. openshift-storage 命名空间中的文件创建 secret:

    $ oc create secret generic <connection-secret> --from-file=connect.json -n openshift-storage
  3. 使用连接 secret 和可选的 CephFS RWX PVC 更新 openshift-storage 命名空间中的 NooBaa CR。如果没有提供 PVC,MCG 会根据需要自动创建 PVC:

    $ oc get pvc bn-pvc -n openshift-storage
    NAME     STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS                VOLUMEATTRIBUTESCLASS   AGE
    bn-pvc   Bound    pvc-24683f8d-48e4-4c6b-b108-d507ca2f4fd1   25Gi       RWX            ocs-storagecluster-cephfs   <unset>                 25h
    $ oc patch noobaa noobaa --type='merge' -n openshift-storage -p '{
      "spec": {
        "bucketNotifications": {
          "connections": [
            {
              "name": <connection-secret>,
              "namespace": "openshift-storage"
            }
          ],
          "enabled": true,
          "pvc": "bn-pvc" <-- optional
        }
      }
    }'
    Important

    连接 secret 的名称在列表中必须是唯一的,即使命名空间不同。

    等待 noobaa-corenoobaa-endpoint pod 重启,然后继续下一步。

  4. 使用 S3 别名下的 noobaa-admin 凭证和 S3 端点在 MCG 存储桶上使用 S3:PutBucketNotification

    $ aws_alias s3api put-bucket-notification --bucket first.bucket --notification-configuration '{
      "TopicConfiguration": {
        "Id": "<notif_event_kafka>", <-- Unique string
        "Events": ["s3:ObjectCreated:*"], <-- a filter for events
        "Topic": "<connection-secret/connect.json>"
      }
    }'

验证步骤

  1. 验证存储桶上是否设置了存储桶通知配置:

    $ aws_alias s3api get-bucket-notification-configuration --bucket first.bucket
    {
        "TopicConfigurations": [
            {
                "Id": "notif_event_kafka",
                "TopicArn": "kafka-connection-secret/connect.json",
                "Events": [
                    "s3:ObjectCreated:*"
                ]
            }
        ]
    }
  2. 在存储桶中添加一些对象:

    echo 'a' | aws_alias s3 cp - s3://first.bucket/a

    等待一段时间并查询主题消息,以验证是否已发送和接收预期的通知。

    例如:

    对于 Kafka

    $ oc -n your-kafka-project rsh my-cluster-kafka-0 bin/kafka-console-consumer.sh \
      --bootstrap-server my-cluster-kafka-bootstrap.myproject.svc.cluster.local:9092 \
      --topic my-topic \
      --from-beginning --timeout-ms 10000 | grep '^{.*}' | jq -c '.' | jq

    对于 HTTP

    $ oc logs deployment/http-logger -n <http-server-namespace> | grep '^{.*}' | jq

    输出

    {
      "Records": [
        {
          "eventVersion": "2.3",
          "eventSource": "noobaa:s3",
          "eventTime": "2024-11-27T12:44:21.987Z",
          "s3": {
            "s3SchemaVersion": "1.0",
            "object": {
              "sequencer": 10,
              "key": "a",
              "eTag": "60b725f10c9c85c70d97880dfe8191b3"
            },
            "bucket": {
              "name": "second.bucket",
              "ownerIdentity": {
                "principalId": "admin@noobaa.io"
              },
              "arn": "arn:aws:s3:::first.bucket"
            }
          },
          "eventName": "ObjectCreated:Put",
          "userIdentity": {
            "principalId": "noobaa"
          },
          "requestParameters": {
            "sourceIPAddress": "100.64.0.3"
          },
          "responseElements": {
            "x-amz-request-id": "m3zvo0cm-5239xd-bhj",
            "x-amz-id-2": "m3zvo0cm-5239xd-bhj"
          }
        }
      ]
    }
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部