kubernetes scheduler源码阅读

scheduler

k8s 的分配器,给pod分配node.主要目的是通过阅读源码了解pod的分配"规则"/"优先级",以及如何自己定制 的开发自己的"规则"/"优先级";理解其运行原理及数据流;以及类之间的关系理解其设计,学习其架构 .阅读时有几个关键点 controller模式/podQueue,schedulerCache作用/pod状态变化 (init/assumed/add) /"规则","优先级".

1.程序主流程

1.1.入口

程序入口在plugin/cmd的scheduler.go,进入run方法后会有如下代码,

    //创建k8s客户端
  kubecli, err := createClient(s)
  if err != nil {
    return fmt.Errorf("unable to create kube client: %v", err)
    }
    //创建事件写入客户端(api,event)
  recorder := createRecorder(kubecli, s)

1.2.客户端创建

接下来这一段代码就是前面提到的 controller模式->SharedIndexInformer.其次pkg/client/informers/informers_generated/externalversion(外部调用),internalversion(内部调用) 进到里面,每一个包的结构都一致,外面是interface,方法为可用的版本号(v1,batev1),再里面是具体的实现,而实现都是组合了sharedindexinformer[HTML_REMOVED],然后提供一个创建informer的方法,这个方法递交给factory来进行统一创建;其实看到这里感到很困惑,这里有大量的重复代码,有必要这样吗?是不是有更好的实现,其实通过文件名generate就可以猜测到,这个文件夹应该是自动生成的.果然在cmd/libs/go2idl/包里面有着各种各样的自动生成代码类.


以下是我截取的informer的部分目录结构,可以看到类之间的关系,factory定义了各个模块的获取方法 如:core/app等,然后及是各个模块的定义自有的版本号如v1,batev1等,版本号文件夹就是功能的具体实现 ,中间模块版本号都是分开的,而最终的xxxInformer 里面的Informer方法都调用factory informerFor方法各自的shareIndexInformer对象注册到 factory informer列表中 ,这样即从结构上区分开了各个功能模块及版本,而最终由一个factory来创建,代码结构清晰且创建过程集中不臃肿.

├── externalversions
│   ├── core
│   │   ├── BUILD
│   │   ├── interface.go
│   │   └── v1
│   │       ├── BUILD
│   │       ├── componentstatus.go
│   │       ├── configmap.go
│   │       ├── endpoints.go
│   │       ├── event.go
│   │       ├── interface.go
│   │       ├── limitrange.go
│   │       ├── namespace.go
│   │       ├── node.go
│   │       ├── persistentvolumeclaim.go
│   │       ├── persistentvolume.go
│   │       ├── pod.go
│   │       ├── podtemplate.go
│   │       ├── replicationcontroller.go
│   │       ├── resourcequota.go
│   │       ├── secret.go
│   │       ├── serviceaccount.go
│   │       └── service.go
│   ├── factory.go
    informerFactory := informers.NewSharedInformerFactory(kubecli, 0)

    sched, err := CreateScheduler(
        s,
        kubecli,
        informerFactory.Core().V1().Nodes(),
        informerFactory.Core().V1().PersistentVolumes(),
        informerFactory.Core().V1().PersistentVolumeClaims(),
        informerFactory.Core().V1().ReplicationControllers(),
        informerFactory.Extensions().V1beta1().ReplicaSets(),
        informerFactory.Apps().V1beta1().StatefulSets(),
        informerFactory.Core().V1().Services(),
        recorder,
    )

至此,event事件创建器, nodes,pv,pvc,rc,replicasets.statefulset,sv 监听都已经准备好了,只要各个infomer 执行了Informer()方法, 其监听器就会创建 并放入 factory的informer列表中,最终factory.Run来启动这些sharedindexinformer, 再注册handler事件来处理这些监听器所产生的缓存器内容,或者是通过Listner来获取其索引信息(计算策略时这样使用)

注意:从后面的初始化过程看来,只有node显示的使用了nodeInformer.Informer()方法并添加了handler,而其他的都是通过 Listener这个方法,隐藏的调用了Informer()方法.

3.进入到CreateScheduler方法里面

func CreateScheduler(
    s *options.SchedulerServer,
    kubecli *clientset.Clientset,
    nodeInformer coreinformers.NodeInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer extensionsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
    configurator := factory.NewConfigFactory(
        s.SchedulerName,
        kubecli,
        nodeInformer,
        pvInformer,
        pvcInformer,
        replicationControllerInformer,
        replicaSetInformer,
        statefulSetInformer,
        serviceInformer,
        s.HardPodAffinitySymmetricWeight,
    )

    // Rebuild the configurator with a default Create(...) method.
    configurator = &schedulerConfigurator{
        configurator,
        s.PolicyConfigFile,
        s.AlgorithmProvider,
        s.PolicyConfigMapName,
        s.PolicyConfigMapNamespace,
        s.UseLegacyPolicyConfig,
    }

    return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
        cfg.Recorder = recorder
    })
}

这里似乎信息量并不多,只知道最终创建了scheduler对象 ;接下来分别进入factory.NewConfigFactory ,scheduler.NewFromConfigurator

func NewConfigFactory(
    schedulerName string,
    client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer extensionsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    hardPodAffinitySymmetricWeight int,
) scheduler.Configurator {
    stopEverything := make(chan struct{})
    schedulerCache := schedulercache.New(30*time.Second, stopEverything)
    c := &ConfigFactory{
        client:                         client,
        podLister:                      schedulerCache,
        podQueue:                       cache.NewFIFO(cache.MetaNamespaceKeyFunc),
        pVLister:                       pvInformer.Lister(),
        pVCLister:                      pvcInformer.Lister(),
        serviceLister:                  serviceInformer.Lister(),
        controllerLister:               replicationControllerInformer.Lister(),
        replicaSetLister:               replicaSetInformer.Lister(),
        statefulSetLister:              statefulSetInformer.Lister(),
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  schedulerName,
        hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
    }

    // On add/delete to the scheduled pods, remove from the assumed pods.
    // We construct this here instead of in CreateFromKeys because
    // ScheduledPodLister is something we provide to plug in functions that
    // they may need to call.
    var scheduledPodIndexer cache.Indexer
    scheduledPodIndexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
        c.createAssignedNonTerminatedPodLW(),
        &v1.Pod{},
        0,
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addPodToCache,
            UpdateFunc: c.updatePodInCache,
            DeleteFunc: c.deletePodFromCache,
        },
        cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    )
    c.scheduledPodLister = corelisters.NewPodLister(scheduledPodIndexer)

    // Only nodes in the "Ready" condition with status == "True" are schedulable
    nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addNodeToCache,
            UpdateFunc: c.updateNodeInCache,
            DeleteFunc: c.deleteNodeFromCache,
        },
        0,
    )
    c.nodeLister = nodeInformer.Lister()

    // TODO(harryz) need to fill all the handlers here and below for equivalence cache

    return c
}

schedulercache.New(30*time.Second, stopEverything)这一句进去看到,创建了schedulerCache类,这个类很关键,其中几个重要的成员变量,assumedPods,podStates,nodes,可以看到缓存了pod及node信息,但是为什么有2个pod信息,以下是代码中给出的状态图,可以这么理解: 所有Running ,pod都处于add状态,这时可以更新,删除,而所有未分配的pod叫Init状态, 分配好了后-->到了assume状态,可以变为add/forget/expire但是不能删除;问题来了,在什么时机assumed -->add/forget/finishing 然后expired不用及继续往下看这里只是对schedulercache的初始化

** 来解释一下这个状态的变化首先
Initial(未分配podQueue) --Pop(),Scheduler()--> assumed --bingding-成功-> finishing(最终expired从队列中remove) --> successEvent
                                                 |
                                                 ----binding-失败-> forget(remove)<最终可能被backOff机制或是监听事件重新放入podQueue重复这个过程>
                                                 这时候pod如果启动成功变为Running状态,就会被Watch到,如果没有过期此时assumed状态变为add状态
//   +-------------------------------------------+  +----+
//   |                            Add            |  |    |
//   |                                           |  |    | Update
//   +      Assume                Add            v  v    |
//Initial +--------> Assumed +------------+---> Added <--+
//   ^                +   +               |       +
//   |                |   |               |       |
//   |                |   |           Add |       | Remove
//   |                |   |               |       |
//   |                |   |               +       |
//   +----------------+   +-----------> Expired   +----> Deleted
//         Forget             Expire
//

下一步是初始化ConfigFactory; - podListener/schedulerCache都引用了刚刚创建的schedulercache, - podQueue为FIFO的缓存器在之前的controller中可以知道FIFO是一个缓存器(一个周期内,同一个对象会被处理一次,最新的那个版本,而且不会处理删除操作<先add/delete这种情况是不会被处理的> - 各种Lisener,使用各种Informer.Listener()返回,这里就是各个Informer()创建sharedIndexInformer注册到了factory,并在这里返回其索引再下一步cache.NewIndexerInformer 是不是很熟悉,依然是controller里面一个公共方法,创建一个Controller,并返回其索引再看传入的参数c.createAssignedNonTerminatedPodLW 从字面上理解 已分配的终端的pod但是还没有成功或是失败的 ,另一个参数ResourceEventHandlerFuncs 传入的是 前面初始化的ConfigFactory.add/update/deleteToCache,其实就是将这些没有分配的pod进行 add/add-->update/add-->delete/assumed-->add/这样的操作 然后将返回的索引赋值给c.scheduledPodLister:已经分配的pod列表. 再下一步就是nodeInformer的初始化,注册到factory中,并传入handler,而这些handler实际上就是操作schedulerCache里面的node信息的 至此,schedulerConfig就创建完成了; 总结一下:

  • 1.创建schedulerCache:用于保存node和进行过分配的pod信息
  • 2.各个informer的创建及注册到factory过程,但是只有node有处理操作(对schedulerCache node进行操作)
  • 3.创建一个Controller模式,来监听已经分配的但未成功的pod ,将其存入schedulerCache里面
  • 4.初始化 podQueue (FIFO存储器<处理最新的一次,且忽略delete操作>)

接下来是回到CreateScheduler方法的 接下来先理清一下几个类的关系,ConfigFactory实现了Configurator接口,而schedulerConfigurator组合了ConfigFactory但是没有去实现Configurator接口,它和 ConfigFactory关系更像是继承关系. 所以接下来的调用过程scheduler.NewFromConfigurator-->schedulerConfigurator.create()-->schedulerConfigurator.CreateFromConfig()-->ConfigFactory.CreateFromConfig()

    f.Run()
    // TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
    algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
    podBackoff := util.CreateDefaultPodBackoff()
    return &scheduler.Config{
        SchedulerCache: f.schedulerCache,
        // The scheduler only needs to consider schedulable nodes.
        NodeLister:          &nodePredicateLister{f.nodeLister},
        Algorithm:           algo,
        Binder:              &binder{f.client},
        PodConditionUpdater: &podConditionUpdater{f.client},
        NextPod: func() *v1.Pod {
            return f.getNextPod()
        },
        Error:          f.MakeDefaultErrorFunc(podBackoff, f.podQueue),
        StopEverything: f.StopEverything,
    }, nil

create()时回去加载 策略信息,可以通过configMap/policyFile两种方式进行加载 最终都到了CreateFromKeys(),f.Run这里 最开始的configfactory进行了run可以进去看一下

第一行,又是很熟悉的东西,这里再次使用Controller模式里面的reflect生产者,监听没有被分配过的pod 将其存入podQueue. 第二行,就是之前监听的 已经分配过的Controller进行Run的操作

cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.podQueue, 0).RunUntil(f.StopEverything)

    // Begin populating scheduled pods.
    go f.scheduledPodPopulator.Run(f.StopEverything)

回到CreateFromKeys(),接下来就是初始化策略分配器,和scheduler参数,这里注意一点,就是backOff这里, 这个方法会一直请查询queue里面的pod,防止分配失败的pod丢失

至此,shceduler就初始化完成了,接下来startHTTP,scheduler.Run从这个run方法可以知道,这里就是podQueue的消费地方,然后通过策略器分配后将 pod置为assumed状态然后调用api进行binding操作,成功后置为finishing状态<等待30s过期,过期了此条记录生命周期完结>然后写入success event

2. pod分配规则及优先级分数

后面来讲讲,策略器按照什么算法进行分配pod的

首先在configurator里面引入了下面的包 _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" 进去看只有一行代码:

import (
    // Import defaults of algorithmprovider for initialization.
    _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)

default中有一个init()函数,以及一些常量

  • 1.首先通过查询已经分配的pod索引来加载已经在使用的predicate/prioties
  • 2.其次注册默认的predicates<可以看到之前监听的pvc信息在这里被使用>
  • 2.1 NoVolumeZoneConflict: 没有卷区域冲突,翻译注释的意思是 >VolumeZonePredicate评估一个pod是否适合,因为它要求的卷 某些卷可能具有区域调度约束。 要求是任何卷区域标签必须与节点上的等效区域标签相匹配。 节点具有更多区域标签约束(例如,假设的复制卷可能允许区域范围的访问)是可以的。目前,这仅支持PersistentVolumeClaims,并且仅在绑定的PersistentVolume上查找标签。 使用在pod规范中内联声明的卷(即不使用PersistentVolume)可能更难,因为它需要在调度期间确定卷的区域,并且可能需要呼叫到云提供商。 似乎我们正在远离内联卷声明。

  • 2.2 MaxEBSVolumeCount:最大的

  • 2.3 MaxGCEPDVolumeCount
  • 2.4 MaxAzureDiskVolumeCount
  • 2.5 MatchInterPodAffinity
  • 2.6 NoDiskConflict
  • 2.7 GeneralPredicates
  • 2.8 PodToleratesNodeTaints
  • 2.9 CheckNodeMemoryPressure
  • 2.10 CheckNodeDiskPressure
  • 3.注册默认的Priorities/predicates -->AlgorithmProviderConfig --> algorithmProviderMap["default"]
  • 3.1 SelectorSpreadPriority
  • 3.2 InterPodAffinityPriority
  • 3.3 LeastRequestedPriority
  • 3.4 BalancedResourceAllocation
  • 3.5 NodePreferAvoidPodsPriority
  • 3.6 NodeAffinityPriority
  • 3.7 TaintTolerationPriority
  • 4.注册自动适配器Priorities/predicates -->AlgorithmProviderConfig --> algorithmProviderMap["ClusterAutoscalerProvider"]
  • 5.注册默认的fitPredicateMap
  • 5.1 PodFitsPorts
  • 5.2 PodFitsHostPorts
  • 5.3 PodFitsResources
  • 5.4 HostName
  • 5.5 MatchNodeSelector
  • 6.注册默认的priorityFunctionMap
  • 6.1 ServiceSpreadingPriority
  • 6.2 EqualPriority
  • 6.3 ImageLocalityPriority
  • 6.4 MostRequestedPriority

这些信息都初始化到了 plugin里面的常量里面去了,另外plugin更像是策略包和factory之间的适配器,负责它们之间的数据 传递.我们暂且把 predicates定义为"规则":满足规则的node才能被分配,Priorities定义为"优先级",通过这些优先级 来计算满足"规则"节点所得的优先级分数,最终根据分数来确定绑定的node.在这里可以自定义一些特殊需求的"规则"/'优先级" 并在 init函数里面注册到plugin常量池里面去,以达到二次开发目的.

入口

algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)

3. 二次开发:自定义规则及优先级算法

4.类图

4.1 nodeInformer类图

notify

4.2 scheduler config类图

notify

最后把nodeInformer类图及 scheduler配置主要类图贴出来