5.3. 创建基于 Go 的 Operator
Operator SDK 中的 Go 编程语言支持可以利用 Operator SDK 中的 Go 编程语言支持,为 Memcached 构建基于 Go 的 Operator 示例、分布式键值存储并管理其生命周期。
Kubebuilder 作为基于 Go 的 Operator 的构建解决方案嵌入 Operator SDK 中。
5.3.1. 使用 Operator SDK 创建基于 Go 的 Operator
Operator SDK 可简化 Kubernetes 原生应用程序的构建,构建该应用程序原本需要深入掌握特定于应用程序的操作知识。SDK 不仅降低了这一障碍,而且有助于减少许多常见管理功能(如计量或监控)所需的样板代码量。
本流程介绍了使用 SDK 提供的工具和库创建简单 Memcached Operator 的示例。
先决条件
- 开发工作站上安装 operator SDK v0.19.4 CLI
-
基于 Kubernetes 的集群(v1.8 或更高版本,支持
apps/v1beta2
API 组,如 OpenShift Container Platform 4.6)上已安装 operator Lifecycle Manager(OLM) -
使用具有
cluster-admin
权限的账户访问该集群 -
已安装 OpenShift CLI(
oc
)v4.6+
流程
创建 Operator 项目:
为项目创建一个目录:
$ mkdir -p $HOME/projects/memcached-operator
进入该目录:
$ cd $HOME/projects/memcached-operator
激活对 Go 模块的支持:
$ export GO111MODULE=on
运行
operator-sdk init
命令以初始化项目:$ operator-sdk init \ --domain=example.com \ --repo=github.com/example-inc/memcached-operator
注意operator-sdk init
命令默认使用go.kubebuilder.io/v2
插件。
更新 Operator 以使用支持的镜像:
在项目根级别 Dockerfile 中,更改默认运行程序镜像引用:
FROM gcr.io/distroless/static:nonroot
改为:
FROM registry.access.redhat.com/ubi8/ubi-minimal:latest
-
根据 Go 项目版本,您的 Dockerfile 可能包含
USER 65532:65532
或USER nonroot:nonroot
指令。在这两种情况下,删除该行,因为受支持的 runner 镜像不需要该行。 在
config/default/manager_auth_proxy_patch.yaml
文件中,修改image
值:gcr.io/kubebuilder/kube-rbac-proxy:<tag>
使用支持的镜像:
registry.redhat.io/openshift4/ose-kube-rbac-proxy:v4.6
通过替换以下行,更新 Makefile 中的
test
目标,以安装后续构建过程中所需的依赖项:例 5.1. 现有
test
目标test: generate fmt vet manifests go test ./... -coverprofile cover.out
使用以下行:
例 5.2. 更新了
test
目标ENVTEST_ASSETS_DIR=$(shell pwd)/testbin test: manifests generate fmt vet ## Run tests. mkdir -p ${ENVTEST_ASSETS_DIR} test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out
创建自定义资源定义(CRD)API 和控制器:
运行以下命令创建带有组
cache
、版本v1
和种类Memcached
的 API:$ operator-sdk create api \ --group=cache \ --version=v1 \ --kind=Memcached
提示时,输入
y
来创建资源和控制器:Create Resource [y/n] y Create Controller [y/n] y
输出示例
Writing scaffold for you to edit... api/v1/memcached_types.go controllers/memcached_controller.go ...
此过程在
api/v1/memcached_types.go
和controllers/memcached_controller.go
上生成 Memcached 资源 API。修改
api/v1/memcached_types.go
中的 Go 类型定义,使其具有以下spec
和status
:// MemcachedSpec defines the desired state of Memcached type MemcachedSpec struct { // +kubebuilder:validation:Minimum=0 // Size is the size of the memcached deployment Size int32 `json:"size"` } // MemcachedStatus defines the observed state of Memcached type MemcachedStatus struct { // Nodes are the names of the memcached pods Nodes []string `json:"nodes"` }
添加
+kubebuilder:subresource:status
marker,将status
子资源添加到 CRD 清单中:// Memcached is the Schema for the memcacheds API // +kubebuilder:subresource:status 1 type Memcached struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec MemcachedSpec `json:"spec,omitempty"` Status MemcachedStatus `json:"status,omitempty"` }
- 1
- 添加这一行。
这可让控制器在不更改剩余的 CR 对象的情况下更新 CR 状态。
为资源类型更新生成的代码:
$ make generate
提示在修改了
*_types.go
文件后,您必须运行make generate
命令来更新该资源类型生成的代码。以上 Makefile 目标调用
controller-gen
程序来更新api/v1/zz_generated.deepcopy.go
文件。这样可确保您的 API Go 类型定义实现了runtime.Object
接口,所有Kind
类型都必须实现。
生成和更新 CRD 清单:
$ make manifests
此 Makefile 目标调用
controller-gen
实用程序在config/crd/bases/cache.example.com_memcacheds.yaml
文件中生成 CRD 清单。可选:将自定义验证添加到 CRD 中。
当生成清单时,OpenAPI v3.0 模式会添加到
spec.validation
块中的 CRD 清单中。此验证块允许 Kubernetes 在Memcached
自定义资源(CR)创建或更新时验证其中的属性。作为 Operator 作者,您可以使用类似于注解的单行注释(称为 Kubebuilder markers )来配置 API 的自定义验证。这些标记必须始终具有
+kubebuilder:validation
前缀。例如,可以通过添加以下标记来添加一个 enum-type 规格:// +kubebuilder:validation:Enum=Lion;Wolf;Dragon type Alias string
API 代码中的标记用法会在 Kubebuilder 生成 CRD 和用于配置/代码生成的标记文档中讨论。Kubebuilder CRD 验证文档还提供了 OpenAPIv3 验证标记的完整列表。
如果添加任何自定义验证,请运行以下命令来更新 CRD 的 OpenAPI 验证部分:
$ make manifests
在创建新 API 和控制器后,您可以实现控制器逻辑。在本例中,将生成的控制器文件
controllers/memcached_controller.go
替换为以下示例实现:例 5.3.
memcached_controller.go
示例/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "reflect" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" cachev1 "github.com/example-inc/memcached-operator/api/v1" ) // MemcachedReconciler reconciles a Memcached object type MemcachedReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list; func (r *MemcachedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("memcached", req.NamespacedName) // Fetch the Memcached instance memcached := &cachev1.Memcached{} err := r.Get(ctx, req.NamespacedName, memcached) if err != nil { if errors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. // Return and don't requeue log.Info("Memcached resource not found. Ignoring since object must be deleted") return ctrl.Result{}, nil } // Error reading the object - requeue the request. log.Error(err, "Failed to get Memcached") return ctrl.Result{}, err } // Check if the deployment already exists, if not create a new one found := &appsv1.Deployment{} err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found) if err != nil && errors.IsNotFound(err) { // Define a new deployment dep := r.deploymentForMemcached(memcached) log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name) err = r.Create(ctx, dep) if err != nil { log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name) return ctrl.Result{}, err } // Deployment created successfully - return and requeue return ctrl.Result{Requeue: true}, nil } else if err != nil { log.Error(err, "Failed to get Deployment") return ctrl.Result{}, err } // Ensure the deployment size is the same as the spec size := memcached.Spec.Size if *found.Spec.Replicas != size { found.Spec.Replicas = &size err = r.Update(ctx, found) if err != nil { log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name) return ctrl.Result{}, err } // Spec updated - return and requeue return ctrl.Result{Requeue: true}, nil } // Update the Memcached status with the pod names // List the pods for this memcached's deployment podList := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(memcached.Namespace), client.MatchingLabels(labelsForMemcached(memcached.Name)), } if err = r.List(ctx, podList, listOpts...); err != nil { log.Error(err, "Failed to list pods", "Memcached.Namespace", memcached.Namespace, "Memcached.Name", memcached.Name) return ctrl.Result{}, err } podNames := getPodNames(podList.Items) // Update status.Nodes if needed if !reflect.DeepEqual(podNames, memcached.Status.Nodes) { memcached.Status.Nodes = podNames err := r.Status().Update(ctx, memcached) if err != nil { log.Error(err, "Failed to update Memcached status") return ctrl.Result{}, err } } return ctrl.Result{}, nil } // deploymentForMemcached returns a memcached Deployment object func (r *MemcachedReconciler) deploymentForMemcached(m *cachev1.Memcached) *appsv1.Deployment { ls := labelsForMemcached(m.Name) replicas := m.Spec.Size dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: m.Name, Namespace: m.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: ls, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: ls, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ Image: "memcached:1.4.36-alpine", Name: "memcached", Command: []string{"memcached", "-m=64", "-o", "modern", "-v"}, Ports: []corev1.ContainerPort{{ ContainerPort: 11211, Name: "memcached", }}, }}, }, }, }, } // Set Memcached instance as the owner and controller ctrl.SetControllerReference(m, dep, r.Scheme) return dep } // labelsForMemcached returns the labels for selecting the resources // belonging to the given memcached CR name. func labelsForMemcached(name string) map[string]string { return map[string]string{"app": "memcached", "memcached_cr": name} } // getPodNames returns the pod names of the array of pods passed in func getPodNames(pods []corev1.Pod) []string { var podNames []string for _, pod := range pods { podNames = append(podNames, pod.Name) } return podNames } func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1.Memcached{}). Owns(&appsv1.Deployment{}). Complete(r) }
示例控制器为每个
Memcached
CR 运行以下协调逻辑:- 如果尚无 Memcached 部署,创建一个。
-
确保部署大小与
Memcached
CR spec 指定的大小相同。 -
使用
memcached
Pod 的名称更新Memcached
CR 状态。
接下来的两个子步骤检查控制器如何监视资源以及协调循环的触发方式。您可跳过这些步骤,直接构建和运行 Operator。
检查
controllers/memcached_controller.go
文件中的控制器实施,以查看控制器如何监视资源。SetupWithManager()
函数指定如何构建控制器来监控 CR 以及由该控制器拥有和管理的其他资源:例 5.4.
SetupWithManager()
功能import ( ... appsv1 "k8s.io/api/apps/v1" ... ) func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1.Memcached{}). Owns(&appsv1.Deployment{}). Complete(r) }
NewControllerManagedBy()
提供了一个控制器构建器,它允许各种控制器配置。for(&cachev1.Memcached{})
将Memcached
类型指定为要监视的主要资源。对于Memcached
类型的每个 Add、Update 或 Delete 事件,协调循环都会为该Memcached
对象发送一个协调Request
参数,其中包括命名空间和名称键。owns(&appsv1.Deployment{})
将Deployment
类型指定为要监视的辅助资源。对于Deployment
类型的每个 Add、Update 或 Delete 事件,事件处理程序会将每个事件映射到部署所有者的协调请求。在本例中,所有者是创建部署的Memcached
对象。每个控制器都有一个协调器对象,它带有实现了协调循环的
Reconcile()
方法。协调循环通过Request
参数传递,该参数是从缓存中查找主资源对象Memcached
的命名空间和名称键:例 5.5. 协调循环
import ( ctrl "sigs.k8s.io/controller-runtime" cachev1 "github.com/example-inc/memcached-operator/api/v1" ... ) func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // Lookup the Memcached instance for this reconcile request memcached := &cachev1.Memcached{} err := r.Get(ctx, req.NamespacedName, memcached) ... }
根据
Reconcile()
函数的返回值,协调Request
可能会重新排队,且可能会再次触发循环:例 5.6. 重排队列的逻辑
// Reconcile successful - don't requeue return reconcile.Result{}, nil // Reconcile failed due to error - requeue return reconcile.Result{}, err // Requeue for any reason other than error return reconcile.Result{Requeue: true}, nil
您可以将
Result.RequeueAfter
设置为在宽限期后重排队列请求:例 5.7. 宽限期后重排队列
import "time" // Reconcile for any reason other than an error after 5 seconds return ctrl.Result{RequeueAfter: time.Second*5}, nil
注意您可以返回带有
RequeueAfter
设置的Result
来定期协调一个 CR。有关协调器、客户端并与资源事件交互的更多信息,请参阅 Controller Runtime Client API 文档。
其它资源
- 如需有关 CRD 中的 OpenAPI v3.0 验证模式的更多信息,请参阅 Kubernetes 文档。