kubernetes-controller源码分析

kubernetes-controller源码分析

controller比起scheduler来更容易理解,最为关键的2个模式,infomer/workqueue.controller拆分成了 多个组件(daemon/endpoint....等)的controller.这样的controller设计的模式都类似,都是利用前面 提到的informer模式监控收集相关数据,然后放入到一个缓存队列中去,然后再循环出队一个个处理这些缓存数据 处理出错的会再次入队列(这时入队时会采用令牌桶算法控制速率,延迟入队的做法).整体架构和主要模式知道后 接下来需要关注的就是具体的实现

1.informer

和之前的一样,监听一系列信息的变化,将信息存入limitrateworkqueue中

2.limitrateworkqueue

监听到变化的信息直接存入queue缓存中,controller处理失败后采用延迟入queue的方式缓存 延迟缓存依据令牌算法处理,默认使用的是10qps,100tikect 令牌算法

关键类图结构

3.具体的业务处理

3.1 certificates

3.2 daemon

3.3 deployment

3.2 endpoint

3.2.1入队列过程

从 endpoint/endpoints_controller.go 的方法NewEndpointController()可以看到 endpoint监控service , pod 的变化入队列;具体是对于service add/update/delete 的都会入队列,对于pod add/delete 查询与pod相关的service都入队列,updata查询 新pod/旧pod 相关的service 都入队列

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc: e.enqueueService,
            UpdateFunc: func(old, cur interface{}) {
                e.enqueueService(cur)
            },
            DeleteFunc: e.enqueueService,
        },
        // TODO: Can we have much longer period here?
        FullServiceResyncPeriod,
    )
    e.serviceLister = serviceInformer.Lister()
    e.servicesSynced = serviceInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    e.addPod,
        UpdateFunc: e.updatePod,
        DeleteFunc: e.deletePod,
    })

3.2.2出队列过程

可以看到启动了workers个线程来处理,这个参数由ConcurrentEndpointSyncs控制,默认为5 最后查询所有的endpoint以免遗漏

    defer utilruntime.HandleCrash()
    defer e.queue.ShutDown()

    glog.Infof("Starting endpoint controller")
    defer glog.Infof("Shutting down endpoint controller")

    if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(e.worker, time.Second, stopCh)
    }

    go func() {
        defer utilruntime.HandleCrash()
        time.Sleep(5 * time.Minute) // give time for our cache to fill
        e.checkLeftoverEndpoints()
    }()

    <-stopCh

3.2.3处理过程

  • 首先重新查询service信息,查询不到则删除endpoint
  • Spec.Selector为空则做处理,这里看出service与endpoint的关系是通过该属性还联系的
  • 查询所有pods ,service.Spec.Selector这个属性与pods的关系则体现在pods.Labels.name
  • 组装endpoint
  • 查询旧的endpoint 比较
  • 插入或是更新或是不处理
startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    service, err := e.serviceLister.Services(namespace).Get(name)
    if err != nil {
        // Delete the corresponding endpoint, as the service has been deleted.
        // TODO: Please note that this will delete an endpoint when a
        // service is deleted. However, if we're down at the time when
        // the service is deleted, we will miss that deletion, so this
        // doesn't completely solve the problem. See #6877.
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err))
            // Don't retry, as the key isn't going to magically become understandable.
            return nil
        }
        err = e.client.Core().Endpoints(namespace).Delete(name, nil)
        if err != nil && !errors.IsNotFound(err) {
            return err
        }
        return nil
    }

    if service.Spec.Selector == nil {
        // services without a selector receive no endpoints from this controller;
        // these services will receive the endpoints that are created out-of-band via the REST API.
        return nil
    }

    glog.V(5).Infof("About to update endpoints for service %q", key)
    pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
    if err != nil {
        // Since we're getting stuff from a local cache, it is
        // basically impossible to get this error.
        return err
    }

    subsets := []v1.EndpointSubset{}

    var tolerateUnreadyEndpoints bool
    if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
        b, err := strconv.ParseBool(v)
        if err == nil {
            tolerateUnreadyEndpoints = b
        } else {
            utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
        }
    }

    readyEps := 0
    notReadyEps := 0
    for i := range pods {
        // TODO: Do we need to copy here?
        pod := &(*pods[i])

        for i := range service.Spec.Ports {
            servicePort := &service.Spec.Ports[i]

            portName := servicePort.Name
            portProto := servicePort.Protocol
            portNum, err := podutil.FindPort(pod, servicePort)
            if err != nil {
                glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
                continue
            }
            if len(pod.Status.PodIP) == 0 {
                glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
                continue
            }
            if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
                glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
                continue
            }

            epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
            epa := v1.EndpointAddress{
                IP:       pod.Status.PodIP,
                NodeName: &pod.Spec.NodeName,
                TargetRef: &v1.ObjectReference{
                    Kind:            "Pod",
                    Namespace:       pod.ObjectMeta.Namespace,
                    Name:            pod.ObjectMeta.Name,
                    UID:             pod.ObjectMeta.UID,
                    ResourceVersion: pod.ObjectMeta.ResourceVersion,
                }}

            hostname := pod.Spec.Hostname
            if len(hostname) > 0 &&
                pod.Spec.Subdomain == service.Name &&
                service.Namespace == pod.Namespace {
                epa.Hostname = hostname
            }

            if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
                subsets = append(subsets, v1.EndpointSubset{
                    Addresses: []v1.EndpointAddress{epa},
                    Ports:     []v1.EndpointPort{epp},
                })
                readyEps++
            } else {
                glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
                subsets = append(subsets, v1.EndpointSubset{
                    NotReadyAddresses: []v1.EndpointAddress{epa},
                    Ports:             []v1.EndpointPort{epp},
                })
                notReadyEps++
            }
        }
    }
    subsets = endpoints.RepackSubsets(subsets)

    // See if there's actually an update here.
    currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{})
    if err != nil {
        if errors.IsNotFound(err) {
            currentEndpoints = &v1.Endpoints{
                ObjectMeta: metav1.ObjectMeta{
                    Name:   service.Name,
                    Labels: service.Labels,
                },
            }
        } else {
            return err
        }
    }

    if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
        reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
        glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
        return nil
    }
    newEndpoints := currentEndpoints
    newEndpoints.Subsets = subsets
    newEndpoints.Labels = service.Labels
    if newEndpoints.Annotations == nil {
        newEndpoints.Annotations = make(map[string]string)
    }

    glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, readyEps, notReadyEps)
    createEndpoints := len(currentEndpoints.ResourceVersion) == 0
    if createEndpoints {
        // No previous endpoints, create them
        _, err = e.client.Core().Endpoints(service.Namespace).Create(newEndpoints)
    } else {
        // Pre-existing
        _, err = e.client.Core().Endpoints(service.Namespace).Update(newEndpoints)
    }
    if err != nil {
        if createEndpoints && errors.IsForbidden(err) {
            // A request is forbidden primarily for two reasons:
            // 1. namespace is terminating, endpoint creation is not allowed by default.
            // 2. policy is misconfigured, in which case no service would function anywhere.
            // Given the frequency of 1, we log at a lower level.
            glog.V(5).Infof("Forbidden from creating endpoints: %v", err)
        }
        return err
    }

3.2 garbagecollector

3.2 job

3.2 namespace

3.2 node

3.2 replicaset

3.2 resourcequota

3.2 serviceaccount