5.6.2.4. 实现控制器


在创建新 API 和控制器后,您可以实现控制器逻辑。

流程

  1. 将以下依赖项附加到 pom.xml 文件中:

        <dependency>
          <groupId>commons-collections</groupId>
          <artifactId>commons-collections</artifactId>
          <version>3.2.2</version>
        </dependency>
  2. 在本例中,将生成的控制器文件 MemcachedReconciler.java 替换为以下示例实现:

    例 5.10. MemcachedReconciler.java 示例

    package com.example;
    
    import io.fabric8.kubernetes.client.KubernetesClient;
    import io.javaoperatorsdk.operator.api.reconciler.Context;
    import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
    import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
    import io.fabric8.kubernetes.api.model.ContainerBuilder;
    import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
    import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
    import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
    import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
    import io.fabric8.kubernetes.api.model.Pod;
    import io.fabric8.kubernetes.api.model.PodSpecBuilder;
    import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder;
    import io.fabric8.kubernetes.api.model.apps.Deployment;
    import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
    import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
    import org.apache.commons.collections.CollectionUtils;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    public class MemcachedReconciler implements Reconciler<Memcached> {
      private final KubernetesClient client;
    
      public MemcachedReconciler(KubernetesClient client) {
        this.client = client;
      }
    
      // TODO Fill in the rest of the reconciler
    
      @Override
      public UpdateControl<Memcached> reconcile(
          Memcached resource, Context context) {
          // TODO: fill in logic
          Deployment deployment = client.apps()
                  .deployments()
                  .inNamespace(resource.getMetadata().getNamespace())
                  .withName(resource.getMetadata().getName())
                  .get();
    
          if (deployment == null) {
              Deployment newDeployment = createMemcachedDeployment(resource);
              client.apps().deployments().create(newDeployment);
              return UpdateControl.noUpdate();
          }
    
          int currentReplicas = deployment.getSpec().getReplicas();
          int requiredReplicas = resource.getSpec().getSize();
    
          if (currentReplicas != requiredReplicas) {
              deployment.getSpec().setReplicas(requiredReplicas);
              client.apps().deployments().createOrReplace(deployment);
              return UpdateControl.noUpdate();
          }
    
          List<Pod> pods = client.pods()
              .inNamespace(resource.getMetadata().getNamespace())
              .withLabels(labelsForMemcached(resource))
              .list()
              .getItems();
    
          List<String> podNames =
              pods.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toList());
    
    
          if (resource.getStatus() == null
                   || !CollectionUtils.isEqualCollection(podNames, resource.getStatus().getNodes())) {
               if (resource.getStatus() == null) resource.setStatus(new MemcachedStatus());
               resource.getStatus().setNodes(podNames);
               return UpdateControl.updateResource(resource);
          }
    
          return UpdateControl.noUpdate();
      }
    
      private Map<String, String> labelsForMemcached(Memcached m) {
        Map<String, String> labels = new HashMap<>();
        labels.put("app", "memcached");
        labels.put("memcached_cr", m.getMetadata().getName());
        return labels;
      }
    
      private Deployment createMemcachedDeployment(Memcached m) {
          Deployment deployment = new DeploymentBuilder()
              .withMetadata(
                  new ObjectMetaBuilder()
                      .withName(m.getMetadata().getName())
                      .withNamespace(m.getMetadata().getNamespace())
                      .build())
              .withSpec(
                  new DeploymentSpecBuilder()
                      .withReplicas(m.getSpec().getSize())
                      .withSelector(
                          new LabelSelectorBuilder().withMatchLabels(labelsForMemcached(m)).build())
                      .withTemplate(
                          new PodTemplateSpecBuilder()
                              .withMetadata(
                                  new ObjectMetaBuilder().withLabels(labelsForMemcached(m)).build())
                              .withSpec(
                                  new PodSpecBuilder()
                                      .withContainers(
                                          new ContainerBuilder()
                                              .withImage("memcached:1.4.36-alpine")
                                              .withName("memcached")
                                              .withCommand("memcached", "-m=64", "-o", "modern", "-v")
                                              .withPorts(
                                                  new ContainerPortBuilder()
                                                      .withContainerPort(11211)
                                                      .withName("memcached")
                                                      .build())
                                              .build())
                                      .build())
                              .build())
                      .build())
              .build();
        deployment.addOwnerReference(m);
        return deployment;
      }
    }

    示例控制器为每个 Memcached 自定义资源(CR)运行以下协调逻辑:

    • 如果尚无 Memcached 部署,请创建一个。
    • 确保部署大小与 Memcached CR spec 指定的大小匹配。
    • 使用 memcached Pod 的名称更新 Memcached CR 状态。

下面的小节解释了示例中的控制器如何监视资源以及如何触发协调循环。您可以跳过这些小节来直接进入运行 Operator

5.6.2.4.1. 协调循环
  1. 每个控制器都有一个协调器对象,它带有实现了协调循环的 Reconcile() 方法。协调循环通过 Deployment 参数,如下例所示:

            Deployment deployment = client.apps()
                    .deployments()
                    .inNamespace(resource.getMetadata().getNamespace())
                    .withName(resource.getMetadata().getName())
                    .get();
  2. 如以下示例所示,如果 Deploymentnull,则需要创建部署。创建 Deployment 后,您可以确定是否需要协调。如果不需要协调,返回 UpdateControl.noUpdate() 的值,否则返回 'UpdateControl.updateStatus(resource)的值:

            if (deployment == null) {
                Deployment newDeployment = createMemcachedDeployment(resource);
                client.apps().deployments().create(newDeployment);
                return UpdateControl.noUpdate();
            }
  3. 获取 Deployment 后,获取当前和所需的副本,如下例所示:

            int currentReplicas = deployment.getSpec().getReplicas();
            int requiredReplicas = resource.getSpec().getSize();
  4. 如果 currentReplicasrequiredReplicas 不匹配,您必须更新 Deployment,如下例所示:

            if (currentReplicas != requiredReplicas) {
                deployment.getSpec().setReplicas(requiredReplicas);
                client.apps().deployments().createOrReplace(deployment);
                return UpdateControl.noUpdate();
            }
  5. 以下示例演示了如何获取 pod 列表及其名称:

            List<Pod> pods = client.pods()
                .inNamespace(resource.getMetadata().getNamespace())
                .withLabels(labelsForMemcached(resource))
                .list()
                .getItems();
    
            List<String> podNames =
                pods.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toList());
  6. 检查资源是否已创建并使用 Memcached 资源验证 pod 名称。如果这两个条件之一都存在不匹配,请执行协调,如下例所示:

            if (resource.getStatus() == null
                    || !CollectionUtils.isEqualCollection(podNames, resource.getStatus().getNodes())) {
                if (resource.getStatus() == null) resource.setStatus(new MemcachedStatus());
                resource.getStatus().setNodes(podNames);
                return UpdateControl.updateResource(resource);
            }
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

关于红帽文档

Legal Notice

Theme

© 2026 Red Hat
返回顶部