Skip to content

Kubernetes scheduler 源码阅读 #23

@LLLeon

Description

@LLLeon

Kubernetes scheduler 源码阅读

本文基于 Kubernetes 1.20 版本。

Kubernetes 调度器负责将 Pod 调度到集群内的节点上,它监听 API Server,查询还未分配 Node 的 Pod,然后根据调度策略为这些 Pod 分配节点(更新 Pod 的 NodeName 字段)。

0. 调度框架简介

调度框架是 Kubernetes Scheduler 的一种可插入架构,可以简化调度器的自定义。 它向现有的调度器增加了一组新的“插件” API。插件被编译到调度器程序中。 这些 API 允许大多数调度功能以插件的形式实现,同时使调度“核心”保持简单且可维护。

调度框架定义了一些扩展点。调度器插件注册后在一个或多个扩展点处被调用。 这些插件中的一些可以改变调度决策,而另一些仅用于提供信息。

每次调度一个 Pod 的尝试都分为两个阶段,即 调度周期绑定周期

  • 调度周期为 Pod 选择一个节点,绑定周期将该决策应用于集群。 调度周期和绑定周期一起被称为“调度上下文”。
  • 调度周期是串行运行的,而绑定周期可以并发运行。
  • 如果确定 Pod 不可调度或者存在内部错误,则可以终止调度周期或绑定周期。 Pod 将返回队列并重试。

scheduling-framework-extensions

以上简介内容来自官方文档,下面开始看源码。

Framework是一个接口,要实现此接口需要实现上面提到的各扩展点的方法:

framework 结构实现了 Framework 接口:

type framework struct {
	registry              Registry
	snapshotSharedLister  schedulerlisters.SharedLister
	waitingPods           *waitingPodsMap
	pluginNameToWeightMap map[string]int
	queueSortPlugins      []QueueSortPlugin
	preFilterPlugins      []PreFilterPlugin
	filterPlugins         []FilterPlugin
	preScorePlugins       []PreScorePlugin
	scorePlugins          []ScorePlugin
	reservePlugins        []ReservePlugin
	preBindPlugins        []PreBindPlugin
	bindPlugins           []BindPlugin
	postBindPlugins       []PostBindPlugin
	unreservePlugins      []UnreservePlugin
	permitPlugins         []PermitPlugin

	clientSet       clientset.Interface
	informerFactory informers.SharedInformerFactory
	volumeBinder    *volumebinder.VolumeBinder

	metricsRecorder *metricsRecorder

	// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
	// after the first failure.
	runAllFilters bool
}

1. 启动 kube-scheduler 进程

首先找到 scheduler 的入口:cmd/kube-scheduler/scheduler.go 中的 main 函数,代码编译后,可通过终端命令启动程序。

command := app.NewSchedulerCommand()
// ...
if err := command.Execute(); err != nil {
		os.Exit(1)
	}

进入下一个函数:cmd/kube-scheduler/app/server.go 中的 Run 函数:

Run 函数做的是基于给定的配置信息和 registryOptions 运行 scheduler,只有在出错或终端命令终止时退出:

// 因为 runCommand() 中设置了 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

简单看一下 Scheduler 里面几个关键字段,先不必深入细节:

// 监控未调度的 Pod, 找到合适的节点, 并将绑定信息写回 API Server.
type Scheduler struct {
  // 通过 SchedulerCache 所做的变更会被 NodeLister 和 Algorithm 观察到.
	SchedulerCache internalcache.Cache
	Algorithm core.ScheduleAlgorithm
	podConditionUpdater podConditionUpdater
	
  // 用来驱逐 Pods 和更新抢占者 Pod 的 NominatedNode 字段.
	podPreemptor podPreemptor

  // 函数实现应该是阻塞的, 直到有可用的 Pod 才返回.
  // 此函数不使用 channel, 因为调度 Pod 会花一些时间, Pod 放在 channel 中可能会时间过长而 stale.
	NextPod func() *framework.PodInfo
	Error func(*framework.PodInfo, error)

	// Close this to shut down the scheduler.
	StopEverything <-chan struct{}

  // 处理 Pod PVC/PV 的绑定
	VolumeBinder *volumebinder.VolumeBinder

  // 是否禁用 Pod 抢占.
	DisablePreemption bool

  // 待调度 Pod 的队列, 后面会详细介绍
	SchedulingQueue internalqueue.SchedulingQueue

	// Profiles are the scheduling profiles.
	Profiles profile.Map
	scheduledPodsHasSynced func() bool
}

回到主线,Run 函数:

// 初始化调度框架的插件注册表
// ...
// 创建 Scheduler 结构
sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
	)
// 准备事件广播器
// 设置 healthz 检查并启动该服务
// 另起一个 goroutine 运行 informers
// ...
// 运行 scheduler
sched.Run(ctx)

sched.Run 方法开始进行 Pod 调度:

// 执行监听和调度, 等待缓存同步完毕后, 阻塞的执行调度,直到 context cancel。
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
  // Pod 队列操作, 里面起了两个 goroutine
	sched.SchedulingQueue.Run()
  // sched.scheduleOne 实现 Pod 的调度逻辑
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

下面分开看 Pod 队列的操作和 Pod 调度。

2. 优先级队列 PriorityQueue

sched.SchedulingQueue.Run() 方法将 Pod 在不同的队列中进行移动。SchedulingQueue 是一个 interface,实现此接口的队列用来存储等待调度的 Pod。

实现此接口的结构是 PriorityQueue,它主要包含三个队列:

  • activeQ:调度器从 activeQ 队列寻找 Pod 来调度,队列最前面的 Pod 的优先级最高。
  • podBackoffQ:Pod 调度失败后放入此队列,里面的 Pod 按退避到期顺序 (即重试时间) 排序。在调度器从 activeQ 寻找 Pod 之前,会从 podBackoffQ 中弹出退避完成 (即到达重试时间) 的 Pod 进行重新调度。
  • unschedulableQ:是一个 map,用来存储那些已尝试并确定无法调度的 Pod。

简单看一下数据结构:

type PriorityQueue struct {
	lock sync.RWMutex // 此结构是非线程安全的
	
  // Heap 是一个 map+slice 实现的队列.
	activeQ *heap.Heap
	podBackoffQ *heap.Heap
	unschedulableQ *UnschedulablePodsMap
  
  // nominatedPods 是一个 map, 用来存储被提名的在节点上运行的 Pod.
	nominatedPods *nominatedPodMap
  // 表示调度周期的序号, 当有一个 Pod 从 activeQ 弹出时将加 1.
	schedulingCycle int64
  // 收到一个移动请求时, 缓存调度周期的序号. 
  // 当收到移动请求时, 如果尝试去调度在这个调度周期之前和之中的不可调度的 Pod, 它们将被放回到 activeQueue.
	moveRequestCycle int64
}

实际执行的是 PriorityQueue 的 Run 方法,它负责将 Pod 在三个队列中进行移动:

// wait.Until 函数来保证 flushBackoffQCompleted 和 flushUnschedulableQLeftover 两个方法失败时会不断重试.
func (p *PriorityQueue) Run() {
  // 每 1 秒执行一次 flushBackoffQCompleted 方法, 直到收到停止信号.
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  // 每 30 秒执行一次 flushUnschedulableQLeftover 方法, 直到收到停止信号.
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

PriorityQueue.flushBackoffQCompleted 方法做的事情是把 BackoffQ 里面到达重试时间的 Pod 放回到 activeQ:

func (p *PriorityQueue) flushBackoffQCompleted() {
	// 省略加解锁代码(什么情况下会发生锁竞争?)
  for {
    rawPodInfo := p.podBackoffQ.Peek()
    
    // ...
    // 查看队列头部的 Pod 是否到达重新调度时间 (根据尝试次数计算), 未到达则 return.
    // 这里是先查看 Pod 符合 pop 的条件后才执行真正的 pop 动作.
    // ...
    
		_, err := p.podBackoffQ.Pop()
    // 入 activeQ 队列
		p.activeQ.Add(rawPodInfo)
    // 唤醒所有等待从 activeQ 队列 pop Pod 的 goroutine (sched.scheduleOne 方法会调用 sched.NextPod() 来获取 Pod)
		defer p.cond.Broadcast()
	}
}

PriorityQueue.flushUnschedulableQLeftover 方法会把在 unschedulableQ 队列中存放时间超过 unschedulableQTimeInterval (60 秒) 的 Pod 移到 podBackoffQ 或 activeQ。

func (p *PriorityQueue) flushUnschedulableQLeftover() {
  // 加锁
	// ...
  // 遍历 map 寻找到达重是时间的 Pod 放入 podsToMove
  // ...
  
	if len(podsToMove) > 0 {
    // 主要逻辑, 正在等待 backoff 时间的 Pod 会放入 podBackoffQ, 否则放入 activeQ
    // UnschedulableTimeout 表示一个事件, 用来统计 metrics
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

接下来看 Pod 真正的调度逻辑。

3. Pod 调度

Pod 的调度逻辑在 Scheduler.scheduleOne 方法中实现。大体思路是先找到合适的节点,缓存必要信息,假定 Pod 已经运行在该节点上,真正的绑定操作是异步进行的。

看一下它的调度逻辑:

  1. 从队列中获取一个待调度的 Pod。

  2. 获取此 Pod 所属调度器的 Profile,包括根据给定的配置创建的 Framework:

    // pkg/scheduler/profile/profile.go
    type Profile struct {
    	framework.Framework
    	Recorder events.EventRecorder
    }
    
    // pkg/scheduler/apis/config/types.go
    type KubeSchedulerProfile struct {
    	SchedulerName string
      // 调度器要用到的插件
    	Plugins *Plugins
    	PluginConfig []PluginConfig
    }
  3. 创建一个 CycleState 结构供插件读写数据,各插件的数据可以互相读写。

    // pkg/scheduler/framework/v1alpha1/cycle_state.go
    type CycleState struct {
    	mx      sync.RWMutex
    	storage map[StateKey]StateData
    	// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
    	recordPluginMetrics bool
    }
  4. 调用 sched.Algorithm.Schedule() 方法,经过调用一系列插件的过滤及打分,得到一个符合的节点。

    • 运行 Prefilter 插件集。
    • 调用 findNodesThatFitPod() 方法,经过 Filter 和 Extender 的过滤,得到符合条件的节点列表。
    • 运行 Prescore 插件集,都成功后进行后续逻辑。
    • 调用 prioritizeNodes() 方法执行 Score 插件集,也可以运行任何的 extender。
      • 每个插件的分数加在一起,就是一个节点的总分。
      • 返回 NodeScore 列表。
    • 选择一个得分最高的节点。
  5. 复制一份 Pod 信息,假定该 Pod 已经运行在选定的节点上,即使还没有绑定它们。这样调度器可以继续调度其它 Pod,而无需等待绑定操作完成(绑定操作是异步进行的)。

  6. 调用 AssumePodVolumes() 方法缓存 Pod 的节点选择。如果需要 PVC 绑定,则只在内存中缓存。

    • AssumePodVolumes 将把缓存的匹配 PV 和 PVC 提供给 podBindingCache 中的所选节点。
    • 用新的预绑定 PV 更新 pvCache。
    • 用新的带 annotations 集合的 PVC 更新 pvcCache。
    • 用为 PV 和 PVC 缓存的 API 更新再次更新 podBindingCache 。
  7. 运行 Reserve 插件。

  8. 调用 assume 方法来把 assumedPod 缓存起来,缓存前它会设置 assumed.Spec.NodeName = scheduleResult.SuggestedHost,即所谓的 Pod 绑定信息。

  9. 调用 RunPermitPlugins() 方法运行 Permit 插件集。Permit 插件用于防止或延迟 Pod 的绑定。

    • 如果返回的不是 Success 或 Wait,将不会继续执行剩下的 Permit 插件并返回错误。
    • 如果有任一插件返回的是 Wait,在所有插件运行完后,会创建一个 waitingPod(已经开始) 并将其放入 waitingPods map 里(后面会用到),随后返回。
    • 如果都返回 Success,则继续后面的异步绑定操作。
  10. 起一个 goroutine 进行绑定:

    • 执行 WaitOnPermit 方法在 Permit 阶段等待 Pod。
    • 绑定 Volumes。
      • 它使用前面假定的绑定更新 API,并等待 PV 控制器完成绑定操作。
      • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 prebind 插件。同上面一样,失败后会触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 bind 操作。
      • 绑定的优先级:先执行 extender 再执行 framework 的 Bind 插件。其实就是把 Pod 与 Node 的绑定信息发送给 API Server 处理。
      • 绑定成功后,会调用 finishBinding() 方法使缓存的 Pod 过期。
    • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 Postbind 插件。

至此,调度一个 Pod 的逻辑就梳理完毕了,不过,还有一些细节需要再梳理。

4. 参考

  1. 官方文档

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions