5.3. 创建基于 Go 的 Operator
Operator SDK 中的 Go 编程语言支持可以利用 Operator SDK 中的 Go 编程语言支持,为 Memcached 构建基于 Go 的 Operator 示例、分布式键值存储并管理其生命周期。
Kubebuilder 作为基于 Go 的 Operator 的构建解决方案嵌入 Operator SDK 中。
5.3.1. 使用 Operator SDK 创建基于 Go 的 Operator 复制链接链接已复制到粘贴板!
Operator SDK 可简化 Kubernetes 原生应用程序的构建,构建该应用程序原本需要深入掌握特定于应用程序的操作知识。SDK 不仅降低了这一障碍,而且有助于减少许多常见管理功能(如计量或监控)所需的样板代码量。
本流程介绍了使用 SDK 提供的工具和库创建简单 Memcached Operator 的示例。
先决条件
- 开发工作站上安装 operator SDK v0.19.4 CLI
-
基于 Kubernetes 的集群(v1.8 或更高版本,支持
apps/v1beta2API 组,如 OpenShift Container Platform 4.6)上已安装 operator Lifecycle Manager(OLM) -
使用具有
cluster-admin权限的账户访问该集群 -
已安装 OpenShift CLI(
oc)v4.6+
流程
创建 Operator 项目:
为项目创建一个目录:
$ mkdir -p $HOME/projects/memcached-operator进入该目录:
$ cd $HOME/projects/memcached-operator激活对 Go 模块的支持:
$ export GO111MODULE=on运行
operator-sdk init命令以初始化项目:$ operator-sdk init \ --domain=example.com \ --repo=github.com/example-inc/memcached-operator注意operator-sdk init命令默认使用go.kubebuilder.io/v2插件。
更新 Operator 以使用支持的镜像:
在项目根级别 Dockerfile 中,更改默认运行程序镜像引用:
FROM gcr.io/distroless/static:nonroot改为:
FROM registry.access.redhat.com/ubi8/ubi-minimal:latest-
根据 Go 项目版本,您的 Dockerfile 可能包含
USER 65532:65532或USER nonroot:nonroot指令。在这两种情况下,删除该行,因为受支持的 runner 镜像不需要该行。 在
config/default/manager_auth_proxy_patch.yaml文件中,修改image值:gcr.io/kubebuilder/kube-rbac-proxy:<tag>使用支持的镜像:
registry.redhat.io/openshift4/ose-kube-rbac-proxy:v4.6
通过替换以下行,更新 Makefile 中的
test目标,以安装后续构建过程中所需的依赖项:例 5.1. 现有
test目标test: generate fmt vet manifests go test ./... -coverprofile cover.out使用以下行:
例 5.2. 更新了
test目标ENVTEST_ASSETS_DIR=$(shell pwd)/testbin test: manifests generate fmt vet ## Run tests. mkdir -p ${ENVTEST_ASSETS_DIR} test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out创建自定义资源定义(CRD)API 和控制器:
运行以下命令创建带有组
cache、版本v1和种类Memcached的 API:$ operator-sdk create api \ --group=cache \ --version=v1 \ --kind=Memcached提示时,输入
y来创建资源和控制器:Create Resource [y/n] y Create Controller [y/n] y输出示例
Writing scaffold for you to edit... api/v1/memcached_types.go controllers/memcached_controller.go ...此过程在
api/v1/memcached_types.go和controllers/memcached_controller.go上生成 Memcached 资源 API。修改
api/v1/memcached_types.go中的 Go 类型定义,使其具有以下spec和status:// MemcachedSpec defines the desired state of Memcached type MemcachedSpec struct { // +kubebuilder:validation:Minimum=0 // Size is the size of the memcached deployment Size int32 `json:"size"` } // MemcachedStatus defines the observed state of Memcached type MemcachedStatus struct { // Nodes are the names of the memcached pods Nodes []string `json:"nodes"` }添加
+kubebuilder:subresource:statusmarker,将status子资源添加到 CRD 清单中:// Memcached is the Schema for the memcacheds API // +kubebuilder:subresource:status1 type Memcached struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec MemcachedSpec `json:"spec,omitempty"` Status MemcachedStatus `json:"status,omitempty"` }- 1
- 添加这一行。
这可让控制器在不更改剩余的 CR 对象的情况下更新 CR 状态。
为资源类型更新生成的代码:
$ make generate提示在修改了
*_types.go文件后,您必须运行make generate命令来更新该资源类型生成的代码。以上 Makefile 目标调用
controller-gen程序来更新api/v1/zz_generated.deepcopy.go文件。这样可确保您的 API Go 类型定义实现了runtime.Object接口,所有Kind类型都必须实现。
生成和更新 CRD 清单:
$ make manifests此 Makefile 目标调用
controller-gen实用程序在config/crd/bases/cache.example.com_memcacheds.yaml文件中生成 CRD 清单。可选:将自定义验证添加到 CRD 中。
当生成清单时,OpenAPI v3.0 模式会添加到
spec.validation块中的 CRD 清单中。此验证块允许 Kubernetes 在Memcached自定义资源(CR)创建或更新时验证其中的属性。作为 Operator 作者,您可以使用类似于注解的单行注释(称为 Kubebuilder markers )来配置 API 的自定义验证。这些标记必须始终具有
+kubebuilder:validation前缀。例如,可以通过添加以下标记来添加一个 enum-type 规格:// +kubebuilder:validation:Enum=Lion;Wolf;Dragon type Alias stringAPI 代码中的标记用法会在 Kubebuilder 生成 CRD 和用于配置/代码生成的标记文档中讨论。Kubebuilder CRD 验证文档还提供了 OpenAPIv3 验证标记的完整列表。
如果添加任何自定义验证,请运行以下命令来更新 CRD 的 OpenAPI 验证部分:
$ make manifests
在创建新 API 和控制器后,您可以实现控制器逻辑。在本例中,将生成的控制器文件
controllers/memcached_controller.go替换为以下示例实现:例 5.3.
memcached_controller.go示例/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "reflect" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" cachev1 "github.com/example-inc/memcached-operator/api/v1" ) // MemcachedReconciler reconciles a Memcached object type MemcachedReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list; func (r *MemcachedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("memcached", req.NamespacedName) // Fetch the Memcached instance memcached := &cachev1.Memcached{} err := r.Get(ctx, req.NamespacedName, memcached) if err != nil { if errors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. // Return and don't requeue log.Info("Memcached resource not found. Ignoring since object must be deleted") return ctrl.Result{}, nil } // Error reading the object - requeue the request. log.Error(err, "Failed to get Memcached") return ctrl.Result{}, err } // Check if the deployment already exists, if not create a new one found := &appsv1.Deployment{} err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found) if err != nil && errors.IsNotFound(err) { // Define a new deployment dep := r.deploymentForMemcached(memcached) log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name) err = r.Create(ctx, dep) if err != nil { log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name) return ctrl.Result{}, err } // Deployment created successfully - return and requeue return ctrl.Result{Requeue: true}, nil } else if err != nil { log.Error(err, "Failed to get Deployment") return ctrl.Result{}, err } // Ensure the deployment size is the same as the spec size := memcached.Spec.Size if *found.Spec.Replicas != size { found.Spec.Replicas = &size err = r.Update(ctx, found) if err != nil { log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name) return ctrl.Result{}, err } // Spec updated - return and requeue return ctrl.Result{Requeue: true}, nil } // Update the Memcached status with the pod names // List the pods for this memcached's deployment podList := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(memcached.Namespace), client.MatchingLabels(labelsForMemcached(memcached.Name)), } if err = r.List(ctx, podList, listOpts...); err != nil { log.Error(err, "Failed to list pods", "Memcached.Namespace", memcached.Namespace, "Memcached.Name", memcached.Name) return ctrl.Result{}, err } podNames := getPodNames(podList.Items) // Update status.Nodes if needed if !reflect.DeepEqual(podNames, memcached.Status.Nodes) { memcached.Status.Nodes = podNames err := r.Status().Update(ctx, memcached) if err != nil { log.Error(err, "Failed to update Memcached status") return ctrl.Result{}, err } } return ctrl.Result{}, nil } // deploymentForMemcached returns a memcached Deployment object func (r *MemcachedReconciler) deploymentForMemcached(m *cachev1.Memcached) *appsv1.Deployment { ls := labelsForMemcached(m.Name) replicas := m.Spec.Size dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: m.Name, Namespace: m.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: ls, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: ls, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ Image: "memcached:1.4.36-alpine", Name: "memcached", Command: []string{"memcached", "-m=64", "-o", "modern", "-v"}, Ports: []corev1.ContainerPort{{ ContainerPort: 11211, Name: "memcached", }}, }}, }, }, }, } // Set Memcached instance as the owner and controller ctrl.SetControllerReference(m, dep, r.Scheme) return dep } // labelsForMemcached returns the labels for selecting the resources // belonging to the given memcached CR name. func labelsForMemcached(name string) map[string]string { return map[string]string{"app": "memcached", "memcached_cr": name} } // getPodNames returns the pod names of the array of pods passed in func getPodNames(pods []corev1.Pod) []string { var podNames []string for _, pod := range pods { podNames = append(podNames, pod.Name) } return podNames } func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1.Memcached{}). Owns(&appsv1.Deployment{}). Complete(r) }示例控制器为每个
MemcachedCR 运行以下协调逻辑:- 如果尚无 Memcached 部署,创建一个。
-
确保部署大小与
MemcachedCR spec 指定的大小相同。 -
使用
memcachedPod 的名称更新MemcachedCR 状态。
接下来的两个子步骤检查控制器如何监视资源以及协调循环的触发方式。您可跳过这些步骤,直接构建和运行 Operator。
检查
controllers/memcached_controller.go文件中的控制器实施,以查看控制器如何监视资源。SetupWithManager()函数指定如何构建控制器来监控 CR 以及由该控制器拥有和管理的其他资源:例 5.4.
SetupWithManager()功能import ( ... appsv1 "k8s.io/api/apps/v1" ... ) func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1.Memcached{}). Owns(&appsv1.Deployment{}). Complete(r) }NewControllerManagedBy()提供了一个控制器构建器,它允许各种控制器配置。for(&cachev1.Memcached{})将Memcached类型指定为要监视的主要资源。对于Memcached类型的每个 Add、Update 或 Delete 事件,协调循环都会为该Memcached对象发送一个协调Request参数,其中包括命名空间和名称键。owns(&appsv1.Deployment{})将Deployment类型指定为要监视的辅助资源。对于Deployment类型的每个 Add、Update 或 Delete 事件,事件处理程序会将每个事件映射到部署所有者的协调请求。在本例中,所有者是创建部署的Memcached对象。每个控制器都有一个协调器对象,它带有实现了协调循环的
Reconcile()方法。协调循环通过Request参数传递,该参数是从缓存中查找主资源对象Memcached的命名空间和名称键:例 5.5. 协调循环
import ( ctrl "sigs.k8s.io/controller-runtime" cachev1 "github.com/example-inc/memcached-operator/api/v1" ... ) func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // Lookup the Memcached instance for this reconcile request memcached := &cachev1.Memcached{} err := r.Get(ctx, req.NamespacedName, memcached) ... }根据
Reconcile()函数的返回值,协调Request可能会重新排队,且可能会再次触发循环:例 5.6. 重排队列的逻辑
// Reconcile successful - don't requeue return reconcile.Result{}, nil // Reconcile failed due to error - requeue return reconcile.Result{}, err // Requeue for any reason other than error return reconcile.Result{Requeue: true}, nil您可以将
Result.RequeueAfter设置为在宽限期后重排队列请求:例 5.7. 宽限期后重排队列
import "time" // Reconcile for any reason other than an error after 5 seconds return ctrl.Result{RequeueAfter: time.Second*5}, nil注意您可以返回带有
RequeueAfter设置的Result来定期协调一个 CR。有关协调器、客户端并与资源事件交互的更多信息,请参阅 Controller Runtime Client API 文档。