5.6.2.4. 实现控制器
在创建新 API 和控制器后,您可以实现控制器逻辑。
流程
将以下依赖项附加到
pom.xml文件中:<dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency>在本例中,将生成的控制器文件
MemcachedReconciler.java替换为以下示例实现:例 5.10.
MemcachedReconciler.java示例package com.example; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerPortBuilder; import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodSpecBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; import org.apache.commons.collections.CollectionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class MemcachedReconciler implements Reconciler<Memcached> { private final KubernetesClient client; public MemcachedReconciler(KubernetesClient client) { this.client = client; } // TODO Fill in the rest of the reconciler @Override public UpdateControl<Memcached> reconcile( Memcached resource, Context context) { // TODO: fill in logic Deployment deployment = client.apps() .deployments() .inNamespace(resource.getMetadata().getNamespace()) .withName(resource.getMetadata().getName()) .get(); if (deployment == null) { Deployment newDeployment = createMemcachedDeployment(resource); client.apps().deployments().create(newDeployment); return UpdateControl.noUpdate(); } int currentReplicas = deployment.getSpec().getReplicas(); int requiredReplicas = resource.getSpec().getSize(); if (currentReplicas != requiredReplicas) { deployment.getSpec().setReplicas(requiredReplicas); client.apps().deployments().createOrReplace(deployment); return UpdateControl.noUpdate(); } List<Pod> pods = client.pods() .inNamespace(resource.getMetadata().getNamespace()) .withLabels(labelsForMemcached(resource)) .list() .getItems(); List<String> podNames = pods.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toList()); if (resource.getStatus() == null || !CollectionUtils.isEqualCollection(podNames, resource.getStatus().getNodes())) { if (resource.getStatus() == null) resource.setStatus(new MemcachedStatus()); resource.getStatus().setNodes(podNames); return UpdateControl.updateResource(resource); } return UpdateControl.noUpdate(); } private Map<String, String> labelsForMemcached(Memcached m) { Map<String, String> labels = new HashMap<>(); labels.put("app", "memcached"); labels.put("memcached_cr", m.getMetadata().getName()); return labels; } private Deployment createMemcachedDeployment(Memcached m) { Deployment deployment = new DeploymentBuilder() .withMetadata( new ObjectMetaBuilder() .withName(m.getMetadata().getName()) .withNamespace(m.getMetadata().getNamespace()) .build()) .withSpec( new DeploymentSpecBuilder() .withReplicas(m.getSpec().getSize()) .withSelector( new LabelSelectorBuilder().withMatchLabels(labelsForMemcached(m)).build()) .withTemplate( new PodTemplateSpecBuilder() .withMetadata( new ObjectMetaBuilder().withLabels(labelsForMemcached(m)).build()) .withSpec( new PodSpecBuilder() .withContainers( new ContainerBuilder() .withImage("memcached:1.4.36-alpine") .withName("memcached") .withCommand("memcached", "-m=64", "-o", "modern", "-v") .withPorts( new ContainerPortBuilder() .withContainerPort(11211) .withName("memcached") .build()) .build()) .build()) .build()) .build()) .build(); deployment.addOwnerReference(m); return deployment; } }示例控制器为每个
Memcached自定义资源(CR)运行以下协调逻辑:- 如果尚无 Memcached 部署,请创建一个。
-
确保部署大小与
MemcachedCR spec 指定的大小匹配。 -
使用
memcachedPod 的名称更新MemcachedCR 状态。
下面的小节解释了示例中的控制器如何监视资源以及如何触发协调循环。您可以跳过这些小节来直接进入运行 Operator。
5.6.2.4.1. 协调循环 复制链接链接已复制到粘贴板!
每个控制器都有一个协调器对象,它带有实现了协调循环的
Reconcile()方法。协调循环通过Deployment参数,如下例所示:Deployment deployment = client.apps() .deployments() .inNamespace(resource.getMetadata().getNamespace()) .withName(resource.getMetadata().getName()) .get();如以下示例所示,如果
Deployment是null,则需要创建部署。创建Deployment后,您可以确定是否需要协调。如果不需要协调,返回UpdateControl.noUpdate()的值,否则返回 'UpdateControl.updateStatus(resource)的值:if (deployment == null) { Deployment newDeployment = createMemcachedDeployment(resource); client.apps().deployments().create(newDeployment); return UpdateControl.noUpdate(); }获取
Deployment后,获取当前和所需的副本,如下例所示:int currentReplicas = deployment.getSpec().getReplicas(); int requiredReplicas = resource.getSpec().getSize();如果
currentReplicas与requiredReplicas不匹配,您必须更新Deployment,如下例所示:if (currentReplicas != requiredReplicas) { deployment.getSpec().setReplicas(requiredReplicas); client.apps().deployments().createOrReplace(deployment); return UpdateControl.noUpdate(); }以下示例演示了如何获取 pod 列表及其名称:
List<Pod> pods = client.pods() .inNamespace(resource.getMetadata().getNamespace()) .withLabels(labelsForMemcached(resource)) .list() .getItems(); List<String> podNames = pods.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toList());检查资源是否已创建并使用 Memcached 资源验证 pod 名称。如果这两个条件之一都存在不匹配,请执行协调,如下例所示:
if (resource.getStatus() == null || !CollectionUtils.isEqualCollection(podNames, resource.getStatus().getNodes())) { if (resource.getStatus() == null) resource.setStatus(new MemcachedStatus()); resource.getStatus().setNodes(podNames); return UpdateControl.updateResource(resource); }